217 lines
8.5 KiB
Rust
217 lines
8.5 KiB
Rust
// Copyright 2026 Tyler Neely
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
//! Smoke tests for the async-IO `MultiGet` path.
|
|
//!
|
|
//! These tests run in both feature configurations. When the `coroutines`
|
|
//! feature is disabled, `async_io=true` parallelizes only within a single
|
|
//! LSM level; when enabled, the multi-level coroutine path activates. The
|
|
//! tests verify that results are *correct* in both configurations - they do
|
|
//! not assert which dispatch branch ran.
|
|
//!
|
|
//! The multi-level test explicitly populates several LSM levels and asserts
|
|
//! the layout via `rocksdb.num-files-at-levelN` before running the
|
|
//! MultiGet, so failures from a "MultiGet across multiple levels" bug land
|
|
//! in a test that actually has data in multiple levels.
|
|
|
|
mod util;
|
|
|
|
use pretty_assertions::assert_eq;
|
|
use rust_rocksdb::{
|
|
CompactOptions, DB, Options, ReadOptions, WaitForCompactOptions, WriteOptions,
|
|
properties::num_files_at_level,
|
|
};
|
|
|
|
use util::DBPath;
|
|
|
|
/// Counts non-empty levels by querying `rocksdb.num-files-at-level{N}` for
|
|
/// each level. Returns `(non_empty_count, [files_per_level; max_levels])`.
|
|
fn level_layout(db: &DB, max_levels: usize) -> (usize, Vec<u64>) {
|
|
let counts: Vec<u64> = (0..max_levels)
|
|
.map(|lvl| {
|
|
db.property_int_value(num_files_at_level(lvl))
|
|
.unwrap()
|
|
.unwrap_or(0)
|
|
})
|
|
.collect();
|
|
let non_empty = counts.iter().filter(|&&n| n > 0).count();
|
|
(non_empty, counts)
|
|
}
|
|
|
|
/// Verifies that `built_with_coroutines()` agrees with the crate's `coroutines`
|
|
/// cargo feature.
|
|
///
|
|
/// This is intentionally tautological at the source level - the function is
|
|
/// implemented as `cfg!(feature = "coroutines")` - but it's not pointless. If
|
|
/// the function is ever refactored to read its answer from a runtime symbol
|
|
/// (for example, after the upstream PR adding `rocksdb_compiled_with_coroutines`
|
|
/// is merged and we bump the submodule), this test would catch a wiring bug
|
|
/// where the runtime value disagrees with the build feature.
|
|
#[test]
|
|
fn built_with_coroutines_matches_feature_flag() {
|
|
assert_eq!(
|
|
rust_rocksdb::built_with_coroutines(),
|
|
cfg!(feature = "coroutines"),
|
|
"built_with_coroutines() must match the cargo feature flag"
|
|
);
|
|
}
|
|
|
|
/// Verifies that `multi_get` with `async_io=true` returns correct results
|
|
/// when keys are actually spread across multiple LSM levels. Without an
|
|
/// explicit level-layout check, a regression that breaks the multi-level
|
|
/// dispatch path could pass silently because the test setup never produced
|
|
/// such a layout.
|
|
#[test]
|
|
fn multi_get_async_io_matches_serial_get_across_levels() {
|
|
let path = DBPath::new("_rust_rocksdb_multi_get_async_io_multi_level");
|
|
{
|
|
// Configure tiny memtables and SSTs so a moderate number of writes
|
|
// forces L0->L1->L2 compactions. Disable auto compactions so the
|
|
// test drives the layout deterministically via `compact_range_opt`.
|
|
let mut opts = Options::default();
|
|
opts.create_if_missing(true);
|
|
opts.set_write_buffer_size(4 * 1024);
|
|
opts.set_target_file_size_base(4 * 1024);
|
|
opts.set_max_bytes_for_level_base(16 * 1024);
|
|
opts.set_max_bytes_for_level_multiplier(2.0);
|
|
opts.set_num_levels(5);
|
|
opts.set_disable_auto_compactions(true);
|
|
opts.set_level_zero_file_num_compaction_trigger(64); // effectively disabled
|
|
|
|
let db = DB::open(&opts, &path).unwrap();
|
|
let mut wo = WriteOptions::default();
|
|
wo.set_sync(false);
|
|
|
|
// Three batches, each flushed to its own L0 file, then compacted
|
|
// down one level at a time. After the dance there should be data
|
|
// in at least L0 and one deeper level.
|
|
let value = vec![b'v'; 200];
|
|
|
|
// 30-second timeout for each compaction wait. With
|
|
// disable_auto_compactions=true and a high L0 trigger, no background
|
|
// work runs concurrently, but we still call wait_for_compact after
|
|
// each manual compaction as defense-in-depth so the level layout is
|
|
// settled before we measure it.
|
|
let make_wait_opts = || {
|
|
let mut o = WaitForCompactOptions::default();
|
|
o.set_flush(true);
|
|
o.set_timeout(30 * 1_000_000); // 30s in microseconds
|
|
o
|
|
};
|
|
|
|
let put_batch = |start: u32, end: u32| {
|
|
for i in start..end {
|
|
let key = format!("key-{i:08}");
|
|
db.put_opt(key.as_bytes(), &value, &wo).unwrap();
|
|
}
|
|
db.flush().unwrap();
|
|
};
|
|
|
|
// Batch 1 -> L0 -> compact to L2.
|
|
put_batch(0, 200);
|
|
let mut co = CompactOptions::default();
|
|
co.set_change_level(true);
|
|
co.set_target_level(2);
|
|
db.compact_range_opt::<&[u8], &[u8]>(None, None, &co);
|
|
db.wait_for_compact(&make_wait_opts()).unwrap();
|
|
|
|
// Batch 2 -> L0 -> compact to L1.
|
|
put_batch(200, 400);
|
|
let mut co = CompactOptions::default();
|
|
co.set_change_level(true);
|
|
co.set_target_level(1);
|
|
db.compact_range_opt::<&[u8], &[u8]>(None, None, &co);
|
|
db.wait_for_compact(&make_wait_opts()).unwrap();
|
|
|
|
// Batch 3 -> stays in L0 (no further compaction).
|
|
put_batch(400, 600);
|
|
db.wait_for_compact(&make_wait_opts()).unwrap();
|
|
|
|
let (non_empty, counts) = level_layout(&db, 5);
|
|
assert!(
|
|
non_empty >= 2,
|
|
"test setup failed to spread data across multiple LSM levels: \
|
|
non_empty={non_empty}, counts={counts:?}; \
|
|
coroutines feature: {}",
|
|
cfg!(feature = "coroutines")
|
|
);
|
|
|
|
// Build a key batch spanning all three insertion batches plus some
|
|
// misses. Each batch landed in a different level, so a MultiGet
|
|
// across this set forces multi-level lookup.
|
|
let mut keys: Vec<Vec<u8>> = (0..600u32)
|
|
.step_by(37)
|
|
.map(|i| format!("key-{i:08}").into_bytes())
|
|
.collect();
|
|
keys.push(b"key-99999999".to_vec()); // miss
|
|
keys.push(b"key-00001234".to_vec()); // miss
|
|
|
|
let reference: Vec<Option<Vec<u8>>> = keys.iter().map(|k| db.get(k).unwrap()).collect();
|
|
|
|
let mut ro = ReadOptions::default();
|
|
ro.set_async_io(true);
|
|
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
|
|
let async_results: Vec<Option<Vec<u8>>> = db
|
|
.multi_get_opt(key_refs, &ro)
|
|
.into_iter()
|
|
.map(Result::unwrap)
|
|
.collect();
|
|
|
|
assert_eq!(
|
|
reference,
|
|
async_results,
|
|
"async_io MultiGet results must match a serial Get loop \
|
|
(coroutines feature: {}, level counts: {counts:?})",
|
|
cfg!(feature = "coroutines")
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Reads a batch of keys that all live in a single SST after a flush.
|
|
/// Exercises the same-level fast path even when the coroutines feature is
|
|
/// enabled, since version_set.cc's `MultiGetFromSST` short-circuits the
|
|
/// coroutine dispatch when only one file is involved.
|
|
#[test]
|
|
fn multi_get_async_io_matches_serial_get_single_level() {
|
|
let path = DBPath::new("_rust_rocksdb_multi_get_async_io_single_level");
|
|
{
|
|
let mut opts = Options::default();
|
|
opts.create_if_missing(true);
|
|
|
|
let db = DB::open(&opts, &path).unwrap();
|
|
for i in 0..32u32 {
|
|
let key = format!("key-{i:04}");
|
|
db.put(key.as_bytes(), format!("value-{i}").as_bytes())
|
|
.unwrap();
|
|
}
|
|
db.flush().unwrap();
|
|
|
|
let keys: Vec<Vec<u8>> = (0..32u32)
|
|
.map(|i| format!("key-{i:04}").into_bytes())
|
|
.collect();
|
|
|
|
let reference: Vec<Option<Vec<u8>>> = keys.iter().map(|k| db.get(k).unwrap()).collect();
|
|
|
|
let mut ro = ReadOptions::default();
|
|
ro.set_async_io(true);
|
|
let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect();
|
|
let async_results: Vec<Option<Vec<u8>>> = db
|
|
.multi_get_opt(key_refs, &ro)
|
|
.into_iter()
|
|
.map(Result::unwrap)
|
|
.collect();
|
|
|
|
assert_eq!(reference, async_results);
|
|
}
|
|
}
|