Summary: Add per-file sampling of "collapsible" entry reads (single deletions, merges, and kNotFound results) that may later be used to help inform read-triggered compactions. This is a better metric than `num_reads_sampled` as it is more targeted towards reads that could be avoided via compaction. The existing behavior of `num_reads_sampled` is that reads only gets sampled on iterator creation for a file. It is problematic because next/prev() calls are not sampled, nor are additional seeks(). This PR moves sampling to per-seek/next granularity within `LevelIterator` and adds a new `num_collapsible_entry_reads_sampled` counter that tracks how often a file serves entries that could be eliminated by compaction. Note only L1+ files have iterator seeks/nexts/prevs sampled. Introducing this at L0 would require wrapping table reader iterators, introducing a performance cost. ## Key changes - **New counter `num_collapsible_entry_reads_sampled`** in `FileSampledStats` tracks sampled reads that encounter deletions, single deletions, merges, or kNotFound results in both Get and Iterator paths. - **Moved sampling from file-open to per-operation** in `LevelIterator`: sampling now happens in `SampleRead()` called from `Seek()`, `SeekForPrev()`, `SeekToFirst()`, `SeekToLast()`, `Next()`, `NextAndGetResult()`, and `Prev()`. The `should_sample` parameter was removed from `LevelIterator`'s constructor. - **Differentiated sampling rate for Next() vs Seek()**: `should_sample_file_read_next()` uses a 64x lower sampling rate (`kFileReadSampleRate * 64`) since Next() is cheaper than Seek() and called more frequently. - **Collapsible tracking in Get path**: `Version::Get()` now increments the collapsible counter when `GetContext::State()` is `kNotFound`, `kMerge`, or `kDeleted`. - **Collapsible tracking in MultiGet path**: `MultiGetFromSST` also increments the collapsible counter for the same states. Pull Request resolved: https://github.com/facebook/rocksdb/pull/14434 Test Plan: - Added new DB tests for both num_reads_sampled and num_collapsible_entry_reads_sampled ### Benchmark results (readrandom, readseq) Setup: 1M keys, 16-byte keys, 100-byte values, no compression, fillrandom+compact | Benchmark | Params | ops/s (main) | ops/s (feature) | % change | |------------|--------------------|-------------|--------------------------|----------| | readrandom | seed=1, threads=1 | 387,194 | 389,449 | +0.6% | | readseq | seed=1, threads=1 | 5,598,371 | 5,572,975 | -0.5% | No meaningful performance regression observed — differences are within run-to-run noise. Reviewed By: xingbowang Differential Revision: D95613793 Pulled By: joshkang97 fbshipit-source-id: 9dd09c9b7527b148424bde5686f4157c7a9e1214
173 lines
6.3 KiB
C++
173 lines
6.3 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "util/coro_utils.h"
|
|
|
|
#if defined(WITHOUT_COROUTINES) || \
|
|
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Lookup a batch of keys in a single SST file
|
|
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
|
|
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
|
|
bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
|
|
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
|
|
TableCache::TypedHandle* table_handle, uint64_t& num_filter_read,
|
|
uint64_t& num_index_read, uint64_t& num_sst_read) {
|
|
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
|
get_perf_context()->per_level_perf_context_enabled;
|
|
|
|
Status s;
|
|
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
|
s = CO_AWAIT(table_cache_->MultiGet)(
|
|
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
|
mutable_cf_options_,
|
|
cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
|
|
skip_range_deletions, hit_file_level, table_handle);
|
|
// TODO: examine the behavior for corrupted key
|
|
if (timer_enabled) {
|
|
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
|
hit_file_level);
|
|
}
|
|
if (!s.ok()) {
|
|
// TODO: Set status for individual keys appropriately
|
|
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
|
*iter->s = s;
|
|
file_range.MarkKeyDone(iter);
|
|
}
|
|
CO_RETURN s;
|
|
}
|
|
uint64_t batch_size = 0;
|
|
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
|
++iter) {
|
|
GetContext& get_context = *iter->get_context;
|
|
Status* status = iter->s;
|
|
// The Status in the KeyContext takes precedence over GetContext state
|
|
// Status may be an error if there were any IO errors in the table
|
|
// reader. We never expect Status to be NotFound(), as that is
|
|
// determined by get_context
|
|
assert(!status->IsNotFound());
|
|
if (!status->ok()) {
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
}
|
|
|
|
if (get_context.sample()) {
|
|
sample_file_read_inc(f->file_metadata);
|
|
if (get_context.State() == GetContext::kNotFound ||
|
|
get_context.State() == GetContext::kMerge ||
|
|
get_context.State() == GetContext::kDeleted) {
|
|
sample_collapsible_entry_file_read_inc(f->file_metadata);
|
|
}
|
|
}
|
|
batch_size++;
|
|
num_index_read += get_context.get_context_stats_.num_index_read;
|
|
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
|
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
|
// Reset these stats since they're specific to a level
|
|
get_context.get_context_stats_.num_index_read = 0;
|
|
get_context.get_context_stats_.num_filter_read = 0;
|
|
get_context.get_context_stats_.num_sst_read = 0;
|
|
|
|
// report the counters before returning
|
|
if (get_context.State() != GetContext::kNotFound &&
|
|
get_context.State() != GetContext::kMerge &&
|
|
db_statistics_ != nullptr) {
|
|
get_context.ReportCounters();
|
|
} else {
|
|
if (iter->max_covering_tombstone_seq > 0) {
|
|
// The remaining files we look at will only contain covered keys, so
|
|
// we stop here for this key
|
|
file_range.SkipKey(iter);
|
|
}
|
|
}
|
|
switch (get_context.State()) {
|
|
case GetContext::kNotFound:
|
|
// Keep searching in other files
|
|
break;
|
|
case GetContext::kMerge:
|
|
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
|
break;
|
|
case GetContext::kFound:
|
|
if (hit_file_level == 0) {
|
|
RecordTick(db_statistics_, GET_HIT_L0);
|
|
} else if (hit_file_level == 1) {
|
|
RecordTick(db_statistics_, GET_HIT_L1);
|
|
} else if (hit_file_level >= 2) {
|
|
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
|
}
|
|
|
|
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
|
|
|
|
file_range.MarkKeyDone(iter);
|
|
|
|
if (iter->is_blob_index) {
|
|
BlobIndex blob_index;
|
|
Status tmp_s;
|
|
|
|
if (iter->value) {
|
|
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
|
&(*iter));
|
|
|
|
tmp_s = blob_index.DecodeFrom(*(iter->value));
|
|
|
|
} else {
|
|
assert(iter->columns);
|
|
|
|
tmp_s = blob_index.DecodeFrom(
|
|
WideColumnsHelper::GetDefaultColumn(iter->columns->columns()));
|
|
}
|
|
|
|
if (tmp_s.ok()) {
|
|
const uint64_t blob_file_num = blob_index.file_number();
|
|
blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
|
|
} else {
|
|
*(iter->s) = tmp_s;
|
|
}
|
|
} else {
|
|
if (iter->value) {
|
|
file_range.AddValueSize(iter->value->size());
|
|
} else {
|
|
assert(iter->columns);
|
|
file_range.AddValueSize(iter->columns->serialized_size());
|
|
}
|
|
|
|
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
|
s = Status::Aborted();
|
|
break;
|
|
}
|
|
}
|
|
continue;
|
|
case GetContext::kDeleted:
|
|
// Use empty error message for speed
|
|
*status = Status::NotFound();
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kCorrupt:
|
|
*status =
|
|
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kUnexpectedBlobIndex:
|
|
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
|
*status = Status::NotSupported(
|
|
"Encounter unexpected blob index. Please open DB with "
|
|
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kMergeOperatorFailed:
|
|
*status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
|
CO_RETURN s;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif
|