copy_trading/position_manager/
tx_monitor_loop.rs

1use crate::types::position::Position;
2use crate::types::position_manager::PositionManager;
3use solana_central::constants::TOKENS;
4use solana_central::SwapDirection;
5use solana_sdk::signature::Signer;
6use std::sync::{Arc, Mutex};
7
8impl PositionManager {
9  /// Monitor swap transactions and manage position lifecycle based on wallet activity. Processes
10  /// swap transactions from broadcast channel, handling two cases: Our own wallet's 
11  /// swaps: Confirms buy/sell transactions for existing positions and tracked wallets' 
12  /// swaps: Opens new positions when they buy tokens
13  pub async fn tx_monitor_loop(position_manager: Arc<PositionManager>) {
14    let mut tx_monitor_receiver = position_manager.tx_monitor_receiver.resubscribe();
15    while let Ok(tx) = tx_monitor_receiver.recv().await {
16      // println!("Tx monitor loop received tx");
17      // This is a swap tx of our own wallet, pass it to the correct position for confirmation
18      if tx.signers.contains(&position_manager.wallet.pubkey()) {
19        let mut open_positions = position_manager.open_positions.write().unwrap();
20        /*
21        Lookup position by market address and send over to the position for confirmation. Cloning
22        this position resolves borrow checker issue by no holding immutable reference to open
23        positions
24        */
25        if let Some(position_arc) = open_positions.get(&tx.market_address).cloned() {
26          let mut position = position_arc.lock().unwrap();
27          // Buys
28          if tx.direction == SwapDirection::AToB && tx.token_a_address == TOKENS.wsol {
29            Position::process_buy_tx_confirmation(
30              &mut position,
31              &position_arc,
32              &tx,
33              &position_manager,
34            );
35          } else if tx.direction == SwapDirection::BToA && tx.token_b_address == TOKENS.wsol {
36            Position::process_buy_tx_confirmation(
37              &mut position,
38              &position_arc,
39              &tx,
40              &position_manager,
41            );
42          }
43          // Sells.
44          else {
45            Position::process_sell_tx_confirmation(&mut position, &mut open_positions, &tx);
46          }
47        }
48      } else {
49        for signer in tx.signers.iter() {
50          // This is a swap tx of a wallet that we are tracking
51          if let Some(config) = position_manager.wallet_configs.get(signer) {
52            /*
53            Check if this swap tx was to "buy" a token, meaning swap SOL for the token. This will
54            work with pumpfun bonding curves because we store WSOL as a token in the pumpfun bonding
55            curve and it acts as a pool trait
56            */
57            let is_buy;
58            let token_address;
59            let sol_token_a;
60            {
61              let swap_direction = tx.direction;
62
63              if swap_direction == SwapDirection::BToA {
64                is_buy = tx.token_b_address == TOKENS.wsol;
65                token_address = tx.token_a_address;
66                sol_token_a = false;
67              } else {
68                is_buy = tx.token_a_address == TOKENS.wsol;
69                token_address = tx.token_b_address;
70                sol_token_a = true;
71              }
72            }
73            // println!("Is buy: {:?}", is_buy);
74            // println!("Token address: {:?}", token_address);
75            // println!("Sol token a: {:?}", sol_token_a);
76            /*
77            We consider this a buy. We need to create a new position for this market to buy this
78            token, add it to the list of positions, and then start the position.
79            This entire block is now protected by a single write lock to prevent race conditions.
80            */
81            if is_buy {
82              let mut open_positions = position_manager.open_positions.write().unwrap();
83              /*
84              Check the position we are about to enter is good before we proceed. This will only
85              return something if the position is legit and we should proceed. Otherwise the
86              function won't yield anything and the inner block won't be called.
87              */
88              if let Some(position) = Position::create_checked(
89                token_address,
90                sol_token_a,
91                &tx,
92                *config,
93                &mut open_positions,
94                &position_manager,
95              ) {
96                let position_arc = Arc::new(Mutex::new(position));
97                open_positions.insert(tx.market_address, position_arc.clone());
98
99                Position::start(position_arc, position_manager.clone());
100              }
101            }
102            // If we've found a signer to process, break out of the loop, no need to scan more
103            break;
104          }
105        }
106      }
107    }
108  }
109}