jito_searcher_client/
lib.rs

1pub mod client_interceptor;
2pub mod cluster_data_impl;
3pub mod convert;
4
5use crate::convert::proto_packet_from_tx;
6use bytes::Bytes;
7use futures::StreamExt;
8use http;
9use jito_protos::{
10  bundle::{Bundle, BundleResult},
11  searcher::{
12    searcher_service_client::SearcherServiceClient, SendBundleRequest,
13    SubscribeBundleResultsRequest,
14  },
15};
16use log::*;
17use solana_sdk::transaction::Transaction;
18use solana_sdk::{clock::Slot, pubkey::Pubkey};
19use std::sync::{
20  atomic::{AtomicBool, Ordering},
21  Arc,
22};
23use thiserror::Error;
24use tokio::sync::{
25  mpsc::{channel, Receiver},
26  Mutex,
27};
28use tonic::transport::ClientTlsConfig;
29use tonic::{
30  codegen::{Body, StdError},
31  transport,
32  transport::{Channel, Endpoint},
33  Status,
34};
35
36/// BundleId is expected to be a hash of the contained transaction signatures:
37/// fn derive_bundle_id(transactions: &[VersionedTransaction]) -> String {
38///     let mut hasher = Sha256::new();
39///     hasher.update(transactions.iter().map(|tx| tx.signatures[0]).join(","));
40///     format!("{:x}", hasher.finalize())
41/// }
42pub type BundleId = String;
43
44#[derive(Error, Debug)]
45pub enum SearcherClientError {
46  #[error("block-engine transport error {0}")]
47  BlockEngineTransportError(#[from] transport::Error),
48
49  #[error("no upcoming validator is running jito-solana")]
50  NoUpcomingJitoValidator,
51
52  #[error("grpc client error {0}")]
53  GrpcClientError(#[from] Status),
54
55  #[error("the grpc stream was closed")]
56  GrpcStreamClosed,
57
58  #[error("error serializing transaction")]
59  TransactionSerializationError,
60
61  #[error("tpu client error")]
62  TpuClientError,
63}
64
65pub type SearcherClientResult<T> = Result<T, SearcherClientError>;
66
67#[tonic::async_trait]
68pub trait ClusterData {
69  async fn current_slot(&self) -> Slot;
70  async fn next_jito_validator(&self) -> Option<(Pubkey, Slot)>;
71}
72
73#[derive(Clone)]
74pub struct SearcherClient<C: ClusterData, T> {
75  cluster_data: Arc<C>,
76  searcher_service_client: Arc<Mutex<SearcherServiceClient<T>>>,
77  exit: Arc<AtomicBool>,
78}
79
80impl<C: ClusterData + Clone, T> SearcherClient<C, T>
81where
82  T: tonic::client::GrpcService<tonic::body::Body>,
83  T::Error: Into<StdError>,
84  T::ResponseBody: Body<Data = Bytes> + Send + 'static,
85  <T::ResponseBody as Body>::Error: Into<StdError> + Send,
86{
87  pub fn new(
88    cluster_data: C,
89    searcher_service_client: SearcherServiceClient<T>,
90    exit: Arc<AtomicBool>,
91  ) -> Self {
92    Self {
93      searcher_service_client: Arc::new(Mutex::new(searcher_service_client)),
94      cluster_data: Arc::new(cluster_data),
95      exit,
96    }
97  }
98
99  /**
100  Sends a bundle to the block engine
101  */
102  pub async fn send_bundle(
103    &self,
104    transactions: Vec<Transaction>,
105  ) -> SearcherClientResult<BundleId> {
106    let resp = self
107      .searcher_service_client
108      .lock()
109      .await
110      .send_bundle(SendBundleRequest {
111        bundle: Some(Bundle {
112          header: None,
113          packets: transactions.iter().map(proto_packet_from_tx).collect(),
114        }),
115      })
116      .await?;
117
118    Ok(resp.into_inner().uuid)
119  }
120
121  pub async fn subscribe_bundle_results(
122    &self,
123    buffer_size: usize,
124  ) -> SearcherClientResult<Receiver<BundleResult>> {
125    let (sender, receiver) = channel(buffer_size);
126
127    let mut stream = self
128      .searcher_service_client
129      .lock()
130      .await
131      .subscribe_bundle_results(SubscribeBundleResultsRequest {})
132      .await?
133      .into_inner();
134
135    let exit = self.exit.clone();
136    tokio::spawn(async move {
137      while !exit.load(Ordering::Relaxed) {
138        let msg = match stream.next().await {
139          None => {
140            error!("bundle results stream closed");
141            return;
142          }
143          Some(res) => {
144            if let Err(e) = res {
145              error!("bundle results stream received error status: {e}");
146              return;
147            }
148            res.unwrap()
149          }
150        };
151
152        if let Err(e) = sender.send(msg).await {
153          error!("error sending bundle result: {e}");
154          return;
155        }
156      }
157    });
158
159    Ok(receiver)
160  }
161}
162
163pub async fn grpc_connect(url: &str) -> SearcherClientResult<Channel> {
164  let endpoint = if url.contains("https") {
165    Endpoint::from_shared(url.to_string())
166      .expect("invalid url")
167      .tls_config(ClientTlsConfig::new().with_native_roots())
168  } else {
169    Endpoint::from_shared(url.to_string())
170  }?;
171
172  Ok(endpoint.connect().await?)
173}
174
175pub mod utils {
176  use solana_sdk::pubkey::Pubkey;
177
178  pub fn derive_tip_accounts(tip_program_pubkey: &Pubkey) -> Vec<Pubkey> {
179    let tip_pda_0 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_0"], tip_program_pubkey).0;
180    let tip_pda_1 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_1"], tip_program_pubkey).0;
181    let tip_pda_2 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_2"], tip_program_pubkey).0;
182    let tip_pda_3 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_3"], tip_program_pubkey).0;
183    let tip_pda_4 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_4"], tip_program_pubkey).0;
184    let tip_pda_5 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_5"], tip_program_pubkey).0;
185    let tip_pda_6 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_6"], tip_program_pubkey).0;
186    let tip_pda_7 = Pubkey::find_program_address(&[b"TIP_ACCOUNT_7"], tip_program_pubkey).0;
187
188    vec![
189      tip_pda_0, tip_pda_1, tip_pda_2, tip_pda_3, tip_pda_4, tip_pda_5, tip_pda_6, tip_pda_7,
190    ]
191  }
192}