Summary: 1. LogAndNotifyTableFileDeletion checks for null event logger like other functions 2. LogAndNotifyBlobFileCreationFinished and LogAndNotifyTablebFileCreationFinished log on success similar to deletions 3. LogAndNotify functions log status on success ## Verification Ran the code on [kvrocks](https://github.com/apache/kvrocks/tree/unstable) which implements event hooks, and the logging is now observable / consistent. ``` 2025/06/05-10:00:49.644611 92065 EVENT_LOG_v1 {"time_micros": 1749132049644595, "cf_name": "metadata", "job": 5, "event": "blob_file_creation", "file_number": 34, "total_blob_count": 68, "total_blob_bytes": 272018457, "file_checksum": "", "file_checksum_func_name": "Unknown", "status": "OK"} ``` ``` 2025/06/02-09:42:29.343893 122068 EVENT_LOG_v1 {"time_micros": 1748871749343853, "cf_name": "metadata", "job": 93, "event": "table_file_creation", "file_number": 853, "file_size": 0, "file_checksum": "", "file_checksum_func_name": "Unknown", "smallest_seqno": 23371, "largest_seqno": 24182, "table_properties": {"data_size": 0, "index_size": 0, "index_partitions": 0, "top_level_index_size": 0, "index_key_is_user_key": 0, "index_value_is_delta_encoded": 0, "filter_size": 0, "raw_key_size": 0, "raw_average_key_size": 0, "raw_value_size": 0, "raw_average_value_size": 0, "num_data_blocks": 0, "num_entries": 0, "num_filter_entries": 0, "num_deletions": 0, "num_merge_operands": 0, "num_range_deletions": 0, "format_version": 0, "fixed_key_len": 0, "filter_policy": "", "column_family_name": "", "column_family_id": 2147483647, "comparator": "", "user_defined_timestamps_persisted": 1, "key_largest_seqno": 18446744073709551615, "merge_operator": "", "prefix_extractor_name": "", "property_collectors": "", "compression": "", "compression_options": "", "creation_time": 0, "oldest_key_time": 0, "newest_key_time": 0, "file_creation_time": 0, "slow_compression_estimated_data_size": 0, "fast_compression_estimated_data_size": 0, "db_id": "", "db_session_id": "", "orig_file_number": 0, "seqno_to_time_mapping": "N/A"}, "oldest_blob_file_number": 821, "status": "Shutdown in progress: Database shutdown"} ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/13670 Reviewed By: jaykorean Differential Revision: D76173710 Pulled By: hx235 fbshipit-source-id: 1f81623c1edade0c122bd0e73391a1b76abc13d9
349 lines
13 KiB
C++
349 lines
13 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/event_helpers.h"
|
|
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/utilities/customizable_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
Status EventListener::CreateFromString(const ConfigOptions& config_options,
|
|
const std::string& id,
|
|
std::shared_ptr<EventListener>* result) {
|
|
return LoadSharedObject<EventListener>(config_options, id, result);
|
|
}
|
|
|
|
namespace {
|
|
template <class T>
|
|
inline T SafeDivide(T a, T b) {
|
|
return b == 0 ? 0 : a / b;
|
|
}
|
|
} // anonymous namespace
|
|
|
|
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
|
|
*jwriter << "time_micros"
|
|
<< std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch())
|
|
.count();
|
|
}
|
|
|
|
void EventHelpers::NotifyTableFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, TableFileCreationReason reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationBriefInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.job_id = job_id;
|
|
info.reason = reason;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyOnBackgroundError(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
|
|
bool* auto_recovery) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
for (auto& listener : listeners) {
|
|
listener->OnBackgroundError(reason, bg_error);
|
|
bg_error->PermitUncheckedError();
|
|
if (*auto_recovery) {
|
|
listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
|
|
}
|
|
}
|
|
db_mutex->Lock();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, const FileDescriptor& fd,
|
|
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
|
|
TableFileCreationReason reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name) {
|
|
if (!event_logger && listeners.empty()) {
|
|
s.PermitUncheckedError();
|
|
return;
|
|
}
|
|
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "table_file_creation" << "file_number" << fd.GetNumber()
|
|
<< "file_size" << fd.GetFileSize() << "file_checksum"
|
|
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
|
|
<< "largest_seqno" << fd.largest_seqno;
|
|
|
|
// table_properties
|
|
{
|
|
jwriter << "table_properties";
|
|
jwriter.StartObject();
|
|
|
|
// basic properties:
|
|
jwriter << "data_size" << table_properties.data_size << "index_size"
|
|
<< table_properties.index_size << "index_partitions"
|
|
<< table_properties.index_partitions << "top_level_index_size"
|
|
<< table_properties.top_level_index_size
|
|
<< "index_key_is_user_key"
|
|
<< table_properties.index_key_is_user_key
|
|
<< "index_value_is_delta_encoded"
|
|
<< table_properties.index_value_is_delta_encoded << "filter_size"
|
|
<< table_properties.filter_size << "raw_key_size"
|
|
<< table_properties.raw_key_size << "raw_average_key_size"
|
|
<< SafeDivide(table_properties.raw_key_size,
|
|
table_properties.num_entries)
|
|
<< "raw_value_size" << table_properties.raw_value_size
|
|
<< "raw_average_value_size"
|
|
<< SafeDivide(table_properties.raw_value_size,
|
|
table_properties.num_entries)
|
|
<< "num_data_blocks" << table_properties.num_data_blocks
|
|
<< "num_entries" << table_properties.num_entries
|
|
<< "num_filter_entries" << table_properties.num_filter_entries
|
|
<< "num_deletions" << table_properties.num_deletions
|
|
<< "num_merge_operands" << table_properties.num_merge_operands
|
|
<< "num_range_deletions" << table_properties.num_range_deletions
|
|
<< "format_version" << table_properties.format_version
|
|
<< "fixed_key_len" << table_properties.fixed_key_len
|
|
<< "filter_policy" << table_properties.filter_policy_name
|
|
<< "column_family_name" << table_properties.column_family_name
|
|
<< "column_family_id" << table_properties.column_family_id
|
|
<< "comparator" << table_properties.comparator_name
|
|
<< "user_defined_timestamps_persisted"
|
|
<< table_properties.user_defined_timestamps_persisted
|
|
<< "key_largest_seqno" << table_properties.key_largest_seqno
|
|
<< "merge_operator" << table_properties.merge_operator_name
|
|
<< "prefix_extractor_name"
|
|
<< table_properties.prefix_extractor_name << "property_collectors"
|
|
<< table_properties.property_collectors_names << "compression"
|
|
<< table_properties.compression_name << "compression_options"
|
|
<< table_properties.compression_options << "creation_time"
|
|
<< table_properties.creation_time << "oldest_key_time"
|
|
<< table_properties.newest_key_time << "newest_key_time"
|
|
<< table_properties.oldest_key_time << "file_creation_time"
|
|
<< table_properties.file_creation_time
|
|
<< "slow_compression_estimated_data_size"
|
|
<< table_properties.slow_compression_estimated_data_size
|
|
<< "fast_compression_estimated_data_size"
|
|
<< table_properties.fast_compression_estimated_data_size
|
|
<< "db_id" << table_properties.db_id << "db_session_id"
|
|
<< table_properties.db_session_id << "orig_file_number"
|
|
<< table_properties.orig_file_number << "seqno_to_time_mapping";
|
|
|
|
if (table_properties.seqno_to_time_mapping.empty()) {
|
|
jwriter << "N/A";
|
|
} else {
|
|
SeqnoToTimeMapping tmp;
|
|
Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
|
|
if (status.ok()) {
|
|
jwriter << tmp.ToHumanString();
|
|
} else {
|
|
jwriter << "Invalid";
|
|
}
|
|
}
|
|
|
|
// user collected properties
|
|
for (const auto& prop : table_properties.readable_properties) {
|
|
jwriter << prop.first << prop.second;
|
|
}
|
|
jwriter.EndObject();
|
|
}
|
|
|
|
if (oldest_blob_file_number != kInvalidBlobFileNumber) {
|
|
jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
|
|
}
|
|
|
|
jwriter << "status" << s.ToString();
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.file_size = fd.file_size;
|
|
info.job_id = job_id;
|
|
info.table_properties = table_properties;
|
|
info.reason = reason;
|
|
info.status = s;
|
|
info.file_checksum = file_checksum;
|
|
info.file_checksum_func_name = file_checksum_func_name;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileDeletion(
|
|
EventLogger* event_logger, int job_id, uint64_t file_number,
|
|
const std::string& file_path, const Status& status,
|
|
const std::string& dbname,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners) {
|
|
if (!event_logger && listeners.empty()) {
|
|
status.PermitUncheckedError();
|
|
return;
|
|
}
|
|
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event" << "table_file_deletion"
|
|
<< "file_number" << file_number << "status" << status.ToString();
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileDeletionInfo info;
|
|
info.db_name = dbname;
|
|
info.job_id = job_id;
|
|
info.file_path = file_path;
|
|
info.status = status;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::NotifyOnErrorRecoveryEnd(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const Status& old_bg_error, const Status& new_bg_error,
|
|
InstrumentedMutex* db_mutex) {
|
|
if (!listeners.empty()) {
|
|
db_mutex->AssertHeld();
|
|
// Make copies before releasing mutex to avoid race.
|
|
Status old_bg_error_cp = old_bg_error;
|
|
Status new_bg_error_cp = new_bg_error;
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
|
|
for (auto& listener : listeners) {
|
|
BackgroundErrorRecoveryInfo info;
|
|
info.old_bg_error = old_bg_error_cp;
|
|
info.new_bg_error = new_bg_error_cp;
|
|
listener->OnErrorRecoveryCompleted(old_bg_error_cp);
|
|
listener->OnErrorRecoveryEnd(info);
|
|
info.old_bg_error.PermitUncheckedError();
|
|
info.new_bg_error.PermitUncheckedError();
|
|
}
|
|
db_mutex->Lock();
|
|
} else {
|
|
old_bg_error.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyBlobFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id,
|
|
BlobFileCreationReason creation_reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, uint64_t file_number,
|
|
BlobFileCreationReason creation_reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name, uint64_t total_blob_count,
|
|
uint64_t total_blob_bytes) {
|
|
if (!event_logger && listeners.empty()) {
|
|
s.PermitUncheckedError();
|
|
return;
|
|
}
|
|
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "blob_file_creation" << "file_number" << file_number
|
|
<< "total_blob_count" << total_blob_count << "total_blob_bytes"
|
|
<< total_blob_bytes << "file_checksum" << file_checksum
|
|
<< "file_checksum_func_name" << file_checksum_func_name << "status"
|
|
<< s.ToString();
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason, total_blob_count, total_blob_bytes,
|
|
s, file_checksum, file_checksum_func_name);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileDeletion(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
|
|
uint64_t file_number, const std::string& file_path, const Status& status,
|
|
const std::string& dbname) {
|
|
if (!event_logger && listeners.empty()) {
|
|
status.PermitUncheckedError();
|
|
return;
|
|
}
|
|
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event" << "blob_file_deletion"
|
|
<< "file_number" << file_number << "status" << status.ToString();
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileDeletionInfo info(dbname, file_path, job_id, status);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|