rocksdb/db/compaction/compaction_service_job.cc
Changyu Bi 1d94aeea44 Refactor snapshot context into JobContext and fix deadlock on db mutex in WP/WUP (#13632)
Summary:
With WP/WUP, we can deadlock on db mutex here: 8dc3d77b59/db/db_impl/db_impl_compaction_flush.cc (L4626). Here we release a snapshot (which will acquire db mutex) while already holding the mutex. This caused some transaction lock timeout error in crash test. This PR fixes this by refactoring snapshot related context into JobContext and only allow snapshot related context to be initialized once. This also reduces the number of parameters being passed around.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13632

Test Plan:
- existing tests
- this fails with timeout before this fix
```
./db_stress --WAL_size_limit_MB=0 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --adm_policy=2 --advise_random_on_open=1 --allow_data_in_errors=True --allow_fallocate=0 --allow_unprepared_value=1 --async_io=0 --auto_readahead_size=0 --auto_refresh_iterator_with_snapshot=0 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=1 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 --bgerror_resume_retry_interval=1000000 --block_align=0 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=6 --bloom_bits=2 --bottommost_compression_type=lz4 --bottommost_file_compaction_delay=0 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=0 --cache_size=33554432 --cache_type=fixed_hyper_clock_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 --charge_table_reader=0 --check_multiget_consistency=1 --check_multiget_entity_consistency=0 --checkpoint_one_in=0 --checksum_type=kxxHash --clear_column_family_one_in=0 --commit_bypass_memtable_one_in=0 --compact_files_one_in=1000 --compact_range_one_in=1000000 --compaction_pri=1 --compaction_readahead_size=0 --compaction_style=1 --compaction_ttl=0 --compress_format_version=2 --compressed_secondary_cache_ratio=0.0 --compressed_secondary_cache_size=0 --compression_checksum=1 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --create_timestamped_snapshot_one_in=0 --daily_offpeak_time_utc= --data_block_index_type=1 --db_write_buffer_size=0 --decouple_partitioned_filters=1 --default_temperature=kWarm --default_write_temperature=kHot --delete_obsolete_files_period_micros=30000000 --delpercent=5 --delrangepercent=0 --destroy_db_initially=1 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=1000000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=1 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=0 --enable_index_compression=0 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_remote_compaction=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=1 --error_recovery_with_no_fault_injection=0 --exclude_wal_from_write_fault_injection=1 --fifo_allow_compaction=1 --file_checksum_impl=none --file_temperature_age_thresholds= --fill_cache=1 --flush_one_in=1000 --format_version=3 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=10000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0 --index_block_restart_interval=2 --index_shortening=1 --index_type=0 --ingest_external_file_one_in=0 --ingest_wbwi_one_in=0 --initial_auto_readahead_size=16384 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kWarm --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=1000000 --log2_keys_per_lock=10 --log_file_time_to_roll=60 --log_readahead_size=16777216 --long_running_snapshots=1 --low_pri_pool_ratio=0 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=1 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16777216 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=1000 --memtable_op_scan_flush_trigger=100 --memtable_prefix_bloom_size_ratio=0 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=0 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=500000 --open_metadata_read_fault_one_in=8 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=200000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --paranoid_memory_checks=0 --partition_filters=0 --partition_pinning=0 --pause_background_one_in=10000 --periodic_compaction_seconds=0 --prefix_size=-1 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=60 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=32 --readahead_size=524288 --readpercent=50 --recycle_log_file_num=0 --reopen=20 --report_bg_io_stats=1 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=bar --sqfc_version=0 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --stats_history_buffer_size=0 --strict_bytes_per_sync=0 --subcompactions=4 --sync=0 --sync_fault_injection=0 --table_cache_numshardbits=6 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --test_ingest_standalone_range_deletion_one_in=0 --top_level_index_pinning=0 --two_write_queues=1 --txn_write_policy=2 --uncache_aggressiveness=2 --universal_max_read_amp=10 --unordered_write=0 --unpartitioned_pinning=1 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=0 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=1 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=1 --use_optimistic_txn=0 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_txn=1 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=1 --verify_db_one_in=10000 --verify_file_checksums_one_in=0 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=4194304 --write_dbid_to_manifest=1 --write_fault_one_in=0 --write_identity_file=0 --writepercent=35 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox
```

Reviewed By: hx235

Differential Revision: D75173149

Pulled By: cbi42

fbshipit-source-id: ec68cadc78469730dfe26824e20b8ca4ab993101
2025-05-22 09:42:15 -07:00

1063 lines
46 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_job.h"
#include "db/compaction/compaction_state.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/utilities/options_type.h"
namespace ROCKSDB_NAMESPACE {
class SubcompactionState;
CompactionServiceJobStatus
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
assert(db_options_.compaction_service);
const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;
const std::vector<CompactionInputFiles>& inputs =
*(compact_->compaction->inputs());
for (const auto& files_per_level : inputs) {
for (const auto& file : files_per_level.files) {
compaction_input.input_files.emplace_back(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.snapshots = job_context_->snapshot_seqs;
compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin =
compaction_input.has_begin ? sub_compact->start->ToString() : "";
compaction_input.has_end = sub_compact->end.has_value();
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
compaction_input.options_file_number = options_file_number_;
TEST_SYNC_POINT_CALLBACK(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
&compaction_input);
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
std::ostringstream input_files_oss;
bool is_first_one = true;
for (const auto& file : compaction_input.input_files) {
input_files_oss << (is_first_one ? "" : ", ") << file;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(
dbname_, db_id_, db_session_id_,
compaction->column_family_data()->GetID(),
compaction->column_family_data()->GetName(), GetCompactionId(sub_compact),
thread_pri_, compaction->compaction_reason(),
compaction->is_full_compaction(), compaction->is_manual_compaction(),
compaction->bottommost_level(), compaction->start_level(),
compaction->output_level());
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kAborted:
sub_compact->status =
Status::Aborted("Scheduling a remote compaction job was aborted");
ROCKS_LOG_WARN(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction was aborted at Schedule()",
compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction->column_family_data()->GetName().c_str(),
job_id_);
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
default:
assert(false); // unknown status
break;
}
std::string debug_str_before_wait =
compaction->input_version()->DebugString(/*hex=*/true);
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"[%s] [JOB %d] Wait() status is not kSuccess. "
"\nDebugString Before Wait():\n%s"
"\nDebugString After Wait():\n%s",
compaction->column_family_data()->GetName().c_str(), job_id_,
debug_str_before_wait.c_str(),
compaction->input_version()->DebugString(/*hex=*/true).c_str());
}
if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
if (compaction_status == CompactionServiceJobStatus::kAborted) {
sub_compact->status =
Status::Aborted("Waiting a remote compaction job was aborted");
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction was aborted during Wait()",
compaction->column_family_data()->GetName().c_str(),
job_id_);
return compaction_status;
}
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
if (compaction_status == CompactionServiceJobStatus::kFailure) {
if (s.ok()) {
if (compaction_result.status.ok()) {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (even though "
"the internal status is okay).");
} else {
// set the current sub compaction status with the status returned from
// remote
sub_compact->status = compaction_result.status;
}
} else {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (and no valid "
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
// CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
// read the result. Consider this as an installation failure
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;
std::ostringstream output_files_oss;
is_first_one = true;
for (const auto& file : compaction_result.output_files) {
output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());
// Installation Starts
for (const auto& file : compaction_result.output_files) {
uint64_t file_num = versions_->NewFileNumber();
auto src_file = compaction_result.output_path + "/" + file.file_name;
auto tgt_file = TableFileName(compaction->immutable_options().cf_paths,
file_num, compaction->output_path_id());
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
FileMetaData meta;
uint64_t file_size = file.file_size;
// TODO - Clean this up in the next release.
// For backward compatibility - in case the remote worker does not populate
// the file_size yet. If missing, continue to populate this from the file
// system.
if (file_size == 0) {
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
}
if (!s.ok()) {
sub_compact->status = s;
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
return CompactionServiceJobStatus::kFailure;
}
assert(file_size > 0);
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
meta.smallest.DecodeFrom(file.smallest_internal_key);
meta.largest.DecodeFrom(file.largest_internal_key);
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.epoch_number = file.epoch_number;
meta.file_checksum = file.file_checksum;
meta.file_checksum_func_name = file.file_checksum_func_name;
meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id;
meta.temperature = file.file_temperature;
meta.tail_size =
FileMetaData::CalculateTailSize(file_size, file.table_properties);
auto cfd = compaction->column_family_data();
CompactionOutputs* compaction_outputs =
sub_compact->Outputs(file.is_proximal_level_output);
assert(compaction_outputs);
compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),
false, true, file.paranoid_hash);
compaction_outputs->UpdateTableProperties(file.table_properties);
}
// Set per-level stats
auto compaction_output_stats =
sub_compact->OutputStats(false /* is_proximal_level */);
assert(compaction_output_stats);
compaction_output_stats->Add(
compaction_result.internal_stats.output_level_stats);
if (compaction->SupportsPerKeyPlacement()) {
compaction_output_stats =
sub_compact->OutputStats(true /* is_proximal_level */);
assert(compaction_output_stats);
compaction_output_stats->Add(
compaction_result.internal_stats.proximal_level_stats);
}
// Set job stats
sub_compact->compaction_job_stats = compaction_result.stats;
RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
db_options_.compaction_service->OnInstallation(
response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
return CompactionServiceJobStatus::kSuccess;
}
std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {
return MakeTableFileName(output_path_, file_number);
}
void CompactionServiceCompactionJob::RecordCompactionIOStats() {
compaction_result_->bytes_read += IOSTATS(bytes_read);
compaction_result_->bytes_written += IOSTATS(bytes_written);
CompactionJob::RecordCompactionIOStats();
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
VersionSet* versions, const std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
JobContext* job_context, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, const std::string& dbname,
const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result)
: CompactionJob(job_id, compaction, db_options, mutable_db_options,
file_options, versions, shutting_down, log_buffer, nullptr,
output_directory, nullptr, stats, db_mutex,
db_error_handler, job_context, std::move(table_cache),
event_logger,
compaction->mutable_cf_options().paranoid_file_checks,
compaction->mutable_cf_options().report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER,
io_tracer, manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(std::move(output_path)),
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}
void CompactionServiceCompactionJob::Prepare() {
std::optional<Slice> begin;
if (compaction_input_.has_begin) {
begin = compaction_input_.begin;
}
std::optional<Slice> end;
if (compaction_input_.has_end) {
end = compaction_input_.end;
}
CompactionJob::Prepare(std::make_pair(begin, end));
}
Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
auto* c = compact_->compaction;
log_buffer_->FlushBufferToLog();
LogCompaction();
compaction_result_->stats.Reset();
const uint64_t start_micros = db_options_.clock->NowMicros();
c->GetOrInitInputTableProperties();
// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
ProcessKeyValueCompaction(sub_compact);
uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;
internal_stats_.SetMicros(elapsed_micros);
internal_stats_.AddCpuMicros(elapsed_micros);
RecordTimeToHistogram(stats_, COMPACTION_TIME,
internal_stats_.output_level_stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
internal_stats_.output_level_stats.cpu_micros);
Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
DirFsyncOptions());
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
LogFlush(db_options_.info_log);
compact_->status = status;
compact_->status.PermitUncheckedError();
// Build Compaction Job Stats
// 1. Aggregate internal stats and job stats for all subcompactions
// internal stats: sub_compact.proximal_level_outputs_.stats and
// sub_compact.compaction_outputs_.stats into
// internal_stats_.output_level_stats and
// internal_stats_.proximal_level_stats
// job-level stats: sub_compact.compaction_job_stats into compact.job_stats_
//
// For remote compaction, there's only one subcompaction.
compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
// 2. Update job-level output stats with the aggregated internal_stats_
// Please note that input stats will be updated by primary host when all
// subcompactions are finished
UpdateCompactionJobOutputStats(internal_stats_);
// and set fields that are not propagated as part of the update
compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
compaction_result_->stats.is_full_compaction = c->is_full_compaction();
compaction_result_->stats.is_remote_compaction = true;
// 3. Update IO Stats that are not part of the the update above
// (bytes_read, bytes_written)
RecordCompactionIOStats();
// Build Output
compaction_result_->internal_stats = internal_stats_;
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
if (status.ok()) {
for (const auto& output_file : sub_compact->GetOutputs()) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(),
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(),
meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number,
meta.file_checksum, meta.file_checksum_func_name,
output_file.validator.GetHash(), meta.marked_for_compaction,
meta.unique_id, *output_file.table_properties,
output_file.is_proximal_level, meta.temperature);
}
}
TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
&compaction_result_);
return status;
}
void CompactionServiceCompactionJob::CleanupCompaction() {
CompactionJob::CleanupCompaction();
}
// Internal binary format for the input and result data
enum BinaryFormatVersion : uint32_t {
kOptionsString = 1, // Use string format similar to Option string format
};
static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
{"name",
{offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options",
{offsetof(struct ColumnFamilyDescriptor, options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
value, cf_options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
std::string result;
auto status =
GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
auto this_conf = CFOptionsAsConfigurable(*this_one);
auto that_conf = CFOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"cf_name",
{offsetof(struct CompactionServiceInput, cf_name),
OptionType::kEncodedString}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
{"input_files", OptionTypeInfo::Vector<std::string>(
offsetof(struct CompactionServiceInput, input_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kEncodedString})},
{"output_level",
{offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"db_id",
{offsetof(struct CompactionServiceInput, db_id),
OptionType::kEncodedString}},
{"has_begin",
{offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"begin",
{offsetof(struct CompactionServiceInput, begin),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"has_end",
{offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options_file_number",
{offsetof(struct CompactionServiceInput, options_file_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
cs_output_file_type_info = {
{"file_name",
{offsetof(struct CompactionServiceOutputFile, file_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_size",
{offsetof(struct CompactionServiceOutputFile, file_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_seqno",
{offsetof(struct CompactionServiceOutputFile, smallest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_seqno",
{offsetof(struct CompactionServiceOutputFile, largest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_internal_key",
{offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_internal_key",
{offsetof(struct CompactionServiceOutputFile, largest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"oldest_ancester_time",
{offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_creation_time",
{offsetof(struct CompactionServiceOutputFile, file_creation_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"epoch_number",
{offsetof(struct CompactionServiceOutputFile, epoch_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_checksum",
{offsetof(struct CompactionServiceOutputFile, file_checksum),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_checksum_func_name",
{offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"paranoid_hash",
{offsetof(struct CompactionServiceOutputFile, paranoid_hash),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"marked_for_compaction",
{offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"unique_id",
OptionTypeInfo::Array<uint64_t, 2>(
offsetof(struct CompactionServiceOutputFile, unique_id),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
{"table_properties",
{offsetof(struct CompactionServiceOutputFile, table_properties),
OptionType::kStruct, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto table_properties = static_cast<TableProperties*>(addr);
return TableProperties::Parse(opts, value, table_properties);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto table_properties =
static_cast<const TableProperties*>(addr);
std::string result;
auto status = table_properties->Serialize(opts, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const TableProperties*>(addr1);
const auto that_one = static_cast<const TableProperties*>(addr2);
return this_one->AreEqual(opts, that_one, mismatch);
}}},
{"is_proximal_level_output",
{offsetof(struct CompactionServiceOutputFile,
is_proximal_level_output),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_temperature",
{offsetof(struct CompactionServiceOutputFile, file_temperature),
OptionType::kTemperature, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_job_stats_type_info = {
{"elapsed_micros",
{offsetof(struct CompactionJobStats, elapsed_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"cpu_micros",
{offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"num_input_records",
{offsetof(struct CompactionJobStats, num_input_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_blobs_read",
{offsetof(struct CompactionJobStats, num_blobs_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files",
{offsetof(struct CompactionJobStats, num_input_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_at_output_level",
{offsetof(struct CompactionJobStats, num_input_files_at_output_level),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct CompactionJobStats, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files",
{offsetof(struct CompactionJobStats, num_output_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files_blob",
{offsetof(struct CompactionJobStats, num_output_files_blob),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_full_compaction",
{offsetof(struct CompactionJobStats, is_full_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_manual_compaction",
{offsetof(struct CompactionJobStats, is_manual_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_remote_compaction",
{offsetof(struct CompactionJobStats, is_remote_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_bytes",
{offsetof(struct CompactionJobStats, total_input_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_blob_bytes_read",
{offsetof(struct CompactionJobStats, total_blob_bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes",
{offsetof(struct CompactionJobStats, total_output_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes_blob",
{offsetof(struct CompactionJobStats, total_output_bytes_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_records_replaced",
{offsetof(struct CompactionJobStats, num_records_replaced),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_key_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_value_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_deletion_records",
{offsetof(struct CompactionJobStats, num_input_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_expired_deletion_records",
{offsetof(struct CompactionJobStats, num_expired_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_corrupt_keys",
{offsetof(struct CompactionJobStats, num_corrupt_keys),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_write_nanos",
{offsetof(struct CompactionJobStats, file_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_range_sync_nanos",
{offsetof(struct CompactionJobStats, file_range_sync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_fsync_nanos",
{offsetof(struct CompactionJobStats, file_fsync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_prepare_write_nanos",
{offsetof(struct CompactionJobStats, file_prepare_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_output_key_prefix",
{offsetof(struct CompactionJobStats, smallest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_output_key_prefix",
{offsetof(struct CompactionJobStats, largest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_fallthru",
{offsetof(struct CompactionJobStats, num_single_del_fallthru),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_mismatch",
{offsetof(struct CompactionJobStats, num_single_del_mismatch),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_stats_type_info = {
{"micros",
{offsetof(struct InternalStats::CompactionStats, micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"cpu_micros",
{offsetof(struct InternalStats::CompactionStats, cpu_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
bytes_read_non_output_levels),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_output_level",
{offsetof(struct InternalStats::CompactionStats,
bytes_read_output_level),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_skipped_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
bytes_skipped_non_output_levels),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_skipped_output_level",
{offsetof(struct InternalStats::CompactionStats,
bytes_skipped_output_level),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_blob",
{offsetof(struct InternalStats::CompactionStats, bytes_read_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written",
{offsetof(struct InternalStats::CompactionStats, bytes_written),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written_blob",
{offsetof(struct InternalStats::CompactionStats, bytes_written_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_moved",
{offsetof(struct InternalStats::CompactionStats, bytes_moved),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_in_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
num_input_files_in_non_output_levels),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_in_output_level",
{offsetof(struct InternalStats::CompactionStats,
num_input_files_in_output_level),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_filtered_input_files_in_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
num_filtered_input_files_in_non_output_levels),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_filtered_input_files_in_output_level",
{offsetof(struct InternalStats::CompactionStats,
num_filtered_input_files_in_output_level),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files",
{offsetof(struct InternalStats::CompactionStats, num_output_files),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files_blob",
{offsetof(struct InternalStats::CompactionStats,
num_output_files_blob),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_records",
{offsetof(struct InternalStats::CompactionStats, num_input_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_dropped_records",
{offsetof(struct InternalStats::CompactionStats, num_dropped_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct InternalStats::CompactionStats, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"count",
{offsetof(struct InternalStats::CompactionStats, count),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"counts", OptionTypeInfo::Array<
int, static_cast<int>(CompactionReason::kNumOfReasons)>(
offsetof(struct InternalStats::CompactionStats, counts),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kInt})},
};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_internal_stats_type_info = {
{"output_level_stats",
OptionTypeInfo::Struct(
"output_level_stats", &compaction_stats_type_info,
offsetof(struct InternalStats::CompactionStatsFull,
output_level_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"has_proximal_level_output",
{offsetof(struct InternalStats::CompactionStatsFull,
has_proximal_level_output),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"proximal_level_stats",
OptionTypeInfo::Struct(
"proximal_level_stats", &compaction_stats_type_info,
offsetof(struct InternalStats::CompactionStatsFull,
proximal_level_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
namespace {
// this is a helper struct to serialize and deserialize class Status, because
// Status's members are not public.
struct StatusSerializationAdapter {
uint8_t code;
uint8_t subcode;
uint8_t severity;
std::string message;
StatusSerializationAdapter() = default;
explicit StatusSerializationAdapter(const Status& s) {
code = s.code();
subcode = s.subcode();
severity = s.severity();
auto msg = s.getState();
message = msg ? msg : "";
}
Status GetStatus() const {
return Status{static_cast<Status::Code>(code),
static_cast<Status::SubCode>(subcode),
static_cast<Status::Severity>(severity), message};
}
};
} // namespace
static std::unordered_map<std::string, OptionTypeInfo>
status_adapter_type_info = {
{"code",
{offsetof(struct StatusSerializationAdapter, code),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"subcode",
{offsetof(struct StatusSerializationAdapter, subcode),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"severity",
{offsetof(struct StatusSerializationAdapter, severity),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"message",
{offsetof(struct StatusSerializationAdapter, message),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
{"status",
{offsetof(struct CompactionServiceResult, status),
OptionType::kCustomizable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto status_obj = static_cast<Status*>(addr);
StatusSerializationAdapter adapter;
Status s = OptionTypeInfo::ParseType(
opts, value, status_adapter_type_info, &adapter);
*status_obj = adapter.GetStatus();
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto status_obj = static_cast<const Status*>(addr);
StatusSerializationAdapter adapter(*status_obj);
std::string result;
Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
&adapter, &result);
*value = "{" + result + "}";
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto status1 = static_cast<const Status*>(addr1);
const auto status2 = static_cast<const Status*>(addr2);
StatusSerializationAdapter adatper1(*status1);
StatusSerializationAdapter adapter2(*status2);
return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
&adatper1, &adapter2, mismatch);
}}},
{"output_files",
OptionTypeInfo::Vector<CompactionServiceOutputFile>(
offsetof(struct CompactionServiceResult, output_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
OptionVerificationType::kNormal,
OptionTypeFlags::kNone))},
{"output_level",
{offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"output_path",
{offsetof(struct CompactionServiceResult, output_path),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read",
{offsetof(struct CompactionServiceResult, bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written",
{offsetof(struct CompactionServiceResult, bytes_written),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"stats", OptionTypeInfo::Struct(
"stats", &compaction_job_stats_type_info,
offsetof(struct CompactionServiceResult, stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"internal_stats",
OptionTypeInfo::Struct(
"internal_stats", &compaction_internal_stats_type_info,
offsetof(struct CompactionServiceResult, internal_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
Status CompactionServiceInput::Read(const std::string& data_str,
CompactionServiceInput* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceInput string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Input data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceInput::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
}
Status CompactionServiceResult::Read(const std::string& data_str,
CompactionServiceResult* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceResult string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Result data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceResult::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
}
#ifndef NDEBUG
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
mismatch);
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE