jito_searcher_client/
cluster_data_impl.rs

1use crate::{ClusterData, Slot};
2use jito_protos::searcher::{
3  searcher_service_client::SearcherServiceClient, ConnectedLeadersRequest, ConnectedLeadersResponse,
4};
5use log::error;
6use solana_sdk::pubkey::Pubkey;
7use std::{
8  collections::{BTreeMap, HashMap},
9  str::FromStr,
10  sync::{
11    atomic::{AtomicBool, AtomicU64, Ordering},
12    Arc,
13  },
14  time::Duration,
15};
16use tokio::{
17  sync::{
18    mpsc::{channel, Receiver},
19    Mutex,
20  },
21  time::{interval, timeout, Interval},
22};
23use tonic::client::GrpcService;
24use tonic::codegen::{Body, Bytes, StdError};
25
26/// Convenient types.
27type JitoLeaderScheduleCache = Arc<Mutex<BTreeMap<Slot, LeaderScheduleCacheEntry>>>;
28
29struct LeaderScheduleCacheEntry {
30  validator_identity: Arc<Pubkey>,
31}
32
33/// Main object keeping track
34/// of cluster data.
35#[derive(Clone)]
36pub struct ClusterDataImpl {
37  /// Tracks the current slot.
38  current_slot: Arc<AtomicU64>,
39
40  /// Keeps track of Jito-Solana slot schedules.
41  jito_leader_schedule_cache: JitoLeaderScheduleCache,
42}
43
44impl ClusterDataImpl {
45  /// Creates a new instance of this object; also spawns a few background tokio tasks.
46  pub async fn new<T>(
47    rpc_pubsub_addr: String,
48    searcher_service_client: SearcherServiceClient<T>,
49    exit: Arc<AtomicBool>,
50  ) -> Self
51  where
52    T: GrpcService<tonic::body::Body> + Send + 'static,
53    T::Error: Into<StdError>,
54    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
55    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
56    <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
57  {
58    let current_slot = Arc::new(AtomicU64::new(0));
59    let jito_leader_schedule_cache = Arc::new(Mutex::new(BTreeMap::new()));
60
61    let (slot_sender, slot_receiver) = channel(1_000);
62    tokio::spawn(rpc_utils::slot_subscribe_loop(
63      rpc_pubsub_addr,
64      slot_sender,
65      exit.clone(),
66    ));
67    tokio::spawn(Self::current_slot_updater(
68      current_slot.clone(),
69      slot_receiver,
70      exit.clone(),
71    ));
72
73    let fetch_interval = interval(Duration::from_secs(30));
74    tokio::spawn(Self::jito_leader_schedule_cache_updater(
75      jito_leader_schedule_cache.clone(),
76      current_slot.clone(),
77      searcher_service_client,
78      fetch_interval,
79      exit,
80    ));
81
82    Self {
83      current_slot,
84      jito_leader_schedule_cache,
85    }
86  }
87
88  async fn current_slot_updater(
89    current_slot: Arc<AtomicU64>,
90    mut slot_receiver: Receiver<Slot>,
91    exit: Arc<AtomicBool>,
92  ) {
93    let timeout_duration = Duration::from_secs(3);
94    while !exit.load(Ordering::Relaxed) {
95      match timeout(timeout_duration, slot_receiver.recv()).await {
96        Err(_) => continue,
97        Ok(Some(slot)) => {
98          current_slot.store(slot, Ordering::Relaxed);
99        }
100        Ok(None) => {
101          break;
102        }
103      }
104    }
105  }
106
107  async fn jito_leader_schedule_cache_updater<T>(
108    jito_leader_schedule_cache: JitoLeaderScheduleCache,
109    current_slot: Arc<AtomicU64>,
110    mut searcher_service_client: SearcherServiceClient<T>,
111    mut fetch_interval: Interval,
112    exit: Arc<AtomicBool>,
113  ) where
114    T: GrpcService<tonic::body::Body> + Send,
115    T::Error: Into<StdError>,
116    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
117    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
118    <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
119  {
120    const MAX_RETRIES: usize = 5;
121    while !exit.load(Ordering::Relaxed) {
122      let _ = fetch_interval.tick().await;
123      if let Some(connected_leaders_resp) =
124        Self::fetch_connected_leaders_with_retries(&mut searcher_service_client, MAX_RETRIES).await
125      {
126        let mut leader_schedule =
127          HashMap::with_capacity(connected_leaders_resp.connected_validators.values().len());
128
129        let current_slot = current_slot.load(Ordering::Relaxed);
130        for (validator_identity, slots) in connected_leaders_resp.connected_validators {
131          if let Ok(validator_identity) = Pubkey::from_str(&validator_identity) {
132            let validator_identity = Arc::new(validator_identity);
133
134            slots
135              .slots
136              .iter()
137              .filter(|&&slot| slot >= current_slot)
138              .for_each(|&slot| {
139                leader_schedule.insert(
140                  slot,
141                  LeaderScheduleCacheEntry {
142                    validator_identity: validator_identity.clone(),
143                  },
144                );
145              });
146          } else {
147            error!("error parsing validator identity: {validator_identity}");
148          }
149        }
150
151        *jito_leader_schedule_cache.lock().await = BTreeMap::from_iter(leader_schedule.into_iter());
152      } else {
153        exit.store(true, Ordering::Relaxed);
154        return;
155      }
156    }
157  }
158
159  async fn fetch_connected_leaders_with_retries<T>(
160    searcher_service_client: &mut SearcherServiceClient<T>,
161    max_retries: usize,
162  ) -> Option<ConnectedLeadersResponse>
163  where
164    T: GrpcService<tonic::body::Body> + Send,
165    T::Error: Into<StdError>,
166    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
167    <T::ResponseBody as Body>::Error: Into<StdError> + Send,
168    <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
169  {
170    for _ in 0..max_retries {
171      if let Ok(resp) = searcher_service_client
172        .get_connected_leaders(ConnectedLeadersRequest {})
173        .await
174      {
175        return Some(resp.into_inner());
176      }
177    }
178    None
179  }
180}
181
182#[tonic::async_trait]
183impl ClusterData for ClusterDataImpl {
184  async fn current_slot(&self) -> Slot {
185    self.current_slot.load(Ordering::Relaxed) as Slot
186  }
187
188  async fn next_jito_validator(&self) -> Option<(Pubkey, Slot)> {
189    let l_jito_leader_schedule_cache = self.jito_leader_schedule_cache.lock().await;
190    let (slot, entry) = l_jito_leader_schedule_cache.first_key_value()?;
191    Some((*entry.validator_identity, *slot))
192  }
193}
194
195mod rpc_utils {
196  use std::{
197    sync::{
198      atomic::{AtomicBool, Ordering},
199      Arc,
200    },
201    time::Duration,
202  };
203
204  use futures_util::StreamExt;
205  use solana_client::{nonblocking::pubsub_client::PubsubClient, rpc_response::SlotUpdate};
206  use solana_metrics::{datapoint_error, datapoint_info};
207  use solana_sdk::clock::Slot;
208  use tokio::{sync::mpsc::Sender, time::sleep};
209
210  // Maintains a RPC subscription to keep track of the current slot.
211  pub async fn slot_subscribe_loop(
212    pubsub_addr: String,
213    slot_sender: Sender<Slot>,
214    exit: Arc<AtomicBool>,
215  ) {
216    let mut connect_errors: u64 = 0;
217    let mut slot_subscribe_errors: u64 = 0;
218    let mut slot_subscribe_disconnect_errors: u64 = 0;
219
220    while !exit.load(Ordering::Relaxed) {
221      sleep(Duration::from_secs(1)).await;
222
223      match PubsubClient::new(&pubsub_addr).await {
224        Ok(pubsub_client) => match pubsub_client.slot_updates_subscribe().await {
225          Ok((mut slot_update_subscription, _unsubscribe_fn)) => {
226            while let Some(slot_update) = slot_update_subscription.next().await {
227              if let SlotUpdate::FirstShredReceived { slot, timestamp: _ } = slot_update {
228                datapoint_info!("slot_subscribe_slot", ("slot", slot, i64));
229                if slot_sender.send(slot).await.is_err() {
230                  datapoint_error!("slot_subscribe_send_error", ("errors", 1, i64));
231                  exit.store(true, Ordering::Relaxed);
232                  return;
233                }
234              }
235
236              if exit.load(Ordering::Relaxed) {
237                return;
238              }
239            }
240            slot_subscribe_disconnect_errors += 1;
241            datapoint_error!(
242              "slot_subscribe_disconnect_error",
243              ("errors", slot_subscribe_disconnect_errors, i64)
244            );
245          }
246          Err(e) => {
247            slot_subscribe_errors += 1;
248            datapoint_error!(
249              "slot_subscribe_error",
250              ("errors", slot_subscribe_errors, i64),
251              ("error_str", e.to_string(), String),
252            );
253          }
254        },
255        Err(e) => {
256          connect_errors += 1;
257          datapoint_error!(
258            "slot_subscribe_pubsub_connect_error",
259            ("errors", connect_errors, i64),
260            ("error_str", e.to_string(), String)
261          );
262        }
263      }
264    }
265  }
266}