rocksdb/db/version_util.cc
Josh Kang f25fb41da6 Add option to validate sst files in the background on DB open (#14322)
Summary:
Add `open_files_async` option for faster DB startup. When enabled, SST file opening and validation is deferred to a background thread after `DB::Open` returns, reducing startup latency for databases with many SST files. WAL recovery remains synchronous.

To support this, `FindTable` is extended with a pinning mechanism that stores the cache handle directly on `FileMetaData` via a new `PinnedTableReader` class, and sets the table reader atomically so subsequent reads skip cache lookups. `FileDescriptor::table_reader` is replaced with `PinnedTableReader pinned_reader` which wraps a `std::atomic<TableReader*>` with acquire/release ordering to safely handle concurrent access between the background opener and read threads.

Should validations fail, the background opener sets a `kAsyncFileOpen` background error. Future read requests will look up the table reader again via the cache, and if any validations fail there it will get propagated to the user (existing behavior when `max_open_files > 0`).

This feature is most useful when `max_open_files=-1`, because otherwise file opening is already capped at 16 files and DB open should be fast.

## Restrictions
- This feature also is incompatible with fifo compaction because fifo compaction requires reading table properties under DB mutex. When table reader is unpinned, this may cause a DB hang.
- This feature is also incompatible with `skip_stats_update_on_db_open=false` because it will result in even longer DB open

## Key changes

- New `open_files_async` DB option with C, Java, and `db_bench` bindings
- `BGWorkAsyncFileOpen` background worker that opens all SST files post-`DB::Open`, with shutdown awareness via `shutting_down_` flag
- New `PinnedTableReader` class in `version_edit.h` — thread-safe wrapper holding `std::atomic<TableReader*>` and `Cache::Handle*` with proper acquire/release ordering. Replaces the old `FileDescriptor::table_reader` raw pointer and `FileMetaData::table_reader_handle`
- Extract `LoadTableHandlersHelper` into `db/version_util.cc` — shared between `VersionBuilder::LoadTableHandlers` (for version edits during recovery) and `BGWorkAsyncFileOpen` (for base storage post-open)
- `FindTable` extended with `pin_table_handle` and `out_table_reader` params — when pinning is enabled, the table reader is stored on `FileMetaData` so Get/MultiGet/Iterator skip redundant cache lookups. `FindTable` now performs the pinned-reader fast-path check internally instead of requiring callers to check `fd.table_reader` beforehand
  - Note: pinning is explicit (not default) because some callers create temporary `FileMetaData`s that would need to properly clean up table handles
- `CompactedDBImpl` updated to use `FindTable` + pinning instead of raw `fd.table_reader` access for Get/MultiGet
- New `kAsyncFileOpen` background error reason in `listener.h` and `error_handler.cc`
- Add a check in ~DBImpl to ensure async file open task has not been forgotten to be scheduled in (future) subclasses of DBImpl. Certain subclasses that never use it will need to explicitly mark it.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14322

Test Plan:
- `OpenFilesAsyncTest` parameterized over `num_flushes` (1, 20), `ReadType` (Get, MultiGet, Iterator), `max_open_files` (-1, 10), and `read_only` (true, false)
  - **ConcurrentFileAccess**: concurrent reads and compactions race with async opener
  - **AfterRead**: reads happen before async opener, verifying lazy open and that the opener sees already-pinned readers
  - **BeforeRead**: async opener completes first, verifying reads use pre-loaded table readers
  - **Shutdown**: DB closes before async opener starts, verifying clean cancellation with 0 file opens
  - **Error**: corrupted SST files, verifying `kAsyncFileOpen` background error is set and reads return corruption
  - **DropColumnFamily**: CF dropped before async opener runs, verifying the opener gracefully skips dropped CFs
- Added to crash test

### Benchmark

To simulate a high-latency remote filesystem, I set up a virtual filesystem with dm-delay using 10ms reads, 0 ms writes.

```
# Generate a DB with many L0 files

TEST_TMPDIR=/data/users/jkangs/dm-delay-test/mnt ./db_bench -benchmarks=fillseq -disable_auto_compactions=true -write_buffer_size=1000 -num=1000000
```

```
./db_bench -use_existing_db=true -db=/data/users/jkangs/dm-delay-test/mnt/dbbench -benchmarks=readrandom -reads=1 -report_open_timing=true -open_files_async=true -use_direct_reads -file_opening_threads=1 -skip_stats_update_on_db_open

OpenDb:     25.1419 milliseconds
```

```
./db_bench -use_existing_db=true -db=/data/users/jkangs/dm-delay-test/mnt/dbbench -benchmarks=readrandom -reads=1 -report_open_timing=true -open_files_async=false -use_direct_reads -file_opening_threads=1 -skip_stats_update_on_db_open

OpenDb:     23109.4 milliseconds
```

### No read regressions

On main branch
```
./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30

readrandom   :       4.827 micros/op 1657100 ops/sec 30.005 seconds 49720992 operations;  183.3 MB/s (6198999 of 6198999 found)
```

On this branch
```
./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30

readrandom   :       4.863 micros/op 1644808 ops/sec 30.007 seconds 49354992 operations;  182.0 MB/s (6099999 of 6099999 found)

./db_bench -use_existing_db=true -db=/dev/shm/dbbench -benchmarks=readrandom -seed=1 -threads=8 -duration=30 -open_files_async=true

readrandom   :       4.803 micros/op 1665392 ops/sec 30.004 seconds 49968992 operations;  184.2 MB/s (6222999 of 6222999 found)
```

Reviewed By: pdillinger, xingbowang

Differential Revision: D93538033

Pulled By: joshkang97

fbshipit-source-id: 32ac70c112cd733b7c1e1c1e2e7ce6422318a5ae
2026-03-02 16:18:14 -08:00

96 lines
2.9 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/version_util.h"
#include <atomic>
#include <functional>
#include <thread>
#include <utility>
#include <vector>
#include "db/internal_stats.h"
#include "db/table_cache.h"
#include "port/port.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
Status LoadTableHandlersHelper(
const std::vector<std::pair<FileMetaData*, int>>& files_meta,
TableCache* table_cache, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
std::atomic<bool>* stop) {
assert(table_cache != nullptr);
std::atomic<size_t> next_file_meta_idx(0);
std::atomic<bool> has_error(false);
Status ret;
std::function<void()> load_handlers_func([&]() {
while (true) {
size_t file_idx = next_file_meta_idx.fetch_add(1);
if (has_error.load(std::memory_order_relaxed)) {
break;
}
if (file_idx >= files_meta.size()) {
break;
}
if (stop != nullptr && stop->load()) {
break;
}
auto* cache = table_cache->get_cache().get();
if (cache->GetCapacity() < TableCache::kInfiniteCapacity &&
cache->GetUsage() >= cache->GetCapacity()) {
break;
}
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
TEST_SYNC_POINT_CALLBACK(
"VersionBuilder::Rep::LoadTableHandlers::BeforeFindTable", file_meta);
TableCache::TypedHandle* handle = nullptr;
TableReader* table_reader = nullptr;
auto status = table_cache->FindTable(
read_options, file_options, internal_comparator, *file_meta, &handle,
mutable_cf_options, &table_reader, false /* no_io */,
internal_stats->GetFileReadHist(level), false /* skip_filters */,
level, prefetch_index_and_filter_in_cache,
max_file_size_for_l0_meta_pin, file_meta->temperature,
true /* pin_table_handle */);
TEST_SYNC_POINT_CALLBACK(
"VersionBuilder::Rep::LoadTableHandlers::AfterFindTable", &status);
if (!status.ok()) {
bool expected = false;
if (has_error.compare_exchange_strong(expected, true)) {
ret = status;
}
}
}
});
std::vector<port::Thread> threads;
for (int i = 1; i < max_threads; i++) {
threads.emplace_back(load_handlers_func);
}
load_handlers_func();
for (auto& t : threads) {
t.join();
}
return ret;
}
} // namespace ROCKSDB_NAMESPACE