solana_central/utilities/
load_pools.rs

1use crate::CentralContext;
2use crate::utilities::process_get_program_accounts_pool::process_get_program_accounts_pool;
3use futures::future::join_all;
4use solana_account_decoder::UiAccountEncoding;
5use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
6use solana_client::rpc_filter::RpcFilterType;
7use solana_sdk::account::Account;
8use solana_sdk::pubkey::Pubkey;
9use std::sync::Arc;
10use std::thread;
11
12/// Load pools from multiple protocols using async getProgramAccounts JSON RPC calls. Fetches and
13/// processes pool accounts for the specified protocols in parallel using multiple threads. 
14/// Processed pools are inserted into the central context for later access.
15pub async fn load_pools(
16  protocols_to_load: &[(Pubkey, usize)],
17  central_context: Arc<CentralContext>,
18  threads: usize,
19) {
20  println!(
21    "load_pools: Loading pools for the following protocols: {:?}",
22    protocols_to_load
23  );
24  // Use async await to fetch all dex pools using get program accounts and wait for results
25  let mut futures = Vec::new();
26  for (protocol, size) in protocols_to_load {
27    let config = RpcProgramAccountsConfig {
28      filters: Some(Vec::from([RpcFilterType::DataSize(*size as u64)])),
29      account_config: RpcAccountInfoConfig {
30        encoding: Some(UiAccountEncoding::Base64),
31        data_slice: None,
32        commitment: None,
33        min_context_slot: None,
34      },
35      with_context: None,
36      sort_results: None,
37    };
38    futures.push(
39      central_context
40        .json_rpc_client_async
41        .get_program_accounts_with_config(protocol, config),
42    );
43  }
44  let results = join_all(futures).await;
45  println!("load_pools: Loaded pools from rpc");
46
47  // Pack results into big raw data array to be parsed
48  let mut accounts_raw_data: Vec<(Pubkey, Account)> = Vec::new();
49  for result in results {
50    let accounts = result.unwrap();
51    accounts_raw_data.extend(accounts);
52  }
53
54  // Compute base chunk size for each thread to go through
55  let chunk_size = accounts_raw_data.len() / threads;
56  let accounts_raw_data = Arc::new(accounts_raw_data);
57  println!("Length of accounts_raw_data: {}", accounts_raw_data.len());
58
59  // Create threads to process raw account data into chunks
60  let mut handles = Vec::with_capacity(threads);
61  for i in 0..threads {
62    let accounts_raw_data = Arc::clone(&accounts_raw_data);
63    let central_context = Arc::clone(&central_context);
64    // Clone the Arc to bump the strong count; each thread gets its own Arc pointer.
65
66    // Compute start/end indices for this chunk.  The last thread grabs any “leftovers.”
67    let start = i * chunk_size;
68    let end = if i == threads - 1 {
69      accounts_raw_data.len()
70    } else {
71      start + chunk_size
72    };
73
74    let handle = thread::spawn(move || {
75      process_get_program_accounts_pool(accounts_raw_data, central_context, start, end)
76    });
77
78    // println!("Pushed handle with start and end indices: {}, {}", start, end);
79    handles.push(handle);
80  }
81  // println!("load_pools: handles created");
82
83  /*
84  Load processed pools into the markets cache on a single thread, write locks would make it single
85  thread anyways
86  */
87  for handle in handles {
88    let results = handle.join().unwrap();
89    for result in results {
90      central_context.insert_pool(result);
91    }
92  }
93}