solana_central/utilities/
load_pools.rs1use crate::CentralContext;
2use crate::utilities::process_get_program_accounts_pool::process_get_program_accounts_pool;
3use futures::future::join_all;
4use solana_account_decoder::UiAccountEncoding;
5use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
6use solana_client::rpc_filter::RpcFilterType;
7use solana_sdk::account::Account;
8use solana_sdk::pubkey::Pubkey;
9use std::sync::Arc;
10use std::thread;
11
12pub async fn load_pools(
16 protocols_to_load: &[(Pubkey, usize)],
17 central_context: Arc<CentralContext>,
18 threads: usize,
19) {
20 println!(
21 "load_pools: Loading pools for the following protocols: {:?}",
22 protocols_to_load
23 );
24 let mut futures = Vec::new();
26 for (protocol, size) in protocols_to_load {
27 let config = RpcProgramAccountsConfig {
28 filters: Some(Vec::from([RpcFilterType::DataSize(*size as u64)])),
29 account_config: RpcAccountInfoConfig {
30 encoding: Some(UiAccountEncoding::Base64),
31 data_slice: None,
32 commitment: None,
33 min_context_slot: None,
34 },
35 with_context: None,
36 sort_results: None,
37 };
38 futures.push(
39 central_context
40 .json_rpc_client_async
41 .get_program_accounts_with_config(protocol, config),
42 );
43 }
44 let results = join_all(futures).await;
45 println!("load_pools: Loaded pools from rpc");
46
47 let mut accounts_raw_data: Vec<(Pubkey, Account)> = Vec::new();
49 for result in results {
50 let accounts = result.unwrap();
51 accounts_raw_data.extend(accounts);
52 }
53
54 let chunk_size = accounts_raw_data.len() / threads;
56 let accounts_raw_data = Arc::new(accounts_raw_data);
57 println!("Length of accounts_raw_data: {}", accounts_raw_data.len());
58
59 let mut handles = Vec::with_capacity(threads);
61 for i in 0..threads {
62 let accounts_raw_data = Arc::clone(&accounts_raw_data);
63 let central_context = Arc::clone(¢ral_context);
64 let start = i * chunk_size;
68 let end = if i == threads - 1 {
69 accounts_raw_data.len()
70 } else {
71 start + chunk_size
72 };
73
74 let handle = thread::spawn(move || {
75 process_get_program_accounts_pool(accounts_raw_data, central_context, start, end)
76 });
77
78 handles.push(handle);
80 }
81 for handle in handles {
88 let results = handle.join().unwrap();
89 for result in results {
90 central_context.insert_pool(result);
91 }
92 }
93}