rocksdb/db/merge_helper.h
Hui Xiao f7e4009de1 Integrate compaction resumption with DB::OpenAndCompact() (#13984)
Summary:
### Context/Summary:
This is stacked on top of https://github.com/facebook/rocksdb/pull/13983 and integrate compaction resumption with OpenAndCompact().

Flow of resuming: DB::OpenAndCompact() -> Compaction progress file  -> SubcompactionProgress -> CompactionJob
Flow of persistence: CompactionJob -> SubcompactionProgress -> Compaction progress file  -> DB that is called with OpenAndCompact()

This PR focuses on DB::OpenAndCompact() -> Compaction progress file  -> SubcompactionProgress  and Compaction progress file -> DB that is called with OpenAndCompact()

**Resume Flow**
1. Check configuration. Right now paranoid_file_check=true (by default false) is not yet compatible with allow_resumption=true. Also only single subcompaction is supported as OpenAndCompact() does not partition compaction anyway
2. Scan compaction output files for latest, old and temporary compaction progress file and output files. If latest compaction progress file exists, we should resume.
3. Clean up older or temporary progress files if any. They can exist if the last OpenAndCompact() crashed during resume flow
4. If any, parse the latest progress file into CompactionProgress and clean up extra compaction output files that are not yet tracked. These compaction output files can exist as tracking every output file is just best-effort and interrupted output files in the middle is not tracked as progress yet.
5. If allow_resumption=false or no valid compaction progress is found or parsed, clean up the latest progress file and existing compaction output files to start fresh compaction. If the clean up itself fails, fail the OpenAndCompact() call to prevent resuming with inconsistency between output files and progress file.

 **Progress File Creation**
1. Create temporary progress file
2. Persist the progress from latest compaction progress file to the temporary progress file. This is to simplify resuming from an interrupted compaction that was just resumed. Similar to how manifest recovery works.
3. Rename the temporary progress file to the newer compaction progress so it atomically becomes the "new" latest progress file
4. Delete the "old" latest progress file since it's useless now.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13984

Test Plan:
- Integrated unit tests to simulate OpenAndCompact gets canceled and optionally resumed for remote compaction
- Existing UTs and stress/crash test
- Manual stress test with https://github.com/facebook/rocksdb/pull/14041

### Performance testing:
**1. Latency**
Using
```
./db_bench --benchmarks=OpenAndCompact[X5] --openandcompact_test_cancel_on_odd=false --openandcompact_cancel_after_seconds=0 --openandcompact_allow_resumption=$openandcompact_allow_resumption  --use_existing_db=true --db=$db --disable_auto_compactions=true --compression_type=none --secondary_path=$secondary_path  --target_file_size_base=268435456
```
**allow_resumption = false**
Input files: 101 files, 10000 keys
OpenAndCompact() API call : 26766256.000 micros/op 0 ops/sec 26.766 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 27837249.000 micros/op 0 ops/sec 27.837 seconds 1 operations;

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 26546234.000 micros/op 0 ops/sec 26.546 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 27918621.000 micros/op 0 ops/sec 27.919 seconds 1 operations;
OpenAndCompact [AVG 2 runs] : 0 (± 0) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 42243571.000 micros/op 0 ops/sec 42.244 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 43497581.000 micros/op 0 ops/sec 43.498 seconds 1 operations;
OpenAndCompact [AVG 3 runs] : 0 (± 0) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 34241357.000 micros/op 0 ops/sec 34.241 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 35655346.000 micros/op 0 ops/sec 35.655 seconds 1 operations;
OpenAndCompact [AVG 4 runs] : 0 (± 0) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 27083361.000 micros/op 0 ops/sec 27.083 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 28487999.000 micros/op 0 ops/sec 28.488 seconds 1 operations;
OpenAndCompact [AVG 5 runs] : 0 (± 0) ops/sec

OpenAndCompact [AVG    5 runs] : 0 (± 0) ops/sec; 31669.681 ms/op
OpenAndCompact [MEDIAN 5 runs] : 0 ops/sec

**allow_resumption= true**
Input files: 101 files, 10000 keys
OpenAndCompact() API call : 25446470.000 micros/op 0 ops/sec 25.446 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 26833415.000 micros/op 0 ops/sec 26.833 seconds 1 operations;

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 240745.000 micros/op 0 ops/sec 0.241 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact :  244934.000 micros/op 4 ops/sec 0.245 seconds 1 operations;
OpenAndCompact [AVG 2 runs] : 2 (± 3) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 24843383.000 micros/op 0 ops/sec 24.843 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 26192235.000 micros/op 0 ops/sec 26.192 seconds 1 operations;
OpenAndCompact [AVG 3 runs] : 1 (± 2) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 270819.000 micros/op 0 ops/sec 0.271 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact :  275140.000 micros/op 3 ops/sec 0.275 seconds 1 operations;
OpenAndCompact [AVG 4 runs] : 1 (± 2) ops/sec

Input files: 101 files, 10000 keys
OpenAndCompact() API call : 23038311.000 micros/op 0 ops/sec 23.038 seconds 1 operations;
OpenAndCompact status: OK
Output: 92 files, average size: 271747380 bytes (259.16 MB)
OpenAndCompact : 24439097.000 micros/op 0 ops/sec 24.439 seconds 1 operations;
OpenAndCompact [AVG 5 runs] : 1 (± 1) ops/sec

OpenAndCompact [AVG    5 runs] : 1 (± 1) ops/sec; 638.417 ms/op
OpenAndCompact [MEDIAN 5 runs] : 0 ops/sec

**Persistence cost:** If we compare the odd number of OpenAndCompact() API, it's actually faster.
**Resumption saving:** (0.2 - 26.766 ) / 26.766 * 100 = 99.25% improvement when all the compaction progress is redone without the allow_resumption feature

**2.  Memory usage** (in case SubcompactionProgress storing its own memory copies of output filemetadata in https://github.com/facebook/rocksdb/pull/13983/files is a trouble)
 ```
// ~= 90 output files
/usr/bin/time -f "
Resource Summary:
  Wall time: %e seconds
  CPU time: %U user + %S system (%P total)
  Peak memory: %M KB
  Page faults: %F major + %R minor
" ./db_bench --benchmarks=OpenAndCompact[X1] --openandcompact_test_cancel_on_odd=false --openandcompact_cancel_after_seconds=0 --openandcompact_allow_resumption=$openandcompact_allow_resumption  --use_existing_db=true --db=$db --disable_auto_compactions=true --compression_type=none --secondary_path=$secondary_path  --target_file_size_base=268435456
```
**allow_resumption = false**
Peak memory: 275828 KB
**allow_resumption = true**
Peak memory: 277204 KB (regress 0.49% memory usage, most likely due to storing own copies of output files' file metadata in subcompaction progress)

### Near-term follow up:
- Add statistics to record the successfully resumed compaction output files bytes
- Add stress/crash test support to cover error paths (including progress file sync error), crash/cancel OpenAndCompact() at random compaction progress point and surface feature incompatibility
   - See https://github.com/facebook/rocksdb/pull/14041
- Resolve the TODO https://github.com/facebook/rocksdb/pull/13984/files#diff-17fbdec07244b1f07d1a4e5aed0a6feecf4474d20b3129818c10fc0ff9f3d547R1303-R1314
   - See https://github.com/facebook/rocksdb/pull/14042

Reviewed By: jaykorean

Differential Revision: D84299662

Pulled By: hx235

fbshipit-source-id: 69bbf395401604172a1a5c557ca834011a3d51d7
2025-10-15 13:43:53 -07:00

318 lines
14 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <deque>
#include <string>
#include <vector>
#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "db/wide/wide_column_serialization.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "rocksdb/wide_columns.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class Statistics;
class SystemClock;
class BlobFetcher;
class PrefetchBufferCollection;
struct CompactionIterationStats;
class MergeHelper {
public:
MergeHelper(Env* env, const Comparator* user_comparator,
const MergeOperator* user_merge_operator,
const CompactionFilter* compaction_filter, Logger* logger,
bool assert_valid_internal_key, SequenceNumber latest_snapshot,
const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
Statistics* stats = nullptr,
const std::atomic<bool>* shutting_down = nullptr);
// Wrappers around MergeOperator::FullMergeV3() that record perf statistics.
// Set `update_num_ops_stats` to true if it is from a user read so that
// the corresponding statistics are updated.
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - Corruption: Merge operator reported unsuccessful merge. The scope of the
// damage will be stored in `*op_failure_scope` when `op_failure_scope` is
// not nullptr
// Empty tag types to disambiguate overloads
struct NoBaseValueTag {};
static constexpr NoBaseValueTag kNoBaseValue{};
struct PlainBaseValueTag {};
static constexpr PlainBaseValueTag kPlainBaseValue{};
struct WideBaseValueTag {};
static constexpr WideBaseValueTag kWideBaseValue{};
template <typename... ResultTs>
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, NoBaseValueTag,
const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope,
ResultTs... results) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
return TimedFullMergeImpl(
merge_operator, key, std::move(existing_value), operands, logger,
statistics, clock, update_num_ops_stats, op_failure_scope, results...);
}
template <typename... ResultTs>
static Status TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
const Slice& value, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);
return TimedFullMergeImpl(
merge_operator, key, std::move(existing_value), operands, logger,
statistics, clock, update_num_ops_stats, op_failure_scope, results...);
}
template <typename... ResultTs>
static Status TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
Slice entity_copy(entity);
WideColumns existing_columns;
const Status s =
WideColumnSerialization::Deserialize(entity_copy, existing_columns);
if (!s.ok()) {
return s;
}
existing_value = std::move(existing_columns);
return TimedFullMergeImpl(
merge_operator, key, std::move(existing_value), operands, logger,
statistics, clock, update_num_ops_stats, op_failure_scope, results...);
}
template <typename... ResultTs>
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, WideBaseValueTag,
const WideColumns& columns,
const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope,
ResultTs... results) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);
return TimedFullMergeImpl(
merge_operator, key, std::move(existing_value), operands, logger,
statistics, clock, update_num_ops_stats, op_failure_scope, results...);
}
// During compaction, merge entries until we hit
// - a corrupted key
// - a Put/Delete,
// - a different user key,
// - a specific sequence number (snapshot boundary),
// - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
// or - the end of iteration
//
// The result(s) of the merge can be accessed in `MergeHelper::keys()` and
// `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
// is called. `MergeOutputIterator` is specially designed to iterate the
// results of a `MergeHelper`'s most recent `MergeUntil()`.
//
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
// range_del_agg: (IN) filters merge operands covered by range tombstones.
// stop_before: (IN) a sequence number that merge should not cross.
// 0 means no restriction
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
// allow_data_in_errors: (IN) if true, data details will be displayed in
// error/log messages.
// blob_fetcher: (IN) blob fetcher object for the compaction's input version.
// prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
// used for compaction readahead.
// c_iter_stats: (OUT) compaction iteration statistics.
//
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - MergeInProgress: Output consists of merge operands only.
// - Corruption: Merge operator reported unsuccessful merge or a corrupted
// key has been encountered and not expected (applies only when compiling
// with asserts removed).
// - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(InternalIterator* iter,
CompactionRangeDelAggregator* range_del_agg,
const SequenceNumber stop_before, const bool at_bottom,
const bool allow_data_in_errors,
const BlobFetcher* blob_fetcher,
const std::string* const full_history_ts_low,
PrefetchBufferCollection* prefetch_buffers,
CompactionIterationStats* c_iter_stats);
// Filters a merge operand using the compaction filter specified
// in the constructor. Returns the decision that the filter made.
// Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
// optional outputs of compaction filter.
// user_key includes timestamp if user-defined timestamp is enabled.
CompactionFilter::Decision FilterMerge(const Slice& user_key,
const Slice& value_slice);
// Query the merge result
// These are valid until the next MergeUntil call
// If the merging was successful:
// - keys() contains a single element with the latest sequence number of
// the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
// - values() contains a single element with the result of merging all the
// operands together
//
// IMPORTANT 1: the key type could change after the MergeUntil call.
// Put/Delete + Merge + ... + Merge => Put
// Merge + ... + Merge => Merge
//
// If the merge operator is not associative, and if a Put/Delete is not found
// then the merging will be unsuccessful. In this case:
// - keys() contains the list of internal keys seen in order of iteration.
// - values() contains the list of values (merges) seen in the same order.
// values() is parallel to keys() so that the first entry in
// keys() is the key associated with the first entry in values()
// and so on. These lists will be the same length.
// All of these pairs will be merges over the same user key.
// See IMPORTANT 2 note below.
//
// IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
// So keys().back() was the first key seen by iterator.
// TODO: Re-style this comment to be like the first one
const std::deque<std::string>& keys() const { return keys_; }
const std::vector<Slice>& values() const {
return merge_context_.GetOperands();
}
uint64_t TotalFilterTime() const { return total_filter_time_; }
bool HasOperator() const { return user_merge_operator_ != nullptr; }
// If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
// return true and fill *until with the key to which we should skip.
// If true, keys() and values() are empty.
bool FilteredUntil(Slice* skip_until) const {
if (!has_compaction_filter_skip_until_) {
return false;
}
assert(compaction_filter_ != nullptr);
assert(skip_until != nullptr);
assert(compaction_filter_skip_until_.Valid());
*skip_until = compaction_filter_skip_until_.Encode();
return true;
}
private:
Env* env_;
SystemClock* clock_;
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
Logger* logger_;
bool assert_valid_internal_key_; // enforce no internal key corruption?
bool allow_single_operand_;
SequenceNumber latest_snapshot_;
const SnapshotChecker* const snapshot_checker_;
int level_;
// the scratch area that holds the result of MergeUntil
// valid up to the next MergeUntil call
// Keeps track of the sequence of keys seen
std::deque<std::string> keys_;
// Parallel with keys_; stores the operands
mutable MergeContext merge_context_;
StopWatchNano<> filter_timer_;
uint64_t total_filter_time_;
Statistics* stats_;
bool has_compaction_filter_skip_until_ = false;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
}
template <typename Visitor>
static Status TimedFullMergeCommonImpl(
const MergeOperator* merge_operator, const Slice& key,
MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor);
// Variant that exposes the merge result directly (in serialized form for wide
// columns) as well as its value type. Used by iterator and compaction.
static Status TimedFullMergeImpl(
const MergeOperator* merge_operator, const Slice& key,
MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
Slice* result_operand, ValueType* result_type);
// Variant that exposes the merge result translated into the form requested by
// the client. (For example, if the result is a wide-column structure but the
// client requested the results in plain-value form, the value of the default
// column is returned.) Used by point lookups.
static Status TimedFullMergeImpl(
const MergeOperator* merge_operator, const Slice& key,
MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope,
std::string* result_value, PinnableWideColumns* result_entity);
};
// MergeOutputIterator can be used to iterate over the result of a merge.
class MergeOutputIterator {
public:
// The MergeOutputIterator is bound to a MergeHelper instance.
explicit MergeOutputIterator(const MergeHelper* merge_helper);
// Seeks to the first record in the output.
void SeekToFirst();
// Advances to the next record in the output.
void Next();
Slice key() { return Slice(*it_keys_); }
Slice value() { return Slice(*it_values_); }
bool Valid() const { return it_keys_ != merge_helper_->keys().rend(); }
private:
const MergeHelper* merge_helper_;
std::deque<std::string>::const_reverse_iterator it_keys_;
std::vector<Slice>::const_reverse_iterator it_values_;
};
} // namespace ROCKSDB_NAMESPACE