Summary: This PR does two things: 1. Adds a new table property `newest_key_time` 2. Uses this property to improve TTL and temperature change compaction. ### Context The current `creation_time` table property should really be named `oldest_ancestor_time`. For flush output files, this is the oldest key time in the file. For compaction output files, this is the minimum among all oldest key times in the input files. The problem with using the oldest ancestor time for TTL compaction is that we may end up dropping files earlier than we should. What we really want is the newest (i.e. "youngest") key time. Right now we take a roundabout way to estimate this value -- we take the value of the _oldest_ key time for the _next_ (newer) SST file. This is also why the current code has checks for `index >= 1`. Our new property `newest_key_time` is set to the file creation time during flushes, and the max over all input files for compactions. There were some additional smaller changes that I had to make for testing purposes: - Refactoring the mock table reader to support specifying my own table properties - Refactoring out a test utility method `GetLevelFileMetadatas` that would otherwise be copy/pasted in 3 places Credit to cbi42 for the problem explanation and proposed solution ### Testing - Added a dedicated unit test to my `newest_key_time` logic in isolation (i.e. are we populating the property on flush and compaction) - Updated the existing unit tests (for TTL/temperate change compaction), which were comprehensive enough to break when I first made my code changes. I removed the test setup code which set the file metadata `oldest_ancestor_time`, so we know we are actually only using the new table property instead. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13083 Reviewed By: cbi42 Differential Revision: D65298604 Pulled By: archang19 fbshipit-source-id: 898ef91b692ab33f5129a2a16b64ecadd4c32432
334 lines
12 KiB
C++
334 lines
12 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/event_helpers.h"
|
|
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/utilities/customizable_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
Status EventListener::CreateFromString(const ConfigOptions& config_options,
|
|
const std::string& id,
|
|
std::shared_ptr<EventListener>* result) {
|
|
return LoadSharedObject<EventListener>(config_options, id, result);
|
|
}
|
|
|
|
namespace {
|
|
template <class T>
|
|
inline T SafeDivide(T a, T b) {
|
|
return b == 0 ? 0 : a / b;
|
|
}
|
|
} // anonymous namespace
|
|
|
|
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
|
|
*jwriter << "time_micros"
|
|
<< std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch())
|
|
.count();
|
|
}
|
|
|
|
void EventHelpers::NotifyTableFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, TableFileCreationReason reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationBriefInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.job_id = job_id;
|
|
info.reason = reason;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyOnBackgroundError(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
|
|
bool* auto_recovery) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
for (auto& listener : listeners) {
|
|
listener->OnBackgroundError(reason, bg_error);
|
|
bg_error->PermitUncheckedError();
|
|
if (*auto_recovery) {
|
|
listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
|
|
}
|
|
}
|
|
db_mutex->Lock();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, const FileDescriptor& fd,
|
|
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
|
|
TableFileCreationReason reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "table_file_creation"
|
|
<< "file_number" << fd.GetNumber() << "file_size"
|
|
<< fd.GetFileSize() << "file_checksum"
|
|
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
|
|
<< "largest_seqno" << fd.largest_seqno;
|
|
|
|
// table_properties
|
|
{
|
|
jwriter << "table_properties";
|
|
jwriter.StartObject();
|
|
|
|
// basic properties:
|
|
jwriter << "data_size" << table_properties.data_size << "index_size"
|
|
<< table_properties.index_size << "index_partitions"
|
|
<< table_properties.index_partitions << "top_level_index_size"
|
|
<< table_properties.top_level_index_size
|
|
<< "index_key_is_user_key"
|
|
<< table_properties.index_key_is_user_key
|
|
<< "index_value_is_delta_encoded"
|
|
<< table_properties.index_value_is_delta_encoded << "filter_size"
|
|
<< table_properties.filter_size << "raw_key_size"
|
|
<< table_properties.raw_key_size << "raw_average_key_size"
|
|
<< SafeDivide(table_properties.raw_key_size,
|
|
table_properties.num_entries)
|
|
<< "raw_value_size" << table_properties.raw_value_size
|
|
<< "raw_average_value_size"
|
|
<< SafeDivide(table_properties.raw_value_size,
|
|
table_properties.num_entries)
|
|
<< "num_data_blocks" << table_properties.num_data_blocks
|
|
<< "num_entries" << table_properties.num_entries
|
|
<< "num_filter_entries" << table_properties.num_filter_entries
|
|
<< "num_deletions" << table_properties.num_deletions
|
|
<< "num_merge_operands" << table_properties.num_merge_operands
|
|
<< "num_range_deletions" << table_properties.num_range_deletions
|
|
<< "format_version" << table_properties.format_version
|
|
<< "fixed_key_len" << table_properties.fixed_key_len
|
|
<< "filter_policy" << table_properties.filter_policy_name
|
|
<< "column_family_name" << table_properties.column_family_name
|
|
<< "column_family_id" << table_properties.column_family_id
|
|
<< "comparator" << table_properties.comparator_name
|
|
<< "user_defined_timestamps_persisted"
|
|
<< table_properties.user_defined_timestamps_persisted
|
|
<< "key_largest_seqno" << table_properties.key_largest_seqno
|
|
<< "merge_operator" << table_properties.merge_operator_name
|
|
<< "prefix_extractor_name"
|
|
<< table_properties.prefix_extractor_name << "property_collectors"
|
|
<< table_properties.property_collectors_names << "compression"
|
|
<< table_properties.compression_name << "compression_options"
|
|
<< table_properties.compression_options << "creation_time"
|
|
<< table_properties.creation_time << "oldest_key_time"
|
|
<< table_properties.newest_key_time << "newest_key_time"
|
|
<< table_properties.oldest_key_time << "file_creation_time"
|
|
<< table_properties.file_creation_time
|
|
<< "slow_compression_estimated_data_size"
|
|
<< table_properties.slow_compression_estimated_data_size
|
|
<< "fast_compression_estimated_data_size"
|
|
<< table_properties.fast_compression_estimated_data_size
|
|
<< "db_id" << table_properties.db_id << "db_session_id"
|
|
<< table_properties.db_session_id << "orig_file_number"
|
|
<< table_properties.orig_file_number << "seqno_to_time_mapping";
|
|
|
|
if (table_properties.seqno_to_time_mapping.empty()) {
|
|
jwriter << "N/A";
|
|
} else {
|
|
SeqnoToTimeMapping tmp;
|
|
Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
|
|
if (status.ok()) {
|
|
jwriter << tmp.ToHumanString();
|
|
} else {
|
|
jwriter << "Invalid";
|
|
}
|
|
}
|
|
|
|
// user collected properties
|
|
for (const auto& prop : table_properties.readable_properties) {
|
|
jwriter << prop.first << prop.second;
|
|
}
|
|
jwriter.EndObject();
|
|
}
|
|
|
|
if (oldest_blob_file_number != kInvalidBlobFileNumber) {
|
|
jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.file_size = fd.file_size;
|
|
info.job_id = job_id;
|
|
info.table_properties = table_properties;
|
|
info.reason = reason;
|
|
info.status = s;
|
|
info.file_checksum = file_checksum;
|
|
info.file_checksum_func_name = file_checksum_func_name;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileDeletion(
|
|
EventLogger* event_logger, int job_id, uint64_t file_number,
|
|
const std::string& file_path, const Status& status,
|
|
const std::string& dbname,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "table_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileDeletionInfo info;
|
|
info.db_name = dbname;
|
|
info.job_id = job_id;
|
|
info.file_path = file_path;
|
|
info.status = status;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::NotifyOnErrorRecoveryEnd(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const Status& old_bg_error, const Status& new_bg_error,
|
|
InstrumentedMutex* db_mutex) {
|
|
if (!listeners.empty()) {
|
|
db_mutex->AssertHeld();
|
|
// Make copies before releasing mutex to avoid race.
|
|
Status old_bg_error_cp = old_bg_error;
|
|
Status new_bg_error_cp = new_bg_error;
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
|
|
for (auto& listener : listeners) {
|
|
BackgroundErrorRecoveryInfo info;
|
|
info.old_bg_error = old_bg_error_cp;
|
|
info.new_bg_error = new_bg_error_cp;
|
|
listener->OnErrorRecoveryCompleted(old_bg_error_cp);
|
|
listener->OnErrorRecoveryEnd(info);
|
|
info.old_bg_error.PermitUncheckedError();
|
|
info.new_bg_error.PermitUncheckedError();
|
|
}
|
|
db_mutex->Lock();
|
|
} else {
|
|
old_bg_error.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyBlobFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id,
|
|
BlobFileCreationReason creation_reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, uint64_t file_number,
|
|
BlobFileCreationReason creation_reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name, uint64_t total_blob_count,
|
|
uint64_t total_blob_bytes) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "blob_file_creation"
|
|
<< "file_number" << file_number << "total_blob_count"
|
|
<< total_blob_count << "total_blob_bytes" << total_blob_bytes
|
|
<< "file_checksum" << file_checksum << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "status" << s.ToString();
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason, total_blob_count, total_blob_bytes,
|
|
s, file_checksum, file_checksum_func_name);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileDeletion(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
|
|
uint64_t file_number, const std::string& file_path, const Status& status,
|
|
const std::string& dbname) {
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "blob_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileDeletionInfo info(dbname, file_path, job_id, status);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|