1use crate::{ClusterData, Slot};
2use jito_protos::searcher::{
3 searcher_service_client::SearcherServiceClient, ConnectedLeadersRequest, ConnectedLeadersResponse,
4};
5use log::error;
6use solana_sdk::pubkey::Pubkey;
7use std::{
8 collections::{BTreeMap, HashMap},
9 str::FromStr,
10 sync::{
11 atomic::{AtomicBool, AtomicU64, Ordering},
12 Arc,
13 },
14 time::Duration,
15};
16use tokio::{
17 sync::{
18 mpsc::{channel, Receiver},
19 Mutex,
20 },
21 time::{interval, timeout, Interval},
22};
23use tonic::client::GrpcService;
24use tonic::codegen::{Body, Bytes, StdError};
25
26type JitoLeaderScheduleCache = Arc<Mutex<BTreeMap<Slot, LeaderScheduleCacheEntry>>>;
28
29struct LeaderScheduleCacheEntry {
30 validator_identity: Arc<Pubkey>,
31}
32
33#[derive(Clone)]
36pub struct ClusterDataImpl {
37 current_slot: Arc<AtomicU64>,
39
40 jito_leader_schedule_cache: JitoLeaderScheduleCache,
42}
43
44impl ClusterDataImpl {
45 pub async fn new<T>(
47 rpc_pubsub_addr: String,
48 searcher_service_client: SearcherServiceClient<T>,
49 exit: Arc<AtomicBool>,
50 ) -> Self
51 where
52 T: GrpcService<tonic::body::Body> + Send + 'static,
53 T::Error: Into<StdError>,
54 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
55 <T::ResponseBody as Body>::Error: Into<StdError> + Send,
56 <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
57 {
58 let current_slot = Arc::new(AtomicU64::new(0));
59 let jito_leader_schedule_cache = Arc::new(Mutex::new(BTreeMap::new()));
60
61 let (slot_sender, slot_receiver) = channel(1_000);
62 tokio::spawn(rpc_utils::slot_subscribe_loop(
63 rpc_pubsub_addr,
64 slot_sender,
65 exit.clone(),
66 ));
67 tokio::spawn(Self::current_slot_updater(
68 current_slot.clone(),
69 slot_receiver,
70 exit.clone(),
71 ));
72
73 let fetch_interval = interval(Duration::from_secs(30));
74 tokio::spawn(Self::jito_leader_schedule_cache_updater(
75 jito_leader_schedule_cache.clone(),
76 current_slot.clone(),
77 searcher_service_client,
78 fetch_interval,
79 exit,
80 ));
81
82 Self {
83 current_slot,
84 jito_leader_schedule_cache,
85 }
86 }
87
88 async fn current_slot_updater(
89 current_slot: Arc<AtomicU64>,
90 mut slot_receiver: Receiver<Slot>,
91 exit: Arc<AtomicBool>,
92 ) {
93 let timeout_duration = Duration::from_secs(3);
94 while !exit.load(Ordering::Relaxed) {
95 match timeout(timeout_duration, slot_receiver.recv()).await {
96 Err(_) => continue,
97 Ok(Some(slot)) => {
98 current_slot.store(slot, Ordering::Relaxed);
99 }
100 Ok(None) => {
101 break;
102 }
103 }
104 }
105 }
106
107 async fn jito_leader_schedule_cache_updater<T>(
108 jito_leader_schedule_cache: JitoLeaderScheduleCache,
109 current_slot: Arc<AtomicU64>,
110 mut searcher_service_client: SearcherServiceClient<T>,
111 mut fetch_interval: Interval,
112 exit: Arc<AtomicBool>,
113 ) where
114 T: GrpcService<tonic::body::Body> + Send,
115 T::Error: Into<StdError>,
116 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
117 <T::ResponseBody as Body>::Error: Into<StdError> + Send,
118 <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
119 {
120 const MAX_RETRIES: usize = 5;
121 while !exit.load(Ordering::Relaxed) {
122 let _ = fetch_interval.tick().await;
123 if let Some(connected_leaders_resp) =
124 Self::fetch_connected_leaders_with_retries(&mut searcher_service_client, MAX_RETRIES).await
125 {
126 let mut leader_schedule =
127 HashMap::with_capacity(connected_leaders_resp.connected_validators.values().len());
128
129 let current_slot = current_slot.load(Ordering::Relaxed);
130 for (validator_identity, slots) in connected_leaders_resp.connected_validators {
131 if let Ok(validator_identity) = Pubkey::from_str(&validator_identity) {
132 let validator_identity = Arc::new(validator_identity);
133
134 slots
135 .slots
136 .iter()
137 .filter(|&&slot| slot >= current_slot)
138 .for_each(|&slot| {
139 leader_schedule.insert(
140 slot,
141 LeaderScheduleCacheEntry {
142 validator_identity: validator_identity.clone(),
143 },
144 );
145 });
146 } else {
147 error!("error parsing validator identity: {validator_identity}");
148 }
149 }
150
151 *jito_leader_schedule_cache.lock().await = BTreeMap::from_iter(leader_schedule.into_iter());
152 } else {
153 exit.store(true, Ordering::Relaxed);
154 return;
155 }
156 }
157 }
158
159 async fn fetch_connected_leaders_with_retries<T>(
160 searcher_service_client: &mut SearcherServiceClient<T>,
161 max_retries: usize,
162 ) -> Option<ConnectedLeadersResponse>
163 where
164 T: GrpcService<tonic::body::Body> + Send,
165 T::Error: Into<StdError>,
166 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
167 <T::ResponseBody as Body>::Error: Into<StdError> + Send,
168 <T as GrpcService<tonic::body::Body>>::Future: std::marker::Send,
169 {
170 for _ in 0..max_retries {
171 if let Ok(resp) = searcher_service_client
172 .get_connected_leaders(ConnectedLeadersRequest {})
173 .await
174 {
175 return Some(resp.into_inner());
176 }
177 }
178 None
179 }
180}
181
182#[tonic::async_trait]
183impl ClusterData for ClusterDataImpl {
184 async fn current_slot(&self) -> Slot {
185 self.current_slot.load(Ordering::Relaxed) as Slot
186 }
187
188 async fn next_jito_validator(&self) -> Option<(Pubkey, Slot)> {
189 let l_jito_leader_schedule_cache = self.jito_leader_schedule_cache.lock().await;
190 let (slot, entry) = l_jito_leader_schedule_cache.first_key_value()?;
191 Some((*entry.validator_identity, *slot))
192 }
193}
194
195mod rpc_utils {
196 use std::{
197 sync::{
198 atomic::{AtomicBool, Ordering},
199 Arc,
200 },
201 time::Duration,
202 };
203
204 use futures_util::StreamExt;
205 use solana_client::{nonblocking::pubsub_client::PubsubClient, rpc_response::SlotUpdate};
206 use solana_metrics::{datapoint_error, datapoint_info};
207 use solana_sdk::clock::Slot;
208 use tokio::{sync::mpsc::Sender, time::sleep};
209
210 pub async fn slot_subscribe_loop(
212 pubsub_addr: String,
213 slot_sender: Sender<Slot>,
214 exit: Arc<AtomicBool>,
215 ) {
216 let mut connect_errors: u64 = 0;
217 let mut slot_subscribe_errors: u64 = 0;
218 let mut slot_subscribe_disconnect_errors: u64 = 0;
219
220 while !exit.load(Ordering::Relaxed) {
221 sleep(Duration::from_secs(1)).await;
222
223 match PubsubClient::new(&pubsub_addr).await {
224 Ok(pubsub_client) => match pubsub_client.slot_updates_subscribe().await {
225 Ok((mut slot_update_subscription, _unsubscribe_fn)) => {
226 while let Some(slot_update) = slot_update_subscription.next().await {
227 if let SlotUpdate::FirstShredReceived { slot, timestamp: _ } = slot_update {
228 datapoint_info!("slot_subscribe_slot", ("slot", slot, i64));
229 if slot_sender.send(slot).await.is_err() {
230 datapoint_error!("slot_subscribe_send_error", ("errors", 1, i64));
231 exit.store(true, Ordering::Relaxed);
232 return;
233 }
234 }
235
236 if exit.load(Ordering::Relaxed) {
237 return;
238 }
239 }
240 slot_subscribe_disconnect_errors += 1;
241 datapoint_error!(
242 "slot_subscribe_disconnect_error",
243 ("errors", slot_subscribe_disconnect_errors, i64)
244 );
245 }
246 Err(e) => {
247 slot_subscribe_errors += 1;
248 datapoint_error!(
249 "slot_subscribe_error",
250 ("errors", slot_subscribe_errors, i64),
251 ("error_str", e.to_string(), String),
252 );
253 }
254 },
255 Err(e) => {
256 connect_errors += 1;
257 datapoint_error!(
258 "slot_subscribe_pubsub_connect_error",
259 ("errors", connect_errors, i64),
260 ("error_str", e.to_string(), String)
261 );
262 }
263 }
264 }
265 }
266}