From 635d84dfad10ae86bb5a9d86a5942d6c08094416 Mon Sep 17 00:00:00 2001 From: "wangxinshuo.db" Date: Fri, 3 Jul 2026 15:58:01 +0800 Subject: [PATCH 1/2] [Docs] add doc for features: row based spill, adaptive hash partition, memory management pushdown --- .../2026-07-03-adaptive-spill-partition.md | 472 ++++++++++++++++++ .../2026-07-03-memory-management-offload.md | 387 ++++++++++++++ doc/blog/_posts/2026-07-03-row-based-spill.md | 466 +++++++++++++++++ 3 files changed, 1325 insertions(+) create mode 100644 doc/blog/_posts/2026-07-03-adaptive-spill-partition.md create mode 100644 doc/blog/_posts/2026-07-03-memory-management-offload.md create mode 100644 doc/blog/_posts/2026-07-03-row-based-spill.md diff --git a/doc/blog/_posts/2026-07-03-adaptive-spill-partition.md b/doc/blog/_posts/2026-07-03-adaptive-spill-partition.md new file mode 100644 index 000000000..7b827d97b --- /dev/null +++ b/doc/blog/_posts/2026-07-03-adaptive-spill-partition.md @@ -0,0 +1,472 @@ +--- +layout: post +title: "Adaptive Spill Partitioning: Using Runtime Row Counts to Reduce Recursive HashJoin Spill" +date: 2026-07-03 +author: "Bolt Community" +parent: Blog +nav_order: 8 +--- + +When the build side of a HashJoin exceeds its memory budget, the hash table has to be split by hash partition and written to disk. The basic mechanism is straightforward: split build rows into partitions, read partitions back one by one, rebuild the hash table for each partition, and then match it with the probe side. + +The hard part is choosing the partition count. If the first spill produces partitions that are still too large, reading back a partition will exceed memory again. The engine then has to repartition and spill that partition again. Data goes through repeated write, read, repartition, and write cycles, which can multiply spill size, I/O, compression work, and hash computation. + +Adaptive spill partitioning does not simply make the partition count larger by default. Instead, when the first memory pressure signal appears, it uses runtime row-count information and the observed HashBuild memory state to estimate a better spill partition count. A static configuration problem becomes a runtime decision: how many build rows can the current memory budget approximately hold, how many build rows are expected in total, and therefore how many partitions are needed so that each partition is more likely to fit in memory when restored. + +## Why Fixed Partition Counts Amplify Spill + +Traditional HashJoin spill is usually controlled by `joinPartitionBits`: + +```text +partition_count = 2 ^ joinPartitionBits +``` + +If `joinPartitionBits = 2`, the first spill creates only 4 partitions. If the build side is much larger than the memory available to the task, each partition can still exceed memory after the first split. + +The execution path then looks like this: + +```text +Build side rows + | + v +HashBuild runs out of memory + | + v +Split into 4 partitions with fixed bits and write them to disk + | + v +Read partition 0 back + | + v +partition 0 still does not fit in memory + | + v +Split again into 0-0, 0-1, 0-2, 0-3 and write them to disk + | + v +Other partitions repeat the same process +``` + +The cost of this recursive spill is not just a few extra files. Each spill level adds hash repartitioning, serialization, compression, decompression, reader construction, file metadata management, and CPU work during restore. For a large build side, total spill writes can approach two or three times the original data size, or even more. + +The fixed-partition approach lacks two pieces of information: + +- How many rows the whole build side contains. +- How many rows the current task was able to process before the first spill. + +The first spill point gives the engine the second signal. If the upstream plan can also provide the first signal, the engine can choose a partition count that is much closer to the actual data scale. + +## Key Insight: The First Spill Is a Capacity Sample + +The first spill is not only a failure signal. It is also a useful capacity sample. + +During HashBuild, if memory pressure appears after `processedRowCount` rows have been processed, the engine can treat that point as an approximation of: + +```text +How many build rows this HashBuild can hold under the current memory budget +``` + +If the upstream side also provides the total build-side row count `totalRowCount`, the target partition count can be estimated as: + +```text +target_partitions ~= totalRowCount / rows_fit_in_memory +``` + +Here, `rows_fit_in_memory` does not come from a static knob. It comes from runtime state. The implementation also caps this capacity estimate with the hash table bucket limit, so that the estimated number of rows per partition is not overly optimistic. + +For example, suppose the build side is estimated to contain 10 million rows, and the first memory shortage appears around 800 thousand processed rows. With a fixed 4-way split, each partition would still average 2.5 million rows and would likely spill again during restore. Adaptive partitioning tends to raise the partition count toward a size such as 16 or 32, so that each partition is closer to the capacity observed at the first spill point. + +This idea has two important boundaries: + +- Row count is only a proxy for capacity, not byte size. Wide rows, variable-length fields, and hash table overhead all affect real memory usage. +- More partitions reduce the average partition size, but they do not eliminate hot partitions caused by hash key skew. + +For that reason, the mechanism is a conservative enhancement: it only increases partition bits when the estimate is larger than the current configuration. It never lowers the existing partition count. + +## How Row Counts Reach HashBuild + +Adaptive partitioning depends on a lightweight data contract. An upstream operator exposes total row count and processed row count through runtime metrics. When HashBuild needs a spill decision, it scans preceding operators to find those metrics. + +The flow can be simplified as follows: + +```text +Spark / Gluten collects or estimates row counts + | + v +Substrait / native plan carries a row-count hint + | + v +Bolt source or blocking operator writes runtime metrics + | + v +OperatorCtx scans backward from the current HashBuild operator + | + v +HashBuild reads totalRowCount / processedRowCount + | + v +SpillConfig partition bits are updated + | + v +Spiller writes partitions with the new hash bit range +``` + +The relevant runtime metrics have the following meaning: + +| Metric | Meaning | +|---|---| +| `kCanUsedToEstimateHashBuildPartitionNum` | This operator has row-count information usable for HashBuild partition estimation | +| `kTotalRowCount` | The total output row count represented by the operator, or an upstream total-row estimate | +| `kHasBeenProcessedRowCount` | The number of rows already emitted or processed toward the downstream operator | + +Different operators provide these metrics in different ways: + +| Operator Type | Row-Count Source | +|---|---| +| TableScan | Row-count estimate carried by the scan plan; processed rows are updated as output batches are produced | +| ValueStream / shuffle input | Total row count carried by the upstream stream input; processed rows are accumulated as RowVectors are emitted | +| Sort | As a blocking operator, input rows can represent total rows after input is fully consumed; output rows represent current output progress | +| HashAggregation | In non-spill cases, group count and output rows can be used; in spill cases, spill-row statistics can be used | + +HashBuild does not depend on a specific source operator. It only needs a preceding operator in the same driver to expose these three metrics. `OperatorCtx` scans backward from the current operator and uses the first provider marked as usable for estimation. + +This keeps the row-count path loosely coupled. Gluten, TableScan, shuffle input, and blocking operators can each provide the best row count they know, while HashBuild consumes the same runtime metric contract. + +## First Spill: Adjusting `joinPartitionBits` + +When HashBuild first fails to reserve enough memory for input, it tries to calculate new join partition bits. The simplified flow is: + +```text +HashBuild::ensureInputFits(input) + | + v +reserveMemory(input) fails + | + v +If this is the first spill: + | + +--> Read totalRowCount / processedRowCount from preceding operators + | + +--> Record spillThreshold = current RowContainer row count + | + +--> Record memoryUsedForFirstSpill = current pool usage + | + +--> Estimate the target partition count + | + +--> If estimated bits exceed current joinPartitionBits: + update joinPartitionBits + resetSpiller() + setupSpiller() + | + v +requestSpill(input) +``` + +The core computation can be expressed in pseudocode close to the implementation: + +```cpp +rowsInMem = rowContainer.numRows(); +readRuntimeMetric(totalRowCount, processedRowCount); + +spillThreshold = rowsInMem; +memoryUsedForFirstSpill = pool.currentBytes(); + +if (totalRowCount == 0 || processedRowCount == 0) { + return unchanged; +} + +maxRowsPerPartition = + min(maxHashTableBucketCount, processedRowCount); + +targetPartitions = + ceil(totalRowCount / maxRowsPerPartition); + +estimatedBits = conservativeBitLength(targetPartitions); + +if (targetPartitions is close to a small power-of-two boundary) { + estimatedBits += 1; +} + +if (estimatedBits > joinPartitionBits) { + joinPartitionBits = min(estimatedBits, maxAllowedBits); +} +``` + +There are several details worth calling out. + +First, `processedRowCount` is used as a capacity signal, but not blindly. The implementation takes the smaller value between `processedRowCount` and `maxHashTableBucketCount`, which prevents the estimated per-partition capacity from exceeding a reasonable structural limit of the hash table. + +Second, the partition count must eventually be converted into bits because Spiller works with hash bit ranges. The implementation rounds conservatively instead of trying to find the mathematically smallest `2^n`. If the target partition count is already close to the upper bound of a small power-of-two range, it adds one more bit to leave room for skew, row-width changes, and estimation error. + +Third, the algorithm only increases bits: + +```text +estimatedBits <= current joinPartitionBits + | + v +keep the existing configuration +``` + +This rule matters. The adaptive logic enhances the fixed configuration; it does not replace all of its semantics. If row-count information is missing, the estimate is too small, or the current configuration is already sufficient, HashBuild continues to use the existing bits. + +## Restoring a Partition: Adjusting `joinRepartitionBits` + +The first-spill decision only answers whether the first split is fine-grained enough. Execution still needs to handle another case: a spilled partition may still be too large when it is restored. + +For this reason, when HashBuild sets up the reader for a spilled partition, it also estimates `joinRepartitionBits` for the next repartition level. + +The inputs differ between the first spill and the restore path: + +| Phase | Total-Size Signal | Capacity Signal | Field Adjusted | +|---|---|---|---| +| First spill | Total build-side row count `totalRowCount` | Current processed row count `processedRowCount` | `joinPartitionBits` | +| Restoring a spilled partition | Maximum row count of the current partition `maxPartitionRowCount` | First-spill threshold `spillThreshold` | `joinRepartitionBits` | + +The restore logic can be understood as: + +```text +Restoring partition P + | + v +P may still contain more rows than the first-spill capacity sample + | + v +Estimate next-level partitions with maxPartitionRowCount / spillThreshold + | + v +If more bits are needed, increase joinRepartitionBits +``` + +This path directly targets recursive spill. If the first split is already sufficient, the restored partition does not need to be split again. If a partition is still too large, the next repartition level uses a more suitable number of bits instead of mechanically reusing the initial configuration. + +The implementation also uses the remaining hash-bit range as an upper bound to avoid overflowing the partition bit range: + +```text +startBit = hash bit offset already consumed by the current partition +maxAllowedBits = 64 - startBit +joinRepartitionBits <= maxAllowedBits +``` + +`maxSpillLevel` still applies. Once the maximum spill level is exceeded, the system stops continuing recursive spill, which prevents unbounded splitting under extreme data volume or severe skew. + +## `SpillConfig` Is Where the Decision Takes Effect + +Adaptive partitioning does not bypass Spiller or manipulate files directly. It modifies the partition-bit fields in `SpillConfig`, then lets the existing Spiller operate with the new hash bit range. + +The relevant fields are: + +| Field | Purpose | +|---|---| +| `startPartitionBit` | Starting hash bit used for spill partitioning | +| `joinPartitionBits` | Partition bits used by the first HashJoin spill | +| `joinRepartitionBits` | Partition bits used for repartition when spill level is greater than 0 | +| `maxSpillLevel` | Maximum allowed level of recursive spill | +| `spillPartitionsAdaptiveThreshold` | Threshold controlling extra conservative growth for small partition counts | + +`setupSpiller()` turns these fields into a concrete `HashBitRange`: + +```text +First spill: + [startPartitionBit, startPartitionBit + joinPartitionBits) + +Repartition after restore: + [startBit, startBit + joinRepartitionBits) +``` + +The adaptive logic only changes how many hash bits are used to split build rows. It does not change join semantics or the basic spill and restore model. + +## Already-Spilled Partitions Go Directly to Disk + +Adaptive bits work well with another HashBuild spill behavior: once a partition is marked as spilled, new input rows belonging to that partition are written directly to disk instead of being inserted into the in-memory hash table again. + +In simplified form: + +```text +New input RowVector + | + v +Compute partition with the current hash bit range + | + +-- partition has not spilled + | | + | v + | insert into in-memory RowContainer / hash table + | + +-- partition has spilled + | + v + append directly to the spill file +``` + +When the partition count is more reasonable, this direct-to-disk routing also becomes finer grained. It reduces the chance that data already known to require disk repeatedly inflates the in-memory hash table, and it lowers the risk of spilling again during restore. + +## The Same Row-Count Signal Can Guide Spill Compression + +The row-count path is also reused for spill compression decisions. + +At the first spill, if the current spill configuration does not explicitly specify a compression algorithm, Bolt estimates the final spill size from current memory usage and the row-count ratio: + +```text +estimatedSpillSize = + currentUsage * totalRowCount / (processedRowCount + 1) +``` + +It then selects compression by threshold: + +| Estimated Spill Size | Compression Strategy | +|---:|---| +| Below the low threshold | Keep the existing configuration, usually without forcing compression | +| At or above the low threshold | Use LZ4, favoring low CPU overhead | +| At or above the high threshold | Use ZSTD, favoring a better compression ratio | + +The default thresholds are: + +| Threshold | Default | +|---|---:| +| Low compression threshold | 4GB | +| High compression threshold | 20GB | + +This makes runtime row count a more general execution signal. At the first sign of memory pressure, the engine not only knows that the current input does not fit; it can also estimate how much data the full build side may spill. Small spill avoids unnecessary compression CPU, while large spill prioritizes reducing I/O. + +## Where the Performance Gain Comes From + +The performance gain comes mainly from avoiding repeated work, not from making a single spill operation inherently faster. + +| Scenario | Typical Behavior With Fixed Bits | Expected Change With Adaptive Bits | Source of Gain | +|---|---|---|---| +| Build side far exceeds memory, hash distribution is relatively even | Multiple recursive spill levels | More likely to finish at the first or a lower spill level | Less repeated writing, reading, and hash repartitioning | +| Default partition count is clearly too small | A restored partition still does not fit | Partition count is raised before the first spill | Fewer rows per partition | +| Large spill with no explicit compression setting | Large amounts of data may be spilled uncompressed | LZ4 or ZSTD is selected based on estimated size | Better tradeoff between CPU and I/O | +| Row-count information is missing, or estimated bits do not exceed current bits | Fixed configuration is used | Behavior is essentially unchanged | Stable fallback path | +| Severe key skew | Hot partitions may still be too large | Only partially mitigated | Still requires a skew-aware mechanism | + +The before-and-after execution paths can be summarized as: + +```text +Fixed partitioning: + +large build side + | + v +4 partitions + | + +--> P0 too large -> repartition -> spill again + +--> P1 too large -> repartition -> spill again + +--> P2 too large -> repartition -> spill again + +--> P3 too large -> repartition -> spill again + +Adaptive partitioning: + +large build side + | + v +estimate rows per memory budget + | + v +more partitions at first spill + | + +--> smaller P0 -> restore likely fits + +--> smaller P1 -> restore likely fits + +--> smaller P2 -> restore likely fits + +--> smaller Pn -> restore likely fits +``` + +The most useful observable signals are: + +- Lower `spillLevel`. +- Fewer recursive spill events. +- Less secondary amplification in spill read/write bytes. +- Less cascading growth in spill files or runs. +- `useAdaptiveSpill`, which indicates whether HashBuild actually used adaptive bits. +- In large-spill cases, compression may change from NONE to LZ4 or ZSTD. + +These signals are more explanatory than total elapsed time alone. Total runtime is also affected by disk behavior, compression libraries, CPU, data skew, probe-side size, and scheduling concurrency. A reduction in recursive spill depth and repeated I/O directly shows whether the optimization hit the intended problem. + +## Correctness and Fallback + +Adaptive partitioning does not change HashJoin matching semantics. It only changes how many hash bits are used to split build-side data during spill. Each row still enters a partition based on its hash value, and restore still rebuilds the hash table partition by partition. + +When required information is missing, the logic falls back to the existing configuration: + +| Condition | Behavior | +|---|---| +| No usable runtime metric is found | Do not adjust bits | +| `totalRowCount` or `processedRowCount` is 0 | Do not adjust bits | +| Estimated bits do not exceed current bits | Keep the current configuration | +| The current task has multiple drivers | Do not use this row-count traversal path | +| Remaining hash bits are insufficient | Clamp to the upper bound or stop increasing bits | +| Maximum spill level is exceeded | Stop continuing recursive spill | +| Spill compression is explicitly configured | Do not override the user-specified compression strategy | + +This fallback behavior keeps the optimization additive: it improves execution when enough information is available, and preserves the original behavior when it is not. + +## Engineering Tradeoffs + +### Row Count Is Not Byte Size + +HashBuild memory pressure is fundamentally driven by bytes, hash table overhead, row width, variable-length fields, and allocator behavior. Row count is only a low-cost proxy. + +If the first part of the data contains narrow rows but the later part contains many wide strings or complex values, the `processedRowCount` observed at the first spill may overestimate how many rows a partition can hold, and later spill can still happen. Conversely, if the earlier rows are wider, the estimate may be conservative and produce more partitions than necessary. + +In this design, being somewhat conservative is usually preferable to being too aggressive. More partitions have management cost, but too few partitions directly cause recursive spill. + +### Too Many Partitions Also Cost Something + +Increasing bits is not always better. More partitions can introduce: + +- More partition buffers. +- More spill runs or file fragments. +- More metadata. +- More small I/O. +- More scheduling and reader-management work during restore. + +This is why the implementation has safeguards such as `spillPartitionsAdaptiveThreshold`, the remaining hash-bit upper bound, and `maxSpillLevel`. They keep adaptation within a reasonable range and avoid trading recursive spill for excessive fragmentation. + +### Skew Cannot Be Solved Only by Adding Partitions + +If join keys are heavily skewed, one hot key can still produce a partition far larger than the average. Increasing the partition count reduces average partition size under a reasonably even distribution, but it cannot split rows for the same hot key by itself. + +Those cases need additional skew-aware strategies, such as special handling for hot keys or a join-strategy rewrite at the planning layer. Adaptive spill partitioning reduces recursive spill risk for ordinary large HashJoins, but it is not a complete skew-join solution. + +### Multi-Driver Execution Needs More Careful Statistics + +The current row-count traversal path only applies to single-driver execution. With multiple drivers, each driver may observe different `processedRowCount`, memory capacity, input splits, and hash distribution. Treating one driver's local progress as global capacity can easily mislead the estimate. + +Supporting multiple drivers requires additional per-driver or task-level aggregated statistics, such as: + +```text +global total row count +per-driver processed row count +per-driver memory usage +per-driver spill threshold +partition-level row distribution +``` + +Before that aggregation exists, restricting this path to a single driver is conservative but reasonable. + +## When It Applies + +This optimization is most useful when: + +- The HashJoin build side is much larger than the memory available to a single task. +- The default `joinPartitionBits` is too small and recursive spill is likely. +- Build-key distribution is relatively even, without extreme hot keys. +- The upstream path can provide a credible total row count, and preceding operators can update processed row count continuously. +- Spill I/O or compression cost is already a query bottleneck. + +It is less useful, or needs caution, when: + +- The build side already fits in memory and does not spill. +- The fixed partition bits are already large enough. +- Row-count estimates are missing or clearly unreliable. +- Row width varies so much that row count is weakly correlated with memory footprint. +- Join keys are severely skewed and hot partitions dominate memory pressure. +- Too many partitions create more small-file and metadata cost than the recursive spill they avoid. + +## Summary + +Adaptive spill partitioning addresses a specific but expensive HashJoin spill problem: fixed partition counts cannot see the real build-side scale or the task's observed memory capacity, so partitions can remain too large and trigger recursive spill. + +The core idea is to treat the first spill as a runtime capacity sample. The engine uses upstream total row count and current processed row count to estimate better partition bits. The first spill adjusts `joinPartitionBits`; restoring a spilled partition uses that partition's row count and the first-spill threshold to adjust `joinRepartitionBits`. The same row-count signal can also estimate spill size and select a better compression strategy for large spill cases. + +The goal is not to make HashJoin never spill. It is to reduce unnecessary repeated spill. The design keeps the original configuration as a lower bound, falls back automatically when information is missing, and controls risk with hash-bit limits, spill level, and partition thresholds. For large build sides with undersized fixed partition counts and obvious recursive spill, it moves execution closer to the desired path: split once at a reasonable granularity, restore partitions one by one, and avoid paying for repeated I/O and CPU work. diff --git a/doc/blog/_posts/2026-07-03-memory-management-offload.md b/doc/blog/_posts/2026-07-03-memory-management-offload.md new file mode 100644 index 000000000..9269b81df --- /dev/null +++ b/doc/blog/_posts/2026-07-03-memory-management-offload.md @@ -0,0 +1,387 @@ +--- +layout: post +title: "From Correctness to Utilization: Moving Memory Management into Bolt and Calibrating Quota with RSS" +date: 2026-07-03 +author: "Bolt Community" +parent: Blog +nav_order: 6 +--- + +The first goal of moving Bolt memory management into Bolt was not performance, but correctness: reserve, repay, spill, and OOM decisions for off-heap memory had to move out of the cross-language path between Spark/Gluten and Bolt, and converge into one closed loop on the Bolt side. + +After this loop was in place, experiments exposed another problem: the logical Quota was already exhausted, while the process RSS was still clearly below the requested off-heap budget. In other words, the system became tight on accounted memory too early, but physical resident memory had not actually been filled. RSS-based Quota calibration was designed in this context. It does not break the real memory budget of the container or process. Instead, it uses RSS feedback to correct an overly conservative logical Quota, so the execution path falls into fewer ineffective spills and premature OOMs. + +## The Problem Started with Correctness + +The memory management path in Spark on Gluten originally crossed both JVM and C++ sides. Bolt operators allocated memory on the C++ side, while memory accounting and task-level arbitration crossed JNI and returned to Gluten/Spark through `MemoryTarget`, `MemoryConsumer`, and `TaskMemoryManager`. + +Abstracted as one path, it looked roughly like this: + +```text +Bolt / Velox operator requests memory + | + v +Bolt MemoryPool / Arbitrator + | + v +JNI AllocationListener + | + v +Gluten MemoryTarget.borrow() + | + v +Spark TaskMemoryManager.acquireExecutionMemory() + | + v +Spark ExecutionMemoryPool + | + +-- grant: increase logical accounting + | + +-- not enough: trigger spill or throw OOM +``` + +This path worked, but it carried several engineering risks: + +1. **Distributed state**: the Bolt side knew the concrete `MemoryPool`, operator, and spill state, while the Spark side owned task-level Quota. The two sides were synchronized through a JNI listener. +2. **Unsafe failure paths**: allocation failure, spill failure, OOM, and cross-language exceptions could all interrupt the control flow. If one layer had already updated accounting while another layer did not fully roll it back, memory accounting could become inconsistent. +3. **Mixed failure semantics**: APIs such as `maybeReserve` are better suited to returning success or failure, but in the old path some failures propagated as exceptions. Once exceptions are used as normal control flow, keeping memory accounting symmetric on every path becomes difficult. +4. **Spill decisions were too far from execution state**: the Bolt execution side is the component that really knows which Bolt pools can shrink and which operators can spill. When task-level management lives on the JVM side, the decision path becomes longer and harder to keep consistent. + +Therefore, the core goal of moving memory management into Bolt was to let Bolt own the task-level memory management structures and put memory allocation, release, spill, shrink, and OOM decisions into the same C++ control flow. + +## The Closed Loop After the Move + +After the move, the Bolt side added several abstractions similar to Spark execution memory: + +| Module | Responsibility | +|---|---| +| `ExecutionMemoryPool` | Executor-level off-heap Quota pool that allocates execution memory by active task count | +| `TaskMemoryManager` | Task-level entry point for execution memory allocation; triggers spill for the current task when Quota is insufficient | +| `MemoryConsumer` / `ConsumerTargetBridge` | Connects Spark-style consumers with the Gluten-style `MemoryTarget` tree | +| `TreeMemoryTargetNode` | Maintains hierarchical memory accounting, child nodes, spillers, and virtual statistic nodes | +| `ManagedAllocationListener` | Receives capacity changes from the Bolt allocator / arbitrator and calls `borrow/repay` | +| `ListenableArbitrator` | Notifies the listener when Bolt `MemoryPool` grows or shrinks, synchronizing task-level Quota | +| `BoltMemoryManagerHolder` | Holds the Bolt `MemoryManager`, Arrow pool, listener, target, and spiller in one place | + +The overall structure can be simplified as: + +```text + executor level + +------------------------------+ + | ExecutionMemoryPool | + | poolSize / active tasks / | + | dynamic quota extension | + +---------------+--------------+ + | + v + task level + +------------------------------+ + | TaskMemoryManager | + | acquire -> spill -> retry | + +---------------+--------------+ + | + v + target tree + +------------------------------+ + | ConsumerTargetBridge | + | root TreeTarget | + +---------------+--------------+ + | + +--------------+--------------+ + | | + v v + +---------------+ +---------------+ + | normal target | | over-acquire | + | operator | | backup target | + +-------+-------+ +---------------+ + | + v + +----------------+ + | SpillTrigger | + | shrink / spill | + +----------------+ +``` + +The grow path of Bolt `MemoryPool` also became: + +```text +operator allocate / maybeReserve + | + v +MemoryPoolForGluten::maybeReserve() + | + v +ListenableArbitrator::growCapacity() + | + v +ManagedAllocationListener::reserve() + | + v +MemoryTarget.borrow() + | + v +TaskMemoryManager.acquireExecutionMemory() + | + +-- ExecutionMemoryPool grant + | + +-- insufficient grant: + SpillTrigger shrinks first, then spills + reacquire after memory is released + if still insufficient, return actual granted bytes +``` + +The most important change here is not the class names, but the failure semantics: `ManagedAllocationListener::reserve()` directly returns the granted bytes, and allocation failure no longer depends on exceptions as normal control flow. This lets the upper-level `maybeReserve()` decide whether to continue based on the return value, instead of compensating memory accounting during exception unwinding. + +At the same time, when `TaskMemoryManager` is destructed, it releases all remaining Quota for that task from `ExecutionMemoryPool`; when `MemoryConsumer` is destructed, it checks whether its own `used_` has returned to zero; when `BoltMemoryManager` is destroyed, it checks whether the Bolt pool, Arrow pool, and retained child pools still have unreleased memory. These checks are not performance optimizations. They make it observable and diagnosable whether memory accounting is closed. + +## Why Preserve Spark's 1/N and 1/2N Policy + +If Quota were implemented as a simple global counter, tasks that start earlier could easily grab a large amount of memory, while later tasks would spill frequently. Spark's execution memory design has a key constraint: when there are `N` active tasks in an executor, each task should have a chance to obtain about `1/2N` of the memory before spilling, and usually no more than `1/N`. + +Bolt's `ExecutionMemoryPool` preserves this idea: + +```text +active task count = N + +maxMemoryPerTask = poolSize / N +minMemoryPerTask = poolSize / (2N) + +If the current task has not reached minMemoryPerTask, +and the pool does not have enough free memory yet, +wait for other tasks to release memory instead of failing immediately. +``` + +This policy addresses fairness and stability, not maximum throughput for a single task. It prevents a task from consuming too much Quota just because it started earlier, and it also reduces sharp oscillations when the number of active tasks changes. + +Engineering-wise, a wait timeout was also added. Spark's original implementation can wait without a timeout, but in a cross-component scenario, if another lock blocks the release path while a task is waiting, a deadlock may form. Bolt adds an upper bound to the minimum-memory wait. After timeout, it returns failure and lets the upper layer proceed to spill or OOM. + +## The Second Problem Exposed by Experiments: Quota Is Not RSS + +Moving memory management into Bolt solved the question of whether accounting was correct, but experiments revealed another phenomenon: logical Quota looked tight, while RSS was far below the off-heap budget requested by the process. + +A typical case was that the logical memory usage of a task was already close to 1.4 GB, but the process RSS at failure time was only about 1.09 GB. After explicitly writing through allocated memory, RSS for similar tasks increased noticeably. This shows that at least part of the logical allocation did not translate into an equal amount of resident physical pages. + +This is not rare. In off-heap memory management, there are at least three kinds of gaps: + +1. **Allocated but untouched pages**: virtual address space or allocator accounting has increased, but physical pages may not have faulted into RSS yet. +2. **Reserved capacity that does not carry data**: to avoid frequent grow operations, `MemoryPool` reserves capacity by granularity. Reservation is not the same as real data usage. +3. **Capacity slack in operator internals**: structures such as hash tables, row containers, and vector buffers may hold extra capacity because of their growth strategies. + +After marking and checking a batch of allocation sites on release, the memory that was not really used was clearly not a single-site problem: + +| Observation | Value | +|---|---:| +| Allocation call sites with potential waste | 785 | +| Estimated total waste | 479 MB | +| Waste from top 10 call sites | 80 MB | +| Waste from top 50 call sites | 268 MB | +| Waste from top 100 call sites | 389 MB | +| Maximum waste from one call site | 8 MB | +| Minimum waste from one call site | 61 B | + +This shows that fixing one operator or one allocation site cannot systematically solve low utilization. A more stable approach is to acknowledge that logical Quota and RSS have a mapping ratio, then dynamically calibrate that ratio while the task is running. + +## RSS Quota Calibration Is Not Memory Overcommit + +First, three concepts need to be separated: + +| Concept | Meaning | +|---|---| +| `PoolSize` | The real off-heap budget configured by the user or executor | +| `Quota` | The limit used by Bolt memory management for logical allocation and arbitration | +| `RSS` | The current resident memory of the process, reported by the operating system | + +Initially, `Quota = PoolSize`. But if off-heap RSS is only 6 GB when logical allocation reaches 10 GB, using 10 GB as a hard Quota will trigger spill or OOM too early. RSS Quota calibration increases the logical Quota without breaking the real memory budget, so RSS can move closer to `PoolSize`. + +Therefore, the more accurate name is Quota overcommit, not memory overcommit: + +```text +Memory overcommit: + actual RSS exceeds the physical memory requested or allowed by the container + +Quota overcommit: + logical Quota is larger than the configured PoolSize, + in order to offset gaps between logical allocation and actual RSS, + so actual RSS approaches but does not exceed PoolSize +``` + +The core RSS calibration decision can be written as: + +```text +countUsed = logical Quota already granted by ExecutionMemoryPool +rss = max(32 MB, processRSS - onHeapRSS) + +mapping ratio = rss / countUsed +gap ratio = countUsed / rss +``` + +If `rss / countUsed` is clearly low, the logical Quota contains gaps. If `rss` is already close to or above `PoolSize`, Quota must not be expanded further. + +## Why Not PID or eBPF + +This problem can be framed as feedback control: runtime observes RSS continuously and adjusts Quota so RSS approaches `PoolSize`. The most direct idea is PID or PI control, but it has two obvious problems: + +1. Parameters are hard to tune. Spark SQL workloads vary widely, and one set of control parameters is hard to fit scan, aggregation, join, shuffle, writer, and other scenarios at the same time. +2. Sampling cost is high. If RSS is read on every memory allocation, operating-system queries and control logic enter a high-frequency path. + +An eBPF-based approach is theoretically more precise, because it can observe physical page allocation from the kernel side. But it is also more expensive: it depends on kernel capabilities, is more complex to deploy, and if the execution model expands from a single thread to more threads in the future, attribution becomes harder. + +The final choice is repeated proportional calibration: the logic is simple, the sampling frequency is controllable, and problems are easier to trace. + +## How Repeated Proportional Calibration Works + +Dynamic Quota calibration is triggered in `ExecutionMemoryPool::acquireMemory()`. It does not read RSS on every allocation. Instead, it attempts sampling under two conditions: + +1. The current request cannot obtain enough Quota. +2. Dynamic Quota has already been triggered, and cumulative Quota growth has exceeded the sampling threshold. + +The simplified decision flow is: + +```text +memory request + | + v +try to grant by the 1/N policy in ExecutionMemoryPool + | + +-- grant is enough -> return + | + +-- grant is insufficient or sampling threshold is reached + | + v + read process RSS and on-heap usage + | + v + offHeapRSS = max(32 MB, processRSS - onHeapRSS) + | + v + check whether the mapping ratio is in a reasonable range + | + +-- reasonable -> do not adjust Quota + | + +-- unreasonable + | + v + ratio = clamp(countUsed / offHeapRSS, + extendMinRatio, + extendMaxRatio) + | + v + poolExtendSize = PoolSize * (ratio - 1) * scale + | + v + if the change exceeds the threshold, update logical Quota +``` + +The default parameters can be summarized as: + +| Parameter | Default | Purpose | +|---|---:|---| +| `quotaTriggerRatio` | 0.5 | Start observing only after Quota usage reaches a certain ratio, avoiding inaccurate early sampling | +| `rssMinRatio` | 0.9 | If RSS / logical usage is below this value, treat it as a clear gap | +| `rssMaxRatio` | 1.0 | Once RSS is close to `PoolSize`, stop expanding further | +| `extendMinRatio` | 1.0 | Lower bound for Quota expansion; 1.0 means no expansion | +| `extendMaxRatio` | 6.0 | Upper bound for Quota expansion, preventing aggressive estimation | +| `extendScaleRatio` | 1.0 | Scales the computed expansion size | +| `sampleRatio` | 0.05 | Allow another sample after cumulative Quota growth exceeds 5% of `PoolSize` | +| `changeThresholdRatio` | 0.0 | Controls how much the new extension must differ from the old one before updating | +| `logPrintFreq` | 0.05 | Controls log frequency so logging itself does not become overhead | + +This algorithm is not a one-shot decision. The first sample may be affected by the current operator, input batch, or allocator state. Therefore, if Quota later grows by a certain amount again, the algorithm samples again and corrects `poolExtendSize`. + +## Key Safeguards: Prevent Calibration from Polluting the Signal + +RSS calibration sounds straightforward, but several engineering details determine whether it remains stable. + +First, when dynamic Quota is enabled, `MemoryPoolForGluten::maybeReserve()` releases unused reservation: + +```text +maybeReserve(size) + -> reserve upward by 8 MB granularity + -> if dynamic Quota is enabled: + releaseThreadSafe(0, false) + release capacity that was reserved but not used +``` + +This prevents reserved-but-unused Quota from being mistaken as real logical usage, which would overestimate the gap ratio. + +Second, RSS is a process-level metric, so on-heap usage must be subtracted: + +```text +offHeapRSS = processRSS - onHeapRSS +``` + +If the on-heap estimate is abnormally large, the code falls back to a 32 MB lower bound to avoid a negative or excessively small denominator. + +Third, some critical memory-pressure paths temporarily disable dynamic Quota. For example, when `HashBuild` performs extra admission reservation before probe, it uses a scoped guard to disable dynamic expansion, preventing Quota from being expanded further just because the code is checking memory pressure. This prevents the control logic from chasing itself. + +Fourth, when dynamic Quota is triggered, it records `borrowFromRssWatermarkBytes`. Later, when `HashBuild` performs probe admission, it considers this watermark together with the reclaim watermark: + +```text +admissionWatermark = + min(non-zero reclaimWatermark, + non-zero borrowFromRssWatermark, + configuredTaskQuota fallback) +``` + +The meaning is that the build phase can usually still spill, while the probe phase has less room for remediation. If RSS calibration has already shown that this task encountered memory pressure, memory should be reserved earlier and more conservatively before probe. + +## Where the Performance Gains Come From + +RSS Quota calibration does not directly optimize operator algorithms. Its benefits mainly come from three places: + +1. **Fewer premature spills**: logical Quota is no longer exhausted too early because of untouched pages or reservation gaps. +2. **Fewer ineffective shrink/spill loops**: when some spills can release very few bytes or even 0 bytes, forcing more spill work is not useful. +3. **Higher actual memory utilization**: RSS moves closer to the requested off-heap budget, so the same resources can hold more intermediate state. + +In a replay of tasks that spilled while memory utilization was low, the overall trend was: + +| Metric | Median Change | Notes | +|---|---:|---| +| Peak memory utilization | +20% | Original changes in measurable samples were 9%, 11%, 17%, 31%, 34%, and 47% | +| Spill rows | -20% | Some cases dropped significantly, while some were almost unchanged | +| Spill time | -30% | Generally consistent with the trend in spill rows | +| Spill bytes | -20% | Generally consistent with the trend in spill rows | + +In another double-run experiment, after excluding failed groups and all-zero groups, there were 12 valid groups: + +| Metric | Result | +|---|---:| +| Total spill rows | Down 24% | +| Total spill time | Down 16% | + +This result better reflects the two-stage benefit: moving memory management into Bolt first reduced failures caused by correctness issues; RSS calibration then improved spill behavior in low-utilization scenarios. + +## Costs and Boundaries + +This design has clear boundaries. + +First, RSS is a process-level metric, not a task-level metric. The current implementation approximates the mapping using task logical usage and process off-heap RSS, so it is suitable when workloads inside an executor are relatively homogeneous and the main pressure comes from off-heap memory. If on-heap usage fluctuates heavily, or if memory behavior differs greatly across tasks, the mapping ratio becomes coarse. + +Second, sampling timing affects the result. If sampling happens too early, many pages may not have been touched yet and the gap ratio may be overestimated. If sampling happens too late, the task may already have entered a spill or OOM path. `quotaTriggerRatio` and `sampleRatio` are used to trade off accuracy and overhead. + +Third, Quota expansion is capped, but it is still a statistical strategy. The default `extendMaxRatio = 6.0` prevents one estimate from expanding Quota too aggressively. Repeated sampling and the change threshold can reduce oscillation. However, if the RSS signal itself is unstable, expansion may still be insufficient or excessive. + +Fourth, this does not replace operator-level optimization. If spills are caused by too few hash partitions, data skew, recursive spill, insufficient probe admission, or an operator that cannot spill effectively, RSS calibration can only mitigate part of the problem and cannot remove the root cause. + +Fifth, it does not break the real memory budget. The goal is to make RSS approach `PoolSize`, not exceed it. In production, cgroup kills, RSS peaks, spill bytes, spill time, and task failure rate still need to be monitored. + +The suitable scenarios can be summarized as: + +| Scenario | Suitable? | +|---|---| +| High logical Quota, low RSS, frequent spill | Yes | +| High logical Quota, low RSS, OOM after reducing memory | Yes | +| The root cause is accounting error from the cross-language exception path | Move memory management into Bolt first | +| RSS is already close to `PoolSize` | Quota should not be expanded further | +| Spill comes from severe data skew or an unreasonable partitioning strategy | Needs operator or partitioning optimization as well | +| RSS is dominated by on-heap fluctuations | Tune and observe carefully | + +## Summary + +This optimization can be understood in two layers. + +The first layer is correctness: moving Bolt memory management into Bolt converges the memory accounting path that used to be scattered across Spark, Gluten, JNI, and Bolt onto the C++ side. Reserve, repay, spill, shrink, and OOM now go through one rollback-friendly and observable control flow. This solves the question of whether accounting can become wrong. + +The second layer is utilization: after the correctness loop was established, experiments found a systematic gap between logical Quota and RSS. RSS-based Quota calibration does not allow real memory overuse. It uses actual resident memory to correct logical Quota, so requested resources can actually be used by the execution path. This solves the question of whether the accounting, although correct, is too conservative. + +The relationship between the two should not be reversed. Without the correctness loop after moving memory management into Bolt, RSS calibration would be hard to land stably. Without RSS calibration, the moved memory management would be more correct, but could still spill too early in low-RSS, high-Quota scenarios. The final benefit comes from the combination of both stages: first make memory accounting correct, then make that accounting better reflect real physical memory usage. diff --git a/doc/blog/_posts/2026-07-03-row-based-spill.md b/doc/blog/_posts/2026-07-03-row-based-spill.md new file mode 100644 index 000000000..5b39bf6d5 --- /dev/null +++ b/doc/blog/_posts/2026-07-03-row-based-spill.md @@ -0,0 +1,466 @@ +--- +layout: post +title: "Row-Based Spill: Aligning the On-Disk Format with the Execution Format" +date: 2026-07-03 +author: "Bolt Community" +parent: Blog +nav_order: 9 +--- + +The goal of row-based spill is straightforward: when an operator's primary execution state is already row-oriented, it should not temporarily convert that state back to a columnar format just to spill. Instead, rows in RowContainer are written to disk in a format that stays close to their in-memory layout. This reduces row-column conversion, columnar serialization, multi-column random access, and intermediate object construction during restore. + +This is not as simple as changing the spill file format from columnar to row-oriented. Spill sits between memory management, operator state, serialization, sorted merge, and aggregate function interfaces. If any one part is handled poorly, the saved conversion cost can move into restore, comparison, or accumulator handling. A practical row-based spill design needs to answer three questions: + +- How can in-memory rows be written to disk safely, especially variable-width data such as strings and complex types? +- How can RowContainer-based operators such as HashBuild and HashAggregation reuse the same spill capability? +- After faster writes, can restore and merge still preserve batch processing instead of falling back to row-at-a-time interpretation? + +## Spill Bottlenecks Are Not Just I/O + +In large-scale execution engines, spill is often understood as an I/O problem: memory is insufficient, data is written to disk, and the engine reads it back later. In a vectorized execution engine, however, the expensive part is often not the disk write itself, but the data-shape conversion before and after that write. + +Take operators that use RowContainer to maintain state. Their in-memory working state is usually row-oriented: + +- HashBuild organizes the build side into RowContainer and a hash table so the probe phase can access rows randomly. +- HashAggregation stores group keys and accumulators in RowContainer and continuously updates aggregate state. +- Operators such as Sort and Window also organize data into row formats at certain stages for comparison, sorting, or window computation. + +If the spill framework only supports columnar files, the execution path has to convert formats back and forth: + +```text +input RowVector + | + v +operator-internal RowContainer + | + | memory pressure, spill required + v +temporary RowVector / columnar batches + | + v +columnar serialization and disk write + | + | restore + v +read RowVector + | + v +rebuild RowContainer +``` + +The problem with this path is not that columnar format is bad. Columnar layout is a good fit for scan, projection, filter, and batched expression evaluation. The problem is that when both the input and output sides of spill are already row-oriented, forcing a columnar intermediate format introduces extra work: + +- Rows in RowContainer must be split back into multiple vectors. +- Each column needs to gather and copy cells according to partition indexes. +- Variable-width strings and complex types go through additional serialization paths. +- During restore, RowVector data has to be written back into RowContainer. +- After multi-way merge, if rows can only be passed to aggregate functions one at a time, the path creates many virtual calls and SelectivityVector construction costs. + +The HashBuild spill flame graph shows this directly: hot spots concentrate around call chains such as `HashBuild::spillPartition`, `SpillState::appendToPartition`, `SpillFileList::write`, and `serializer::presto`. Pure file writing is only part of the cost. + +This means spill bottlenecks cannot be treated as I/O alone. Row-based spill targets the CPU and memory-copy costs in serialization, format conversion, and aggregate-state updates after merge. + +## Core Idea: Keep the Spill Format Close to RowContainer + +The central judgment behind row-based spill is: + +> If an operator ultimately needs to restore data into RowContainer, the intermediate format on disk should stay as close as possible to RowContainer instead of going through a columnar detour. + +A row in RowContainer can be roughly split into two parts: + +```text ++------+-----------+--------------+-------------------+---------+--------------+ +| keys | flag bits | accumulators | dependent columns | rowSize | next pointer | ++------+-----------+--------------+-------------------+---------+--------------+ + fixed-width area +``` + +The fixed-width area stores keys, flag bits, aggregate accumulators, dependent columns, row size, hash-chain pointers, and related metadata. For fixed-width types, this part is already contiguous memory. For variable-width types, a RowContainer row only stores a view or pointer: + +- Short strings can be inlined in `StringView`. +- Long strings, varbinary values, and complex type payloads are managed by an allocator, with the row storing views that point to the actual content. +- Complex types are usually serialized into contiguous payloads and referenced by in-row views. + +Therefore, row-based spill cannot simply write the in-row fixed area as-is. Otherwise, pointers become invalid after restore. The actual on-disk format needs to combine the fixed-width area and variable-width payload into self-contained row records: + +```text +in-memory RowContainer row + ++---------------- fixed area ----------------+ +| key | flags | acc | StringView(ptr,len) ... | ++--------------------------------------------+ + | + v + variable-width payload in allocator + +row-based spill record + ++---------------- fixed area ----------------+---------------- payload ---------------+ +| key | flags | acc | StringView(offset,len) | string bytes | complex bytes | ... | ++--------------------------------------------+----------------------------------------+ +``` + +When writing, Spiller builds a row-format description from RowContainer column information, offsets, alignment, row-size position, and the order of variable-width columns. For each row, it: + +1. Copies the in-row fixed-width area. +2. Appends non-inlined variable-width content to the end of the current row record. +3. Makes the views in the row record able to point back to that payload after readback. +4. Writes rows to spill files in batches and compresses them when configured. + +When reading back, the row-based reader reads a batch of row bytes, decompresses them if needed, restores in-row views according to the same row-format description, and directly returns a set of `char*` row pointers. The restore side does not need to construct RowVector first and then write RowVector data back into RowContainer. + +```text +disk file + | + v +RowBasedSpillReadFile + | + | read block, decompress, deserialize row views + v +vector rows + | + +--> HashBuild restore + | + +--> HashAgg ordered merge +``` + +## Overall Architecture + +Row-based spill does not replace the whole Spiller. It adds a write mode inside the existing Spiller framework. Spiller still collects data, organizes it by partition, sorts it when needed, writes files, and creates readers and merge streams. + +You can think of it as two modes sharing the same scheduling framework: + +```text + +------------------+ + | Operator | + | HashBuild/HashAgg| + +---------+--------+ + | + v + +------------------+ + | RowContainer | + +---------+--------+ + | + v + +------------------+ + | Spiller | + +---------+--------+ + | + +----------------+----------------+ + | | + v v + RowVector spill mode RowContainer spill mode + | | + v v + extract RowVector batches serialize rows directly + | | + v v + columnar spill files row-based spill files +``` + +Configuration-wise, row-based spill has three states: + +| Mode | Meaning | Typical trade-off | +|---|---|---| +| `disable` | Use the original columnar spill path | Conservative, best compatibility | +| `raw` | Write rows to disk without compressing row bytes | Lower CPU overhead, potentially larger files | +| `compression` | Write rows to disk and compress row bytes | Lower I/O, additional compression/decompression CPU | + +At the file level, row-based spill writes data in blocks. Each block starts with two length fields: + +```text ++-------------------+-----------------+------------------------+ +| uncompressed size | compressed size | row bytes or compressed | ++-------------------+-----------------+------------------------+ +``` + +Without compression, the two lengths are the same. With compression, the reader first decompresses contiguous row bytes and then restores in-row views row by row. This allows the reader to read in batches, decompress in batches, and return row pointers in batches while avoiding one system call or small-object allocation per row. + +## HashBuild: Avoiding a Columnar Intermediate + +HashBuild's execution state is naturally row-oriented. The input is RowVector, but the build side ultimately goes into RowContainer and a hash table. Under memory pressure, the original columnar spill path roughly looks like this: + +```text +input RowVector + | + v +RowContainer / hash table + | + | spill + v +extract RowVector by partition + | + v +columnar serialization and disk write + | + | restore + v +read RowVector + | + v +call addInput again, rebuild RowContainer / hash table +``` + +With row-based spill, the key change is that build rows already in RowContainer are no longer split back into RowVector. They are written directly in row format, and after restore the read rows go straight into HashBuild's row-input path. + +```text +input RowVector + | + v +RowContainer / hash table + | + | row-based spill + v +partitioned row records + | + v +row-based spill files + | + | restore + v +vector rows + | + v +HashBuild add spilled row input +``` + +The gains come from several places: + +- In-memory build rows no longer go through `RowContainer -> RowVector`. +- Restore no longer goes through the full `RowVector -> RowContainer` columnar intermediate. +- For many columns, strings, and complex types, writing is organized around one row at a time, reducing random access from multi-column gather. +- For fixed-width data that is already in row format, writing is closer to contiguous memory copy. + +The original columnar path still needs to stay. Not every HashBuild scenario is suitable for row-based spill. For example, with multiple builders, multiple drivers, or hybrid join, row-pointer ownership, restore ordering, and concurrency coordination are more complex. A practical implementation usually enables this first for the single-builder, single-driver, non-hybrid-join path to keep correctness and rollout controllable. + +## HashAggregation: Accumulators and Merge Are the Hard Parts + +Row-based spill for HashAggregation is more complex because RowContainer stores not only group keys but also accumulators for each aggregate function. + +The aggregation execution flow can be simplified as: + +```text +input batch + | + v +probe group key + | + +--> new group: create RowContainer row, initialize accumulators + | + +--> existing group: update accumulators in the row +``` + +When the number of groups or memory usage exceeds a threshold, group rows in RowContainer need to be spilled. A typical columnar spill path is: + +```text +RowContainer groups + | + v +sort by key + | + v +extract keys and accumulators into RowVector + | + v +write columnar data to disk + | + | merge read + v +RowVector rows + | + v +update final accumulators / output results +``` + +Row-based spill changes the middle two steps: + +```text +RowContainer groups + | + v +sort by key + | + v +row-based write + | + | row-based ordered merge + v +spilled rows + | + v +extract accumulators in batches and update groups +``` + +There are two key points here. + +First, accumulators must be safe to spill. Fixed-width accumulators can be written together with the in-row fixed area. Variable-width accumulators need aggregate functions to provide serialization/deserialization support so extra memory can be packed into the row record payload. Otherwise, the spill file would only contain in-process pointers that cannot be restored. + +Therefore, HashAggregation is suitable for row-based spill only when these conditions hold: + +- The aggregate function accumulator is fixed-size or explicitly supports accumulator serde. +- The aggregate function does not have state such as sorting keys that requires additional ordering semantics. +- The merge phase can correctly extract intermediate aggregate results from row records. + +Second, the merge phase must not degrade into single-row updates. In earlier columnar spill paths, after multi-way merge, each output row could come from a different file and easily fall into row-at-a-time paths such as `addSingleGroupIntermediateResults`. Every row and every aggregate function would trigger one update, making virtual-call and temporary selection-vector overhead very visible. + +The row-based spill merge path first collects a batch of spilled rows that belong to the current output window. It then extracts accumulators from those rows into vectors by aggregate function and calls batch interfaces to update target groups: + +```text +row-based ordered streams + | + v +multi-way merge by key + | + v +collect a batch of rows and corresponding groups + | + v +extract accumulator vectors + | + v +addIntermediateResults(group rows, batch vectors) + | + v +extract final output +``` + +This preserves the benefit of row-oriented spill while keeping aggregate functions on their existing vector interfaces as much as possible. It avoids the trap where data is row-oriented on disk but aggregate functions can only consume it one row at a time. + +## Why It Is Faster + +The benefits of row-based spill fall into four categories. + +First, it reduces format conversion. For native RowContainer state, the original path needs `row -> column` before spill and `column -> row` after restore. Row-based spill makes the on-disk format a recoverable row record, reducing intermediate RowVector construction and decomposition. + +Second, it reduces multi-column gather and copy. Columnar spill needs to collect cells for each column according to partition indexes and then write column buffers into output streams. With many columns, strings, or complex types, this process can easily hurt locality. Row-oriented writes organize data around rows: the fixed-width part is contiguous, and variable-width payloads are appended sequentially. + +Third, it lowers row-at-a-time overhead in the aggregation merge phase. HashAggregation can extract accumulators from spilled rows into vectors in batches and then update accumulators through batch interfaces, reducing virtual calls and temporary object construction. + +Fourth, it reduces intermediate objects during restore. HashBuild restore can obtain row pointers from the row-based reader and return directly to the row-input path. HashAgg merge can compare rows and extract accumulators directly on row-based ordered streams. + +All these gains depend on one premise: the operator's main working state is truly RowContainer. If a path is already purely columnar, or if spilled data is ultimately consumed directly by a columnar operator, row-based spill may not be the better choice. + +## Performance Observations + +### General Row Spill Microbench + +In a mixed-data workload with 10 columns, the data contains 5 fixed-width columns, 4 string columns, and 1 `array` column. The batch size is 4096 rows, and string length is about 50 bytes. The following table shows results for several representative partition counts: + +| Partition count | Column-to-column spill | Row-to-column spill | Row-to-row spill | Row-to-row vs. column-to-column | Row-to-row vs. row-to-column | +|---:|---:|---:|---:|---:|---:| +| 64 | 3.54s | 4.76s | 2.77s | 1.28x | 1.72x | +| 128 | 5.18s | 5.45s | 3.37s | 1.54x | 1.62x | +| 256 | 5.96s | 5.48s | 3.38s | 1.77x | 1.62x | + +On this mixed-data workload, row-to-row spill is roughly `1.58x ~ 1.83x` faster than row-to-column spill and roughly `1.15x ~ 1.76x` faster than column-to-column spill. For typical production partition counts in the tens to low hundreds, the advantage of row-based spill is relatively stable. + +With all variable-width strings, the gain from row-based spill is more visible: + +| Scenario | Row-to-row vs. row-to-column | Row-to-row vs. column-to-column | +|---|---:|---:| +| 10 varchar columns, batch size 4096 | 1.81x ~ 2.05x | 1.16x ~ 1.46x | + +This matches expectations: the more variable-width columns there are, the easier it is for columnar gather, serialization, and repeated copying costs to grow. + +### Local HashAggregation Benchmark + +In aggregate-function benchmarks, row-based spill improves most numeric aggregate functions by roughly `1.3x ~ 1.9x`, with string-related scenarios reaching close to `4x` in the best cases. + +| Aggregation scenario | Observed trend | +|---|---| +| Numeric aggregates such as `count` / `sum` / `avg` / `min` / `max` | Most cases are 1.3x ~ 1.9x | +| Aggregates with slightly more complex state, such as `stddev` | Most cases still show stable improvement | +| String-related aggregates | Some cases approach 4x | + +This should not be interpreted as "row-based spill makes aggregate functions themselves faster." More precisely, the gain comes from less format conversion during spill and merge, fewer row-at-a-time intermediate-result updates, and better memory locality. The actual compute logic of aggregate functions does not change because of the spill format. + +### Production Aggregation Jobs + +In the following production results, HashAgg time is accumulated operator time across tasks, while stage median and total job time are wall-clock metrics. They cannot be divided directly, but together they show the overall trend. + +| Job | Metric | Column spill | Row-based spill | Change | +|---|---|---:|---:|---:| +| A | Stage 3 Regular HashAgg accumulated time | 449.88h | 165.86h | About 2.7x | +| A | Stage 6 Regular HashAgg accumulated time | 56.89h | 25.59h | About 2.2x | +| A | Total job time | 2.9h | 1.2h | About 2.4x | +| B | Stage 6 Regular HashAgg accumulated time | 116.99h | 56.12h | About 2.1x | +| B | Total job time | 55min | 33min | About 1.7x | + +These results show that row-based spill benefits can carry through to job-level latency, but the size of the improvement depends on many factors: the share of total time spent in spill, aggregate function types, number of key columns, string ratio, stage parallelism, and whether other bottlenecks remain. + +### Comparison After Column Spill Batch Optimization + +Columnar spill can also be optimized. For example, after merge, gathering multi-way input back into batches and then passing those batches to aggregate functions can significantly reduce row-at-a-time update overhead. This optimization recovers part of the HashAgg merge cost, but it still has to deal with the columnar intermediate format. + +In one aggregation job, the accumulated times for the three paths are: + +| Metric | Column spill | Row-based spill | Column spill batch optimization | +|---|---:|---:|---:| +| aggregation total | 369.29h | 140.94h | 177.59h | +| groupingSet output time | 269.45h | 60.40h | 75.24h | +| agg update for output | Not separately tracked | 23.78h | 4.20h | +| spill total | 71.91h | 47.28h | 72.77h | +| convert for spilling | 22.06h | 0h | 21.56h | +| read for spilling | 7.20h | 4.16h | 6.74h | +| sort for spilling | 23.13h | 29.32h | 24.26h | + +This comparison is representative: the column batch optimization makes aggregation output updates faster, but format-conversion costs such as `convert for spilling` still remain. The advantage of row-based spill does not come only from batch updates; it also comes from an on-disk format that stays closer to RowContainer. + +## Engineering Trade-Offs + +Row-based spill is not an unconditional replacement for columnar spill. It has clear applicability boundaries. + +### Suitable Scenarios + +Row-based spill is a better fit in these cases: + +- The operator's in-memory state is already RowContainer. +- Restore after spill still needs to return to RowContainer. +- The data has many columns, or contains many strings, varbinary values, or complex types. +- HashAggregation accumulators can be serialized safely. +- Spill accounts for a large share of total job time. + +In these scenarios, row-based spill focuses the optimization on the most expensive conversion path. + +### Unsuitable or Limited-Benefit Scenarios + +In the following scenarios, the benefit may be small or row-based spill may not be appropriate: + +- Data rarely spills, or the amount of spilled data is small. +- Downstream consumption is naturally columnar and does not need RowContainer after readback. +- Aggregate functions have sorting keys, or accumulators do not support safe serde. +- HashBuild scenarios with more complex row-pointer ownership and restore coordination, such as multiple builders, multiple drivers, or hybrid join. +- In compression mode, CPU becomes the bottleneck while I/O is not the main bottleneck. +- Columnar spill has already removed the main row-at-a-time overhead through techniques such as batch gather, and format conversion is not a large share of the cost. + +These boundaries matter. The goal of row-based spill is not to make every spill row-oriented. It is to avoid making intermediate state from row-oriented operators pay unnecessary columnar conversion costs just because it needs to be written to disk. + +### Where Correctness Risks Concentrate + +When implementing row-based spill, correctness risks mainly concentrate around data ownership and serde contracts. + +Variable-width data cannot write in-process pointers directly to disk. Spill records must be self-contained, and after readback they must be able to rebuild `StringView` or complex-type views. + +Accumulator serde must guarantee that: + +- Serialized data contains all variable-width state needed to restore the accumulator. +- Deserialization does not depend on allocator memory that has already become invalid. +- The sizes, alignment, and offsets of the fixed-width area and variable-width payload are consistent. +- The semantics of `extractAccumulators`, `addIntermediateResults`, and final output remain unchanged. + +Sorting and merge must also use comparison semantics consistent with RowContainer, including edge cases around nulls, strings, floating-point values, and complex types. Otherwise, even if row-based files are written faster, the merge phase may break result ordering or group boundaries. + +## Summary + +Row-based spill addresses a practical engineering problem: when execution state is already row-oriented, the spill path should not force it into a columnar format and then back again. + +Its core benefits come from: + +- Reducing `RowContainer <-> RowVector` conversion. +- Lowering multi-column gather, serialization, and restore costs. +- Letting the HashBuild readback path return directly to row input. +- Letting HashAggregation still update accumulators in batches after row-based merge. + +Its boundaries are equally clear: row-based spill brings stable benefits only when operator state, on-disk format, and restore path form a closed loop. For naturally columnar paths, aggregate functions without accumulator serde, or execution modes with complex concurrent restore relationships, keeping columnar spill or enabling row-based spill selectively is safer. + +A good spill design is not just about writing data to disk. It should avoid meaningless data-shape movement when memory is tight. That is the value of row-based spill: the intermediate format on disk serves the execution state itself, instead of forcing execution state to adapt to an unsuitable spill format. From d2d0c0cd4ae68b80f159da7a5927238d6c725a4e Mon Sep 17 00:00:00 2001 From: "wangxinshuo.db" Date: Fri, 3 Jul 2026 20:45:54 +0800 Subject: [PATCH 2/2] ci: lint blog markdown in one pass --- .pre-commit-config.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 625b7a605..d0ec0da3f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -79,10 +79,12 @@ repos: files: ^doc/blog/_posts/.*\.md$ - id: markdownlint-blog name: Lint and fix blog Markdown - entry: npx markdownlint-cli2@0.17.2 --fix "#doc/blog/vendor/**" + entry: npx markdownlint-cli2@0.17.2 --fix "doc/blog/**/*.md" "#doc/blog/vendor/**" language: system files: ^doc/blog/.*\.md$ exclude: ^doc/blog/vendor/ + pass_filenames: false + require_serial: true - id: license-header-check name: Check for license headers entry: go run -C .github/license_check check_license.go