Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
scan_details = scan_details_buf.c_str();
}
fmt_buffer.fmtAppend(
R"("connection_details":[{},{}],"scan_details":{})",
R"("is_partition_table_scan":{},"connection_details":[{},{}],"scan_details":{})",
is_partition_table_scan,
local_table_scan_detail.toJson(),
remote_table_scan_detail.toJson(),
scan_details);
Expand Down Expand Up @@ -233,5 +234,6 @@ void TableScanStatistics::collectExtraRuntimeDetail()

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
: TableScanStatisticsBase(executor, dag_context_)
, is_partition_table_scan(executor->has_partition_table_scan())
{}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Statistics/TableScanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,7 @@ class TableScanStatistics : public TableScanStatisticsBase
private:
void updateTableScanDetail(const std::vector<ConnectionProfileInfo> & connection_profile_infos);
void updateTableScanDetailForDisaggIfNecessary(const IProfilingBlockInputStream * stream);

bool is_partition_table_scan = false;
};
} // namespace DB
55 changes: 47 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,25 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
return updated_segments;
}

namespace
{
constexpr double slow_split_ingest_log_threshold_seconds = 10.0;

struct SplitIngestLogContext
{
const Stopwatch & watch;

bool isSlow() const { return watch.elapsedSeconds() > slow_split_ingest_log_threshold_seconds; }

double elapsedSeconds() const { return watch.elapsedSeconds(); }

Poco::Message::Priority debugOrInfoLevel() const
{
return isSlow() ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
}
};
} // namespace

/**
* Accept a target ingest range and a vector of DTFiles, ingest these DTFiles (clipped by the target ingest range)
* using logical split.
Expand All @@ -245,6 +264,9 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
const DMFiles & files,
bool clear_data_in_range)
{
Stopwatch watch;
SplitIngestLogContext split_ingest_log{watch};

{
RUNTIME_CHECK(files.size() == external_files.size(), files.size(), external_files.size());
for (size_t i = 0; i < files.size(); ++i)
Expand Down Expand Up @@ -331,9 +353,11 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
* ↑ The segment we ingest DMFile into
*/

LOG_DEBUG(
LOG_IMPL(
log,
"Table ingest using split - split ingest phase - begin, ingest_range={}, files_n={}",
split_ingest_log.debugOrInfoLevel(),
"Table ingest using split - split ingest phase - begin, elapsed_seconds={:.3f} ingest_range={} files_n={}",
split_ingest_log.elapsedSeconds(),
ingest_range.toDebugString(),
files.size());

Expand Down Expand Up @@ -389,8 +413,10 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(

LOG_INFO(
log,
"Table ingest using split - split ingest phase - Try to ingest file into segment, file_idx={} "
"Table ingest using split - split ingest phase - Try to ingest file into segment, "
"elapsed_seconds={:.3f} file_idx={} "
"file_id=dmf_{} file_ingest_range={} segment={} segment_ingest_range={}",
split_ingest_log.elapsedSeconds(),
file_idx,
files[file_idx]->fileId(),
file_ingest_range.toDebugString(),
Expand All @@ -412,15 +438,17 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
}
else
{
// this segment is abandoned, or may be split into multiples.
// this segment is abandoned, or may be split into multiples, or running segmentMerge/segmentMergeDelta, etc.
// retry with current range and file and find segment again.
}
}
}

