Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/hotblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ impl HotblocksHandle {
Ok(result)
}

/// Like [`Self::get_finalized_head`], but treats a `null` body (no finalized
/// block yet) as `Ok(None)` instead of a deserialization error.
pub async fn get_finalized_head_opt(
&self,
dataset: &str,
) -> Result<Option<sqd_primitives::BlockRef>, HotblocksErr> {
let result = self
.request_finalized_head(dataset)
.await?
.error_for_status()?
.json()
.await?;

Ok(result)
}

pub async fn request_status(&self, dataset: &str) -> Result<reqwest::Response, HotblocksErr> {
let Some(url) = self.urls.get(dataset) else {
return Err(HotblocksErr::UnknownDataset);
Expand Down
37 changes: 35 additions & 2 deletions src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::utils::logging::MethodRouterExt;
use crate::{
config::Config,
controller::task_manager::TaskManager,
hotblocks::HotblocksHandle,
hotblocks::{traceless_key, HotblocksHandle},
network::{NetworkClient, NoWorker, NotReady},
types::{ChunkId, DatasetId, ParsedQuery, RequestError, StreamRequest},
utils::logging,
Expand Down Expand Up @@ -344,7 +344,18 @@ async fn get_finalized_head(
Extension(network): Extension<Arc<NetworkClient>>,
dataset: DatasetConfig,
) -> Response {
if dataset.hotblocks.is_some() {
if let Some(hotblocks_cfg) = &dataset.hotblocks {
// Traced and traceless variants finalize independently, and a stream may use
// either. Report the minimum of their heads so we never advertise a head one
// variant can't yet serve.
if hotblocks_cfg.dataset_traceless.is_some() {
let traceless_name = traceless_key(&dataset.default_name);
let (traced, traceless) = tokio::join!(
hotblocks.get_finalized_head_opt(&dataset.default_name),
hotblocks.get_finalized_head_opt(&traceless_name),
);
return min_finalized_head_response(traced, traceless);
}
return forward_hotblocks_response(
hotblocks
.request_finalized_head(&dataset.default_name)
Expand Down Expand Up @@ -1134,6 +1145,28 @@ where
}
}

/// Build a `/finalized-head` response with the lower of the traced and traceless heads,
/// so it's streamable by whichever variant a later stream uses.
///
/// If either variant has no finalized block yet (`None`), the head is `None`: advertising
/// `null` is safer than a head one variant can't serve. An error from either variant is
/// surfaced as-is rather than degrading to the other's (possibly too-high) head.
fn min_finalized_head_response(
traced: Result<Option<sqd_primitives::BlockRef>, HotblocksErr>,
traceless: Result<Option<sqd_primitives::BlockRef>, HotblocksErr>,
) -> Response {
match (traced, traceless) {
(Ok(traced), Ok(traceless)) => {
let head = match (traced, traceless) {
(Some(a), Some(b)) => Some(if a.number <= b.number { a } else { b }),
_ => None,
};
axum::Json(head).into_response()
}
(Err(e), _) | (_, Err(e)) => forward_hotblocks_response(Err(e)),
}
}

pub(crate) fn forward_hotblocks_response(
response: Result<reqwest::Response, HotblocksErr>,
) -> Response {
Expand Down
Loading