Summary:
With WP/WUP, we can deadlock on db mutex here: 8dc3d77b59/db/db_impl/db_impl_compaction_flush.cc (L4626)
. Here we release a snapshot (which will acquire db mutex) while already holding the mutex. This caused some transaction lock timeout error in crash test. This PR fixes this by refactoring snapshot related context into JobContext and only allow snapshot related context to be initialized once. This also reduces the number of parameters being passed around.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13632
Test Plan:
- existing tests
- this fails with timeout before this fix
```
./db_stress --WAL_size_limit_MB=0 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --adm_policy=2 --advise_random_on_open=1 --allow_data_in_errors=True --allow_fallocate=0 --allow_unprepared_value=1 --async_io=0 --auto_readahead_size=0 --auto_refresh_iterator_with_snapshot=0 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=1 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 --bgerror_resume_retry_interval=1000000 --block_align=0 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=6 --bloom_bits=2 --bottommost_compression_type=lz4 --bottommost_file_compaction_delay=0 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=0 --cache_size=33554432 --cache_type=fixed_hyper_clock_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 --charge_table_reader=0 --check_multiget_consistency=1 --check_multiget_entity_consistency=0 --checkpoint_one_in=0 --checksum_type=kxxHash --clear_column_family_one_in=0 --commit_bypass_memtable_one_in=0 --compact_files_one_in=1000 --compact_range_one_in=1000000 --compaction_pri=1 --compaction_readahead_size=0 --compaction_style=1 --compaction_ttl=0 --compress_format_version=2 --compressed_secondary_cache_ratio=0.0 --compressed_secondary_cache_size=0 --compression_checksum=1 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --create_timestamped_snapshot_one_in=0 --daily_offpeak_time_utc= --data_block_index_type=1 --db_write_buffer_size=0 --decouple_partitioned_filters=1 --default_temperature=kWarm --default_write_temperature=kHot --delete_obsolete_files_period_micros=30000000 --delpercent=5 --delrangepercent=0 --destroy_db_initially=1 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=1000000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=1 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=0 --enable_index_compression=0 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_remote_compaction=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=1 --error_recovery_with_no_fault_injection=0 --exclude_wal_from_write_fault_injection=1 --fifo_allow_compaction=1 --file_checksum_impl=none --file_temperature_age_thresholds= --fill_cache=1 --flush_one_in=1000 --format_version=3 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=10000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0 --index_block_restart_interval=2 --index_shortening=1 --index_type=0 --ingest_external_file_one_in=0 --ingest_wbwi_one_in=0 --initial_auto_readahead_size=16384 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kWarm --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=1000000 --log2_keys_per_lock=10 --log_file_time_to_roll=60 --log_readahead_size=16777216 --long_running_snapshots=1 --low_pri_pool_ratio=0 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=1 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16777216 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=1000 --memtable_op_scan_flush_trigger=100 --memtable_prefix_bloom_size_ratio=0 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=0 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=500000 --open_metadata_read_fault_one_in=8 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=200000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --paranoid_memory_checks=0 --partition_filters=0 --partition_pinning=0 --pause_background_one_in=10000 --periodic_compaction_seconds=0 --prefix_size=-1 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=60 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=32 --readahead_size=524288 --readpercent=50 --recycle_log_file_num=0 --reopen=20 --report_bg_io_stats=1 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=bar --sqfc_version=0 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --stats_history_buffer_size=0 --strict_bytes_per_sync=0 --subcompactions=4 --sync=0 --sync_fault_injection=0 --table_cache_numshardbits=6 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --test_ingest_standalone_range_deletion_one_in=0 --top_level_index_pinning=0 --two_write_queues=1 --txn_write_policy=2 --uncache_aggressiveness=2 --universal_max_read_amp=10 --unordered_write=0 --unpartitioned_pinning=1 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=0 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=1 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=1 --use_optimistic_txn=0 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_txn=1 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=1 --verify_db_one_in=10000 --verify_file_checksums_one_in=0 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=4194304 --write_dbid_to_manifest=1 --write_fault_one_in=0 --write_identity_file=0 --writepercent=35 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox
```
Reviewed By: hx235
Differential Revision: D75173149
Pulled By: cbi42
fbshipit-source-id: ec68cadc78469730dfe26824e20b8ca4ab993101
550 lines
23 KiB
C++
550 lines
23 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <deque>
|
|
#include <functional>
|
|
#include <limits>
|
|
#include <set>
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "db/blob/blob_file_completion_callback.h"
|
|
#include "db/column_family.h"
|
|
#include "db/compaction/compaction_iterator.h"
|
|
#include "db/compaction/compaction_outputs.h"
|
|
#include "db/flush_scheduler.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/job_context.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/range_del_aggregator.h"
|
|
#include "db/seqno_to_time_mapping.h"
|
|
#include "db/version_edit.h"
|
|
#include "db/write_controller.h"
|
|
#include "db/write_thread.h"
|
|
#include "logging/event_logger.h"
|
|
#include "options/cf_options.h"
|
|
#include "options/db_options.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/compaction_job_stats.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/transaction_log.h"
|
|
#include "util/autovector.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/thread_local.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class Arena;
|
|
class CompactionState;
|
|
class ErrorHandler;
|
|
class MemTable;
|
|
class SnapshotChecker;
|
|
class SystemClock;
|
|
class TableCache;
|
|
class Version;
|
|
class VersionEdit;
|
|
class VersionSet;
|
|
|
|
class SubcompactionState;
|
|
|
|
// CompactionJob is responsible for executing the compaction. Each (manual or
|
|
// automated) compaction corresponds to a CompactionJob object, and usually
|
|
// goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
|
|
// will divide the compaction into subcompactions and execute them in parallel
|
|
// if needed.
|
|
//
|
|
// CompactionJob has 2 main stats:
|
|
// 1. CompactionJobStats job_stats_
|
|
// CompactionJobStats is a public data structure which is part of Compaction
|
|
// event listener that rocksdb share the job stats with the user.
|
|
// Internally it's an aggregation of all the compaction_job_stats from each
|
|
// `SubcompactionState`:
|
|
// +------------------------+
|
|
// | SubcompactionState |
|
|
// | |
|
|
// +--------->| compaction_job_stats |
|
|
// | | |
|
|
// | +------------------------+
|
|
// +------------------------+ |
|
|
// | CompactionJob | | +------------------------+
|
|
// | | | | SubcompactionState |
|
|
// | job_stats +-----+ | |
|
|
// | | +--------->| compaction_job_stats |
|
|
// | | | | |
|
|
// +------------------------+ | +------------------------+
|
|
// |
|
|
// | +------------------------+
|
|
// | | SubcompactionState |
|
|
// | | |
|
|
// +--------->+ compaction_job_stats |
|
|
// | | |
|
|
// | +------------------------+
|
|
// |
|
|
// | +------------------------+
|
|
// | | ... |
|
|
// +--------->+ |
|
|
// +------------------------+
|
|
//
|
|
// 2. CompactionStatsFull internal_stats_
|
|
// `CompactionStatsFull` is an internal stats about the compaction, which
|
|
// is eventually sent to `ColumnFamilyData::internal_stats_` and used for
|
|
// logging and public metrics.
|
|
// Internally, it's an aggregation of stats_ from each `SubcompactionState`.
|
|
// It has 2 parts, ordinary output level stats and the proximal level output
|
|
// stats.
|
|
// +---------------------------+
|
|
// | SubcompactionState |
|
|
// | |
|
|
// | +----------------------+ |
|
|
// | | CompactionOutputs | |
|
|
// | | (normal output) | |
|
|
// +---->| stats_ | |
|
|
// | | +----------------------+ |
|
|
// | | |
|
|
// | | +----------------------+ |
|
|
// +--------------------------------+ | | | CompactionOutputs | |
|
|
// | CompactionJob | | | | (proximal_level) | |
|
|
// | | +--------->| stats_ | |
|
|
// | internal_stats_ | | | | +----------------------+ |
|
|
// | +-------------------------+ | | | | |
|
|
// | |output_level_stats |------|----+ +---------------------------+
|
|
// | +-------------------------+ | | |
|
|
// | | | |
|
|
// | +-------------------------+ | | | +---------------------------+
|
|
// | |proximal_level_stats |------+ | | SubcompactionState |
|
|
// | +-------------------------+ | | | | |
|
|
// | | | | | +----------------------+ |
|
|
// | | | | | | CompactionOutputs | |
|
|
// +--------------------------------+ | | | | (normal output) | |
|
|
// | +---->| stats_ | |
|
|
// | | +----------------------+ |
|
|
// | | |
|
|
// | | +----------------------+ |
|
|
// | | | CompactionOutputs | |
|
|
// | | | (proximal_level) | |
|
|
// +--------->| stats_ | |
|
|
// | +----------------------+ |
|
|
// | |
|
|
// +---------------------------+
|
|
|
|
class CompactionJob {
|
|
public:
|
|
CompactionJob(int job_id, Compaction* compaction,
|
|
const ImmutableDBOptions& db_options,
|
|
const MutableDBOptions& mutable_db_options,
|
|
const FileOptions& file_options, VersionSet* versions,
|
|
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
|
|
FSDirectory* db_directory, FSDirectory* output_directory,
|
|
FSDirectory* blob_output_directory, Statistics* stats,
|
|
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
|
|
JobContext* job_context, std::shared_ptr<Cache> table_cache,
|
|
EventLogger* event_logger, bool paranoid_file_checks,
|
|
bool measure_io_stats, const std::string& dbname,
|
|
CompactionJobStats* compaction_job_stats,
|
|
Env::Priority thread_pri,
|
|
const std::shared_ptr<IOTracer>& io_tracer,
|
|
const std::atomic<bool>& manual_compaction_canceled,
|
|
const std::string& db_id = "",
|
|
const std::string& db_session_id = "",
|
|
std::string full_history_ts_low = "", std::string trim_ts = "",
|
|
BlobFileCompletionCallback* blob_callback = nullptr,
|
|
int* bg_compaction_scheduled = nullptr,
|
|
int* bg_bottom_compaction_scheduled = nullptr);
|
|
|
|
virtual ~CompactionJob();
|
|
|
|
// no copy/move
|
|
CompactionJob(CompactionJob&& job) = delete;
|
|
CompactionJob(const CompactionJob& job) = delete;
|
|
CompactionJob& operator=(const CompactionJob& job) = delete;
|
|
|
|
// REQUIRED: mutex held
|
|
// Prepare for the compaction by setting up boundaries for each subcompaction
|
|
// and organizing seqno <-> time info. `known_single_subcompact` is non-null
|
|
// if we already have a known single subcompaction, with optional key bounds
|
|
// (currently for executing a remote compaction).
|
|
void Prepare(
|
|
std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
|
|
known_single_subcompact);
|
|
|
|
// REQUIRED mutex not held
|
|
// Launch threads for each subcompaction and wait for them to finish. After
|
|
// that, verify table is usable and finally do bookkeeping to unify
|
|
// subcompaction results
|
|
Status Run();
|
|
|
|
// REQUIRED: mutex held
|
|
// Add compaction input/output to the current version
|
|
// Releases compaction file through Compaction::ReleaseCompactionFiles().
|
|
// Sets *compaction_released to true if compaction is released.
|
|
Status Install(bool* compaction_released);
|
|
|
|
// Return the IO status
|
|
IOStatus io_status() const { return io_status_; }
|
|
|
|
protected:
|
|
void UpdateCompactionJobOutputStats(
|
|
const InternalStats::CompactionStatsFull& internal_stats) const;
|
|
|
|
void LogCompaction();
|
|
virtual void RecordCompactionIOStats();
|
|
void CleanupCompaction();
|
|
|
|
// Iterate through input and compact the kv-pairs.
|
|
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
|
|
|
|
CompactionState* compact_;
|
|
InternalStats::CompactionStatsFull internal_stats_;
|
|
const ImmutableDBOptions& db_options_;
|
|
const MutableDBOptions mutable_db_options_copy_;
|
|
LogBuffer* log_buffer_;
|
|
FSDirectory* output_directory_;
|
|
Statistics* stats_;
|
|
// Is this compaction creating a file in the bottom most level?
|
|
bool bottommost_level_;
|
|
|
|
Env::WriteLifeTimeHint write_hint_;
|
|
|
|
IOStatus io_status_;
|
|
|
|
CompactionJobStats* job_stats_;
|
|
|
|
private:
|
|
friend class CompactionJobTestBase;
|
|
|
|
// Collect the following stats from Input Table Properties
|
|
// - num_input_files_in_non_output_levels
|
|
// - num_input_files_in_output_level
|
|
// - bytes_read_non_output_levels
|
|
// - bytes_read_output_level
|
|
// - num_input_records
|
|
// - bytes_read_blob
|
|
// - num_dropped_records
|
|
// and set them in internal_stats_.output_level_stats
|
|
//
|
|
// @param num_input_range_del if non-null, will be set to the number of range
|
|
// deletion entries in this compaction input.
|
|
//
|
|
// Returns true iff internal_stats_.output_level_stats.num_input_records and
|
|
// num_input_range_del are calculated successfully.
|
|
//
|
|
// This should be called only once for compactions (not per subcompaction)
|
|
bool BuildStatsFromInputTableProperties(
|
|
uint64_t* num_input_range_del = nullptr);
|
|
|
|
void UpdateCompactionJobInputStats(
|
|
const InternalStats::CompactionStatsFull& internal_stats,
|
|
uint64_t num_input_range_del) const;
|
|
|
|
Status VerifyInputRecordCount(uint64_t num_input_range_del) const;
|
|
|
|
// Generates a histogram representing potential divisions of key ranges from
|
|
// the input. It adds the starting and/or ending keys of certain input files
|
|
// to the working set and then finds the approximate size of data in between
|
|
// each consecutive pair of slices. Then it divides these ranges into
|
|
// consecutive groups such that each group has a similar size.
|
|
void GenSubcompactionBoundaries();
|
|
|
|
// Get the number of planned subcompactions based on max_subcompactions and
|
|
// extra reserved resources
|
|
uint64_t GetSubcompactionsLimit();
|
|
|
|
// Additional reserved threads are reserved and the number is stored in
|
|
// extra_num_subcompaction_threads_reserved__. For now, this happens only if
|
|
// the compaction priority is round-robin and max_subcompactions is not
|
|
// sufficient (extra resources may be needed)
|
|
void AcquireSubcompactionResources(int num_extra_required_subcompactions);
|
|
|
|
// Additional threads may be reserved during IncreaseSubcompactionResources()
|
|
// if num_actual_subcompactions is less than num_planned_subcompactions.
|
|
// Additional threads will be released and the bg_compaction_scheduled_ or
|
|
// bg_bottom_compaction_scheduled_ will be updated if they are used.
|
|
// DB Mutex lock is required.
|
|
void ShrinkSubcompactionResources(uint64_t num_extra_resources);
|
|
|
|
// Release all reserved threads and update the compaction limits.
|
|
void ReleaseSubcompactionResources();
|
|
|
|
CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
|
|
SubcompactionState* sub_compact);
|
|
|
|
// update the thread status for starting a compaction.
|
|
void ReportStartedCompaction(Compaction* compaction);
|
|
|
|
Status FinishCompactionOutputFile(const Status& input_status,
|
|
SubcompactionState* sub_compact,
|
|
CompactionOutputs& outputs,
|
|
const Slice& next_table_min_key,
|
|
const Slice* comp_start_user_key,
|
|
const Slice* comp_end_user_key);
|
|
Status InstallCompactionResults(bool* compaction_released);
|
|
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
|
|
CompactionOutputs& outputs);
|
|
|
|
void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
|
|
CompactionJobStats* compaction_job_stats = nullptr);
|
|
|
|
void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
|
|
|
|
void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
|
|
|
|
uint32_t job_id_;
|
|
|
|
// DBImpl state
|
|
const std::string& dbname_;
|
|
const std::string db_id_;
|
|
const std::string db_session_id_;
|
|
const FileOptions file_options_;
|
|
|
|
Env* env_;
|
|
std::shared_ptr<IOTracer> io_tracer_;
|
|
FileSystemPtr fs_;
|
|
// env_option optimized for compaction table reads
|
|
FileOptions file_options_for_read_;
|
|
VersionSet* versions_;
|
|
const std::atomic<bool>* shutting_down_;
|
|
const std::atomic<bool>& manual_compaction_canceled_;
|
|
FSDirectory* db_directory_;
|
|
FSDirectory* blob_output_directory_;
|
|
InstrumentedMutex* db_mutex_;
|
|
ErrorHandler* db_error_handler_;
|
|
|
|
SequenceNumber earliest_snapshot_;
|
|
JobContext* job_context_;
|
|
|
|
std::shared_ptr<Cache> table_cache_;
|
|
|
|
EventLogger* event_logger_;
|
|
|
|
bool paranoid_file_checks_;
|
|
bool measure_io_stats_;
|
|
// Stores the Slices that designate the boundaries for each subcompaction
|
|
std::vector<std::string> boundaries_;
|
|
Env::Priority thread_pri_;
|
|
std::string full_history_ts_low_;
|
|
std::string trim_ts_;
|
|
BlobFileCompletionCallback* blob_callback_;
|
|
|
|
uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
|
|
// Stores the number of reserved threads in shared env_ for the number of
|
|
// extra subcompaction in kRoundRobin compaction priority
|
|
int extra_num_subcompaction_threads_reserved_;
|
|
|
|
// Stores the pointer to bg_compaction_scheduled_,
|
|
// bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing
|
|
// or updating it.
|
|
int* bg_compaction_scheduled_;
|
|
int* bg_bottom_compaction_scheduled_;
|
|
|
|
// Stores the sequence number to time mapping gathered from all input files
|
|
// it also collects the smallest_seqno -> oldest_ancester_time from the SST.
|
|
SeqnoToTimeMapping seqno_to_time_mapping_;
|
|
|
|
// Max seqno that can be zeroed out in last level, including for preserving
|
|
// write times.
|
|
SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
|
|
|
|
// Minimal sequence number to preclude the data from the last level. If the
|
|
// key has bigger (newer) sequence number than this, it will be precluded from
|
|
// the last level (output to proximal level).
|
|
SequenceNumber proximal_after_seqno_ = kMaxSequenceNumber;
|
|
|
|
// Options File Number used for Remote Compaction
|
|
// Setting this requires DBMutex.
|
|
uint64_t options_file_number_ = 0;
|
|
|
|
// Get table file name in where it's outputting to, which should also be in
|
|
// `output_directory_`.
|
|
virtual std::string GetTableFileName(uint64_t file_number);
|
|
// The rate limiter priority (io_priority) is determined dynamically here.
|
|
// The Compaction Read and Write priorities are the same for different
|
|
// scenarios, such as write stalled.
|
|
Env::IOPriority GetRateLimiterPriority();
|
|
};
|
|
|
|
// CompactionServiceInput is used the pass compaction information between two
|
|
// db instances. It contains the information needed to do a compaction. It
|
|
// doesn't contain the LSM tree information, which is passed though MANIFEST
|
|
// file.
|
|
struct CompactionServiceInput {
|
|
std::string cf_name;
|
|
|
|
std::vector<SequenceNumber> snapshots;
|
|
|
|
// SST files for compaction, it should already be expended to include all the
|
|
// files needed for this compaction, for both input level files and output
|
|
// level files.
|
|
std::vector<std::string> input_files;
|
|
int output_level = 0;
|
|
|
|
// db_id is used to generate unique id of sst on the remote compactor
|
|
std::string db_id;
|
|
|
|
// information for subcompaction
|
|
bool has_begin = false;
|
|
std::string begin;
|
|
bool has_end = false;
|
|
std::string end;
|
|
|
|
uint64_t options_file_number = 0;
|
|
|
|
// serialization interface to read and write the object
|
|
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
|
|
Status Write(std::string* output);
|
|
|
|
#ifndef NDEBUG
|
|
bool TEST_Equals(CompactionServiceInput* other);
|
|
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
|
|
#endif // NDEBUG
|
|
};
|
|
|
|
// CompactionServiceOutputFile is the metadata for the output SST file
|
|
struct CompactionServiceOutputFile {
|
|
std::string file_name;
|
|
uint64_t file_size{};
|
|
SequenceNumber smallest_seqno{};
|
|
SequenceNumber largest_seqno{};
|
|
std::string smallest_internal_key;
|
|
std::string largest_internal_key;
|
|
uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
|
|
uint64_t file_creation_time = kUnknownFileCreationTime;
|
|
uint64_t epoch_number = kUnknownEpochNumber;
|
|
std::string file_checksum = kUnknownFileChecksum;
|
|
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
uint64_t paranoid_hash{};
|
|
bool marked_for_compaction;
|
|
UniqueId64x2 unique_id{};
|
|
TableProperties table_properties;
|
|
bool is_proximal_level_output;
|
|
Temperature file_temperature = Temperature::kUnknown;
|
|
|
|
CompactionServiceOutputFile() = default;
|
|
CompactionServiceOutputFile(
|
|
const std::string& name, uint64_t size, SequenceNumber smallest,
|
|
SequenceNumber largest, std::string _smallest_internal_key,
|
|
std::string _largest_internal_key, uint64_t _oldest_ancester_time,
|
|
uint64_t _file_creation_time, uint64_t _epoch_number,
|
|
const std::string& _file_checksum,
|
|
const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
|
|
bool _marked_for_compaction, UniqueId64x2 _unique_id,
|
|
const TableProperties& _table_properties, bool _is_proximal_level_output,
|
|
Temperature _file_temperature)
|
|
: file_name(name),
|
|
file_size(size),
|
|
smallest_seqno(smallest),
|
|
largest_seqno(largest),
|
|
smallest_internal_key(std::move(_smallest_internal_key)),
|
|
largest_internal_key(std::move(_largest_internal_key)),
|
|
oldest_ancester_time(_oldest_ancester_time),
|
|
file_creation_time(_file_creation_time),
|
|
epoch_number(_epoch_number),
|
|
file_checksum(_file_checksum),
|
|
file_checksum_func_name(_file_checksum_func_name),
|
|
paranoid_hash(_paranoid_hash),
|
|
marked_for_compaction(_marked_for_compaction),
|
|
unique_id(std::move(_unique_id)),
|
|
table_properties(_table_properties),
|
|
is_proximal_level_output(_is_proximal_level_output),
|
|
file_temperature(_file_temperature) {}
|
|
};
|
|
|
|
// CompactionServiceResult contains the compaction result from a different db
|
|
// instance, with these information, the primary db instance with write
|
|
// permission is able to install the result to the DB.
|
|
struct CompactionServiceResult {
|
|
Status status;
|
|
std::vector<CompactionServiceOutputFile> output_files;
|
|
int output_level = 0;
|
|
|
|
// location of the output files
|
|
std::string output_path;
|
|
|
|
uint64_t bytes_read = 0;
|
|
uint64_t bytes_written = 0;
|
|
|
|
// Job-level Compaction Stats.
|
|
//
|
|
// NOTE: Job level stats cannot be rebuilt from scratch by simply aggregating
|
|
// per-level stats due to some fields populated directly during compaction
|
|
// (e.g. RecordDroppedKeys()). This is why we need both job-level stats and
|
|
// per-level in the serialized result. If rebuilding job-level stats from
|
|
// per-level stats become possible in the future, consider deprecating this
|
|
// field.
|
|
CompactionJobStats stats;
|
|
|
|
// Per-level Compaction Stats for both output_level_stats and
|
|
// proximal_level_stats
|
|
InternalStats::CompactionStatsFull internal_stats;
|
|
|
|
// serialization interface to read and write the object
|
|
static Status Read(const std::string& data_str, CompactionServiceResult* obj);
|
|
Status Write(std::string* output);
|
|
|
|
#ifndef NDEBUG
|
|
bool TEST_Equals(CompactionServiceResult* other);
|
|
bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
|
|
#endif // NDEBUG
|
|
};
|
|
|
|
// CompactionServiceCompactionJob is an read-only compaction job, it takes
|
|
// input information from `compaction_service_input` and put result information
|
|
// in `compaction_service_result`, the SST files are generated to `output_path`.
|
|
class CompactionServiceCompactionJob : private CompactionJob {
|
|
public:
|
|
CompactionServiceCompactionJob(
|
|
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
|
|
const MutableDBOptions& mutable_db_options,
|
|
const FileOptions& file_options, VersionSet* versions,
|
|
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
|
|
FSDirectory* output_directory, Statistics* stats,
|
|
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
|
|
JobContext* job_context, std::shared_ptr<Cache> table_cache,
|
|
EventLogger* event_logger, const std::string& dbname,
|
|
const std::shared_ptr<IOTracer>& io_tracer,
|
|
const std::atomic<bool>& manual_compaction_canceled,
|
|
const std::string& db_id, const std::string& db_session_id,
|
|
std::string output_path,
|
|
const CompactionServiceInput& compaction_service_input,
|
|
CompactionServiceResult* compaction_service_result);
|
|
|
|
// REQUIRED: mutex held
|
|
// Like CompactionJob::Prepare()
|
|
void Prepare();
|
|
|
|
// Run the compaction in current thread and return the result
|
|
Status Run();
|
|
|
|
void CleanupCompaction();
|
|
|
|
IOStatus io_status() const { return CompactionJob::io_status(); }
|
|
|
|
protected:
|
|
void RecordCompactionIOStats() override;
|
|
|
|
private:
|
|
// Get table file name in output_path
|
|
std::string GetTableFileName(uint64_t file_number) override;
|
|
// Specific the compaction output path, otherwise it uses default DB path
|
|
const std::string output_path_;
|
|
|
|
// Compaction job input
|
|
const CompactionServiceInput& compaction_input_;
|
|
|
|
// Compaction job result
|
|
CompactionServiceResult* compaction_result_;
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|