diff --git a/src/controller/task_manager.rs b/src/controller/task_manager.rs index 569ab24..3800d95 100644 --- a/src/controller/task_manager.rs +++ b/src/controller/task_manager.rs @@ -74,6 +74,8 @@ impl TaskManager { } let stream_index = self.next_stream_index.fetch_add(1, Ordering::Relaxed); + let dataset = request.dataset_id.clone(); + let request_id = request.request_id.clone(); let mut streamer = StreamController::new( request, self.network_client.clone(), @@ -93,7 +95,16 @@ impl TaskManager { None => break, Some(Ok(chunk)) => yield chunk, Some(Err(e)) => { - tracing::warn!("Stream got interrupted: {:?}", e); + // This warn is often the only signal of a systemic + // degradation (all streams breaking at once), so it + // must be attributable: which dataset, which request. + tracing::warn!( + dataset = %dataset, + request_id, + stream_index, + error = ?e, + "Stream got interrupted" + ); // There is no way to pass the error to the client break; } diff --git a/src/endpoints/stream.rs b/src/endpoints/stream.rs index 619bdef..6637597 100644 --- a/src/endpoints/stream.rs +++ b/src/endpoints/stream.rs @@ -317,8 +317,17 @@ async fn stream_from_network( res = res.header(FINALIZED_NUMBER_HEADER, head.number); res = res.header(FINALIZED_HASH_HEADER, head.hash); } - if let Some(head) = head_task.await.unwrap() { - res = res.header(HEAD_NUMBER_HEADER, head); + match head_task.await { + Ok(Some(head)) => { + res = res.header(HEAD_NUMBER_HEADER, head); + } + Ok(None) => {} + Err(e) => { + // A panicked head lookup must not 500 the whole stream (the header + // is best-effort), but it must leave a trace — a bare `.unwrap()` + // here used to kill the handler with no error-level log at all. + tracing::error!(error = %e, "head lookup task panicked; omitting head header"); + } } let body = response_body(stream, compression, config.use_gzjoin); res.header(header::CONTENT_TYPE, "application/jsonl") diff --git a/src/main.rs b/src/main.rs index bdde86e..b2787b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,6 +101,29 @@ fn setup_tracing(json: bool, log_span_durations: bool) { .init(); } +/// Panics in spawned tokio tasks bypass `tracing` entirely and die as bare +/// stderr text — invisible to JSON log pipelines, so a crash-looping pod shows +/// no structured cause. Log the panic through `tracing` before the default +/// handler runs (the default still prints the backtrace and aborts as before). +fn install_panic_hook() { + let default_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + let message = if let Some(s) = info.payload().downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = info.payload().downcast_ref::() { + s.clone() + } else { + "unknown panic payload".to_string() + }; + let location = info + .location() + .map(|l| l.to_string()) + .unwrap_or_else(|| "unknown".to_string()); + tracing::error!(panic = true, message, location, "thread panicked"); + default_hook(info); + })); +} + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); @@ -113,6 +136,7 @@ async fn main() -> anyhow::Result<()> { .then(|| setup_sentry(&args.config, &args)); setup_tracing(args.json_log, args.log_span_durations); + install_panic_hook(); let datasets = Arc::new(RwLock::new(Datasets::load(&args.config).await?, "datasets")); diff --git a/src/network/client.rs b/src/network/client.rs index a82359e..215dd29 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -318,6 +318,7 @@ impl NetworkClient { let mut first_fetch = true; let mut current_epoch: u32 = 0; let mut operator; + let mut consecutive_failures: u32 = 0; loop { if first_iteration { first_iteration = false; @@ -339,9 +340,24 @@ impl NetworkClient { epoch_started, compute_units_per_epoch, ) = match self.fetch_blockchain_state().await { - Ok(data) => data, + Ok(data) => { + consecutive_failures = 0; + data + } Err(e) => { - tracing::warn!("Couldn't get blockchain data: {e}"); + // Fires once per polling interval while the chain RPC is + // unavailable; the counter separates a one-off blip from a + // persistent outage (contracts_state silently going stale). + consecutive_failures += 1; + if consecutive_failures >= 5 { + tracing::error!( + consecutive_failures, + error = %e, + "Couldn't get blockchain data; contracts state is stale" + ); + } else { + tracing::warn!(consecutive_failures, error = %e, "Couldn't get blockchain data"); + } continue; } }; @@ -471,7 +487,15 @@ impl NetworkClient { self.network_state.get_height(dataset) } - #[instrument(skip_all, level = "debug", fields(query_id))] + #[instrument(skip_all, level = "debug", fields( + query_id, + peer_id = %lease.worker(), + dataset = %chunk_id.dataset, + chunk = %chunk_id.chunk, + first_block = *block_range.start(), + last_block = *block_range.end(), + request_id, + ))] pub async fn query_worker( self: Arc, lease: WorkerLease, @@ -695,16 +719,25 @@ impl NetworkClient { }) } + // Worker-query failures used to be metrics-only: the most common cause of + // elevated 5xx (worker timeouts / transport errors) left no trace in logs. + // Emit one warn per failure so log readers can localize the failing peer + // and failure kind; events inherit the `query_worker` span fields + // (dataset/chunk/request_id). Known risk: volume scales with the failure + // rate, so a large incident produces a noticeable stream of warns — if + // that proves too noisy, rate-limit here rather than dropping the events. fn convert_query_failure(&self, peer_id: PeerId, failure: QueryFailure) -> QueryError { match failure { QueryFailure::InvalidRequest(e) => { metrics::report_query_result(&peer_id, "invalid"); self.network_state.report_query_success(peer_id, None); + tracing::warn!(peer_id = %peer_id, kind = "invalid_request", error = %e, "worker query failed"); QueryError::Failure(format!("portal tried to send invalid request: {e}")) } QueryFailure::InvalidResponse(e) => { metrics::report_query_result(&peer_id, "invalid"); self.network_state.report_query_error(peer_id); + tracing::warn!(peer_id = %peer_id, kind = "invalid_response", error = %e, "worker query failed"); QueryError::Retriable(format!("couldn't decode response: {e}")) } QueryFailure::Timeout(t) => { @@ -714,11 +747,13 @@ impl NetworkClient { StreamClientTimeout::Connect => "timed out connecting to the peer", StreamClientTimeout::Request => "timed out reading response", }; + tracing::warn!(peer_id = %peer_id, kind = "timeout", phase = msg, "worker query failed"); QueryError::Retriable(msg.to_owned()) } QueryFailure::TransportError(e) => { metrics::report_query_result(&peer_id, "transport_error"); self.network_state.report_query_failure(peer_id); + tracing::warn!(peer_id = %peer_id, kind = "transport_error", error = %e, "worker query failed"); QueryError::Retriable(format!("transport error: {e}")) } } @@ -731,6 +766,7 @@ impl NetworkClient { ReadError::TooLarge => "response too large".to_owned(), ReadError::Transport(e) => format!("transport error: {e}"), }; + tracing::warn!(peer_id = %peer_id, kind = "read_error", error = %msg, "worker query failed"); QueryError::Retriable(msg) } diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 180a01a..ccd963c 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -86,6 +86,9 @@ impl StreamStats { // ); tracing::info!( dataset = %request.dataset_id, + // The summary is written on Drop, outside the `http_request` span — + // without an explicit field it cannot be joined with the access log. + request_id = %request.request_id, first_block = request.query.first_block(), last_block = request.query.last_block(), queries_sent = self.queries_sent,