Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 139 additions & 32 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@
#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <etcd/rpc.pb.h>

#include <algorithm>
#include <cctype>
#include <magic_enum.hpp>

namespace DB
{
namespace
{
KeyspaceID resolveResourceGroupKeyspace(const resource_manager::ResourceGroup & group_pb)
{
return group_pb.has_keyspace_id() ? group_pb.keyspace_id().value() : NullspaceID;
}
} // namespace

void ResourceGroup::initStaticTokenBucket(int64_t capacity)
{
std::lock_guard lock(mu);
Expand Down Expand Up @@ -300,12 +310,21 @@ void LocalAdmissionController::warmupResourceGroupInfoCache(const KeyspaceID & k
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(keyspace_id, name);
if (group != nullptr)
return;
{
std::lock_guard lock(mu);
if (findResourceGroupWithCompatWithoutLock(keyspace_id, name) != nullptr)
return;
}

resource_manager::GetResourceGroupRequest req;
req.mutable_keyspace_id()->set_value(keyspace_id);
{
std::lock_guard lock(mu);
// Until the backend is identified, try the keyspace-aware request shape first. pd-cse
// ignores the field and still returns the shared legacy definition, while new PD returns
// a keyspace-scoped group with keyspace_id set.
if (with_keyspace && resource_group_backend_mode != ResourceGroupBackendMode::LegacyGlobal)
req.mutable_keyspace_id()->set_value(keyspace_id);
}
req.set_resource_group_name(name);
resource_manager::GetResourceGroupResponse resp;

Expand All @@ -323,16 +342,24 @@ void LocalAdmissionController::warmupResourceGroupInfoCache(const KeyspaceID & k
getCurrentExceptionMessage(false));
}

RUNTIME_CHECK_MSG(
!resp.has_error(),
"warmupResourceGroupInfoCache: {}(keyspace={}) failed: {}",
name,
keyspace_id,
resp.error().message());
if (resp.has_error())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, isn't keyspace support a cluster-level configuration for PD? If a cluster's PD doesn't support keyspace, it likely won't support it for a long time. If that's the case, do we really need to handle the fallback at the RPC level?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it means the pd dose not support keyspace scope resource group (not the keyspace itself).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the "starter" tier, currently the pd-cse is running with keyspace support but without keyspace scope resource group. And we don't actually know when pd-cse will be upgraded to pd. But we plan to upgrade the tiflash columnar binary on the "starter" tier to unify the codebase in late July, so tiflash need to adapt to the pd-cse.

RUNTIME_CHECK_MSG(
false,
"warmupResourceGroupInfoCache: {}(keyspace={}) failed: {}",
name,
keyspace_id,
resp.error().message());

checkGACRespValid(resp.group());

addResourceGroup(keyspace_id, resp.group());
const auto detected_mode = with_keyspace ? detectBackendMode(resp.group()) : ResourceGroupBackendMode::LegacyGlobal;
// pd-cse returns a group without keyspace_id and keeps using the shared legacy namespace.
// New PD returns the concrete keyspace-scoped definition. TiFlash only needs this one bit to
// choose its cache key shape and which watch prefix to keep following afterwards.
updateBackendMode(detected_mode);
const auto resolved_keyspace_id = detected_mode == ResourceGroupBackendMode::LegacyGlobal
? NullspaceID
: resolveResourceGroupKeyspace(resp.group());
addResourceGroup(resolved_keyspace_id, resp.group());
Comment thread
yongman marked this conversation as resolved.
Comment thread
yongman marked this conversation as resolved.
}

void LocalAdmissionController::mainLoop()
Expand Down Expand Up @@ -470,7 +497,10 @@ std::optional<resource_manager::TokenBucketsRequest> LocalAdmissionController::b
for (const auto & info : request_infos)
{
auto * group_request = gac_req.add_requests();
group_request->mutable_keyspace_id()->set_value(info.keyspace_id);
// Nullspace/legacy requests were encoded without keyspace_id. Preserve that wire format so
// the request can still be understood by older PD/GAC deployments.
if (info.keyspace_id != NullspaceID)
group_request->mutable_keyspace_id()->set_value(info.keyspace_id);
group_request->set_resource_group_name(info.resource_group_name);
assert(info.acquire_tokens > 0.0 || info.ru_consumption_delta > 0.0 || is_final_report);
if (info.acquire_tokens > 0.0 || is_final_report)
Expand Down Expand Up @@ -535,7 +565,8 @@ static std::vector<std::pair<KeyspaceID, std::string>> extractGACReqNames(
std::vector<std::pair<KeyspaceID, std::string>> res;
res.reserve(gac_req.requests_size());
for (const auto & req : gac_req.requests())
res.push_back({req.keyspace_id().value(), req.resource_group_name()});
// Must mirror buildGACRequest(): a missing field means the logical key is NullspaceID.
res.push_back({req.has_keyspace_id() ? req.keyspace_id().value() : NullspaceID, req.resource_group_name()});
return res;
}

Expand Down Expand Up @@ -643,7 +674,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
continue;
}

handled_resource_group_names.emplace_back(keyspace_id, name);
handled_resource_group_names.emplace_back(resource_group->getKeyspaceID(), name);

const String err_msg = fmt::format("handle acquire token resp failed: rg: {}(keyspace={})", name, keyspace_id);
// It's possible for one_resp.granted_r_u_tokens() to be empty
Expand Down Expand Up @@ -719,7 +750,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
// https://github.com/tikv/pd/blob/e9757fbe03260775262763c67f62296fcb26b3c2/pkg/mcs/resourcemanager/server/token_buckets.go#L47
int64_t capacity = granted_token_bucket.granted_tokens().settings().burst_limit();

const auto name_with_keyspace_id = getResourceGroupMetricName(keyspace_id, name);
const auto name_with_keyspace_id = getResourceGroupMetricName(resource_group->getKeyspaceID(), name);
if (added_tokens > 0)
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_gac_resp_tokens, name_with_keyspace_id)
.Set(added_tokens);
Expand Down Expand Up @@ -757,11 +788,22 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle

