rocksdb/db/compaction/compaction_service_job.cc
Jay Huh d46d1fa105 Skip building remote compaction output file when not OK (#13183)
Summary:
During `FinishCompactionOutputFile()` if there's an IOError, we may end up having the output in memory, but table properties are not populated, because `outputs.UpdateTableProperties();` is called only when `s.ok()` is true.

However, during remote compaction result serialization, we always try to access the `table_properties` which may be null.  This was causing a segfault.

We can skip building the output files in the result completely if the status is not ok.

# Unit Test
New test added
```
./compaction_service_test --gtest_filter="*CompactionOutputFileIOError*"
```

Before the fix
```
Received signal 11 (Segmentation fault)
Invoking GDB for stack trace...
https://github.com/facebook/rocksdb/issues/4  0x00000000004708ed in rocksdb::TableProperties::TableProperties (this=0x7fae070fb4e8) at ./include/rocksdb/table_properties.h:212
212     struct TableProperties {
https://github.com/facebook/rocksdb/issues/5  0x00007fae0b195b9e in rocksdb::CompactionServiceOutputFile::CompactionServiceOutputFile (this=0x7fae070fb400, name=..., smallest=0, largest=0, _smallest_internal_key=..., _largest_internal_key=..., _oldest_ancester_time=1733335023, _file_creation_time=1733335026, _epoch_number=1, _file_checksum=..., _file_checksum_func_name=..., _paranoid_hash=0, _marked_for_compaction=false, _unique_id=..., _table_properties=...) at ./db/compaction/compaction_job.h:450
450             table_properties(_table_properties) {}
```

After the fix
```
[ RUN      ] CompactionServiceTest.CompactionOutputFileIOError
[       OK ] CompactionServiceTest.CompactionOutputFileIOError (4499 ms)
[----------] 1 test from CompactionServiceTest (4499 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran. (4499 ms total)
[  PASSED  ] 1 test.
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13183

Reviewed By: anand1976

Differential Revision: D66770876

Pulled By: jaykorean

fbshipit-source-id: 63df7c2786ce0353f38a93e493ae4e7b591f4ed9
2024-12-04 17:25:31 -08:00

879 lines
38 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_job.h"
#include "db/compaction/compaction_state.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/utilities/options_type.h"
namespace ROCKSDB_NAMESPACE {
class SubcompactionState;
CompactionServiceJobStatus
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
assert(db_options_.compaction_service);
const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;
const std::vector<CompactionInputFiles>& inputs =
*(compact_->compaction->inputs());
for (const auto& files_per_level : inputs) {
for (const auto& file : files_per_level.files) {
compaction_input.input_files.emplace_back(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin =
compaction_input.has_begin ? sub_compact->start->ToString() : "";
compaction_input.has_end = sub_compact->end.has_value();
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
compaction_input.options_file_number =
sub_compact->compaction->input_version()
->version_set()
->options_file_number();
TEST_SYNC_POINT_CALLBACK(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
&compaction_input);
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
std::ostringstream input_files_oss;
bool is_first_one = true;
for (const auto& file : compaction_input.input_files) {
input_files_oss << (is_first_one ? "" : ", ") << file;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(
dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact),
thread_pri_, compaction->compaction_reason(),
compaction->is_full_compaction(), compaction->is_manual_compaction(),
compaction->bottommost_level());
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction->column_family_data()->GetName().c_str(),
job_id_);
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
default:
assert(false); // unknown status
break;
}
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);
if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
if (compaction_status == CompactionServiceJobStatus::kFailure) {
if (s.ok()) {
if (compaction_result.status.ok()) {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (even though "
"the internal status is okay).");
} else {
// set the current sub compaction status with the status returned from
// remote
sub_compact->status = compaction_result.status;
}
} else {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (and no valid "
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
// CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
// read the result. Consider this as an installation failure
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;
std::ostringstream output_files_oss;
is_first_one = true;
for (const auto& file : compaction_result.output_files) {
output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());
// Installation Starts
for (const auto& file : compaction_result.output_files) {
uint64_t file_num = versions_->NewFileNumber();
auto src_file = compaction_result.output_path + "/" + file.file_name;
auto tgt_file = TableFileName(compaction->immutable_options()->cf_paths,
file_num, compaction->output_path_id());
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
FileMetaData meta;
uint64_t file_size;
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
meta.smallest.DecodeFrom(file.smallest_internal_key);
meta.largest.DecodeFrom(file.largest_internal_key);
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.epoch_number = file.epoch_number;
meta.file_checksum = file.file_checksum;
meta.file_checksum_func_name = file.file_checksum_func_name;
meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id;
auto cfd = compaction->column_family_data();
sub_compact->Current().AddOutput(std::move(meta),
cfd->internal_comparator(), false, true,
file.paranoid_hash);
sub_compact->Current().UpdateTableProperties(file.table_properties);
}
sub_compact->compaction_job_stats = compaction_result.stats;
sub_compact->Current().SetNumOutputRecords(
compaction_result.stats.num_output_records);
sub_compact->Current().SetNumOutputFiles(
compaction_result.stats.num_output_files);
sub_compact->Current().AddBytesWritten(compaction_result.bytes_written);
RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
return CompactionServiceJobStatus::kSuccess;
}
std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {
return MakeTableFileName(output_path_, file_number);
}
void CompactionServiceCompactionJob::RecordCompactionIOStats() {
compaction_result_->bytes_read += IOSTATS(bytes_read);
compaction_result_->bytes_written += IOSTATS(bytes_written);
CompactionJob::RecordCompactionIOStats();
}
void CompactionServiceCompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
compaction_job_stats_->elapsed_micros = stats.micros;
// output information only in remote compaction
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
compaction_job_stats_->num_output_records = stats.num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
VersionSet* versions, const std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result)
: CompactionJob(
job_id, compaction, db_options, mutable_db_options, file_options,
versions, shutting_down, log_buffer, nullptr, output_directory,
nullptr, stats, db_mutex, db_error_handler,
std::move(existing_snapshots), kMaxSequenceNumber, nullptr, nullptr,
std::move(table_cache), event_logger,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(std::move(output_path)),
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}
Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;
Slice end = compaction_input_.end;
compact_->sub_compact_states.emplace_back(
c,
compaction_input_.has_begin ? std::optional<Slice>(begin)
: std::optional<Slice>(),
compaction_input_.has_end ? std::optional<Slice>(end)
: std::optional<Slice>(),
/*sub_job_id*/ 0);
log_buffer_->FlushBufferToLog();
LogCompaction();
compaction_result_->stats.Reset();
const uint64_t start_micros = db_options_.clock->NowMicros();
c->GetOrInitInputTableProperties();
// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
ProcessKeyValueCompaction(sub_compact);
compaction_stats_.stats.micros =
db_options_.clock->NowMicros() - start_micros;
compaction_stats_.stats.cpu_micros =
sub_compact->compaction_job_stats.cpu_micros;
RecordTimeToHistogram(stats_, COMPACTION_TIME,
compaction_stats_.stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.stats.cpu_micros);
Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
DirFsyncOptions());
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
LogFlush(db_options_.info_log);
compact_->status = status;
compact_->status.PermitUncheckedError();
// Build Compaction Job Stats
// 1. Aggregate CompactionOutputStats into Internal Compaction Stats
// (compaction_stats_) and aggregate Compaction Job Stats
// (compaction_job_stats_) from the sub compactions
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
// 2. Update the Output information in the Compaction Job Stats with
// aggregated Internal Compaction Stats.
UpdateCompactionJobStats(compaction_stats_.stats);
// 3. Set fields that are not propagated as part of aggregations above
compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
compaction_result_->stats.is_full_compaction = c->is_full_compaction();
compaction_result_->stats.is_remote_compaction = true;
// 4. Update IO Stats that are not part of the aggregations above (bytes_read,
// bytes_written)
RecordCompactionIOStats();
// Build Output
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
if (status.ok()) {
for (const auto& output_file : sub_compact->GetOutputs()) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, meta.epoch_number, meta.file_checksum,
meta.file_checksum_func_name, output_file.validator.GetHash(),
meta.marked_for_compaction, meta.unique_id,
*output_file.table_properties);
}
}
TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
&compaction_result_);
return status;
}
void CompactionServiceCompactionJob::CleanupCompaction() {
CompactionJob::CleanupCompaction();
}
// Internal binary format for the input and result data
enum BinaryFormatVersion : uint32_t {
kOptionsString = 1, // Use string format similar to Option string format
};
static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
{"name",
{offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options",
{offsetof(struct ColumnFamilyDescriptor, options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
value, cf_options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
std::string result;
auto status =
GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
auto this_conf = CFOptionsAsConfigurable(*this_one);
auto that_conf = CFOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"cf_name",
{offsetof(struct CompactionServiceInput, cf_name),
OptionType::kEncodedString}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
{"input_files", OptionTypeInfo::Vector<std::string>(
offsetof(struct CompactionServiceInput, input_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kEncodedString})},
{"output_level",
{offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"db_id",
{offsetof(struct CompactionServiceInput, db_id),
OptionType::kEncodedString}},
{"has_begin",
{offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"begin",
{offsetof(struct CompactionServiceInput, begin),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"has_end",
{offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options_file_number",
{offsetof(struct CompactionServiceInput, options_file_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
cs_output_file_type_info = {
{"file_name",
{offsetof(struct CompactionServiceOutputFile, file_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_seqno",
{offsetof(struct CompactionServiceOutputFile, smallest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_seqno",
{offsetof(struct CompactionServiceOutputFile, largest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_internal_key",
{offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_internal_key",
{offsetof(struct CompactionServiceOutputFile, largest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"oldest_ancester_time",
{offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_creation_time",
{offsetof(struct CompactionServiceOutputFile, file_creation_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"epoch_number",
{offsetof(struct CompactionServiceOutputFile, epoch_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_checksum",
{offsetof(struct CompactionServiceOutputFile, file_checksum),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_checksum_func_name",
{offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"paranoid_hash",
{offsetof(struct CompactionServiceOutputFile, paranoid_hash),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"marked_for_compaction",
{offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"unique_id",
OptionTypeInfo::Array<uint64_t, 2>(
offsetof(struct CompactionServiceOutputFile, unique_id),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
{"table_properties",
{offsetof(struct CompactionServiceOutputFile, table_properties),
OptionType::kStruct, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto table_properties = static_cast<TableProperties*>(addr);
return TableProperties::Parse(opts, value, table_properties);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto table_properties =
static_cast<const TableProperties*>(addr);
std::string result;
auto status = table_properties->Serialize(opts, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const TableProperties*>(addr1);
const auto that_one = static_cast<const TableProperties*>(addr2);
return this_one->AreEqual(opts, that_one, mismatch);
}}}};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_job_stats_type_info = {
{"elapsed_micros",
{offsetof(struct CompactionJobStats, elapsed_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"cpu_micros",
{offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"num_input_records",
{offsetof(struct CompactionJobStats, num_input_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_blobs_read",
{offsetof(struct CompactionJobStats, num_blobs_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files",
{offsetof(struct CompactionJobStats, num_input_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_at_output_level",
{offsetof(struct CompactionJobStats, num_input_files_at_output_level),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct CompactionJobStats, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files",
{offsetof(struct CompactionJobStats, num_output_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files_blob",
{offsetof(struct CompactionJobStats, num_output_files_blob),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_full_compaction",
{offsetof(struct CompactionJobStats, is_full_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_manual_compaction",
{offsetof(struct CompactionJobStats, is_manual_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_remote_compaction",
{offsetof(struct CompactionJobStats, is_remote_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_bytes",
{offsetof(struct CompactionJobStats, total_input_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_blob_bytes_read",
{offsetof(struct CompactionJobStats, total_blob_bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes",
{offsetof(struct CompactionJobStats, total_output_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes_blob",
{offsetof(struct CompactionJobStats, total_output_bytes_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_records_replaced",
{offsetof(struct CompactionJobStats, num_records_replaced),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_key_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_value_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_deletion_records",
{offsetof(struct CompactionJobStats, num_input_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_expired_deletion_records",
{offsetof(struct CompactionJobStats, num_expired_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_corrupt_keys",
{offsetof(struct CompactionJobStats, num_corrupt_keys),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_write_nanos",
{offsetof(struct CompactionJobStats, file_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_range_sync_nanos",
{offsetof(struct CompactionJobStats, file_range_sync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_fsync_nanos",
{offsetof(struct CompactionJobStats, file_fsync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_prepare_write_nanos",
{offsetof(struct CompactionJobStats, file_prepare_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_output_key_prefix",
{offsetof(struct CompactionJobStats, smallest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_output_key_prefix",
{offsetof(struct CompactionJobStats, largest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_fallthru",
{offsetof(struct CompactionJobStats, num_single_del_fallthru),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_mismatch",
{offsetof(struct CompactionJobStats, num_single_del_mismatch),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
namespace {
// this is a helper struct to serialize and deserialize class Status, because
// Status's members are not public.
struct StatusSerializationAdapter {
uint8_t code;
uint8_t subcode;
uint8_t severity;
std::string message;
StatusSerializationAdapter() = default;
explicit StatusSerializationAdapter(const Status& s) {
code = s.code();
subcode = s.subcode();
severity = s.severity();
auto msg = s.getState();
message = msg ? msg : "";
}
Status GetStatus() const {
return Status{static_cast<Status::Code>(code),
static_cast<Status::SubCode>(subcode),
static_cast<Status::Severity>(severity), message};
}
};
} // namespace
static std::unordered_map<std::string, OptionTypeInfo>
status_adapter_type_info = {
{"code",
{offsetof(struct StatusSerializationAdapter, code),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"subcode",
{offsetof(struct StatusSerializationAdapter, subcode),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"severity",
{offsetof(struct StatusSerializationAdapter, severity),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"message",
{offsetof(struct StatusSerializationAdapter, message),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
{"status",
{offsetof(struct CompactionServiceResult, status),
OptionType::kCustomizable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto status_obj = static_cast<Status*>(addr);
StatusSerializationAdapter adapter;
Status s = OptionTypeInfo::ParseType(
opts, value, status_adapter_type_info, &adapter);
*status_obj = adapter.GetStatus();
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto status_obj = static_cast<const Status*>(addr);
StatusSerializationAdapter adapter(*status_obj);
std::string result;
Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
&adapter, &result);
*value = "{" + result + "}";
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto status1 = static_cast<const Status*>(addr1);
const auto status2 = static_cast<const Status*>(addr2);
StatusSerializationAdapter adatper1(*status1);
StatusSerializationAdapter adapter2(*status2);
return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
&adatper1, &adapter2, mismatch);
}}},
{"output_files",
OptionTypeInfo::Vector<CompactionServiceOutputFile>(
offsetof(struct CompactionServiceResult, output_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
OptionVerificationType::kNormal,
OptionTypeFlags::kNone))},
{"output_level",
{offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"output_path",
{offsetof(struct CompactionServiceResult, output_path),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read",
{offsetof(struct CompactionServiceResult, bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written",
{offsetof(struct CompactionServiceResult, bytes_written),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"stats", OptionTypeInfo::Struct(
"stats", &compaction_job_stats_type_info,
offsetof(struct CompactionServiceResult, stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
Status CompactionServiceInput::Read(const std::string& data_str,
CompactionServiceInput* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceInput string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Input data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceInput::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
}
Status CompactionServiceResult::Read(const std::string& data_str,
CompactionServiceResult* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceResult string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Result data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceResult::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
}
#ifndef NDEBUG
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
mismatch);
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE