Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/controller/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ impl TaskManager {
}

let stream_index = self.next_stream_index.fetch_add(1, Ordering::Relaxed);
let dataset = request.dataset_id.clone();
let request_id = request.request_id.clone();
let mut streamer = StreamController::new(
request,
self.network_client.clone(),
Expand All @@ -93,7 +95,16 @@ impl TaskManager {
None => break,
Some(Ok(chunk)) => yield chunk,
Some(Err(e)) => {
tracing::warn!("Stream got interrupted: {:?}", e);
// This warn is often the only signal of a systemic
// degradation (all streams breaking at once), so it
// must be attributable: which dataset, which request.
tracing::warn!(
dataset = %dataset,
request_id,
stream_index,
error = ?e,
"Stream got interrupted"
);
// There is no way to pass the error to the client
break;
}
Expand Down
13 changes: 11 additions & 2 deletions src/endpoints/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,17 @@ async fn stream_from_network(
res = res.header(FINALIZED_NUMBER_HEADER, head.number);
res = res.header(FINALIZED_HASH_HEADER, head.hash);
}
if let Some(head) = head_task.await.unwrap() {
res = res.header(HEAD_NUMBER_HEADER, head);
match head_task.await {
Ok(Some(head)) => {
res = res.header(HEAD_NUMBER_HEADER, head);
}
Ok(None) => {}
Err(e) => {
// A panicked head lookup must not 500 the whole stream (the header
// is best-effort), but it must leave a trace — a bare `.unwrap()`
// here used to kill the handler with no error-level log at all.
tracing::error!(error = %e, "head lookup task panicked; omitting head header");
}
}
let body = response_body(stream, compression, config.use_gzjoin);
res.header(header::CONTENT_TYPE, "application/jsonl")
Expand Down
24 changes: 24 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ fn setup_tracing(json: bool, log_span_durations: bool) {
.init();
}

/// Panics in spawned tokio tasks bypass `tracing` entirely and die as bare
/// stderr text — invisible to JSON log pipelines, so a crash-looping pod shows
/// no structured cause. Log the panic through `tracing` before the default
/// handler runs (the default still prints the backtrace and aborts as before).
fn install_panic_hook() {
let default_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
let message = if let Some(s) = info.payload().downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = info.payload().downcast_ref::<String>() {
s.clone()
} else {
"unknown panic payload".to_string()
};
let location = info
.location()
.map(|l| l.to_string())
.unwrap_or_else(|| "unknown".to_string());
tracing::error!(panic = true, message, location, "thread panicked");
default_hook(info);
}));
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Expand All @@ -113,6 +136,7 @@ async fn main() -> anyhow::Result<()> {
.then(|| setup_sentry(&args.config, &args));

setup_tracing(args.json_log, args.log_span_durations);
install_panic_hook();

let datasets = Arc::new(RwLock::new(Datasets::load(&args.config).await?, "datasets"));

Expand Down
42 changes: 39 additions & 3 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl NetworkClient {
let mut first_fetch = true;
let mut current_epoch: u32 = 0;
let mut operator;
let mut consecutive_failures: u32 = 0;
loop {
if first_iteration {
first_iteration = false;
Expand All @@ -339,9 +340,24 @@ impl NetworkClient {
epoch_started,
compute_units_per_epoch,
) = match self.fetch_blockchain_state().await {
Ok(data) => data,
Ok(data) => {
consecutive_failures = 0;
data
}
Err(e) => {
tracing::warn!("Couldn't get blockchain data: {e}");
// Fires once per polling interval while the chain RPC is
// unavailable; the counter separates a one-off blip from a
// persistent outage (contracts_state silently going stale).
consecutive_failures += 1;
if consecutive_failures >= 5 {
tracing::error!(
consecutive_failures,
error = %e,
"Couldn't get blockchain data; contracts state is stale"
);
} else {
tracing::warn!(consecutive_failures, error = %e, "Couldn't get blockchain data");
}
continue;
}
};
Expand Down Expand Up @@ -471,7 +487,15 @@ impl NetworkClient {
self.network_state.get_height(dataset)
}

#[instrument(skip_all, level = "debug", fields(query_id))]
#[instrument(skip_all, level = "debug", fields(
query_id,
peer_id = %lease.worker(),
dataset = %chunk_id.dataset,
chunk = %chunk_id.chunk,
first_block = *block_range.start(),
last_block = *block_range.end(),
request_id,
))]
pub async fn query_worker(
self: Arc<Self>,
lease: WorkerLease,
Expand Down Expand Up @@ -695,16 +719,25 @@ impl NetworkClient {
})
}

// Worker-query failures used to be metrics-only: the most common cause of
// elevated 5xx (worker timeouts / transport errors) left no trace in logs.
// Emit one warn per failure so log readers can localize the failing peer
// and failure kind; events inherit the `query_worker` span fields
// (dataset/chunk/request_id). Known risk: volume scales with the failure
// rate, so a large incident produces a noticeable stream of warns — if
// that proves too noisy, rate-limit here rather than dropping the events.
fn convert_query_failure(&self, peer_id: PeerId, failure: QueryFailure) -> QueryError {
match failure {
QueryFailure::InvalidRequest(e) => {
metrics::report_query_result(&peer_id, "invalid");
self.network_state.report_query_success(peer_id, None);
tracing::warn!(peer_id = %peer_id, kind = "invalid_request", error = %e, "worker query failed");
QueryError::Failure(format!("portal tried to send invalid request: {e}"))
}
QueryFailure::InvalidResponse(e) => {
metrics::report_query_result(&peer_id, "invalid");
self.network_state.report_query_error(peer_id);
tracing::warn!(peer_id = %peer_id, kind = "invalid_response", error = %e, "worker query failed");
QueryError::Retriable(format!("couldn't decode response: {e}"))
}
QueryFailure::Timeout(t) => {
Expand All @@ -714,11 +747,13 @@ impl NetworkClient {
StreamClientTimeout::Connect => "timed out connecting to the peer",
StreamClientTimeout::Request => "timed out reading response",
};
tracing::warn!(peer_id = %peer_id, kind = "timeout", phase = msg, "worker query failed");
QueryError::Retriable(msg.to_owned())
}
QueryFailure::TransportError(e) => {
metrics::report_query_result(&peer_id, "transport_error");
self.network_state.report_query_failure(peer_id);
tracing::warn!(peer_id = %peer_id, kind = "transport_error", error = %e, "worker query failed");
QueryError::Retriable(format!("transport error: {e}"))
}
}
Expand All @@ -731,6 +766,7 @@ impl NetworkClient {
ReadError::TooLarge => "response too large".to_owned(),
ReadError::Transport(e) => format!("transport error: {e}"),
};
tracing::warn!(peer_id = %peer_id, kind = "read_error", error = %msg, "worker query failed");
QueryError::Retriable(msg)
}

Expand Down
3 changes: 3 additions & 0 deletions src/utils/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ impl StreamStats {
// );
tracing::info!(
dataset = %request.dataset_id,
// The summary is written on Drop, outside the `http_request` span —
// without an explicit field it cannot be joined with the access log.
request_id = %request.request_id,
first_block = request.query.first_block(),
last_block = request.query.last_block(),
queries_sent = self.queries_sent,
Expand Down
Loading