LOG_DEBUG(
LOG_IMPL(
log,
"Table ingest using split - split ingest phase - finished, updated_segments_n={}",
split_ingest_log.debugOrInfoLevel(),
"Table ingest using split - split ingest phase - finished, elapsed_seconds={:.3f} updated_segments_n={}",
split_ingest_log.elapsedSeconds(),
updated_segments.size());

return std::vector<SegmentPtr>(updated_segments.begin(), updated_segments.end());
Expand Down Expand Up @@ -787,10 +815,21 @@ UInt64 DeltaMergeStore::ingestFiles(
if (has_segments || !external_files.empty())
{
if (use_split_replace)
updated_segments
= ingestDTFilesUsingSplit(dm_context, range, external_files, files, clear_data_in_range);
{
// For large files, we use split+replace to ingest the files into stable layer directly.
// Check the `ingestDTFilesUsingSplit` for the details steps.
updated_segments = ingestDTFilesUsingSplit( //
dm_context,
range,
external_files,
files,
clear_data_in_range);
}
else
{
// For small files, we ingest them into the delta layer directly, which is more efficient than split+replace.
updated_segments = ingestDTFilesUsingColumnFile(dm_context, range, files, clear_data_in_range);
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ SegmentPair DeltaMergeStore::segmentSplit(

LOG_INFO(
log,
"Split - {} - Finish, segment is split into two, old_segment={} new_left={} new_right={}",
"Split - {} - Finish, segment is split into two, reason={} "
", old_segment={} new_left={} new_right={}",
split_info.is_logical ? "SplitLogical" : "SplitPhysical",
magic_enum::enum_name(reason),
segment->info(),
new_left->info(),
new_right->info());
Expand Down Expand Up @@ -331,6 +333,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(
dm_context.min_version,
Segment::simpleInfo(ordered_segments));

// keep "for_update=true" snapshot for all related segments
std::vector<SegmentSnapshotPtr> ordered_snapshots;
ordered_snapshots.reserve(ordered_segments.size());
ColumnDefinesPtr schema_snap;
Expand Down Expand Up @@ -775,7 +778,7 @@ void DeltaMergeStore::segmentEnsureStableLocalIndex(
DMFile::info(index_build_info.dm_files));

// 3. Update the meta version of the segments to the latest one.
// To avoid logical split between step 2 and 3, get lastest segments to update again.
// To avoid logical split between step 2 and 3, get latest segments to update again.
// If TiFlash crashes during updating the meta version, some segments' meta are updated and some are not.
// So after TiFlash restarts, we will update meta versions to latest versions again.
{
Expand Down Expand Up @@ -1307,7 +1310,9 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

LOG_INFO(
log,
"MergeDelta - Finish, delta is merged, old_segment={} new_segment={}",
"MergeDelta - Finish, delta is merged, reason={} "
"old_segment={} new_segment={}",
magic_enum::enum_name(reason),
segment->info(),
new_segment->info());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class DMFile : private boost::noncopyable
const DMFileMeta::PackProperties & getPackProperties() const { return meta->pack_properties; }
const ColumnStats & getColumnStats() const { return meta->column_stats; }
const std::unordered_set<ColId> & getColumnIndices() const { return meta->column_indices; }
size_t getNumColumns() const { return meta->column_stats.size(); }

// only used in gtest
void clearPackProperties() const { meta->pack_properties.clear_property(); }
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2555,7 +2555,8 @@ String Segment::info() const
"<segment_id={} epoch={} range={}{} next_segment_id={} "
"delta_rows={} delta_bytes={} delta_deletes={} "
"stable_file={} stable_rows={} stable_bytes={} "
"dmf_rows={} dmf_bytes={} dmf_packs={}>",
"stable_cols={} "
"dmf_rows={} dmf_bytes={} dmf_disk_bytes={} dmf_packs={}>",
segment_id,
epoch,
rowkey_range.toDebugString(),
Expand All @@ -2569,9 +2570,11 @@ String Segment::info() const
stable->getDMFilesString(),
stable->getRows(),
stable->getBytes(),
stable->getDMFilesNumColumns(),

stable->getDMFilesRows(),
stable->getDMFilesBytes(),
stable->getDMFilesBytesOnDisk(),
stable->getDMFilesPacks());
}

Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ size_t StableValueSpace::getDMFilesBytes() const
return bytes;
}

size_t StableValueSpace::getDMFilesNumColumns() const
{
size_t num_columns = 0;
for (const auto & file : files)
num_columns = std::max(num_columns, file->getNumColumns());
return num_columns;
}

String StableValueSpace::getDMFilesString()
{
return DMFile::info(files);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

String getDMFilesString();

/**
* Return the number of columns of the underlying DTFiles.
*/
size_t getDMFilesNumColumns() const;

/**
* Return the total on-disk size of the underlying DTFiles.
* DTFiles are not fully included in the segment range will be also counted in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// FIXME: Disabled because the flaky test result errors in CI.
// And we don't plan to support enable FAP in short time.
#if 0
#include <Debug/MockKVStore/MockRaftStoreProxy.h>
#include <Debug/TiFlashTestEnv.h>
#include <Interpreters/SharedContexts/Disagg.h>
Expand Down Expand Up @@ -1206,3 +1209,4 @@ CATCH

} // namespace tests
} // namespace DB
#endif
13 changes: 9 additions & 4 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,11 @@ void downloadToLocal(
wbuf.sync();
}

void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter)
void FileCache::downloadImpl(
const String & s3_key,
FileSegmentPtr & file_seg,
const WriteLimiterPtr & write_limiter,
DownloadType download_type)
{
Stopwatch sw;
auto client = S3::ClientFactory::instance().sharedTiFlashClient();
Expand Down Expand Up @@ -1368,7 +1372,8 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c
file_seg->setComplete(content_length);
LOG_INFO(
log,
"Download success, s3_key={} local={} size={} cost={}ms",
"Download success, type={} s3_key={} local={} size={} cost={}ms",
magic_enum::enum_name(download_type),
s3_key,
local_fname,
content_length,
Expand All @@ -1393,7 +1398,7 @@ void FileCache::bgDownloadExecutor(
SYNC_FOR("before_FileCache::bgDownloadExecutor_fail_point");
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::file_cache_bg_download_fail);
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download).Increment();
downloadImpl(s3_key, file_seg, write_limiter);
downloadImpl(s3_key, file_seg, write_limiter, DownloadType::Background);
}
catch (...)
{
Expand Down Expand Up @@ -1481,7 +1486,7 @@ void FileCache::fgDownload(const String & s3_key, FileSegmentPtr & file_seg)
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::file_cache_fg_download_fail);
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download).Increment();
// not limit write speed for foreground download now
downloadImpl(s3_key, file_seg, nullptr);
downloadImpl(s3_key, file_seg, nullptr, DownloadType::Foreground);
}
catch (...)
{
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Storages/S3/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,18 @@ class FileCache
Int64 running_limit);
void finishBgDownload(const String & s3_key, Int64 running_limit);
void cleanupFailedDownload(const String & s3_key, FileSegmentPtr & file_seg);
void downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter);

enum class DownloadType
{
Foreground,
Background,
};

void downloadImpl(
const String & s3_key,
FileSegmentPtr & file_seg,
const WriteLimiterPtr & write_limiter,
DownloadType download_type);

static String toTemporaryFilename(const String & fname);
static bool isTemporaryFilename(const String & fname);
Expand Down
8 changes: 4 additions & 4 deletions tests/docker/next-gen-columnar-yaml/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
# Use entrypoint instead, create bucket "tiflash-test" on minio at startup
entrypoint: sh -c 'mkdir -p /data/tiflash-test && minio server /data --console-address ":9001" > /log/minio.log 2>&1'
pd0:
image: ${PD_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-next-gen}
image: ${PD_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -35,7 +35,7 @@ services:
command: --name=pd0 --client-urls=http://0.0.0.0:2379 --peer-urls=http://0.0.0.0:2380 --advertise-client-urls=http://pd0:2379 --advertise-peer-urls=http://pd0:2380 --initial-cluster=pd0=http://pd0:2380 --config=/pd.toml --data-dir=/data --log-file=/log/pd.log
restart: on-failure
tikv0:
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-next-gen}
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -48,7 +48,7 @@ services:
- "minio0"
restart: on-failure
tikv-worker0:
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-next-gen}
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -61,7 +61,7 @@ services:
- "pd0"
restart: on-failure
tidb0:
image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-next-gen}
image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand Down
8 changes: 4 additions & 4 deletions tests/docker/next-gen-utils/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# limitations under the License.

DOCKER ?= docker
TIKV_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-next-gen
PD_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-next-gen
TIDB_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-next-gen
TIFLASH_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tiflash/image:master-next-gen
TIKV_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-nextgen
PD_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-nextgen
TIDB_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-nextgen
TIFLASH_IMAGE ?= us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tiflash/image:master-nextgen

# Set PULL to 1 to always pull the latest image
PULL ?= 0
Expand Down
8 changes: 4 additions & 4 deletions tests/docker/next-gen-yaml/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
# Use entrypoint instead, create bucket "tiflash-test" on minio at startup
entrypoint: sh -c 'mkdir -p /data/tiflash-test && minio server /data --console-address ":9001" > /log/minio.log 2>&1'
pd0:
image: ${PD_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-next-gen}
image: ${PD_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/pd/image:master-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -35,7 +35,7 @@ services:
command: --name=pd0 --client-urls=http://0.0.0.0:2379 --peer-urls=http://0.0.0.0:2380 --advertise-client-urls=http://pd0:2379 --advertise-peer-urls=http://pd0:2380 --initial-cluster=pd0=http://pd0:2380 --config=/pd.toml --data-dir=/data --log-file=/log/pd.log
restart: on-failure
tikv0:
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-next-gen}
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -48,7 +48,7 @@ services:
- "minio0"
restart: on-failure
tikv-worker0:
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-next-gen}
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/tikv/tikv/image:cloud-engine-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand All @@ -61,7 +61,7 @@ services:
- "pd0"
restart: on-failure
tidb0:
image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-next-gen}
image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/tidbx/pingcap/tidb/images/tidb-server:master-nextgen}
security_opt:
- seccomp:unconfined
volumes:
Expand Down
Loading