Summary: Add `open_files_async` option for faster DB startup. When enabled, SST file opening and validation is deferred to a background thread after `DB::Open` returns, reducing startup latency for databases with many SST files. WAL recovery remains synchronous. To support this, `FindTable` is extended with a pinning mechanism that stores the cache handle directly on `FileMetaData` via a new `PinnedTableReader` class, and sets the table reader atomically so subsequent reads skip cache lookups. `FileDescriptor::table_reader` is replaced with `PinnedTableReader pinned_reader` which wraps a `std::atomic<TableReader*>` with acquire/release ordering to safely handle concurrent access between the background opener and read threads. Should validations fail, the background opener sets a `kAsyncFileOpen` background error. Future read requests will look up the table reader again via the cache, and if any validations fail there it will get propagated to the user (existing behavior when `max_open_files > 0`). This feature is most useful when `max_open_files=-1`, because otherwise file opening is already capped at 16 files and DB open should be fast. ## Restrictions - This feature also is incompatible with fifo compaction because fifo compaction requires reading table properties under DB mutex. When table reader is unpinned, this may cause a DB hang. - This feature is also incompatible with `skip_stats_update_on_db_open=false` because it will result in even longer DB open ## Key changes - New `open_files_async` DB option with C, Java, and `db_bench` bindings - `BGWorkAsyncFileOpen` background worker that opens all SST files post-`DB::Open`, with shutdown awareness via `shutting_down_` flag - New `PinnedTableReader` class in `version_edit.h` — thread-safe wrapper holding `std::atomic<TableReader*>` and `Cache::Handle*` with proper acquire/release ordering. Replaces the old `FileDescriptor::table_reader` raw pointer and `FileMetaData::table_reader_handle` - Extract `LoadTableHandlersHelper` into `db/version_util.cc` — shared between `VersionBuilder::LoadTableHandlers` (for version edits during recovery) and `BGWorkAsyncFileOpen` (for base storage post-open) - `FindTable` extended with `pin_table_handle` and `out_table_reader` params — when pinning is enabled, the table reader is stored on `FileMetaData` so Get/MultiGet/Iterator skip redundant cache lookups. `FindTable` now performs the pinned-reader fast-path check internally instead of requiring callers to check `fd.table_reader` beforehand - Note: pinning is explicit (not default) because some callers create temporary `FileMetaData`s that would need to properly clean up table handles - `CompactedDBImpl` updated to use `FindTable` + pinning instead of raw `fd.table_reader` access for Get/MultiGet - New `kAsyncFileOpen` background error reason in `listener.h` and `error_handler.cc` - Add a check in ~DBImpl to ensure async file open task has not been forgotten to be scheduled in (future) subclasses of DBImpl. Certain subclasses that never use it will need to explicitly mark it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/14322 Test Plan: - `OpenFilesAsyncTest` parameterized over `num_flushes` (1, 20), `ReadType` (Get, MultiGet, Iterator), `max_open_files` (-1, 10), and `read_only` (true, false) - **ConcurrentFileAccess**: concurrent reads and compactions race with async opener - **AfterRead**: reads happen before async opener, verifying lazy open and that the opener sees already-pinned readers - **BeforeRead**: async opener completes first, verifying reads use pre-loaded table readers - **Shutdown**: DB closes before async opener starts, verifying clean cancellation with 0 file opens - **Error**: corrupted SST files, verifying `kAsyncFileOpen` background error is set and reads return corruption - **DropColumnFamily**: CF dropped before async opener runs, verifying the opener gracefully skips dropped CFs - Added to crash test ### Benchmark To simulate a high-latency remote filesystem, I set up a virtual filesystem with dm-delay using 10ms reads, 0 ms writes. ``` # Generate a DB with many L0 files TEST_TMPDIR=/data/users/jkangs/dm-delay-test/mnt ./db_bench -benchmarks=fillseq -disable_auto_compactions=true -write_buffer_size=1000 -num=1000000 ``` ``` ./db_bench -use_existing_db=true -db=/data/users/jkangs/dm-delay-test/mnt/dbbench -benchmarks=readrandom -reads=1 -report_open_timing=true -open_files_async=true -use_direct_reads -file_opening_threads=1 -skip_stats_update_on_db_open OpenDb: 25.1419 milliseconds ``` ``` ./db_bench -use_existing_db=true -db=/data/users/jkangs/dm-delay-test/mnt/dbbench -benchmarks=readrandom -reads=1 -report_open_timing=true -open_files_async=false -use_direct_reads -file_opening_threads=1 -skip_stats_update_on_db_open OpenDb: 23109.4 milliseconds ``` ### No read regressions On main branch ``` ./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30 readrandom : 4.827 micros/op 1657100 ops/sec 30.005 seconds 49720992 operations; 183.3 MB/s (6198999 of 6198999 found) ``` On this branch ``` ./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30 readrandom : 4.863 micros/op 1644808 ops/sec 30.007 seconds 49354992 operations; 182.0 MB/s (6099999 of 6099999 found) ./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30 -open_files_async=true readrandom : 4.803 micros/op 1665392 ops/sec 30.004 seconds 49968992 operations; 184.2 MB/s (6222999 of 6222999 found) ``` Reviewed By: pdillinger, xingbowang Differential Revision: D93538033 Pulled By: joshkang97 fbshipit-source-id: 32ac70c112cd733b7c1e1c1e2e7ce6422318a5ae
298 lines
11 KiB
C++
298 lines
11 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/db_impl/compacted_db_impl.h"
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/version_set.h"
|
|
#include "logging/logging.h"
|
|
#include "table/get_context.h"
|
|
#include "util/cast_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
CompactedDBImpl::CompactedDBImpl(const DBOptions& options,
|
|
const std::string& dbname)
|
|
: DBImpl(options, dbname, /*seq_per_batch*/ false, +/*batch_per_txn*/ true,
|
|
/*read_only*/ true),
|
|
cfd_(nullptr),
|
|
version_(nullptr),
|
|
user_comparator_(nullptr) {}
|
|
|
|
CompactedDBImpl::~CompactedDBImpl() = default;
|
|
|
|
size_t CompactedDBImpl::FindFile(const Slice& key) {
|
|
size_t right = files_.num_files - 1;
|
|
auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
|
|
return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
|
|
};
|
|
return static_cast<size_t>(
|
|
std::lower_bound(files_.files, files_.files + right, key, cmp) -
|
|
files_.files);
|
|
}
|
|
|
|
Status CompactedDBImpl::Get(const ReadOptions& _read_options,
|
|
ColumnFamilyHandle*, const Slice& key,
|
|
PinnableSlice* value, std::string* timestamp) {
|
|
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
_read_options.io_activity != Env::IOActivity::kGet) {
|
|
return Status::InvalidArgument(
|
|
"Can only call Get with `ReadOptions::io_activity` is "
|
|
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
|
|
}
|
|
ReadOptions read_options(_read_options);
|
|
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
read_options.io_activity = Env::IOActivity::kGet;
|
|
}
|
|
|
|
assert(user_comparator_);
|
|
if (read_options.timestamp) {
|
|
Status s =
|
|
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (read_options.timestamp->size() > 0) {
|
|
s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
|
|
*(read_options.timestamp));
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
} else {
|
|
const Status s = FailIfCfHasTs(DefaultColumnFamily());
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
// Clear the timestamps for returning results so that we can distinguish
|
|
// between tombstone or key that has never been written
|
|
if (timestamp) {
|
|
timestamp->clear();
|
|
}
|
|
|
|
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
|
|
std::string* ts =
|
|
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
|
|
LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
|
|
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
|
|
GetContext::kNotFound, lkey.user_key(), value,
|
|
/*columns=*/nullptr, ts, nullptr, nullptr, true,
|
|
nullptr, nullptr, nullptr, nullptr, &read_cb);
|
|
|
|
const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
|
|
if (user_comparator_->CompareWithoutTimestamp(
|
|
key, /*a_has_ts=*/false,
|
|
ExtractUserKeyAndStripTimestamp(f.smallest_key,
|
|
user_comparator_->timestamp_size()),
|
|
/*b_has_ts=*/false) < 0) {
|
|
return Status::NotFound();
|
|
}
|
|
TableReader* t = nullptr;
|
|
TableCache::TypedHandle* handle = nullptr;
|
|
Status s = cfd_->table_cache()->FindTable(
|
|
read_options, cfd_->table_cache()->file_options(),
|
|
cfd_->internal_comparator(), *f.file_metadata, &handle,
|
|
version_->GetMutableCFOptions(), &t, false /* no_io */,
|
|
nullptr /* file_read_hist */, false /* skip_filters */, files_level_,
|
|
true /* prefetch_index_and_filter_in_cache */,
|
|
0 /* max_file_size_for_l0_meta_pin */, f.file_metadata->temperature,
|
|
true /* pin_table_handle */);
|
|
if (s.ok()) {
|
|
assert(handle == nullptr);
|
|
// Use TableReader directly to avoid extra table_cache->Get() overheads
|
|
s = t->Get(read_options, lkey.internal_key(), &get_context, nullptr);
|
|
}
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
return s;
|
|
}
|
|
if (get_context.State() == GetContext::kFound) {
|
|
return Status::OK();
|
|
}
|
|
return Status::NotFound();
|
|
}
|
|
|
|
void CompactedDBImpl::MultiGet(const ReadOptions& _read_options,
|
|
size_t num_keys,
|
|
ColumnFamilyHandle** /*column_families*/,
|
|
const Slice* keys, PinnableSlice* values,
|
|
std::string* timestamps, Status* statuses,
|
|
const bool /*sorted_input*/) {
|
|
assert(user_comparator_);
|
|
Status s;
|
|
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
_read_options.io_activity != Env::IOActivity::kMultiGet) {
|
|
s = Status::InvalidArgument(
|
|
"Can only call MultiGet with `ReadOptions::io_activity` is "
|
|
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
|
|
}
|
|
|
|
ReadOptions read_options(_read_options);
|
|
if (s.ok()) {
|
|
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
read_options.io_activity = Env::IOActivity::kMultiGet;
|
|
}
|
|
|
|
if (read_options.timestamp) {
|
|
s = FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp));
|
|
if (s.ok()) {
|
|
if (read_options.timestamp->size() > 0) {
|
|
s = FailIfReadCollapsedHistory(cfd_, cfd_->GetSuperVersion(),
|
|
*(read_options.timestamp));
|
|
}
|
|
}
|
|
} else {
|
|
s = FailIfCfHasTs(DefaultColumnFamily());
|
|
}
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
statuses[i] = s;
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Clear the timestamps for returning results so that we can distinguish
|
|
// between tombstone or key that has never been written
|
|
if (timestamps) {
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
timestamps[i].clear();
|
|
}
|
|
}
|
|
|
|
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
const Slice& key = keys[i];
|
|
LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
|
|
const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
|
|
if (user_comparator_->CompareWithoutTimestamp(
|
|
key, /*a_has_ts=*/false,
|
|
ExtractUserKeyAndStripTimestamp(f.smallest_key,
|
|
user_comparator_->timestamp_size()),
|
|
/*b_has_ts=*/false) < 0) {
|
|
statuses[i] = Status::NotFound();
|
|
continue;
|
|
}
|
|
PinnableSlice& pinnable_val = values[i];
|
|
std::string* timestamp = timestamps ? ×tamps[i] : nullptr;
|
|
GetContext get_context(
|
|
user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
|
|
lkey.user_key(), &pinnable_val, /*columns=*/nullptr,
|
|
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
|
|
nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
|
|
TableReader* t = nullptr;
|
|
TableCache::TypedHandle* handle = nullptr;
|
|
Status status = cfd_->table_cache()->FindTable(
|
|
read_options, cfd_->table_cache()->file_options(),
|
|
cfd_->internal_comparator(), *f.file_metadata, &handle,
|
|
version_->GetMutableCFOptions(), &t, false /* no_io */,
|
|
nullptr /* file_read_hist */, false /* skip_filters */, files_level_,
|
|
true /* prefetch_index_and_filter_in_cache */,
|
|
0 /* max_file_size_for_l0_meta_pin */, f.file_metadata->temperature,
|
|
true /* pin_table_handle */);
|
|
if (status.ok()) {
|
|
assert(handle == nullptr);
|
|
// Use TableReader directly to avoid extra table_cache->Get() overheads
|
|
status = t->Get(read_options, lkey.internal_key(), &get_context, nullptr);
|
|
}
|
|
if (!status.ok() && !status.IsNotFound()) {
|
|
statuses[i] = status;
|
|
} else {
|
|
if (get_context.State() == GetContext::kFound) {
|
|
statuses[i] = Status::OK();
|
|
} else {
|
|
statuses[i] = Status::NotFound();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Status CompactedDBImpl::Init(const Options& options) {
|
|
SuperVersionContext sv_context(/* create_superversion */ true);
|
|
mutex_.Lock();
|
|
ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
|
|
ColumnFamilyOptions(options));
|
|
Status s = Recover({cf}, true /* read only */, false, true);
|
|
if (s.ok()) {
|
|
cfd_ = static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
|
|
->cfd();
|
|
cfd_->InstallSuperVersion(&sv_context, &mutex_);
|
|
}
|
|
mutex_.Unlock();
|
|
sv_context.Clean();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
NewThreadStatusCfInfo(cfd_);
|
|
version_ = cfd_->GetSuperVersion()->current;
|
|
user_comparator_ = cfd_->user_comparator();
|
|
auto* vstorage = version_->storage_info();
|
|
if (vstorage->num_non_empty_levels() == 0) {
|
|
return Status::NotSupported("no file exists");
|
|
}
|
|
const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
|
|
// L0 should not have files
|
|
if (l0.num_files > 1) {
|
|
return Status::NotSupported("L0 contain more than 1 file");
|
|
}
|
|
if (l0.num_files == 1) {
|
|
if (vstorage->num_non_empty_levels() > 1) {
|
|
return Status::NotSupported("Both L0 and other level contain files");
|
|
}
|
|
files_ = l0;
|
|
files_level_ = 0;
|
|
return Status::OK();
|
|
}
|
|
|
|
for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
|
|
if (vstorage->LevelFilesBrief(i).num_files > 0) {
|
|
return Status::NotSupported("Other levels also contain files");
|
|
}
|
|
}
|
|
|
|
int level = vstorage->num_non_empty_levels() - 1;
|
|
if (vstorage->LevelFilesBrief(level).num_files > 0) {
|
|
files_ = vstorage->LevelFilesBrief(level);
|
|
files_level_ = level;
|
|
return Status::OK();
|
|
}
|
|
return Status::NotSupported("no file exists");
|
|
}
|
|
|
|
Status CompactedDBImpl::Open(const Options& options, const std::string& dbname,
|
|
std::unique_ptr<DB>* dbptr) {
|
|
*dbptr = nullptr;
|
|
|
|
if (options.max_open_files != -1) {
|
|
return Status::InvalidArgument("require max_open_files = -1");
|
|
}
|
|
if (options.merge_operator.get() != nullptr) {
|
|
return Status::InvalidArgument("merge operator is not supported");
|
|
}
|
|
DBOptions db_options(options);
|
|
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
|
|
Status s = db->Init(options);
|
|
if (s.ok()) {
|
|
{
|
|
InstrumentedMutexLock l(&db->mutex_);
|
|
db->opened_successfully_ = true;
|
|
if (db->immutable_db_options_.open_files_async) {
|
|
db->ScheduleAsyncFileOpening();
|
|
}
|
|
}
|
|
s = db->StartPeriodicTaskScheduler();
|
|
}
|
|
if (s.ok()) {
|
|
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
|
|
"Opened the db as fully compacted mode");
|
|
LogFlush(db->immutable_db_options_.info_log);
|
|
*dbptr = std::move(db);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|