Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.12.1"
version = "2.12.2"
edition = "2021"

[[bin]]
Expand Down
9 changes: 9 additions & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN
# takes to fetch all symbols.
# oracle.max_lookup_batch_size = 100

# Number of workers used to wait for the handle_price_account_update
# oracle.handle_price_account_update_worker_poll_size = 25
# Channel size used to wait for the handle_price_account_update
# oracle.handle_price_account_update_channel_size = 1000
# Minimum time for a subscriber to run
# oracle.subscriber_finished_min_time = "30s"
# Time to sleep if the subscriber do not run for more than the minimum time
# oracle.subscriber_finished_sleep_time = "1s"

# How often to refresh the cached network state (current slot and blockhash).
# It is recommended to set this to slightly less than the network's block time,
# as the slot fetched will be used as the time of the price update.
Expand Down
56 changes: 43 additions & 13 deletions src/agent/services/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use {
},
state::oracle::Oracle,
},
anyhow::Result,
anyhow::{
Context,
Result,
},
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::{
Expand Down Expand Up @@ -67,6 +70,12 @@ where
)));

if config.oracle.subscriber_enabled {
let number_of_workers = config.oracle.handle_price_account_update_worker_poll_size;
let (sender, receiver) =
tokio::sync::mpsc::channel(config.oracle.handle_price_account_update_channel_size);
let min_elapsed_time = config.oracle.subscriber_finished_min_time;
let sleep_time = config.oracle.subscriber_finished_sleep_time;

handles.push(tokio::spawn(async move {
loop {
let current_time = Instant::now();
Expand All @@ -75,17 +84,34 @@ where
network,
state.clone(),
key_store.pyth_oracle_program_key,
sender.clone(),
)
.await
{
tracing::error!(err = ?err, "Subscriber exited unexpectedly.");
if current_time.elapsed() < Duration::from_secs(30) {
tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second.");
tokio::time::sleep(Duration::from_secs(1)).await;
tracing::error!(?err, "Subscriber exited unexpectedly");
if current_time.elapsed() < min_elapsed_time {
tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
tokio::time::sleep(sleep_time).await;
}
}
}
}));

let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
for _ in 0..number_of_workers {
let receiver = receiver.clone();
handles.push(tokio::spawn(async move {
loop {
let mut receiver = receiver.lock().await;
if let Some(task) = receiver.recv().await {
drop(receiver);
if let Err(err) = task.await {
tracing::error!(%err, "error running price update");
}
}
}
}));
}
}

handles
Expand All @@ -102,6 +128,7 @@ async fn subscriber<S>(
network: Network,
state: Arc<S>,
program_key: Pubkey,
sender: tokio::sync::mpsc::Sender<tokio::task::JoinHandle<()>>,
) -> Result<()>
where
S: Oracle,
Expand Down Expand Up @@ -129,14 +156,17 @@ where
Some(account) => {
let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
let state = state.clone();
tokio::spawn(async move {
if let Err(err) =
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
.await
{
tracing::error!(err = ?err, "Failed to handle account update.");
}
});
sender
.send(tokio::spawn(async move {
if let Err(err) =
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
.await
{
tracing::error!(?err, "Failed to handle account update");
}
}))
.await
.context("sending handle_price_account_update task to worker")?;
}

None => {
Expand Down
55 changes: 47 additions & 8 deletions src/agent/state/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ pub struct Data {
pub publisher_buffer_key: Option<Pubkey>,
}

fn default_handle_price_account_update_channel_size() -> usize {
1000
}

fn default_handle_price_account_update_worker_poll_size() -> usize {
25
}

fn default_subscriber_finished_min_time() -> Duration {
Duration::from_secs(30)
}

fn default_subscriber_finished_sleep_time() -> Duration {
Duration::from_secs(1)
}

#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct Config {
Expand All @@ -159,17 +175,36 @@ pub struct Config {
/// socket count at bay, the batches are looked up sequentially,
/// trading off overall time it takes to fetch all symbols.
pub max_lookup_batch_size: usize,

/// Number of workers used to wait for the handle_price_account_update
#[serde(default = "default_handle_price_account_update_worker_poll_size")]
pub handle_price_account_update_worker_poll_size: usize,
/// Channel size used to wait for the handle_price_account_update
#[serde(default = "default_handle_price_account_update_channel_size")]
pub handle_price_account_update_channel_size: usize,
/// Minimum time for a subscriber to run
#[serde(default = "default_subscriber_finished_min_time")]
pub subscriber_finished_min_time: Duration,
/// Time to sleep if the subscriber do not run for more than the minimum time
#[serde(default = "default_subscriber_finished_sleep_time")]
pub subscriber_finished_sleep_time: Duration,
}

impl Default for Config {
fn default() -> Self {
Self {
commitment: CommitmentLevel::Confirmed,
poll_interval_duration: Duration::from_secs(5),
subscriber_enabled: true,
updates_channel_capacity: 10000,
data_channel_capacity: 10000,
max_lookup_batch_size: 100,
commitment: CommitmentLevel::Confirmed,
poll_interval_duration: Duration::from_secs(5),
subscriber_enabled: true,
updates_channel_capacity: 10000,
data_channel_capacity: 10000,
max_lookup_batch_size: 100,
handle_price_account_update_worker_poll_size:
default_handle_price_account_update_worker_poll_size(),
handle_price_account_update_channel_size:
default_handle_price_account_update_channel_size(),
subscriber_finished_min_time: default_subscriber_finished_min_time(),
subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
}
}
}
Expand Down Expand Up @@ -241,6 +276,7 @@ where
);

data.price_accounts.insert(*account_key, price_entry.into());
drop(data);

Prices::update_global_price(
self,
Expand Down Expand Up @@ -333,13 +369,16 @@ where
let mut data = self.into().data.write().await;
log_data_diff(&data, &new_data);
*data = new_data;
let data_publisher_permissions = data.publisher_permissions.clone();
let data_publisher_buffer_key = data.publisher_buffer_key;
drop(data);

Exporter::update_on_chain_state(
self,
network,
publish_keypair,
data.publisher_permissions.clone(),
data.publisher_buffer_key,
data_publisher_permissions,
data_publisher_buffer_key,
)
.await?;

Expand Down