Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bolt/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ void PageReader::setPageRowInfo(bool forRepDef) {
numRowsInPage_ = numRepDefsInPage_;
} else if (hasChunkRepDefs_) {
++pageIndex_;

// The repeated reader normally decodes rep/def metadata far enough before
// the value reader seeks into a non-top-level data page. However, sparse
// selective reads can make the value path reach the next physical page
// exactly when only the already-consumed rep/def page counts are present in
// numLeavesInPage_. If more preloaded rep/def batches are available, decode
// them here instead of failing the scan immediately.
while (pageIndex_ >= static_cast<int32_t>(numLeavesInPage_.size()) &&

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix — I hit the exact same crash in production and arrived at a similar conclusion.

I considered an alternative placement: doing the loadMoreRepDefs() loop in seekToPage(),
right before readPageHeader() (i.e., the caller side), rather than inside setPageRowInfo()
(the callee/crash site). I ended up going with that approach in #635

Trade-off I'd like to discuss:

Your approach (callee-side): More defensively robust — protects the CHECK_LT regardless
of how setPageRowInfo is reached. If a future code path forgets to preload, this still
saves the day.

Caller-side approach: Keeps setPageRowInfo() as a pure metadata lookup with no I/O side-effects. The invariant becomes: "seekToPage is the page lifecycle owner and must guarantee repdef coverage before entering the next page." This preserves the fail-fast semantics of BOLT_CHECK_LT for genuinely broken invariants.

My concern with the callee-side fix is that if a future path forgets to preload and there happen to be
leftover batches, the callee-side fix would silently mask the bug.

Not a blocker — both approaches fix the production crash correctly. Just wanted to flag
this trade-off for the discussion. Happy to hear your thoughts, and feel free to check
#635 for the alternative implementation.

!preloadedRepDefs_.empty()) {
loadMoreRepDefs();
}

BOLT_CHECK_LT(
pageIndex_,
numLeavesInPage_.size(),
Expand Down
2 changes: 2 additions & 0 deletions bolt/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct CryptoContext {
/// encodings and presents the combination of pages and encoded values as a
/// continuous stream accessible via readWithVisitor().
class PageReader {
friend class PageReaderTestPeer;

public:
PageReader(
std::unique_ptr<dwio::common::SeekableInputStream> stream,
Expand Down
112 changes: 112 additions & 0 deletions bolt/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* --------------------------------------------------------------------------
*/

#include <cstring>
#include "bolt/dwio/parquet/reader/PageReader.h"
#include "bolt/dwio/parquet/tests/ParquetTestBase.h"
using namespace bytedance::bolt;
Expand All @@ -37,6 +38,117 @@ using namespace bytedance::bolt::parquet;

class ParquetPageReaderTest : public ParquetTestBase {};

namespace bytedance::bolt::parquet {

class PageReaderTestPeer {
public:
static raw_vector<char> makeRepDefBatch(
int32_t numRepDefs,
int16_t repetitionLevel,
int16_t definitionLevel,
int32_t maxRepeat,
int32_t maxDefine) {
raw_vector<char> batch;
append<int32_t>(batch, numRepDefs);
appendLevels(batch, numRepDefs, repetitionLevel, maxRepeat);
appendLevels(batch, numRepDefs, definitionLevel, maxDefine);
return batch;
}

static void initializeForPendingRepDefs(PageReader& pageReader) {
pageReader.hasChunkRepDefs_ = true;
pageReader.pageIndex_ = 0;
pageReader.numLeavesInPage_ = {1};
pageReader.preloadedRepDefs_.push_back(makeRepDefBatch(
1,
0,
pageReader.maxDefine_,
pageReader.maxRepeat_,
pageReader.maxDefine_));
}

static void setPageRowInfo(PageReader& pageReader) {
pageReader.setPageRowInfo(false);
}

static int32_t pageIndex(const PageReader& pageReader) {
return pageReader.pageIndex_;
}

static int32_t numRowsInPage(const PageReader& pageReader) {
return pageReader.numRowsInPage_;
}

static size_t numKnownRepDefPages(const PageReader& pageReader) {
return pageReader.numLeavesInPage_.size();
}

static size_t numPendingRepDefBatches(const PageReader& pageReader) {
return pageReader.preloadedRepDefs_.size();
}

private:
template <typename T>
static void append(raw_vector<char>& out, T value) {
const auto offset = out.size();
out.resize(offset + sizeof(T));
std::memcpy(out.data() + offset, &value, sizeof(T));
}

static void appendLevels(
raw_vector<char>& out,
int32_t numLevels,
int16_t level,
int32_t maxLevel) {
const auto bitWidth = ::arrow::bit_util::NumRequiredBits(maxLevel);
std::vector<uint8_t> encoded(
::arrow::util::RleEncoder::MaxBufferSize(bitWidth, numLevels) +
::arrow::util::RleEncoder::MinBufferSize(bitWidth));
::arrow::util::RleEncoder encoder(encoded.data(), encoded.size(), bitWidth);
for (auto i = 0; i < numLevels; ++i) {
encoder.Put(level);
}
const auto encodedSize = encoder.Flush();
append<uint32_t>(out, encodedSize);
const auto offset = out.size();
out.resize(offset + encodedSize);
std::memcpy(out.data() + offset, encoded.data(), encodedSize);
}
};

} // namespace bytedance::bolt::parquet

TEST_F(ParquetPageReaderTest, loadsPendingRepDefsBeforePageRowInfoCheck) {
auto fileType = std::make_shared<ParquetTypeWithId>(
VARCHAR(),
std::vector<std::shared_ptr<const dwio::common::TypeWithId>>{},
0,
0,
0,
"items.list.element",
thrift::Type::BYTE_ARRAY,
std::nullopt,
thrift::ConvertedType::UTF8,
1,
2,
true,
false);
PageReader pageReader(
nullptr,
*leafPool_,
fileType,
thrift::CompressionCodec::UNCOMPRESSED,
0,
nullptr);
PageReaderTestPeer::initializeForPendingRepDefs(pageReader);

ASSERT_NO_THROW(PageReaderTestPeer::setPageRowInfo(pageReader));
EXPECT_EQ(PageReaderTestPeer::pageIndex(pageReader), 1);
EXPECT_EQ(PageReaderTestPeer::numRowsInPage(pageReader), 1);
EXPECT_EQ(PageReaderTestPeer::numKnownRepDefPages(pageReader), 2);
EXPECT_EQ(PageReaderTestPeer::numPendingRepDefBatches(pageReader), 0);
}

TEST_F(ParquetPageReaderTest, smallPage) {
auto readFile =
std::make_shared<LocalReadFile>(getExampleFilePath("small_page_header"));
Expand Down
60 changes: 60 additions & 0 deletions bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,66 @@ TEST_F(ParquetReaderTest, skip) {
EXPECT_EQ(0, rowReader->next(1000, result));
}

TEST_F(ParquetReaderTest, sparseSkipAcrossRepeatedPagesWithBatchedRepDefs) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI — I ran sparseSkipAcrossRepeatedPagesWithBatchedRepDefs on main without this patch
and it passes. The test doesn't seem to trigger the crash path on an unpatched build.

constexpr int32_t kNumRows = 6'000;
constexpr int32_t kTargetRow = 5'000;
const std::string payload(256, 'x');

auto ids = makeFlatVector<int64_t>(
kNumRows, [](auto row) { return static_cast<int64_t>(row); });
std::vector<std::string> values;
values.reserve(kNumRows);
std::vector<std::vector<StringView>> arrays;
arrays.reserve(kNumRows);
for (auto i = 0; i < kNumRows; ++i) {
values.push_back(fmt::format("{}-{}", i, payload));
arrays.push_back({StringView(values.back())});
}

auto rowType = ROW({"id", "items"}, {BIGINT(), ARRAY(VARCHAR())});
auto data = makeRowVector({ids, makeArrayVector<StringView>(arrays)});

auto tempFile = exec::test::TempFilePath::create();
auto writeFile =
std::make_unique<LocalWriteFile>(tempFile->path, true, false);
auto sink = std::make_unique<dwio::common::WriteFileSink>(
std::move(writeFile), tempFile->path);

bytedance::bolt::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = rootPool_.get();
writerOptions.enableDictionary = false;
writerOptions.dataPageSize = 1024;
writerOptions.columnDataPageSizeMap["items.list.element"] = 1024;
writerOptions.writeBatchBytes = 1ULL << 30;
writerOptions.minBatchSize = 1;
auto writer = std::make_unique<bytedance::bolt::parquet::Writer>(
std::move(sink), writerOptions, rowType);
writer->write(data);
writer->close();

bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
readerOptions.setFileSchema(rowType);
readerOptions.setFileFormat(dwio::common::FileFormat::PARQUET);
auto reader = createReader(tempFile->path, readerOptions);

auto rowReaderOpts = getReaderOpts(rowType);
rowReaderOpts.setDecodeRepDefPageCount(1);
rowReaderOpts.setParquetRepDefMemoryLimit(1);
auto scanSpec = makeScanSpec(rowType);
scanSpec->getOrCreateChild(Subfield("id"))
->setFilter(exec::equal(kTargetRow));
rowReaderOpts.setScanSpec(scanSpec);
auto rowReader = reader->createRowReader(rowReaderOpts);

VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get());
ASSERT_NO_THROW({
auto rowsRead = rowReader->next(1, result);
ASSERT_EQ(rowsRead, 1);
});

assertEqualVectorPart(data, result, kTargetRow);
}

TEST_F(ParquetReaderTest, readVarbinaryFromFLBA) {
const std::string filename("varbinary_flba.parquet");
const std::string sample(getExampleFilePath(filename));
Expand Down
Loading