Skip to content

Instantly share code, notes, and snippets.

@rubpy
Last active October 6, 2024 20:54
Show Gist options
  • Save rubpy/f77b025771928882ae824531716355ee to your computer and use it in GitHub Desktop.
Save rubpy/f77b025771928882ae824531716355ee to your computer and use it in GitHub Desktop.
use std::str::FromStr as _;
use solana_client::{
client_error::{ClientError, ClientErrorKind},
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_request::RpcRequest,
};
use solana_program::pubkey::Pubkey;
use solana_sdk::commitment_config::CommitmentConfig;
use solext::rpc::{Client as _, GetAllSignaturesForAddress as _, GetMarkerSignature as _};
// -----------------------------------------------------------------------------
#[tokio::main]
async fn main() {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.format_target(false)
.init();
println!("\n");
const RPC_ENDPOINT_VAR: &str = "SOL_RPC_URL";
let Ok(rpc_endpoint) = std::env::var(RPC_ENDPOINT_VAR) else {
log::error!("missing environment variable: \"{}\"", RPC_ENDPOINT_VAR);
return;
};
let rpc_client = solana_client::nonblocking::rpc_client::RpcClient::new(rpc_endpoint);
if let Err(err) = run(&rpc_client, std::env::args()).await {
log::error!("run: {:?}", err);
};
println!("\n");
}
// -----------------------------------------------------------------------------
async fn run(
rpc_client: &solana_client::nonblocking::rpc_client::RpcClient,
mut args: std::env::Args,
) -> Result<(), ClientError> {
let pool = args
.nth(1)
.and_then(|s| Pubkey::from_str(&s).ok())
.ok_or_else(|| ClientErrorKind::Custom("invalid pool address".to_string()))?;
let rpc_cost = solext::rpc::QuotaMap::from([
(RpcRequest::GetSignaturesForAddress, solext::rpc::Quota(10)),
(RpcRequest::GetBlock, solext::rpc::Quota(10)),
(RpcRequest::GetBlocks, solext::rpc::Quota(10)),
]);
let rpc_client_ext =
solext::rpc::WrappedClient::from(rpc_client).with_limiter(solext::rpc::QuotaLimiter::new(
solext::rpc::Quota(400),
solext::rpc::Quota(1),
Some((&rpc_cost).into()),
));
log::info!(
"Searching for the first transactions of pool \"{}\"\n",
pool
);
let start_time = tokio::time::Instant::now();
let current_slot = rpc_client_ext.get_slot().await?;
let solext::raydium::amm::PoolInitSignatureResponse {
pool_state,
base_mpl_metadata,
pool_init,
} = solext::raydium::amm::find_pool_init_signature_with_max_slot(
&rpc_client_ext,
&pool,
Some(current_slot),
)
.await?;
let Some((pool_init_status, pool_init_marker)) = pool_init else {
log::info!("Unable to find pool_init_status for pool \"{}\"", pool);
return Ok(());
};
if let Some(mpl_token_metadata::accounts::Metadata {
mint,
name,
symbol,
uri,
..
}) = &base_mpl_metadata
{
log::info!(
"Metaplex metadata of base token:\n{}\n",
format!("\tmint: {mint}\n\tname: \"{name}\"\n\tsymbol: \"{symbol}\"\n\turi: \"{uri}\"")
);
}
log::info!(
"`pool_init_status` for pool \"{}\":\n{:?}\n",
pool,
pool_init_status,
);
let pool_init_forward_marker_offset: u64 = 500;
let pool_init_forward_marker = rpc_client_ext
.get_marker_signature(
pool_init_status
.slot
.saturating_add(pool_init_forward_marker_offset)
.min(current_slot),
Some(16),
)
.await?;
let pool_oldest_statuses = rpc_client_ext
.get_all_signatures_for_address_with_config(
&pool_state.coin_vault,
&GetConfirmedSignaturesForAddress2Config {
before: Some(pool_init_forward_marker.signature),
until: Some(pool_init_marker.signature),
limit: None,
commitment: Some(CommitmentConfig::finalized()),
},
)
.await?;
let end_time = tokio::time::Instant::now().duration_since(start_time);
/*
/// Example of how to use a predicate.
let pool_oldest_statuses = rpc_client_ext
.get_all_signatures_for_address_with_config_while(
&pool_state.coin_vault,
&solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config {
before: Some(pool_init_forward_marker.signature),
until: Some(pool_init_marker.signature),
limit: None,
commitment: Some(solana_sdk::commitment_config::CommitmentConfig::finalized()),
},
Some(
|_statuses: &Vec<solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature>, processed: usize|
async move { processed < 2000 },
),
)
.await?;
*/
let statuses_dump = pool_oldest_statuses
.0
.iter()
.rev()
// .filter_map(|s| s.err.is_none().then_some(s.signature.as_str()))
.map(|s| s.signature.as_str())
.collect::<Vec<&str>>()
.join("\n");
log::info!(
"It took {:.2} seconds. Consumed API credits: {}\n",
end_time.as_secs_f64(),
rpc_client_ext
.limiter()
.await
.map(|l| l.consumed())
.unwrap_or_default()
.as_u64()
);
log::info!(
"First {} transactions for pool \"{}\":\n{}\n",
pool_oldest_statuses.0.len(),
pool,
statuses_dump,
);
Ok(())
}
// -----------------------------------------------------------------------------
pub mod solext {
pub mod raydium {
pub mod amm {
use std::future::{self, Future};
use solana_client::{
client_error::{ClientError, ClientErrorKind},
rpc_client, rpc_response,
};
use solana_program::pubkey::Pubkey;
use solana_sdk::commitment_config::CommitmentConfig;
use crate::solext::{self};
// -----------------------------------------------------------------
pub mod state {
pub const SIZE: usize = 752;
pub mod offsets {
pub const COIN_VAULT: usize = 336;
pub const PC_VAULT: usize = 368;
pub const COIN_VAULT_MINT: usize = 400;
pub const PC_VAULT_MINT: usize = 432;
pub const LP_MINT: usize = 464;
}
pub mod minimal {
use solana_program::pubkey::Pubkey;
#[derive(Debug, Clone, Copy, Default)]
#[repr(u8)]
pub enum PoolDecodeError {
#[default]
Unknown = 0,
InvalidSize,
}
impl std::error::Error for PoolDecodeError {}
impl std::fmt::Display for PoolDecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolDecodeError::Unknown => write!(f, "Unknown"),
PoolDecodeError::InvalidSize => {
write!(f, "InvalidSize")
}
}
}
}
#[derive(Debug)]
pub struct Pool {
pub coin_vault: Pubkey,
pub pc_vault: Pubkey,
pub coin_vault_mint: Pubkey,
pub pc_vault_mint: Pubkey,
pub lp_mint: Pubkey,
}
macro_rules! buf_pubkey {
($buf:ident, $offset:expr) => {
solana_program::pubkey::Pubkey::new_from_array(*arrayref::array_ref![
$buf,
$offset,
std::mem::size_of::<solana_program::pubkey::Pubkey>()
])
};
}
impl TryFrom<&[u8]> for Pool {
type Error = PoolDecodeError;
fn try_from(buf: &[u8]) -> Result<Self, Self::Error> {
if buf.len() != super::SIZE {
return Err(PoolDecodeError::InvalidSize);
}
Ok(Self {
coin_vault: buf_pubkey!(buf, super::offsets::COIN_VAULT),
pc_vault: buf_pubkey!(buf, super::offsets::PC_VAULT),
coin_vault_mint: buf_pubkey!(buf, super::offsets::COIN_VAULT_MINT),
pc_vault_mint: buf_pubkey!(buf, super::offsets::PC_VAULT_MINT),
lp_mint: buf_pubkey!(buf, super::offsets::LP_MINT),
})
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolInitSignatureAuthorityAccount {
pub account: Pubkey,
pub marker_offset: Option<u64>,
pub marker_offset_seek: Option<Option<u64>>,
}
impl From<Pubkey> for PoolInitSignatureAuthorityAccount {
fn from(account: Pubkey) -> Self {
Self {
account,
..Default::default()
}
}
}
pub async fn pool_init_signature_authority_standard_mapper(
ctx: PoolInitSignatureAuthorityContext<'_>,
) -> Result<Vec<PoolInitSignatureAuthorityAccount>, ClientError> {
static KNOWN_KEYS: [Pubkey; 2] = [
solana_program::system_program::ID,
mpl_token_metadata::programs::MPL_TOKEN_METADATA_ID,
];
static PUMPFUN_AUTHORITY: Pubkey =
solana_program::pubkey!("TSLvdd1pWpHVjahSpsvCXUbgwsL3JAcvokwaKt1eokM");
static PUMPFUN_ATA_OWNERS: [Pubkey; 2] = [
solana_program::pubkey!("39azUYFWPz3VHgKCf3VChUwbpURdCHRxjWVowf5jUJjg"),
solana_program::pubkey!("5PXxuZkvftsg5CAGjv5LL5tEtvBRskdx1AAjxw8hK2Qx"),
];
let Some(update_authority) = ctx.base_mpl_metadata.map(|m| m.update_authority)
else {
return Ok(Vec::default());
};
let mut candidates = Vec::new();
if update_authority == PUMPFUN_AUTHORITY {
candidates.extend_from_slice(&[
PoolInitSignatureAuthorityAccount {
account: spl_associated_token_account::get_associated_token_address(
&PUMPFUN_ATA_OWNERS[0],
&ctx.base_mint,
),
marker_offset: Some(512),
..Default::default()
},
PoolInitSignatureAuthorityAccount {
account: spl_associated_token_account::get_associated_token_address(
&PUMPFUN_ATA_OWNERS[1],
&ctx.pool_state.lp_mint,
),
marker_offset: Some(2048),
..Default::default()
},
]);
}
if (update_authority != ctx.pool_state.coin_vault_mint
&& update_authority != ctx.pool_state.pc_vault_mint)
&& update_authority.is_on_curve()
&& !KNOWN_KEYS.contains(&update_authority)
{
candidates.extend_from_slice(&[
PoolInitSignatureAuthorityAccount::from(
spl_associated_token_account::get_associated_token_address(
&update_authority,
&ctx.base_mint,
),
),
PoolInitSignatureAuthorityAccount::from(update_authority),
]);
}
Ok(candidates)
}
#[derive(Debug, Clone)]
pub struct PoolInitSignatureAuthorityContext<'a> {
pub pool_state: &'a state::minimal::Pool,
pub base_mint: Pubkey,
pub quote_mint: Pubkey,
pub base_mpl_metadata: Option<&'a mpl_token_metadata::accounts::Metadata>,
}
pub struct PoolInitSignatureResponse {
pub pool_state: state::minimal::Pool,
pub base_mpl_metadata: Option<mpl_token_metadata::accounts::Metadata>,
pub pool_init: Option<(
rpc_response::RpcConfirmedTransactionStatusWithSignature,
solext::rpc::MarkerSignature,
)>,
}
#[derive(Debug, Clone, Default)]
pub struct PoolInitSignatureConfig<AM> {
pub authority_marker_offset: Option<u64>,
pub authority_marker_offset_seek: Option<Option<u64>>,
pub authority_mapper: Option<AM>,
pub max_slot: Option<u64>,
}
pub fn find_pool_init_signature<'a, C>(
rpc_client: &'a C,
pool: &'a Pubkey,
) -> impl Future<Output = Result<PoolInitSignatureResponse, ClientError>> + 'a
where
C: solext::rpc::Client
+ solext::rpc::GetMarkerSignature
+ solext::rpc::GetOldestSignatureForAddress,
{
find_pool_init_signature_with_config::<
C,
fn(
PoolInitSignatureAuthorityContext,
) -> future::Ready<
Result<Vec<PoolInitSignatureAuthorityAccount>, ClientError>,
>,
_,
>(rpc_client, pool, None)
}
pub fn find_pool_init_signature_with_max_slot<'a, C>(
rpc_client: &'a C,
pool: &'a Pubkey,
max_slot: Option<u64>,
) -> impl Future<Output = Result<PoolInitSignatureResponse, ClientError>> + 'a
where
C: solext::rpc::Client
+ solext::rpc::GetMarkerSignature
+ solext::rpc::GetOldestSignatureForAddress,
{
find_pool_init_signature_with_config::<
C,
fn(
PoolInitSignatureAuthorityContext,
) -> future::Ready<
Result<Vec<PoolInitSignatureAuthorityAccount>, ClientError>,
>,
_,
>(
rpc_client,
pool,
Some(PoolInitSignatureConfig {
authority_marker_offset: None,
authority_marker_offset_seek: None,
authority_mapper: None,
max_slot,
}),
)
}
pub async fn find_pool_init_signature_with_config<C, AM, AMFut>(
rpc_client: &C,
pool: &Pubkey,
config: Option<PoolInitSignatureConfig<AM>>,
) -> Result<PoolInitSignatureResponse, ClientError>
where
C: solext::rpc::Client
+ solext::rpc::GetMarkerSignature
+ solext::rpc::GetOldestSignatureForAddress,
AM: Fn(PoolInitSignatureAuthorityContext) -> AMFut,
AMFut: Future<Output = Result<Vec<PoolInitSignatureAuthorityAccount>, ClientError>>,
{
let pool_state = rpc_client.get_account_data(pool).await.and_then(|v| {
state::minimal::Pool::try_from(v.as_slice()).map_err(|e| {
ClientErrorKind::Custom(format!(
"failed deserialization of pool state: {}",
e
))
.into()
})
})?;
let (base_mint, quote_mint) =
match spl_token::native_mint::check_id(&pool_state.coin_vault_mint) {
true => (&pool_state.pc_vault_mint, &pool_state.coin_vault_mint),
false => (&pool_state.coin_vault_mint, &pool_state.pc_vault_mint),
};
let (base_mpl, _) = mpl_token_metadata::accounts::Metadata::find_pda(base_mint);
let base_mpl_metadata = mpl_token_metadata::accounts::Metadata::safe_deserialize(
&rpc_client.get_account_data(&base_mpl).await?,
)
.map_err(|e| {
ClientErrorKind::Custom(format!(
"failed deserialization of Metaplex metadata: {}",
e
))
})?;
let authority_ctx = PoolInitSignatureAuthorityContext {
pool_state: &pool_state,
base_mint: *base_mint,
quote_mint: *quote_mint,
base_mpl_metadata: Some(&base_mpl_metadata),
};
let authority_candidates =
match config.as_ref().and_then(|c| c.authority_mapper.as_ref()) {
Some(mapper) => mapper(authority_ctx).await,
None => pool_init_signature_authority_standard_mapper(authority_ctx).await,
}?;
let authority_candidate_last_status = 'status_search: {
async fn get_last_status<C: solext::rpc::Client>(
rpc_client: &C,
address: &Pubkey,
) -> Result<
Option<rpc_response::RpcConfirmedTransactionStatusWithSignature>,
ClientError,
> {
Ok(rpc_client
.get_signatures_for_address(address)
.await?
.into_iter()
.filter(|s| s.err.is_none())
.last())
}
for candidate in authority_candidates.iter() {
if let Some(status) =
get_last_status(rpc_client, &candidate.account).await?
{
break 'status_search Some((candidate, status));
}
}
None
};
let authority_marker = match &authority_candidate_last_status {
None => None,
Some((candidate, status)) => {
let config = config.as_ref();
let marker_offset = candidate.marker_offset.unwrap_or_else(|| {
config
.and_then(|c| c.authority_marker_offset)
.unwrap_or(16384)
});
let marker_offset_seek =
candidate.marker_offset_seek.unwrap_or_else(|| {
config
.and_then(|c| c.authority_marker_offset_seek)
.unwrap_or(Some(16))
});
let target_slot = status
.slot
.saturating_add(marker_offset)
.min(config.and_then(|c| c.max_slot).unwrap_or(u64::MAX));
rpc_client
.get_marker_signature(target_slot, marker_offset_seek)
.await
.ok()
}
};
let lp_last_signature = rpc_client
.get_oldest_signature_for_address_with_config(
&pool_state.lp_mint,
&rpc_client::GetConfirmedSignaturesForAddress2Config {
before: authority_marker.map(|s| s.signature),
until: None,
limit: None,
commitment: Some(CommitmentConfig::finalized()),
},
)
.await?;
Ok(PoolInitSignatureResponse {
pool_state,
base_mpl_metadata: Some(base_mpl_metadata),
pool_init: match lp_last_signature {
None => None,
Some(s) => {
solext::rpc::MarkerSignature::try_from((&s, s.slot, s.block_time))
.ok()
.map(|m| (s, m))
}
},
})
}
}
}
// -------------------------------------------------------------------------
pub mod rpc {
use std::borrow::{Borrow, Cow};
use std::cmp::{Eq, Ord, PartialEq, PartialOrd};
use std::future::{self, Future};
use std::sync::Arc;
use std::{collections::HashMap, hash::Hash, str::FromStr as _};
use solana_sdk::transaction::TransactionError;
use tokio::sync::Mutex;
pub use solana_client::client_error::{ClientError, ClientErrorKind};
use solana_client::{rpc_client, rpc_config, rpc_request, rpc_response};
use solana_program::pubkey::Pubkey;
use solana_sdk::account;
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::signature::Signature;
use solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
UiConfirmedBlock, UiTransactionEncoding,
};
// ---------------------------------------------------------------------
pub trait Client {
fn get_account(
&self,
pubkey: &Pubkey,
) -> impl Future<Output = Result<account::Account, ClientError>>;
fn get_account_data(
&self,
pubkey: &Pubkey,
) -> impl Future<Output = Result<Vec<u8>, ClientError>>;
fn get_signatures_for_address_with_config(
&self,
address: &Pubkey,
config: rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> impl Future<
Output = Result<
Vec<rpc_response::RpcConfirmedTransactionStatusWithSignature>,
ClientError,
>,
>;
fn get_signatures_for_address(
&self,
address: &Pubkey,
) -> impl Future<
Output = Result<
Vec<rpc_response::RpcConfirmedTransactionStatusWithSignature>,
ClientError,
>,
>;
fn get_blocks(
&self,
start_slot: Slot,
end_slot: Option<Slot>,
) -> impl Future<Output = Result<Vec<Slot>, ClientError>>;
fn get_block_with_config(
&self,
slot: Slot,
config: rpc_config::RpcBlockConfig,
) -> impl Future<Output = Result<UiConfirmedBlock, ClientError>>;
fn get_block(
&self,
slot: Slot,
) -> impl Future<Output = Result<EncodedConfirmedBlock, ClientError>>;
fn get_block_with_encoding(
&self,
slot: Slot,
encoding: UiTransactionEncoding,
) -> impl Future<Output = Result<EncodedConfirmedBlock, ClientError>>;
fn get_slot(&self) -> impl Future<Output = Result<Slot, ClientError>>;
}
pub struct WrappedClient<'a, L = QuotaLimiter<'a>> {
client: &'a solana_client::nonblocking::rpc_client::RpcClient,
limiter: Option<Arc<Mutex<L>>>,
}
impl<'a> From<&'a solana_client::nonblocking::rpc_client::RpcClient> for WrappedClient<'a> {
fn from(rpc_client: &'a solana_client::nonblocking::rpc_client::RpcClient) -> Self {
Self {
client: rpc_client,
limiter: None,
}
}
}
impl<'a, L> WrappedClient<'a, L> {
pub fn with_limiter(mut self, limiter: L) -> Self {
self.limiter = Some(Arc::new(Mutex::new(limiter)));
self
}
#[inline]
pub const fn client(&self) -> &solana_client::nonblocking::rpc_client::RpcClient {
self.client
}
#[inline]
pub async fn limiter(&self) -> Option<tokio::sync::MutexGuard<'_, L>> {
match &self.limiter {
None => None,
Some(limiter) => Some(limiter.lock().await),
}
}
}
macro_rules! mark {
($self:ident, $action:expr) => {
if let Some(limiter) = &$self.limiter {
limiter
.lock()
.await
.mark($action)
.ok_or_else(quota_exceeded)?;
}
};
}
impl<L> Client for WrappedClient<'_, L>
where
L: Limiter,
{
async fn get_account(&self, pubkey: &Pubkey) -> Result<account::Account, ClientError> {
mark!(self, &rpc_request::RpcRequest::GetAccountInfo.into());
self.client.get_account(pubkey).await
}
async fn get_account_data(&self, pubkey: &Pubkey) -> Result<Vec<u8>, ClientError> {
Ok(self.get_account(pubkey).await?.data)
}
async fn get_signatures_for_address_with_config(
&self,
address: &Pubkey,
config: rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> Result<Vec<rpc_response::RpcConfirmedTransactionStatusWithSignature>, ClientError>
{
mark!(
self,
&rpc_request::RpcRequest::GetSignaturesForAddress.into()
);
self.client
.get_signatures_for_address_with_config(address, config)
.await
}
async fn get_signatures_for_address(
&self,
address: &Pubkey,
) -> Result<Vec<rpc_response::RpcConfirmedTransactionStatusWithSignature>, ClientError>
{
self.get_signatures_for_address_with_config(
address,
rpc_client::GetConfirmedSignaturesForAddress2Config::default(),
)
.await
}
async fn get_blocks(
&self,
start_slot: Slot,
end_slot: Option<Slot>,
) -> Result<Vec<Slot>, ClientError> {
mark!(self, &rpc_request::RpcRequest::GetBlocks.into());
self.client.get_blocks(start_slot, end_slot).await
}
async fn get_block_with_config(
&self,
slot: Slot,
config: rpc_config::RpcBlockConfig,
) -> Result<UiConfirmedBlock, ClientError> {
mark!(self, &rpc_request::RpcRequest::GetBlock.into());
self.client.get_block_with_config(slot, config).await
}
async fn get_block(&self, slot: Slot) -> Result<EncodedConfirmedBlock, ClientError> {
self.get_block_with_encoding(slot, UiTransactionEncoding::Json)
.await
}
async fn get_block_with_encoding(
&self,
slot: Slot,
encoding: UiTransactionEncoding,
) -> Result<EncodedConfirmedBlock, ClientError> {
mark!(self, &rpc_request::RpcRequest::GetBlock.into());
self.client.get_block_with_encoding(slot, encoding).await
}
async fn get_slot(&self) -> Result<Slot, ClientError> {
mark!(self, &rpc_request::RpcRequest::GetSlot.into());
self.client.get_slot().await
}
}
// ---------------------------------------------------------------------
type TransactionStatus = rpc_response::RpcConfirmedTransactionStatusWithSignature;
pub trait IterAllSignaturesForAddress {
fn iter_all_signatures_for_address_with_config<'a, F, PF>(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
callback: F,
) -> impl Future<Output = Result<usize, ClientError>>
where
PF: Future<Output = bool> + 'a,
F: Fn(Vec<TransactionStatus>, usize) -> PF;
fn iter_all_signatures_for_address<'a, F, PF>(
&self,
address: &Pubkey,
callback: F,
) -> impl Future<Output = Result<usize, ClientError>>
where
PF: Future<Output = bool> + 'a,
F: Fn(Vec<TransactionStatus>, usize) -> PF;
}
impl<C: Client> IterAllSignaturesForAddress for C {
async fn iter_all_signatures_for_address_with_config<'a, F, PF>(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
callback: F,
) -> Result<usize, ClientError>
where
PF: Future<Output = bool> + 'a,
F: Fn(Vec<TransactionStatus>, usize) -> PF,
{
if config.limit.is_some_and(|v| v == 0) {
return Ok(0);
}
let signature_limit = config.limit.unwrap_or(1000);
let mut last_signature: Option<Signature> = None;
let mut processed = 0usize;
loop {
let c = rpc_client::GetConfirmedSignaturesForAddress2Config {
limit: Some(signature_limit),
before: last_signature.or(config.before),
until: config.until,
commitment: config.commitment,
};
let statuses = self
.get_signatures_for_address_with_config(address, c)
.await?;
let status_count = statuses.len();
match statuses.iter().last() {
None => break,
Some(last_status) => {
last_signature.replace(
Signature::from_str(&last_status.signature)
.map_err(|e| ClientErrorKind::Custom(e.to_string()))?,
);
}
}
processed = processed.saturating_add(status_count);
if !callback(statuses, processed).await || status_count < signature_limit {
break;
}
}
Ok(processed)
}
#[inline]
async fn iter_all_signatures_for_address<'a, F, PF>(
&self,
address: &Pubkey,
callback: F,
) -> Result<usize, ClientError>
where
PF: Future<Output = bool> + 'a,
F: Fn(Vec<TransactionStatus>, usize) -> PF,
{
self.iter_all_signatures_for_address_with_config(
address,
&rpc_client::GetConfirmedSignaturesForAddress2Config::default(),
callback,
)
.await
}
}
pub trait GetAllSignaturesForAddress {
fn get_all_signatures_for_address_with_config(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> impl Future<Output = Result<(Vec<TransactionStatus>, usize), ClientError>>;
fn get_all_signatures_for_address_with_config_while<'a, P, PF>(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
predicate: Option<P>,
) -> impl Future<Output = Result<(Vec<TransactionStatus>, usize), ClientError>>
where
PF: Future<Output = bool> + 'a,
P: for<'b> Fn(&'b Vec<TransactionStatus>, usize) -> PF + 'a;
fn get_all_signatures_for_address(
&self,
address: &Pubkey,
) -> impl Future<Output = Result<(Vec<TransactionStatus>, usize), ClientError>>;
}
impl<C: Client> GetAllSignaturesForAddress for C
where
C: IterAllSignaturesForAddress,
{
#[inline]
async fn get_all_signatures_for_address_with_config(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> Result<(Vec<TransactionStatus>, usize), ClientError> {
self.get_all_signatures_for_address_with_config_while::<fn(&Vec<TransactionStatus>, usize) -> future::Ready<bool>, _>(
address, config, None)
.await
}
async fn get_all_signatures_for_address_with_config_while<'a, P, PF>(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
predicate: Option<P>,
) -> Result<(Vec<TransactionStatus>, usize), ClientError>
where
PF: Future<Output = bool> + 'a,
P: for<'b> Fn(&'b Vec<TransactionStatus>, usize) -> PF + 'a,
{
let no_predicate = predicate.is_none();
let ctx = Arc::new(Mutex::new((
<Option<Vec<TransactionStatus>>>::None,
&predicate,
)));
let processed = self
.iter_all_signatures_for_address_with_config(
address,
config,
|statuses, processed| {
let ctx = ctx.clone();
async move {
let mut ctx_guard = ctx.lock_owned().await;
let proceed = no_predicate
|| (match &ctx_guard.1 {
None => true,
Some(predicate) => predicate(&statuses, processed).await,
});
match &mut ctx_guard.0 {
None => {
ctx_guard.0.replace(statuses);
}
Some(all_statuses) => all_statuses.extend(statuses),
}
proceed
}
},
)
.await?;
Ok((
ctx.lock_owned().await.0.take().unwrap_or_default(),
processed,
))
}
#[inline]
async fn get_all_signatures_for_address(
&self,
address: &Pubkey,
) -> Result<(Vec<TransactionStatus>, usize), ClientError> {
self.get_all_signatures_for_address_with_config(
address,
&rpc_client::GetConfirmedSignaturesForAddress2Config::default(),
)
.await
}
}
pub trait GetOldestSignatureForAddress {
fn get_oldest_signature_for_address_with_config(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> impl Future<Output = Result<Option<TransactionStatus>, ClientError>>;
fn get_oldest_signature_for_address(
&self,
address: &Pubkey,
) -> impl Future<Output = Result<Option<TransactionStatus>, ClientError>>;
}
impl<C: Client> GetOldestSignatureForAddress for C
where
C: IterAllSignaturesForAddress,
{
async fn get_oldest_signature_for_address_with_config(
&self,
address: &Pubkey,
config: &rpc_client::GetConfirmedSignaturesForAddress2Config,
) -> Result<Option<TransactionStatus>, ClientError> {
let last_status = Arc::new(Mutex::new(<Option<TransactionStatus>>::None));
self.iter_all_signatures_for_address_with_config(address, config, |statuses, _| {
let last_status = last_status.clone();
async move {
match statuses.into_iter().last() {
None => false,
Some(status) => {
last_status.lock_owned().await.replace(status);
true
}
}
}
})
.await?;
Ok(last_status.lock_owned().await.take())
}
#[inline]
async fn get_oldest_signature_for_address(
&self,
address: &Pubkey,
) -> Result<Option<TransactionStatus>, ClientError> {
self.get_oldest_signature_for_address_with_config(
address,
&rpc_client::GetConfirmedSignaturesForAddress2Config::default(),
)
.await
}
}
// ---------------------------------------------------------------------
#[derive(Debug, Clone, Copy, Default)]
#[repr(u8)]
pub enum MarkerSignatureDecodeError {
#[default]
Unknown = 0,
UnsupportedEncoding,
MissingMeta,
InvalidSignature,
}
impl std::error::Error for MarkerSignatureDecodeError {}
impl std::fmt::Display for MarkerSignatureDecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MarkerSignatureDecodeError::Unknown => write!(f, "Unknown"),
MarkerSignatureDecodeError::UnsupportedEncoding => {
write!(f, "UnsupportedEncoding")
}
MarkerSignatureDecodeError::MissingMeta => write!(f, "MissingMeta"),
MarkerSignatureDecodeError::InvalidSignature => write!(f, "InvalidSignature"),
}
}
}
#[derive(Debug, Clone)]
pub struct MarkerSignature {
pub signature: Signature,
pub slot: Slot,
pub err: Option<TransactionError>,
pub block_time: Option<UnixTimestamp>,
}
impl PartialEq for MarkerSignature {
fn eq(&self, rhs: &Self) -> bool {
self.slot == rhs.slot && self.signature == rhs.signature
}
}
impl std::fmt::Display for MarkerSignature {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.signature.fmt(f)
}
}
impl From<MarkerSignature> for Signature {
fn from(this: MarkerSignature) -> Self {
this.signature
}
}
impl From<&MarkerSignature> for Signature {
fn from(this: &MarkerSignature) -> Self {
this.signature
}
}
impl
TryFrom<(
&EncodedTransactionWithStatusMeta,
Slot,
Option<UnixTimestamp>,
)> for MarkerSignature
{
type Error = MarkerSignatureDecodeError;
fn try_from(
(status, slot, block_time): (
&EncodedTransactionWithStatusMeta,
Slot,
Option<UnixTimestamp>,
),
) -> Result<Self, Self::Error> {
let EncodedTransaction::Json(tx) = &status.transaction else {
return Err(MarkerSignatureDecodeError::UnsupportedEncoding);
};
let Some(meta) = &status.meta else {
return Err(MarkerSignatureDecodeError::MissingMeta);
};
let signature = tx
.signatures
.last()
.and_then(|s| Signature::from_str(s).ok())
.ok_or(MarkerSignatureDecodeError::InvalidSignature)?;
Ok(Self {
signature,
slot,
err: meta.err.clone(),
block_time,
})
}
}
impl
TryFrom<(
&rpc_response::RpcConfirmedTransactionStatusWithSignature,
Slot,
Option<UnixTimestamp>,
)> for MarkerSignature
{
type Error = MarkerSignatureDecodeError;
fn try_from(
(status, slot, block_time): (
&rpc_response::RpcConfirmedTransactionStatusWithSignature,
Slot,
Option<UnixTimestamp>,
),
) -> Result<Self, Self::Error> {
let signature = Signature::from_str(&status.signature)
.map_err(|_| MarkerSignatureDecodeError::InvalidSignature)?;
Ok(Self {
signature,
slot,
err: status.err.clone(),
block_time,
})
}
}
pub trait GetMarkerSignature {
fn get_marker_signature(
&self,
slot: u64,
seek_valid: Option<u64>,
) -> impl Future<Output = Result<MarkerSignature, ClientError>>;
}
impl GetMarkerSignature for WrappedClient<'_> {
async fn get_marker_signature(
&self,
slot: u64,
seek_valid: Option<u64>,
) -> Result<MarkerSignature, ClientError> {
let first_valid_slot = if let Some(seek_offset) = seek_valid {
self.get_blocks(slot, Some(slot + seek_offset))
.await?
.first()
.copied()
.ok_or_else(|| {
ClientErrorKind::Custom("no valid block range".to_string())
})?
} else {
slot
};
let block = self
.get_block_with_config(
first_valid_slot,
solana_client::rpc_config::RpcBlockConfig {
encoding: Some(UiTransactionEncoding::JsonParsed),
transaction_details: None,
rewards: None,
commitment: Some(CommitmentConfig::finalized()),
max_supported_transaction_version: Some(0),
},
)
.await?;
let last_tx_marker = block
.transactions
.and_then(|ss| {
ss.into_iter().rev().find_map(|s| {
MarkerSignature::try_from((&s, block.parent_slot, block.block_time))
.ok()
})
})
.ok_or_else(|| {
ClientErrorKind::Custom("no valid marker transaction in block".to_string())
})?;
Ok(last_tx_marker)
}
}
// ---------------------------------------------------------------------
#[inline]
pub fn quota_exceeded() -> ClientError {
ClientErrorKind::Custom("quota exceeded".to_string()).into()
}
#[derive(Clone, Default)]
pub struct QuotaLimiter<'a> {
pub quota: Quota,
pub initial_quota: Quota,
pub default_cost: Quota,
pub cost: Option<Cow<'a, QuotaMap<Action>>>,
}
impl Limiter for QuotaLimiter<'_> {
#[inline]
fn is_ok(&self) -> bool {
self.quota.0 != 0
}
#[inline]
fn mark(&mut self, action: &Action) -> Status {
self.mark_with_cost(action, None)
}
fn mark_with_cost(&mut self, action: &Action, cost: Option<Quota>) -> Status {
let cost = cost.unwrap_or_else(|| {
self.cost
.as_ref()
.and_then(|m| m.get(action).copied())
.unwrap_or(self.default_cost)
});
let quota = self.quota;
match quota.checked_sub(cost) {
None => Status::QuotaExceeded((quota, cost, cost - quota)),
Some(new_quota) => {
self.quota = new_quota;
Status::Ok((new_quota, cost))
}
}
}
}
impl<'a> QuotaLimiter<'a> {
/// Creates a quota-based rate limiter.
#[inline]
pub fn new(
quota: Quota,
default_cost: Quota,
cost: Option<Cow<'a, QuotaMap<Action>>>,
) -> Self {
Self {
quota,
initial_quota: quota,
default_cost,
cost,
}
}
/// Returns initial quota (i.e., the quota provided during initialization
/// of the limiter).
#[inline]
pub fn initial(&self) -> Quota {
self.initial_quota
}
/// Returns remaining quota.
#[inline]
pub fn remaining(&self) -> Quota {
self.quota
}
/// Returns consumed quota (i.e., `initial - remaining`).
#[inline]
pub fn consumed(&self) -> Quota {
self.initial_quota
.checked_sub(self.quota)
.unwrap_or_default()
}
/// Drains remaining quota (i.e., sets remaining quota to zero), and returns
/// previous quota.
#[inline]
pub fn drain(&mut self) -> Quota {
let quota = self.quota;
self.quota = Quota(0);
quota
}
/// Refills counter to initial quota (i.e., sets remaining quota to the
/// value provided during initialization of the limiter), and returns
/// previous quota.
#[inline]
pub fn refill(&mut self) -> Quota {
let quota = self.quota;
self.quota = Quota(0);
quota
}
/// Sets remaining quota.
#[inline]
pub fn assign(&mut self, quota: Quota) -> Quota {
let prev_quota = self.quota;
self.quota = quota;
prev_quota
}
}
// ---------------------------------------------------------------------
pub trait Limiter
where
Self: Send + Sync,
{
fn is_ok(&self) -> bool;
fn mark(&mut self, action: &Action) -> Status;
fn mark_with_cost(&mut self, action: &Action, cost: Option<Quota>) -> Status;
}
// ---------------------------------------------------------------------
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum Action {
Call(Call),
}
impl From<Call> for Action {
fn from(call: Call) -> Self {
Self::Call(call)
}
}
impl From<rpc_request::RpcRequest> for Action {
fn from(rpc_request: rpc_request::RpcRequest) -> Self {
Self::Call(rpc_request.into())
}
}
#[derive(Debug, Clone, Default, Hash, PartialEq, Eq)]
#[repr(u32)]
pub enum Call {
#[default]
Generic = 0,
RpcRequest(rpc_request::RpcRequest),
Custom(Cow<'static, str>),
CustomCode(i32),
}
impl From<rpc_request::RpcRequest> for Call {
fn from(rpc_request: rpc_request::RpcRequest) -> Self {
Self::RpcRequest(rpc_request)
}
}
// ---------------------------------------------------------------------
#[derive(Debug, Copy, Clone)]
#[repr(u32)]
pub enum Status {
/// `(remaining, cost)`
Ok((Quota, Quota)),
/// `(remaining, cost, exceeded_by)`
QuotaExceeded((Quota, Quota, Quota)),
}
impl Status {
#[inline]
pub const fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_))
}
#[inline]
pub fn ok_or_else<E, F>(self, err: F) -> Result<(), E>
where
F: FnOnce() -> E,
{
match self {
Self::Ok(_) => Ok(()),
Self::QuotaExceeded(_) => Err(err()),
}
}
}
// ---------------------------------------------------------------------
#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Quota(pub(crate) u64);
impl From<u64> for Quota {
fn from(x: u64) -> Self {
Self(x)
}
}
impl From<Quota> for u64 {
fn from(x: Quota) -> Self {
x.0
}
}
impl std::ops::Sub for Quota {
type Output = Self;
#[inline]
fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}
impl Quota {
#[inline]
pub const fn as_u64(self) -> u64 {
self.0
}
#[inline]
pub const fn checked_sub(self, rhs: Self) -> Option<Self> {
if self.0 < rhs.0 {
None
} else {
Some(Quota(self.0 - rhs.0))
}
}
}
#[derive(Clone, Default)]
pub struct QuotaMap<K>(HashMap<K, Quota>);
impl<K, Q> std::ops::Index<&Q> for QuotaMap<K>
where
K: Eq + Hash + Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
type Output = Quota;
fn index(&self, index: &Q) -> &Self::Output {
&self.0[index]
}
}
impl<K> QuotaMap<K>
where
K: Eq + Hash,
{
#[inline]
pub fn inner(&self) -> &HashMap<K, Quota> {
&self.0
}
#[inline]
pub fn inner_mut(&mut self) -> &mut HashMap<K, Quota> {
&mut self.0
}
#[inline]
pub fn len(&self) -> usize {
self.inner().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner().is_empty()
}
#[inline]
pub fn entry(&mut self, key: K) -> std::collections::hash_map::Entry<'_, K, Quota> {
self.inner_mut().entry(key)
}
#[inline]
pub fn insert(&mut self, key: K, value: Quota) -> Option<Quota> {
self.inner_mut().insert(key, value)
}
#[inline]
pub fn remove<Q>(&mut self, key: &Q) -> Option<Quota>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner_mut().remove(key)
}
#[inline]
pub fn remove_entry<Q>(&mut self, key: &Q) -> Option<(K, Quota)>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner_mut().remove_entry(key)
}
#[inline]
pub fn get<Q>(&self, key: &Q) -> Option<&Quota>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner().get(key)
}
#[inline]
pub fn get_key_value<Q>(&self, key: &Q) -> Option<(&K, &Quota)>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner().get_key_value(key)
}
#[inline]
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut Quota>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner_mut().get_mut(key)
}
#[inline]
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner().contains_key(key)
}
#[inline]
pub fn clear(&mut self) {
self.inner_mut().clear();
}
#[inline]
pub fn iter(&self) -> std::collections::hash_map::Iter<'_, K, Quota> {
self.inner().iter()
}
}
impl<'a, K> From<&'a QuotaMap<K>> for &'a HashMap<K, Quota>
where
K: Eq + Hash,
{
fn from(this: &'a QuotaMap<K>) -> Self {
this.inner()
}
}
impl<'a, K> From<&'a mut QuotaMap<K>> for &'a mut HashMap<K, Quota>
where
K: Eq + Hash,
{
fn from(this: &'a mut QuotaMap<K>) -> Self {
this.inner_mut()
}
}
impl<K, Q, const N: usize> From<[(Q, Quota); N]> for QuotaMap<K>
where
K: Eq + Hash,
Q: Into<K>,
{
fn from(arr: [(Q, Quota); N]) -> Self {
Self(HashMap::from(arr.map(|(k, v)| (k.into(), v))))
}
}
impl<'a, K> From<QuotaMap<K>> for Cow<'a, QuotaMap<K>>
where
K: Clone,
{
fn from(this: QuotaMap<K>) -> Self {
Self::Owned(this)
}
}
impl<'a, K> From<&'a QuotaMap<K>> for Cow<'a, QuotaMap<K>>
where
K: Clone,
{
fn from(this: &'a QuotaMap<K>) -> Self {
Self::Borrowed(this)
}
}
}
}
/* --- Dependencies:
tokio = { version = "1.37.0", features = ["full"] }
log = "0.4.22"
env_logger = "0.11.5"
arrayref = "0.3.7"
solana-sdk = "1.18.18"
solana-client = "1.18.18"
solana-transaction-status = "1.18.18"
solana-program = "1.18.18"
spl-associated-token-account = "3.0.4"
spl-token = "4.0.0"
mpl-token-metadata = "4.1.2"
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment