Summary: This is setting up for a fix to a data race in SetOptions on BlockBasedTableOptions (BBTO), https://github.com/facebook/rocksdb/issues/10079 The race will be fixed by replacing `table_factory` with a modified copy whenever we want to modify a BBTO field. An argument could be made that this change creates more entaglement between features (e.g. BlobSource <-> MutableCFOptions), rather than (conceptually) minimizing the dependencies of each feature, but * Most of these things already depended on ImmutableOptions * Historically there has been a lot of plumbing (and possible small CPU overhead) involved in adding features that need to reach a lot of places, like `block_protection_bytes_per_key`. Keeping those wrapped up in options simplifies that. * SuperVersion management generally takes care of lifetime management of MutableCFOptions, so is not that difficult. (Crash test agrees so far.) There are some FIXME places where it is known to be unsafe to replace `block_cache` unless/until we handle shared_ptr tracking properly. HOWEVER, replacing `block_cache` is generally dubious, at least while existing users of the old block cache (e.g. table readers) can continue indefinitely. The change to cf_options.cc is essentially just moving code (not changing). I'm not concerned about the performance of copying another shared_ptr with MutableCFOptions, but I left a note about considering an improvement if more shared_ptr are added to it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13077 Test Plan: existing tests, crash test. Unit test DBOptionsTest.GetLatestCFOptions updated with some temporary logic. MemoryTest required some refactoring (simplification) for the change. Reviewed By: cbi42 Differential Revision: D64546903 Pulled By: pdillinger fbshipit-source-id: 69ae97ce5cf4c01b58edc4c5d4687eb1e5bf5855
168 lines
6 KiB
C++
168 lines
6 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "util/coro_utils.h"
|
|
|
|
#if defined(WITHOUT_COROUTINES) || \
|
|
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Lookup a batch of keys in a single SST file
|
|
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
|
|
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
|
|
bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
|
|
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
|
|
TableCache::TypedHandle* table_handle, uint64_t& num_filter_read,
|
|
uint64_t& num_index_read, uint64_t& num_sst_read) {
|
|
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
|
get_perf_context()->per_level_perf_context_enabled;
|
|
|
|
Status s;
|
|
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
|
s = CO_AWAIT(table_cache_->MultiGet)(
|
|
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
|
mutable_cf_options_,
|
|
cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
|
|
skip_range_deletions, hit_file_level, table_handle);
|
|
// TODO: examine the behavior for corrupted key
|
|
if (timer_enabled) {
|
|
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
|
hit_file_level);
|
|
}
|
|
if (!s.ok()) {
|
|
// TODO: Set status for individual keys appropriately
|
|
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
|
*iter->s = s;
|
|
file_range.MarkKeyDone(iter);
|
|
}
|
|
CO_RETURN s;
|
|
}
|
|
uint64_t batch_size = 0;
|
|
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
|
++iter) {
|
|
GetContext& get_context = *iter->get_context;
|
|
Status* status = iter->s;
|
|
// The Status in the KeyContext takes precedence over GetContext state
|
|
// Status may be an error if there were any IO errors in the table
|
|
// reader. We never expect Status to be NotFound(), as that is
|
|
// determined by get_context
|
|
assert(!status->IsNotFound());
|
|
if (!status->ok()) {
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
}
|
|
|
|
if (get_context.sample()) {
|
|
sample_file_read_inc(f->file_metadata);
|
|
}
|
|
batch_size++;
|
|
num_index_read += get_context.get_context_stats_.num_index_read;
|
|
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
|
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
|
// Reset these stats since they're specific to a level
|
|
get_context.get_context_stats_.num_index_read = 0;
|
|
get_context.get_context_stats_.num_filter_read = 0;
|
|
get_context.get_context_stats_.num_sst_read = 0;
|
|
|
|
// report the counters before returning
|
|
if (get_context.State() != GetContext::kNotFound &&
|
|
get_context.State() != GetContext::kMerge &&
|
|
db_statistics_ != nullptr) {
|
|
get_context.ReportCounters();
|
|
} else {
|
|
if (iter->max_covering_tombstone_seq > 0) {
|
|
// The remaining files we look at will only contain covered keys, so
|
|
// we stop here for this key
|
|
file_range.SkipKey(iter);
|
|
}
|
|
}
|
|
switch (get_context.State()) {
|
|
case GetContext::kNotFound:
|
|
// Keep searching in other files
|
|
break;
|
|
case GetContext::kMerge:
|
|
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
|
break;
|
|
case GetContext::kFound:
|
|
if (hit_file_level == 0) {
|
|
RecordTick(db_statistics_, GET_HIT_L0);
|
|
} else if (hit_file_level == 1) {
|
|
RecordTick(db_statistics_, GET_HIT_L1);
|
|
} else if (hit_file_level >= 2) {
|
|
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
|
}
|
|
|
|
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
|
|
|
|
file_range.MarkKeyDone(iter);
|
|
|
|
if (iter->is_blob_index) {
|
|
BlobIndex blob_index;
|
|
Status tmp_s;
|
|
|
|
if (iter->value) {
|
|
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
|
&(*iter));
|
|
|
|
tmp_s = blob_index.DecodeFrom(*(iter->value));
|
|
|
|
} else {
|
|
assert(iter->columns);
|
|
|
|
tmp_s = blob_index.DecodeFrom(
|
|
WideColumnsHelper::GetDefaultColumn(iter->columns->columns()));
|
|
}
|
|
|
|
if (tmp_s.ok()) {
|
|
const uint64_t blob_file_num = blob_index.file_number();
|
|
blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
|
|
} else {
|
|
*(iter->s) = tmp_s;
|
|
}
|
|
} else {
|
|
if (iter->value) {
|
|
file_range.AddValueSize(iter->value->size());
|
|
} else {
|
|
assert(iter->columns);
|
|
file_range.AddValueSize(iter->columns->serialized_size());
|
|
}
|
|
|
|
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
|
s = Status::Aborted();
|
|
break;
|
|
}
|
|
}
|
|
continue;
|
|
case GetContext::kDeleted:
|
|
// Use empty error message for speed
|
|
*status = Status::NotFound();
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kCorrupt:
|
|
*status =
|
|
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kUnexpectedBlobIndex:
|
|
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
|
*status = Status::NotSupported(
|
|
"Encounter unexpected blob index. Please open DB with "
|
|
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
case GetContext::kMergeOperatorFailed:
|
|
*status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
|
|
file_range.MarkKeyDone(iter);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
|
CO_RETURN s;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif
|