Summary: **This PR adds a new statistic to track the total number of sorted runs for running compactions.** Context: I am currently working on a separate project, where I am trying to tune the read request sizes made by `FilePrefetchBuffer` to the storage backend. In this particular case, `FilePrefetchBuffer` will issue larger reads and have to buffer larger read responses. This means we expect to see higher memory utilization. At least for the initial rollout, we only want to enable this optimization for compaction reads. **I want some way to get a sense of what the memory usage _impact_ will be if the prefetch read request size is increased from (for instance) 8MB to 64MB.** **If I know the number of files that compactions are actively reading from (i.e. the number of sorted runs / "input iterators"), I can determine how much the memory usage will increase if I bump up the readahead size inside `FilePrefetchBuffer`.** For instance, if there are 16 sorted runs at any given point in time and I bump up the readahead size by 64MB, I can project an increase of 16 * 64 MB. In most cases, the number of sorted runs processed per compaction is the number of L0 files plus the number of non-L0 levels. However, we need to be aware of exceptions like trivial compactions, deletion compactions, and subcompactions. This is a major reason why this PR chooses to implement the stats counting inside `CompactionMergingIterator`, since by the time we get down to that part of the stack, we know the "true" values for the number of input iterators / sorted runs. Alternatives considered: - https://github.com/facebook/rocksdb/issues/13299 gives you a histogram for the number of sorted runs ("input iterators") for a _single compaction_. While this statistic is interested and in the direction of what we want, we are going to be assessing the memory impact across _all_ compactions that are currently running. Thus, this statistic does not give us all the information we need. - https://github.com/facebook/rocksdb/issues/13302 gives you the total prefetch buffer memory usage, but it doesn't tell you what happens when the readahead size is increased. Furthermore, the code change is error prone and very "invasive" -- look at how many places in the code had to be updated. This would be useful in the future for general memory accounting purposes, but it does not serve our immediate needs. - https://github.com/facebook/rocksdb/issues/13320 aimed to track the same metric, but did this inside `DbImpl:: BackgroundCallCompaction`. It turns out that this does not handle the case where a compaction is divided into multiple subcompactions (in which case, there would be _more_ sorted runs being processed at the same time than you would otherwise predict.) The current PR handles subcompactions automatically, and I think it is cleaner overall. Note: When I attempted to put this statistic as part of the `cf_stats_value_` array, even after updating the array to use `std::atomic<uint64_t>`, I still was able to get assertions to _fail_ inside the crash tests. These assertions checked that the unsigned integer would not underflow below zero during compaction. I experimented for many hours but could not figure out a solution, even though it would seem like things "should" work with `fetch_add` and `fetch_sub`. One possibility is that the values in `cf_stats_value_` are being cleared to 0, but I added a `fprintf` to that portion of the code and didn't see it getting printed out before my assertions failed. Regardless, I think that this statistic is different enough from the CF-specific and the other DB-wide stats that the best solution is to just have it defined as a separate `std::atomic<uint64_t>`. I also do not want to spend more hours trying to debug why the crash test assertions break, when the solution in the current version of the PR can get the assertions to consistently pass. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13325 Test Plan: - I updated one unit test to confirm that `num_running_compaction_sorted_runs` starts and ends at 0. This checks that all the additions and subtractions cancel out. I also made sure the statistic got incremented at least once. - When I added `fprintf` manually, I confirmed that my statistics updating code was being exercised numerous times inside `db_compaction_test`. I printed out the results before and after the increments/decrements, and the numbers looked good. - We will monitor the generated statistics after this PR is merged. - There are assertion checks after each increment and before each decrement. If there are bugs, the crash test will almost certainly find them, since they quickly found issues with my initial implementation for this PR which tried using the `cf_stats_value_` array (modified to use `std::atomic`). Reviewed By: anand1976, hx235 Differential Revision: D68527895 Pulled By: archang19 fbshipit-source-id: 135cf210e0ff1550ea28ae4384d429ae620b1784
396 lines
13 KiB
C++
396 lines
13 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
#include "table/compaction_merging_iterator.h"
|
|
|
|
#include "db/internal_stats.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
class CompactionMergingIterator : public InternalIterator {
|
|
public:
|
|
CompactionMergingIterator(
|
|
const InternalKeyComparator* comparator, InternalIterator** children,
|
|
int n, bool is_arena_mode,
|
|
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
|
|
std::unique_ptr<TruncatedRangeDelIterator>**>>&
|
|
range_tombstones,
|
|
InternalStats* internal_stats)
|
|
: is_arena_mode_(is_arena_mode),
|
|
comparator_(comparator),
|
|
current_(nullptr),
|
|
minHeap_(CompactionHeapItemComparator(comparator_)),
|
|
pinned_iters_mgr_(nullptr),
|
|
internal_stats_(internal_stats),
|
|
num_sorted_runs_recorded_(0) {
|
|
children_.resize(n);
|
|
for (int i = 0; i < n; i++) {
|
|
children_[i].level = i;
|
|
children_[i].iter.Set(children[i]);
|
|
assert(children_[i].type == HeapItem::ITERATOR);
|
|
}
|
|
assert(range_tombstones.size() == static_cast<size_t>(n));
|
|
for (auto& p : range_tombstones) {
|
|
range_tombstone_iters_.push_back(std::move(p.first));
|
|
}
|
|
pinned_heap_item_.resize(n);
|
|
for (int i = 0; i < n; ++i) {
|
|
if (range_tombstones[i].second) {
|
|
// for LevelIterator
|
|
*range_tombstones[i].second = &range_tombstone_iters_[i];
|
|
}
|
|
pinned_heap_item_[i].level = i;
|
|
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
|
|
}
|
|
if (internal_stats_) {
|
|
TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats");
|
|
// The size of children_ or range_tombstone_iters_ (n) should not change
|
|
// but to be safe, we can record the size here so we decrement by the
|
|
// correct amount at destruction time
|
|
num_sorted_runs_recorded_ = n;
|
|
internal_stats_->IncrNumRunningCompactionSortedRuns(
|
|
num_sorted_runs_recorded_);
|
|
assert(num_sorted_runs_recorded_ <=
|
|
internal_stats_->NumRunningCompactionSortedRuns());
|
|
}
|
|
}
|
|
|
|
void considerStatus(const Status& s) {
|
|
if (!s.ok() && status_.ok()) {
|
|
status_ = s;
|
|
}
|
|
}
|
|
|
|
~CompactionMergingIterator() override {
|
|
if (internal_stats_) {
|
|
assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size());
|
|
assert(num_sorted_runs_recorded_ <=
|
|
internal_stats_->NumRunningCompactionSortedRuns());
|
|
internal_stats_->DecrNumRunningCompactionSortedRuns(
|
|
num_sorted_runs_recorded_);
|
|
}
|
|
|
|
range_tombstone_iters_.clear();
|
|
|
|
for (auto& child : children_) {
|
|
child.iter.DeleteIter(is_arena_mode_);
|
|
}
|
|
status_.PermitUncheckedError();
|
|
}
|
|
|
|
bool Valid() const override { return current_ != nullptr && status_.ok(); }
|
|
|
|
Status status() const override { return status_; }
|
|
|
|
void SeekToFirst() override;
|
|
|
|
void Seek(const Slice& target) override;
|
|
|
|
void Next() override;
|
|
|
|
Slice key() const override {
|
|
assert(Valid());
|
|
return current_->key();
|
|
}
|
|
|
|
Slice value() const override {
|
|
assert(Valid());
|
|
if (LIKELY(current_->type == HeapItem::ITERATOR)) {
|
|
return current_->iter.value();
|
|
} else {
|
|
return dummy_tombstone_val;
|
|
}
|
|
}
|
|
|
|
// Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
|
|
// from current child iterator. Potentially as long as one of child iterator
|
|
// report out of bound is not possible, we know current key is within bound.
|
|
bool MayBeOutOfLowerBound() override {
|
|
assert(Valid());
|
|
return current_->type == HeapItem::DELETE_RANGE_START ||
|
|
current_->iter.MayBeOutOfLowerBound();
|
|
}
|
|
|
|
IterBoundCheck UpperBoundCheckResult() override {
|
|
assert(Valid());
|
|
return current_->type == HeapItem::DELETE_RANGE_START
|
|
? IterBoundCheck::kUnknown
|
|
: current_->iter.UpperBoundCheckResult();
|
|
}
|
|
|
|
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
|
|
pinned_iters_mgr_ = pinned_iters_mgr;
|
|
for (auto& child : children_) {
|
|
child.iter.SetPinnedItersMgr(pinned_iters_mgr);
|
|
}
|
|
}
|
|
|
|
bool IsDeleteRangeSentinelKey() const override {
|
|
assert(Valid());
|
|
return current_->type == HeapItem::DELETE_RANGE_START;
|
|
}
|
|
|
|
// Compaction uses the above subset of InternalIterator interface.
|
|
void SeekToLast() override { assert(false); }
|
|
|
|
void SeekForPrev(const Slice&) override { assert(false); }
|
|
|
|
void Prev() override { assert(false); }
|
|
|
|
bool NextAndGetResult(IterateResult*) override {
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
bool IsKeyPinned() const override {
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
bool IsValuePinned() const override {
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
bool PrepareValue() override {
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
private:
|
|
struct HeapItem {
|
|
HeapItem() = default;
|
|
|
|
IteratorWrapper iter;
|
|
size_t level = 0;
|
|
std::string tombstone_str;
|
|
enum Type { ITERATOR, DELETE_RANGE_START };
|
|
Type type = ITERATOR;
|
|
|
|
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
|
|
: level(_level), type(Type::ITERATOR) {
|
|
iter.Set(_iter);
|
|
}
|
|
|
|
void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
|
|
tombstone_str.clear();
|
|
AppendInternalKey(&tombstone_str, pik);
|
|
}
|
|
|
|
[[nodiscard]] Slice key() const {
|
|
return type == ITERATOR ? iter.key() : tombstone_str;
|
|
}
|
|
};
|
|
|
|
class CompactionHeapItemComparator {
|
|
public:
|
|
explicit CompactionHeapItemComparator(
|
|
const InternalKeyComparator* comparator)
|
|
: comparator_(comparator) {}
|
|
|
|
bool operator()(HeapItem* a, HeapItem* b) const {
|
|
int r = comparator_->Compare(a->key(), b->key());
|
|
// For each file, we assume all range tombstone start keys come before
|
|
// its file boundary sentinel key (file's meta.largest key).
|
|
// In the case when meta.smallest = meta.largest and range tombstone start
|
|
// key is truncated at meta.smallest, the start key will have op_type =
|
|
// kMaxValid to make it smaller (see TruncatedRangeDelIterator
|
|
// constructor). The following assertion validates this assumption.
|
|
assert(a->type == b->type || r != 0);
|
|
return r > 0;
|
|
}
|
|
|
|
private:
|
|
const InternalKeyComparator* comparator_;
|
|
};
|
|
|
|
using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
|
|
bool is_arena_mode_;
|
|
const InternalKeyComparator* comparator_;
|
|
// HeapItem for all child point iterators.
|
|
std::vector<HeapItem> children_;
|
|
// HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
|
|
// current range tombstone from range_tombstone_iters_[i].
|
|
std::vector<HeapItem> pinned_heap_item_;
|
|
// range_tombstone_iters_[i] contains range tombstones in the sorted run that
|
|
// corresponds to children_[i]. range_tombstone_iters_[i] ==
|
|
// nullptr means the sorted run of children_[i] does not have range
|
|
// tombstones (or the current SSTable does not have range tombstones in the
|
|
// case of LevelIterator).
|
|
std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
|
|
range_tombstone_iters_;
|
|
// Used as value for range tombstone keys
|
|
std::string dummy_tombstone_val{};
|
|
|
|
// Skip file boundary sentinel keys.
|
|
void FindNextVisibleKey();
|
|
|
|
// top of minHeap_
|
|
HeapItem* current_;
|
|
// If any of the children have non-ok status, this is one of them.
|
|
Status status_;
|
|
CompactionMinHeap minHeap_;
|
|
PinnedIteratorsManager* pinned_iters_mgr_;
|
|
InternalStats* internal_stats_;
|
|
uint64_t num_sorted_runs_recorded_;
|
|
// Process a child that is not in the min heap.
|
|
// If valid, add to the min heap. Otherwise, check status.
|
|
void AddToMinHeapOrCheckStatus(HeapItem*);
|
|
|
|
HeapItem* CurrentForward() const {
|
|
return !minHeap_.empty() ? minHeap_.top() : nullptr;
|
|
}
|
|
|
|
void InsertRangeTombstoneAtLevel(size_t level) {
|
|
if (range_tombstone_iters_[level]->Valid()) {
|
|
pinned_heap_item_[level].SetTombstoneForCompaction(
|
|
range_tombstone_iters_[level]->start_key());
|
|
minHeap_.push(&pinned_heap_item_[level]);
|
|
}
|
|
}
|
|
};
|
|
|
|
void CompactionMergingIterator::SeekToFirst() {
|
|
minHeap_.clear();
|
|
status_ = Status::OK();
|
|
for (auto& child : children_) {
|
|
child.iter.SeekToFirst();
|
|
AddToMinHeapOrCheckStatus(&child);
|
|
}
|
|
|
|
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
|
|
if (range_tombstone_iters_[i]) {
|
|
range_tombstone_iters_[i]->SeekToFirst();
|
|
InsertRangeTombstoneAtLevel(i);
|
|
}
|
|
}
|
|
|
|
FindNextVisibleKey();
|
|
current_ = CurrentForward();
|
|
}
|
|
|
|
void CompactionMergingIterator::Seek(const Slice& target) {
|
|
minHeap_.clear();
|
|
status_ = Status::OK();
|
|
for (auto& child : children_) {
|
|
child.iter.Seek(target);
|
|
AddToMinHeapOrCheckStatus(&child);
|
|
}
|
|
|
|
ParsedInternalKey pik;
|
|
ParseInternalKey(target, &pik, false /* log_err_key */)
|
|
.PermitUncheckedError();
|
|
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
|
|
if (range_tombstone_iters_[i]) {
|
|
range_tombstone_iters_[i]->Seek(pik.user_key);
|
|
// For compaction, output keys should all be after seek target.
|
|
while (range_tombstone_iters_[i]->Valid() &&
|
|
comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
|
|
0) {
|
|
range_tombstone_iters_[i]->Next();
|
|
}
|
|
InsertRangeTombstoneAtLevel(i);
|
|
}
|
|
}
|
|
|
|
FindNextVisibleKey();
|
|
current_ = CurrentForward();
|
|
}
|
|
|
|
void CompactionMergingIterator::Next() {
|
|
assert(Valid());
|
|
// For the heap modifications below to be correct, current_ must be the
|
|
// current top of the heap.
|
|
assert(current_ == CurrentForward());
|
|
// as the current points to the current record. move the iterator forward.
|
|
if (current_->type == HeapItem::ITERATOR) {
|
|
current_->iter.Next();
|
|
if (current_->iter.Valid()) {
|
|
// current is still valid after the Next() call above. Call
|
|
// replace_top() to restore the heap property. When the same child
|
|
// iterator yields a sequence of keys, this is cheap.
|
|
assert(current_->iter.status().ok());
|
|
minHeap_.replace_top(current_);
|
|
} else {
|
|
// current stopped being valid, remove it from the heap.
|
|
considerStatus(current_->iter.status());
|
|
minHeap_.pop();
|
|
}
|
|
} else {
|
|
assert(current_->type == HeapItem::DELETE_RANGE_START);
|
|
size_t level = current_->level;
|
|
assert(range_tombstone_iters_[level]);
|
|
range_tombstone_iters_[level]->Next();
|
|
if (range_tombstone_iters_[level]->Valid()) {
|
|
pinned_heap_item_[level].SetTombstoneForCompaction(
|
|
range_tombstone_iters_[level]->start_key());
|
|
minHeap_.replace_top(&pinned_heap_item_[level]);
|
|
} else {
|
|
minHeap_.pop();
|
|
}
|
|
}
|
|
FindNextVisibleKey();
|
|
current_ = CurrentForward();
|
|
}
|
|
|
|
void CompactionMergingIterator::FindNextVisibleKey() {
|
|
while (!minHeap_.empty()) {
|
|
HeapItem* current = minHeap_.top();
|
|
// IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
|
|
if (current->type != HeapItem::ITERATOR ||
|
|
!current->iter.IsDeleteRangeSentinelKey()) {
|
|
return;
|
|
}
|
|
// range tombstone start keys from the same SSTable should have been
|
|
// exhausted
|
|
assert(!range_tombstone_iters_[current->level] ||
|
|
!range_tombstone_iters_[current->level]->Valid());
|
|
// current->iter is a LevelIterator, and it enters a new SST file in the
|
|
// Next() call here.
|
|
current->iter.Next();
|
|
if (current->iter.Valid()) {
|
|
assert(current->iter.status().ok());
|
|
minHeap_.replace_top(current);
|
|
} else {
|
|
considerStatus(current->iter.status());
|
|
minHeap_.pop();
|
|
}
|
|
if (range_tombstone_iters_[current->level]) {
|
|
InsertRangeTombstoneAtLevel(current->level);
|
|
}
|
|
}
|
|
}
|
|
|
|
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
|
|
if (child->iter.Valid()) {
|
|
assert(child->iter.status().ok());
|
|
minHeap_.push(child);
|
|
} else {
|
|
considerStatus(child->iter.status());
|
|
}
|
|
}
|
|
|
|
InternalIterator* NewCompactionMergingIterator(
|
|
const InternalKeyComparator* comparator, InternalIterator** children, int n,
|
|
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
|
|
std::unique_ptr<TruncatedRangeDelIterator>**>>&
|
|
range_tombstone_iters,
|
|
Arena* arena, InternalStats* stats) {
|
|
assert(n >= 0);
|
|
if (n == 0) {
|
|
return NewEmptyInternalIterator<Slice>(arena);
|
|
} else {
|
|
if (arena == nullptr) {
|
|
return new CompactionMergingIterator(comparator, children, n,
|
|
false /* is_arena_mode */,
|
|
range_tombstone_iters, stats);
|
|
} else {
|
|
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
|
|
return new (mem) CompactionMergingIterator(comparator, children, n,
|
|
true /* is_arena_mode */,
|
|
range_tombstone_iters, stats);
|
|
}
|
|
}
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|