jito_searcher_client/
lib.rs1pub mod client_interceptor;
2pub mod cluster_data_impl;
3pub mod convert;
4
5use crate::convert::proto_packet_from_tx;
6use bytes::Bytes;
7use futures::StreamExt;
8use http;
9use jito_protos::{
10 bundle::{Bundle, BundleResult},
11 searcher::{
12 searcher_service_client::SearcherServiceClient, SendBundleRequest,
13 SubscribeBundleResultsRequest,
14 },
15};
16use log::*;
17use solana_sdk::transaction::Transaction;
18use solana_sdk::{clock::Slot, pubkey::Pubkey};
19use std::sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22};
23use thiserror::Error;
24use tokio::sync::{
25 mpsc::{channel, Receiver},
26 Mutex,
27};
28use tonic::transport::ClientTlsConfig;
29use tonic::{
30 codegen::{Body, StdError},
31 transport,
32 transport::{Channel, Endpoint},
33 Status,
34};
35
36pub type BundleId = String;
43
44#[derive(Error, Debug)]
45pub enum SearcherClientError {
46 #[error("block-engine transport error {0}")]
47 BlockEngineTransportError(#[from] transport::Error),
48
49 #[error("no upcoming validator is running jito-solana")]
50 NoUpcomingJitoValidator,
51
52 #[error("grpc client error {0}")]
53 GrpcClientError(#[from] Status),
54
55 #[error("the grpc stream was closed")]
56 GrpcStreamClosed,
57
58 #[error("error serializing transaction")]
59 TransactionSerializationError,
60
61 #[error("tpu client error")]
62 TpuClientError,
63}
64
65pub type SearcherClientResult<T> = Result<T, SearcherClientError>;
66
67#[tonic::async_trait]
68pub trait ClusterData {
69 async fn current_slot(&self) -> Slot;
70 async fn next_jito_validator(&self) -> Option<(Pubkey, Slot)>;
71}
72
73#[derive(Clone)]
74pub struct SearcherClient<C: ClusterData, T> {
75 cluster_data: Arc<C>,
76 searcher_service_client: Arc<Mutex<SearcherServiceClient<T>>>,
77 exit: Arc<AtomicBool>,
78}
79
80impl<C: ClusterData + Clone, T> SearcherClient<C, T>
81where
82 T: tonic::client::GrpcService<tonic::body::Body>,
83 T::Error: Into<StdError>,
84 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
85 <T::ResponseBody as Body>::Error: Into<StdError> + Send,
86{
87 pub fn new(
88 cluster_data: C,
89 searcher_service_client: SearcherServiceClient<T>,
90 exit: Arc<AtomicBool>,
91 ) -> Self {
92 Self {
93 searcher_service_client: Arc::new(Mutex::new(searcher_service_client)),
94 cluster_data: Arc::new(cluster_data),
95 exit,
96 }
97 }
98
99 pub async fn send_bundle(
103 &self,
104 transactions: Vec<Transaction>,
105 ) -> SearcherClientResult<BundleId> {
106 let resp = self
107 .searcher_service_client
108 .lock()
109 .await
110 .send_bundle(SendBundleRequest {
111 bundle: Some(Bundle {
112 header: None,
113 packets: transactions.iter().map(proto_packet_from_tx).collect(),
114 }),
115 })
116 .await?;
117
118 Ok(resp.into_inner().uuid)
119 }
120
121 pub async fn subscribe_bundle_results(
122 &self,
123 buffer_size: usize,
124 ) -> SearcherClientResult<Receiver<BundleResult>> {
125 let (sender, receiver) = channel(buffer_size);
126
127 let mut stream = self
128 .searcher_service_client
129 .lock()
130 .await
131 .subscribe_bundle_results(SubscribeBundleResultsRequest {})
132 .await?
133 .into_inner();
134
135 let exit = self.exit.clone();
136 tokio::spawn(async move {
137 while !exit.load(Ordering::Relaxed) {
138 let msg = match stream.next().await {
139 None => {
140 error!("bundle results stream closed");
141 return;
142 }
143 Some(res) => {
144 if let Err(e) = res {
145 error!("bundle results stream received error status: {e}");
146 return;
147 }
148 res.unwrap()
149 }
150 };
151
152 if let Err(e) = sender.send(msg).await {
153 error!("error sending bundle result: {e}");
154 return;
155 }
156 }
157 });
158
159 Ok(receiver)
160 }
161}
162
163pub async fn grpc_connect(url: &str) -> SearcherClientResult<Channel> {
164 let endpoint = if url.contains("https") {
165 Endpoint::from_shared(url.to_string())
166 .expect("invalid url")
167 .tls_config(ClientTlsConfig::new().with_native_roots())
168 } else {
169 Endpoint::from_shared(url.to_string())
170 }?;
171
172 Ok(endpoint.connect().await?)
173}
174
175pub mod utils {
176 use solana_sdk::pubkey::Pubkey;
177
178 pub fn derive_tip_accounts(tip_program_pubkey: &Pubkey) -> Vec<Pubkey> {
179 let tip_pda_0 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_0"], tip_program_pubkey).0;
180 let tip_pda_1 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_1"], tip_program_pubkey).0;
181 let tip_pda_2 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_2"], tip_program_pubkey).0;
182 let tip_pda_3 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_3"], tip_program_pubkey).0;
183 let tip_pda_4 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_4"], tip_program_pubkey).0;
184 let tip_pda_5 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_5"], tip_program_pubkey).0;
185 let tip_pda_6 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_6"], tip_program_pubkey).0;
186 let tip_pda_7 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_7"], tip_program_pubkey).0;
187
188 vec![
189 tip_pda_0, tip_pda_1, tip_pda_2, tip_pda_3, tip_pda_4, tip_pda_5, tip_pda_6, tip_pda_7,
190 ]
191 }
192}