void LocalAdmissionController::watchGACLoop(const std::string & etcd_path)
{
auto grpc_context = std::make_unique<grpc::ClientContext>();
{
std::lock_guard lock(mu);
active_watch_gac_grpc_context = grpc_context.get();
}
SCOPE_EXIT({
std::lock_guard lock(mu);
if (active_watch_gac_grpc_context == grpc_context.get())
active_watch_gac_grpc_context = nullptr;
});

while (!stopped.load())
{
try
{
doWatch(etcd_path);
doWatch(etcd_path, grpc_context.get());
}
catch (...)
{
Expand All @@ -782,15 +824,17 @@ void LocalAdmissionController::watchGACLoop(const std::string & etcd_path)
}))
return;

// Create new grpc_context for each reader/writer.
watch_gac_grpc_context = std::make_unique<grpc::ClientContext>();
if (active_watch_gac_grpc_context == grpc_context.get())
active_watch_gac_grpc_context = nullptr;
grpc_context = std::make_unique<grpc::ClientContext>();
active_watch_gac_grpc_context = grpc_context.get();
}
}
}

void LocalAdmissionController::doWatch(const std::string & etcd_path)
void LocalAdmissionController::doWatch(const std::string & etcd_path, grpc::ClientContext * grpc_context)
{
auto stream = etcd_client->watch(watch_gac_grpc_context.get());
auto stream = etcd_client->watch(grpc_context);
auto watch_req = setupWatchReq(etcd_path);
LOG_DEBUG(log, "watchGAC req: {}", watch_req.ShortDebugString());
const bool write_ok = stream->Write(watch_req);
Expand Down Expand Up @@ -890,19 +934,22 @@ bool LocalAdmissionController::handlePutEvent(
auto rg = findResourceGroupWithoutLock(keyspace_id, name);
if (rg == nullptr)
{
// It happens when query of this resource group has not came.
// Ignore updates for groups that have never been warmed into the local cache yet.
LOG_DEBUG(
log,
"trying to modify resource group config({}), but cannot find its info",
group_pb.ShortDebugString());
return true;
}
else
{
rg->resetResourceGroup(group_pb);
updateMaxRUPerSecAfterDeleteWithoutLock(rg->user_ru_per_sec);
}
const auto old_user_ru_per_sec = rg->user_ru_per_sec;
rg->resetResourceGroup(group_pb);
updateMaxRUPerSecAfterDeleteWithoutLock(old_user_ru_per_sec);
if (max_ru_per_sec < rg->user_ru_per_sec)
max_ru_per_sec = rg->user_ru_per_sec;
if (refill_token_callback)
refill_token_callback();
Comment thread
yongman marked this conversation as resolved.
}

LOG_INFO(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString());
return true;
}
Expand All @@ -914,8 +961,9 @@ bool LocalAdmissionController::parseResourceGroupNameFromWatchKey(
std::string & parsed_rg_name,
std::string & err_msg)
{
// Expect etcd_key: resource_group/settings/rg_name OR resource_group/settings/keyspace_id/rg_name
// etcd_key_prefix is resource_group/settings
// Expect:
// 1. resource_group/settings/<rg_name>
// 2. resource_group/keyspace/settings/<keyspace_id>/<rg_name>
if (etcd_key.length() <= etcd_key_prefix.length() + 1)
{
err_msg = fmt::format("expect etcd key: {}/resource_group_name, but got {}", etcd_key_prefix, etcd_key);
Expand Down Expand Up @@ -973,14 +1021,20 @@ void LocalAdmissionController::stop()
// So still need to lock.
{
std::lock_guard lock(mu);
watch_gac_grpc_context->TryCancel();
if (active_watch_gac_grpc_context != nullptr)
active_watch_gac_grpc_context->TryCancel();
cv.notify_all();
}
{
std::lock_guard lock(gac_requests_mu);
gac_requests_cv.notify_all();
}
for (auto & thread : background_threads)
std::vector<std::thread> threads_to_join;
{
std::lock_guard lock(mu);
threads_to_join = std::move(background_threads);
}
for (auto & thread : threads_to_join)
{
if (thread.joinable())
thread.join();
Expand Down Expand Up @@ -1027,14 +1081,67 @@ std::unique_ptr<LocalAdmissionController> LocalAdmissionController::global_insta
const std::string LocalAdmissionController::GAC_RESOURCE_GROUP_ETCD_PATH = "resource_group/settings";
const std::string LocalAdmissionController::GAC_KEYSPACE_RESOURCE_GROUP_ETCD_PATH = "resource_group/keyspace/settings";

ResourceGroupBackendMode LocalAdmissionController::detectBackendMode(const resource_manager::ResourceGroup & group_pb)
{
return group_pb.has_keyspace_id() ? ResourceGroupBackendMode::KeyspaceScoped
: ResourceGroupBackendMode::LegacyGlobal;
}

const std::string & LocalAdmissionController::getWatchEtcdPath(ResourceGroupBackendMode mode)
{
switch (mode)
{
case ResourceGroupBackendMode::LegacyGlobal:
return GAC_RESOURCE_GROUP_ETCD_PATH;
case ResourceGroupBackendMode::KeyspaceScoped:
return GAC_KEYSPACE_RESOURCE_GROUP_ETCD_PATH;
case ResourceGroupBackendMode::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "watch path is unavailable for unknown backend mode");
}

throw Exception(ErrorCodes::LOGICAL_ERROR, "unexpected backend mode {}", magic_enum::enum_name(mode));
}

void LocalAdmissionController::updateBackendMode(ResourceGroupBackendMode detected_mode)
{
std::lock_guard lock(mu);
if (detected_mode == ResourceGroupBackendMode::Unknown)
return;

if (resource_group_backend_mode == detected_mode)
return;

if (resource_group_backend_mode != ResourceGroupBackendMode::Unknown)
{
LOG_WARNING(
log,
"keep resource-group backend mode {} and ignore newly detected mode {}",
magic_enum::enum_name(resource_group_backend_mode),
magic_enum::enum_name(detected_mode));
return;
}
Comment on lines +1114 to +1122

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we update tiflash-compute columnar version and make it adapt to the existing LegacyGlobal mode. After pd-cse is upgraded to pd in the future, do we need to restart the tiflash-compute node to migrate its mode access to pd to resource group requests with keyspace?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is by design. The tiflash reource group will keeps in legacy mode after pd upgrade to nextgen version. After pd upgrading done we restart tiflash to make keyspace scoped resource group take effects.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


resource_group_backend_mode = detected_mode;
LOG_INFO(log, "resource-group backend mode resolved to {}", magic_enum::enum_name(resource_group_backend_mode));

if (start_background_threads && !watch_thread_started && !stopped.load())
{
// Only keep one watch path alive. After the backend mode is known there is no need to
// continue listening on both legacy and keyspace prefixes at the same time.
const auto etcd_path = getWatchEtcdPath(resource_group_backend_mode);
background_threads.emplace_back([this, etcd_path] { this->watchGACLoop(etcd_path); });
watch_thread_started = true;
}
}

bool LocalAdmissionController::parseKeyspaceEtcdKey(
const std::string & etcd_key_prefix,
const std::string & etcd_key,
KeyspaceID & parsed_keyspace_id,
std::string & parsed_rg_name,
std::string & err_msg)
{
// resource_group/settings/keyspace_id/rg_name -> keyspace_id/rg_name
// resource_group/keyspace/settings/<keyspace_id>/<rg_name> -> <keyspace_id>/<rg_name>
auto tmp_str = std::string(etcd_key.begin() + etcd_key_prefix.length() + 1, etcd_key.end());
size_t slash_pos = tmp_str.find('/');
if (slash_pos == std::string::npos)
Expand Down
Loading