-
Notifications
You must be signed in to change notification settings - Fork 3
Export hotblocks storage size metrics NET-910 #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ use anyhow::{Context, anyhow}; | |
| use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; | ||
| use sqd_data_client::reqwest::ReqwestDataClient; | ||
| use sqd_primitives::{BlockNumber, BlockRef}; | ||
| use sqd_storage::db::{Chunk, CompactionStatus, DatasetId}; | ||
| use sqd_storage::db::{Chunk, CompactionStatus, Database, DatasetId}; | ||
| use tokio::{select, task::JoinHandle, time::Instant}; | ||
| use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; | ||
|
|
||
|
|
@@ -24,17 +24,29 @@ pub struct DatasetController { | |
| head_receiver: tokio::sync::watch::Receiver<Option<BlockRef>>, | ||
| finalized_head_receiver: tokio::sync::watch::Receiver<Option<BlockRef>>, | ||
| compaction_enabled_sender: tokio::sync::watch::Sender<bool>, | ||
| stats_receiver: tokio::sync::watch::Receiver<DatasetStats>, | ||
| task: JoinHandle<()>, | ||
| compaction_task: JoinHandle<()> | ||
| compaction_task: JoinHandle<()>, | ||
| stats_task: JoinHandle<()> | ||
| } | ||
|
|
||
| impl Drop for DatasetController { | ||
| fn drop(&mut self) { | ||
| self.task.abort(); | ||
| self.compaction_task.abort(); | ||
| self.stats_task.abort(); | ||
| } | ||
| } | ||
|
|
||
| /// Dataset metrics refreshed by [`dataset_stats_loop`] every ~60s. | ||
| #[derive(Clone, Debug, Default)] | ||
| pub struct DatasetStats { | ||
| pub first_block: Option<BlockNumber>, | ||
| pub last_block_time: Option<i64>, | ||
| /// `None` until computed once, distinguishing a fresh process from an empty dataset. | ||
| pub size_bytes: Option<u64> | ||
| } | ||
|
|
||
| impl DatasetController { | ||
| #[instrument(name = "dataset", skip_all, fields(dataset_id = %dataset_id))] | ||
| pub fn new( | ||
|
|
@@ -54,6 +66,7 @@ impl DatasetController { | |
| let (head_sender, head_receiver) = tokio::sync::watch::channel(None); | ||
| let (finalized_head_sender, finalized_head_receiver) = tokio::sync::watch::channel(None); | ||
| let (compaction_enabled_sender, compaction_enabled_receiver) = tokio::sync::watch::channel(false); | ||
| let (stats_sender, stats_receiver) = tokio::sync::watch::channel(DatasetStats::default()); | ||
|
|
||
| let _ = head_sender.send(write.head().cloned()); | ||
| let _ = finalized_head_sender.send(write.finalized_head().cloned()); | ||
|
|
@@ -70,6 +83,8 @@ impl DatasetController { | |
|
|
||
| let task = tokio::spawn(ctl.run(write).in_current_span()); | ||
|
|
||
| let stats_task = tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span()); | ||
|
|
||
| let compaction_task = | ||
| tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span()); | ||
|
|
||
|
|
@@ -80,8 +95,10 @@ impl DatasetController { | |
| head_receiver, | ||
| finalized_head_receiver, | ||
| compaction_enabled_sender, | ||
| stats_receiver, | ||
| task, | ||
| compaction_task | ||
| compaction_task, | ||
| stats_task | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -117,6 +134,10 @@ impl DatasetController { | |
| self.retention_sender.borrow().clone() | ||
| } | ||
|
|
||
| pub fn get_stats(&self) -> DatasetStats { | ||
| self.stats_receiver.borrow().clone() | ||
| } | ||
|
|
||
| pub async fn wait_for_block(&self, block_number: BlockNumber) -> BlockNumber { | ||
| let mut recv = self.head_receiver.clone(); | ||
| loop { | ||
|
|
@@ -591,6 +612,51 @@ async fn fetch_chain_top(clients: Vec<ReqwestDataClient>) -> BlockNumber { | |
| } | ||
| } | ||
|
|
||
| #[instrument(name = "dataset_stats", skip_all)] | ||
| async fn dataset_stats_loop(db: DBRef, dataset_id: DatasetId, sender: tokio::sync::watch::Sender<DatasetStats>) { | ||
| const REFRESH: Duration = Duration::from_secs(60); | ||
|
|
||
| // Delay the first run by a per-dataset offset so the loops don't hit RocksDB together. | ||
| let offset = dataset_id | ||
| .as_ref() | ||
| .iter() | ||
| .fold(0u64, |acc, &b| acc.wrapping_add(b as u64)) | ||
| % REFRESH.as_secs(); | ||
| tokio::time::sleep(Duration::from_secs(offset)).await; | ||
|
|
||
| loop { | ||
| let db = db.clone(); | ||
| let span = tracing::Span::current(); | ||
| let result = tokio::task::spawn_blocking(move || { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more blocking task per dataset, per minute doesn't look right to me.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've measured the time of running, and it's <40 ms per dataset at worst, without blocking anything.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, would you mind sharing some details related to the overall load, size of the db, etc? |
||
| let _s = span.enter(); | ||
| compute_dataset_stats(&db, dataset_id) | ||
| }) | ||
| .await; | ||
|
|
||
| match result { | ||
| Ok(Ok(stats)) => { | ||
| let _ = sender.send(stats); | ||
| } | ||
| Ok(Err(err)) => error!(reason =? err, "failed to estimate dataset stats"), | ||
| Err(err) => error!(reason =? err, "dataset stats task panicked") | ||
| } | ||
|
|
||
| tokio::time::sleep(REFRESH).await; | ||
| } | ||
| } | ||
|
|
||
| fn compute_dataset_stats(db: &Database, dataset_id: DatasetId) -> anyhow::Result<DatasetStats> { | ||
| let snapshot = db.snapshot(); | ||
| let first_block = snapshot.get_first_chunk(dataset_id)?.map(|c| c.first_block()); | ||
| let last_block_time = snapshot.get_last_chunk(dataset_id)?.and_then(|c| c.last_block_time()); | ||
| let size_bytes = snapshot.estimate_dataset_size(dataset_id)?; | ||
| Ok(DatasetStats { | ||
| first_block, | ||
| last_block_time, | ||
| size_bytes: Some(size_bytes) | ||
| }) | ||
| } | ||
|
|
||
| #[instrument(name = "compaction", skip_all)] | ||
| async fn compaction_loop(db: DBRef, dataset_id: DatasetId, mut enabled: tokio::sync::watch::Receiver<bool>) { | ||
| let mut skips = 0; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better move this to config parameters to better control the overhead, that this and similar loop causes