rocksdb/db/compaction/subcompaction_state.h
Hui Xiao a1d8318563 Fix resumable compaction to prevent resumption at truncated range deletion boundaries (#14184)
Summary:
**Context/Summary:**

Truncated range deletion in input files can be output by CompactionIterator with type kMaxValid instead of kTypeRangeDeletion, to satisfy ordering requirement between the truncated range deletion start key and a file's point keys. There was a plan to skip such key in https://github.com/facebook/rocksdb/pull/14122 but blockers remain to fulfill the plan.

Resumable compaction is not able to handle resumption from range deletion well at this point and should consider kMaxValid type same as kTypeRangeDeletion for resumption. Previously, it didn't and mistakenly allow resumption from a delete range. That led to an assertion failure, complaining about lacking information to update file boundaries in the presence of range deletion needed during cutting an output file, after the compaction resumes from that delete range and happens to cut the output file shortly after without any point keys in between.

```
frame https://github.com/facebook/rocksdb/issues/9: 0x00007f4f4743bc93 libc.so.6`__GI___assert_fail(assertion="meta.smallest.size() > 0", file="db/compaction/compaction_outputs.cc", line=530, function="rocksdb::Status rocksdb::CompactionOutputs::AddRangeDels(rocksdb::CompactionRangeDelAggregator&, const rocksdb::Slice*, const rocksdb::Slice*, rocksdb::CompactionIterationStats&, bool, const rocksdb::InternalKeyComparator&, rocksdb::SequenceNumber, std::pair<long unsigned int, long unsigned int>, const rocksdb::Slice&, const string&)") at assert.c:101:3
frame https://github.com/facebook/rocksdb/issues/10: 0x00007f4f4808c68c librocksdb.so.10.9`rocksdb::CompactionOutputs::AddRangeDels(this=0x00007f4f0c27e1a0, range_del_agg=0x00007f4f0c21ecc0, comp_start_user_key=0x0000000000000000, comp_end_user_key=0x0000000000000000, range_del_out_stats=0x00007f4f0dffa140, bottommost_level=false, icmp=0x00007f4ef4c93040, earliest_snapshot=13108729, keep_seqno_range=<unavailable>, next_table_min_key=0x00007f4ef4c8f540, full_history_ts_low="") at compaction_outputs.cc:530:7
frame https://github.com/facebook/rocksdb/issues/11: 0x00007f4f480480dd librocksdb.so.10.9`rocksdb::CompactionJob::FinishCompactionOutputFile(this=0x00007f4f0dffb890, input_status=<unavailable>, prev_table_last_internal_key=0x00007f4f0dffa650, next_table_min_key=0x00007f4ef4c8f540, comp_start_user_key=0x0000000000000000, comp_end_user_key=0x0000000000000000, c_iter=0x00007f4ef4c8f400, sub_compact=0x00007f4f0c27e000, outputs=0x00007f4f0c27e1a0) at compaction_job.cc:1917:31
```

This PR simply prevents  MaxValid from being a resumption point like regular range deletion - see commit 842d66eb18ea67e965d6acb1fce12c18eeb778d2

Besides that, the PR also improves the testing, variable naming, logging in resumable compaction codes that were needed to debug this assertion failure - see commit https://github.com/facebook/rocksdb/pull/14184/commits/aecd4e7f971f6dd4df672d9e5f1409fe4747c561. These improvements are covered by existing tests.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14184

Test Plan:
- The stress initially surfaced the error. Using the exact same LSM shapes and files that were used in stress test but in a unit test, I'm able to get a deterministic repro and confirmed the fix resolves the error.  This is the repro test 1075936e69
```
./compaction_service_test --gtest_filter=ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
# Before fix
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from ResumableCompactionServiceTest
[ RUN      ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
compaction_service_test: db/compaction/compaction_outputs.cc:530: rocksdb::Status rocksdb::CompactionOutputs::AddRangeDels(rocksdb::CompactionRangeDelAggregator&, const rocksdb::Slice*, const rocksdb::Slice*, rocksdb::CompactionIterationStats&, bool, const rocksdb::InternalKeyComparator&, rocksdb::SequenceNumber, std::pair<long unsigned int, long unsigned int>, const rocksdb::Slice&, const string&): Assertion `meta.smallest.size() > 0' failed.
Received signal 6 (Aborted)
Invoking GDB for stack trace...
[New LWP 2621610]
[New LWP 2621611]
[New LWP 2621612]
[New LWP 2621613]
[New LWP 2621614]
[New LWP 2621630]
[New LWP 2621631]

# After fix
Note: Google Test filter = ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from ResumableCompactionServiceTest
[ RUN      ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume
[       OK ] ResumableCompactionServiceTest.CompactSpecificFilesFromExistingDBWithCancelAndResume (4722 ms)
[----------] 1 test from ResumableCompactionServiceTest (4722 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran. (4722 ms total)
[  PASSED  ] 1 test.

```
- Follow-up: I tried a couple time to coerce the truncated range delete from scratch in the unit test but failed doing so. Considering kMaxValid may not be outputted by compaction iterator anymore after https://github.com/facebook/rocksdb/pull/14122/files gets landed again (and obsolete the bug) ADN the simple nature of this fix 842d66eb18ea67e965d6acb1fce12c18eeb778d2 AND the worst case of such fix going wrong is just less resumption, I decided to leave writing a unit test to coerce truncated ranged deletion from scratch a follow-up. Maybe I will draw inspiration from https://github.com/facebook/rocksdb/pull/14122/files.

Reviewed By: jaykorean

Differential Revision: D88912663

Pulled By: hx235

fbshipit-source-id: 80a01135684c8fea659650faaa00c2dc452c482a
2025-12-11 16:50:42 -08:00

258 lines
9.9 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <optional>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
namespace ROCKSDB_NAMESPACE {
// Maintains state and outputs for each sub-compaction
// It contains 2 `CompactionOutputs`:
// 1. one for the normal output files
// 2. another for the proximal level outputs
// a `current` pointer maintains the current output group, when calling
// `AddToOutput()`, it checks the output of the current compaction_iterator key
// and point `current` to the target output group. By default, it just points to
// normal compaction_outputs, if the compaction_iterator key should be placed on
// the proximal level, `current` is changed to point to
// `proximal_level_outputs`.
// The later operations uses `Current()` to get the target group.
//
// +----------+ +-----------------------------+ +---------+
// | *current |--------> | compaction_outputs |----->| output |
// +----------+ +-----------------------------+ +---------+
// | | output |
// | +---------+
// | | ... |
// |
// | +-----------------------------+ +---------+
// +-------------> | proximal_level_outputs |----->| output |
// +-----------------------------+ +---------+
// | ... |
class SubcompactionState {
public:
const Compaction* compaction;
// The boundaries of the key-range this compaction is interested in. No two
// sub-compactions may have overlapping key-ranges.
// 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
const std::optional<Slice> start, end;
// The return status of this sub-compaction
Status status;
// The return IO Status of this sub-compaction
IOStatus io_status;
// Notify on sub-compaction completion only if listener was notified on
// sub-compaction begin.
bool notify_on_subcompaction_completion = false;
// compaction job stats for this sub-compaction
CompactionJobStats compaction_job_stats;
// sub-compaction job id, which is used to identify different sub-compaction
// within the same compaction job.
const uint32_t sub_job_id;
Slice SmallestUserKey() const;
Slice LargestUserKey() const;
// Get all outputs from the subcompaction. For per_key_placement compaction,
// it returns both the last level outputs and proximal level outputs.
OutputIterator GetOutputs() const;
// Assign range dels aggregator. The various tombstones will potentially
// be filtered to different outputs.
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
assert(range_del_agg_ == nullptr);
assert(range_del_agg);
range_del_agg_ = std::move(range_del_agg);
}
void RemoveLastEmptyOutput() {
compaction_outputs_.RemoveLastEmptyOutput();
proximal_level_outputs_.RemoveLastEmptyOutput();
}
void BuildSubcompactionJobInfo(
SubcompactionJobInfo& subcompaction_job_info) const {
const Compaction* c = compaction;
const ColumnFamilyData* cfd = c->column_family_data();
subcompaction_job_info.cf_id = cfd->GetID();
subcompaction_job_info.cf_name = cfd->GetName();
subcompaction_job_info.status = status;
subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
subcompaction_job_info.base_input_level = c->start_level();
subcompaction_job_info.output_level = c->output_level();
subcompaction_job_info.compaction_reason = c->compaction_reason();
subcompaction_job_info.compression = c->output_compression();
subcompaction_job_info.stats = compaction_job_stats;
subcompaction_job_info.blob_compression_type =
c->mutable_cf_options().blob_compression_type;
}
SubcompactionState() = delete;
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SubcompactionState(Compaction* c, const std::optional<Slice> _start,
const std::optional<Slice> _end, uint32_t _sub_job_id)
: compaction(c),
start(_start),
end(_end),
sub_job_id(_sub_job_id),
compaction_outputs_(c, /*is_proximal_level=*/false),
proximal_level_outputs_(c, /*is_proximal_level=*/true) {
assert(compaction != nullptr);
// Set output split key (used for RoundRobin feature) only for normal
// compaction_outputs, output to proximal_level feature doesn't support
// RoundRobin feature (and may never going to be supported, because for
// RoundRobin, the data time is mostly naturally sorted, no need to have
// per-key placement with output_to_proximal_level).
compaction_outputs_.SetOutputSlitKey(start, end);
}
SubcompactionState(SubcompactionState&& state) noexcept
: compaction(state.compaction),
start(state.start),
end(state.end),
status(std::move(state.status)),
io_status(std::move(state.io_status)),
notify_on_subcompaction_completion(
state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id),
compaction_outputs_(std::move(state.compaction_outputs_)),
proximal_level_outputs_(std::move(state.proximal_level_outputs_)),
range_del_agg_(std::move(state.range_del_agg_)) {
current_outputs_ = state.current_outputs_ == &state.proximal_level_outputs_
? &proximal_level_outputs_
: &compaction_outputs_;
}
// Add all the new files from this compaction to version_edit
void AddOutputsEdit(VersionEdit* out_edit) const {
for (const auto& file : proximal_level_outputs_.outputs_) {
out_edit->AddFile(compaction->GetProximalLevel(), file.meta);
}
for (const auto& file : compaction_outputs_.outputs_) {
out_edit->AddFile(compaction->output_level(), file.meta);
}
}
void Cleanup(Cache* cache);
void AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& internal_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
return *current_outputs_;
}
CompactionOutputs* Outputs(bool is_proximal_level) {
assert(compaction);
if (is_proximal_level) {
assert(compaction->SupportsPerKeyPlacement());
return &proximal_level_outputs_;
}
return &compaction_outputs_;
}
// Per-level stats for the output
InternalStats::CompactionStats* OutputStats(bool is_proximal_level) {
assert(compaction);
if (is_proximal_level) {
assert(compaction->SupportsPerKeyPlacement());
return &proximal_level_outputs_.stats_;
}
return &compaction_outputs_.stats_;
}
uint64_t GetWorkerCPUMicros() const {
uint64_t rv = compaction_outputs_.GetWorkerCPUMicros();
if (compaction->SupportsPerKeyPlacement()) {
rv += proximal_level_outputs_.GetWorkerCPUMicros();
}
return rv;
}
CompactionRangeDelAggregator* RangeDelAgg() const {
return range_del_agg_.get();
}
// if the outputs have range delete, range delete is also data
bool HasRangeDel() const {
return range_del_agg_ && !range_del_agg_->IsEmpty();
}
void SetSubcompactionProgress(
const SubcompactionProgress& subcompaction_progress) {
subcompaction_progress_ = subcompaction_progress;
}
SubcompactionProgress& GetSubcompactionProgressRef() {
return subcompaction_progress_;
}
// Add compaction_iterator key/value to the `Current` output group.
Status AddToOutput(const CompactionIterator& iter, bool use_proximal_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func,
const ParsedInternalKey& prev_iter_output_internal_key);
// Close all compaction output files, both output_to_proximal_level outputs
// and normal outputs.
Status CloseCompactionFiles(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
auto per_key = compaction->SupportsPerKeyPlacement();
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
// CloseOutput() may open new compaction output files.
Status s = curr_status;
if (per_key) {
s = proximal_level_outputs_.CloseOutput(s, range_del_agg_.get(),
open_file_func, close_file_func);
} else {
assert(proximal_level_outputs_.HasBuilder() == false);
assert(proximal_level_outputs_.HasOutput() == false);
}
s = compaction_outputs_.CloseOutput(s, range_del_agg_.get(), open_file_func,
close_file_func);
return s;
}
private:
// State kept for output being generated
CompactionOutputs compaction_outputs_;
CompactionOutputs proximal_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
SubcompactionProgress subcompaction_progress_;
};
} // namespace ROCKSDB_NAMESPACE