From 4990e9d291b379495ecbfd5ff74225d10d044e15 Mon Sep 17 00:00:00 2001 From: "omer.arap" Date: Wed, 3 Jun 2026 02:43:25 +0800 Subject: [PATCH] fix(parquet): lazily load pending repdefs during page seek Repeated Parquet columns keep two streams of state in sync: the value pages that contain the actual leaf values, and the repetition/definition levels that describe how those leaf values map back to top-level rows. For chunk-level rep/defs, PageReader materializes per-page leaf counts in numLeavesInPage_ and uses pageIndex_ to look up the count for the current non-top-level data page. Sparse selective reads can make the value reader reach a later physical page while numLeavesInPage_ only contains metadata for the pages decoded from earlier rep/def batches. That does not necessarily mean the file or seek is invalid: preloadRepDefs() may already have staged more rep/def batches in preloadedRepDefs_, but those batches have not yet been decoded into numLeavesInPage_. The old code checked pageIndex_ against numLeavesInPage_ immediately and could fail with: Seeking past known repdefs for non top level column page N This was too eager when preloadedRepDefs_ was non-empty. Before enforcing the existing bounds check, decode pending rep/def batches until either the current page index is covered or there are no pending batches left. The original check remains in place, so truly inconsistent state still fails; the reader only recovers when the needed metadata was already preloaded. Add a focused PageReader regression test that constructs this exact state: one known page in numLeavesInPage_, the next page requested by pageIndex_, and one pending encoded rep/def batch. Without the lazy load, the test throws the same "Seeking past known repdefs" error. With the fix, the pending batch is materialized and the page row info is populated. Also add an end-to-end reader scenario with ARRAY, small Parquet pages, batched rep/def settings, and a sparse filter to exercise the same reader path through public APIs. Validation: cmake --build _build/Release --target bolt_dwio_parquet_reader_test _build/Release/bolt/dwio/parquet/tests/reader/bolt_dwio_parquet_reader_test \ --gtest_filter=ParquetPageReaderTest.loadsPendingRepDefsBeforePageRowInfoCheck:ParquetReaderTest.sparseSkipAcrossRepeatedPagesWithBatchedRepDefs --- bolt/dwio/parquet/reader/PageReader.cpp | 12 ++ bolt/dwio/parquet/reader/PageReader.h | 2 + .../tests/reader/ParquetPageReaderTest.cpp | 112 ++++++++++++++++++ .../tests/reader/ParquetReaderTest.cpp | 60 ++++++++++ 4 files changed, 186 insertions(+) diff --git a/bolt/dwio/parquet/reader/PageReader.cpp b/bolt/dwio/parquet/reader/PageReader.cpp index 055e69c2e..03c81373b 100644 --- a/bolt/dwio/parquet/reader/PageReader.cpp +++ b/bolt/dwio/parquet/reader/PageReader.cpp @@ -262,6 +262,18 @@ void PageReader::setPageRowInfo(bool forRepDef) { numRowsInPage_ = numRepDefsInPage_; } else if (hasChunkRepDefs_) { ++pageIndex_; + + // The repeated reader normally decodes rep/def metadata far enough before + // the value reader seeks into a non-top-level data page. However, sparse + // selective reads can make the value path reach the next physical page + // exactly when only the already-consumed rep/def page counts are present in + // numLeavesInPage_. If more preloaded rep/def batches are available, decode + // them here instead of failing the scan immediately. + while (pageIndex_ >= static_cast(numLeavesInPage_.size()) && + !preloadedRepDefs_.empty()) { + loadMoreRepDefs(); + } + BOLT_CHECK_LT( pageIndex_, numLeavesInPage_.size(), diff --git a/bolt/dwio/parquet/reader/PageReader.h b/bolt/dwio/parquet/reader/PageReader.h index 7cca41acf..ed615f290 100644 --- a/bolt/dwio/parquet/reader/PageReader.h +++ b/bolt/dwio/parquet/reader/PageReader.h @@ -74,6 +74,8 @@ struct CryptoContext { /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). class PageReader { + friend class PageReaderTestPeer; + public: PageReader( std::unique_ptr stream, diff --git a/bolt/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp index 576517e3b..30aac923a 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp @@ -28,6 +28,7 @@ * -------------------------------------------------------------------------- */ +#include #include "bolt/dwio/parquet/reader/PageReader.h" #include "bolt/dwio/parquet/tests/ParquetTestBase.h" using namespace bytedance::bolt; @@ -37,6 +38,117 @@ using namespace bytedance::bolt::parquet; class ParquetPageReaderTest : public ParquetTestBase {}; +namespace bytedance::bolt::parquet { + +class PageReaderTestPeer { + public: + static raw_vector makeRepDefBatch( + int32_t numRepDefs, + int16_t repetitionLevel, + int16_t definitionLevel, + int32_t maxRepeat, + int32_t maxDefine) { + raw_vector batch; + append(batch, numRepDefs); + appendLevels(batch, numRepDefs, repetitionLevel, maxRepeat); + appendLevels(batch, numRepDefs, definitionLevel, maxDefine); + return batch; + } + + static void initializeForPendingRepDefs(PageReader& pageReader) { + pageReader.hasChunkRepDefs_ = true; + pageReader.pageIndex_ = 0; + pageReader.numLeavesInPage_ = {1}; + pageReader.preloadedRepDefs_.push_back(makeRepDefBatch( + 1, + 0, + pageReader.maxDefine_, + pageReader.maxRepeat_, + pageReader.maxDefine_)); + } + + static void setPageRowInfo(PageReader& pageReader) { + pageReader.setPageRowInfo(false); + } + + static int32_t pageIndex(const PageReader& pageReader) { + return pageReader.pageIndex_; + } + + static int32_t numRowsInPage(const PageReader& pageReader) { + return pageReader.numRowsInPage_; + } + + static size_t numKnownRepDefPages(const PageReader& pageReader) { + return pageReader.numLeavesInPage_.size(); + } + + static size_t numPendingRepDefBatches(const PageReader& pageReader) { + return pageReader.preloadedRepDefs_.size(); + } + + private: + template + static void append(raw_vector& out, T value) { + const auto offset = out.size(); + out.resize(offset + sizeof(T)); + std::memcpy(out.data() + offset, &value, sizeof(T)); + } + + static void appendLevels( + raw_vector& out, + int32_t numLevels, + int16_t level, + int32_t maxLevel) { + const auto bitWidth = ::arrow::bit_util::NumRequiredBits(maxLevel); + std::vector encoded( + ::arrow::util::RleEncoder::MaxBufferSize(bitWidth, numLevels) + + ::arrow::util::RleEncoder::MinBufferSize(bitWidth)); + ::arrow::util::RleEncoder encoder(encoded.data(), encoded.size(), bitWidth); + for (auto i = 0; i < numLevels; ++i) { + encoder.Put(level); + } + const auto encodedSize = encoder.Flush(); + append(out, encodedSize); + const auto offset = out.size(); + out.resize(offset + encodedSize); + std::memcpy(out.data() + offset, encoded.data(), encodedSize); + } +}; + +} // namespace bytedance::bolt::parquet + +TEST_F(ParquetPageReaderTest, loadsPendingRepDefsBeforePageRowInfoCheck) { + auto fileType = std::make_shared( + VARCHAR(), + std::vector>{}, + 0, + 0, + 0, + "items.list.element", + thrift::Type::BYTE_ARRAY, + std::nullopt, + thrift::ConvertedType::UTF8, + 1, + 2, + true, + false); + PageReader pageReader( + nullptr, + *leafPool_, + fileType, + thrift::CompressionCodec::UNCOMPRESSED, + 0, + nullptr); + PageReaderTestPeer::initializeForPendingRepDefs(pageReader); + + ASSERT_NO_THROW(PageReaderTestPeer::setPageRowInfo(pageReader)); + EXPECT_EQ(PageReaderTestPeer::pageIndex(pageReader), 1); + EXPECT_EQ(PageReaderTestPeer::numRowsInPage(pageReader), 1); + EXPECT_EQ(PageReaderTestPeer::numKnownRepDefPages(pageReader), 2); + EXPECT_EQ(PageReaderTestPeer::numPendingRepDefBatches(pageReader), 0); +} + TEST_F(ParquetPageReaderTest, smallPage) { auto readFile = std::make_shared(getExampleFilePath("small_page_header")); diff --git a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp index d70671380..669857c92 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -1504,6 +1504,66 @@ TEST_F(ParquetReaderTest, skip) { EXPECT_EQ(0, rowReader->next(1000, result)); } +TEST_F(ParquetReaderTest, sparseSkipAcrossRepeatedPagesWithBatchedRepDefs) { + constexpr int32_t kNumRows = 6'000; + constexpr int32_t kTargetRow = 5'000; + const std::string payload(256, 'x'); + + auto ids = makeFlatVector( + kNumRows, [](auto row) { return static_cast(row); }); + std::vector values; + values.reserve(kNumRows); + std::vector> arrays; + arrays.reserve(kNumRows); + for (auto i = 0; i < kNumRows; ++i) { + values.push_back(fmt::format("{}-{}", i, payload)); + arrays.push_back({StringView(values.back())}); + } + + auto rowType = ROW({"id", "items"}, {BIGINT(), ARRAY(VARCHAR())}); + auto data = makeRowVector({ids, makeArrayVector(arrays)}); + + auto tempFile = exec::test::TempFilePath::create(); + auto writeFile = + std::make_unique(tempFile->path, true, false); + auto sink = std::make_unique( + std::move(writeFile), tempFile->path); + + bytedance::bolt::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = rootPool_.get(); + writerOptions.enableDictionary = false; + writerOptions.dataPageSize = 1024; + writerOptions.columnDataPageSizeMap["items.list.element"] = 1024; + writerOptions.writeBatchBytes = 1ULL << 30; + writerOptions.minBatchSize = 1; + auto writer = std::make_unique( + std::move(sink), writerOptions, rowType); + writer->write(data); + writer->close(); + + bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + readerOptions.setFileSchema(rowType); + readerOptions.setFileFormat(dwio::common::FileFormat::PARQUET); + auto reader = createReader(tempFile->path, readerOptions); + + auto rowReaderOpts = getReaderOpts(rowType); + rowReaderOpts.setDecodeRepDefPageCount(1); + rowReaderOpts.setParquetRepDefMemoryLimit(1); + auto scanSpec = makeScanSpec(rowType); + scanSpec->getOrCreateChild(Subfield("id")) + ->setFilter(exec::equal(kTargetRow)); + rowReaderOpts.setScanSpec(scanSpec); + auto rowReader = reader->createRowReader(rowReaderOpts); + + VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get()); + ASSERT_NO_THROW({ + auto rowsRead = rowReader->next(1, result); + ASSERT_EQ(rowsRead, 1); + }); + + assertEqualVectorPart(data, result, kTargetRow); +} + TEST_F(ParquetReaderTest, readVarbinaryFromFLBA) { const std::string filename("varbinary_flba.parquet"); const std::string sample(getExampleFilePath(filename));