forked from continuwuation/rocksdb
Summary: * Address feedback from https://github.com/facebook/rocksdb/issues/14040 * Add additional test for MultiScan * Fix a bug when del range and data are in same file for multi-scan * Rewrite the cases need to be handled in SeekMultiScan Pull Request resolved: https://github.com/facebook/rocksdb/pull/14055 Test Plan: Unit test Reviewed By: cbi42, anand1976 Differential Revision: D84851788 Pulled By: xingbowang fbshipit-source-id: 0f69632733afb99685f6341badbf239681010c38
2002 lines
82 KiB
C++
2002 lines
82 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
|
|
#include <cmath>
|
|
#include <memory>
|
|
#include <string>
|
|
|
|
#include "cache/cache_reservation_manager.h"
|
|
#include "db/db_test_util.h"
|
|
#include "db/table_properties_collector.h"
|
|
#include "file/file_util.h"
|
|
#include "options/options_helper.h"
|
|
#include "port/port.h"
|
|
#include "port/stack_trace.h"
|
|
#include "rocksdb/compression_type.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/file_system.h"
|
|
#include "rocksdb/options.h"
|
|
#include "table/block_based/block_based_table_builder.h"
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
#include "table/block_based/block_based_table_iterator.h"
|
|
#include "table/block_based/partitioned_index_iterator.h"
|
|
#include "table/format.h"
|
|
#include "test_util/testharness.h"
|
|
#include "test_util/testutil.h"
|
|
#include "util/random.h"
|
|
|
|
// Enable io_uring support for this test
|
|
extern "C" bool RocksDbIOUringEnable() { return true; }
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class BlockBasedTableReaderBaseTest : public testing::Test {
|
|
public:
|
|
static constexpr int kBytesPerEntry = 256;
|
|
// 16 = (default block size) 4 * 1024 / kBytesPerEntry
|
|
static constexpr int kEntriesPerBlock = 16;
|
|
|
|
protected:
|
|
// Prepare key-value pairs to occupy multiple blocks.
|
|
// Each (key, value) pair is `kBytesPerEntry` byte, every kEntriesPerBlock
|
|
// pairs constitute 1 block.
|
|
// If mixed_with_human_readable_string_value == true,
|
|
// then adjacent blocks contain values with different compression
|
|
// complexity: human readable strings are easier to compress than random
|
|
// strings. key is an internal key.
|
|
// When ts_sz > 0 and `same_key_diff_ts` is true, this
|
|
// function generate keys with the same user provided key, with different
|
|
// user defined timestamps and different sequence number to differentiate them
|
|
static std::vector<std::pair<std::string, std::string>> GenerateKVMap(
|
|
int num_block = 2, bool mixed_with_human_readable_string_value = false,
|
|
size_t ts_sz = 0, bool same_key_diff_ts = false,
|
|
const Comparator* comparator = BytewiseComparator()) {
|
|
std::vector<std::pair<std::string, std::string>> kv;
|
|
|
|
SequenceNumber seq_no = 0;
|
|
uint64_t current_udt = 0;
|
|
if (same_key_diff_ts) {
|
|
// These numbers are based on the number of keys to create + an arbitrary
|
|
// buffer number (100) to avoid overflow.
|
|
current_udt = kEntriesPerBlock * num_block + 100;
|
|
seq_no = kEntriesPerBlock * num_block + 100;
|
|
}
|
|
Random rnd(101);
|
|
uint32_t key = 0;
|
|
// To make each (key, value) pair occupy exactly kBytesPerEntry bytes.
|
|
int value_size = kBytesPerEntry - (8 + static_cast<int>(ts_sz) +
|
|
static_cast<int>(kNumInternalBytes));
|
|
for (int block = 0; block < num_block; block++) {
|
|
for (int i = 0; i < kEntriesPerBlock; i++) {
|
|
char k[9] = {0};
|
|
// Internal key is constructed directly from this key,
|
|
// and internal key size is required to be >= 8 bytes,
|
|
// so use %08u as the format string.
|
|
snprintf(k, sizeof(k), "%08u", key);
|
|
std::string v;
|
|
if (mixed_with_human_readable_string_value) {
|
|
v = (block % 2) ? rnd.HumanReadableString(value_size)
|
|
: rnd.RandomString(value_size);
|
|
} else {
|
|
v = rnd.RandomString(value_size);
|
|
}
|
|
std::string user_key = std::string(k);
|
|
if (ts_sz > 0) {
|
|
if (same_key_diff_ts) {
|
|
PutFixed64(&user_key, current_udt);
|
|
current_udt -= 1;
|
|
} else {
|
|
PutFixed64(&user_key, 0);
|
|
}
|
|
}
|
|
InternalKey internal_key(user_key, seq_no, ValueType::kTypeValue);
|
|
kv.emplace_back(internal_key.Encode().ToString(), v);
|
|
if (same_key_diff_ts) {
|
|
seq_no -= 1;
|
|
} else {
|
|
key++;
|
|
}
|
|
}
|
|
}
|
|
auto comparator_name = std::string(comparator->Name());
|
|
if (comparator_name.find("Reverse") != std::string::npos) {
|
|
std::reverse(kv.begin(), kv.end());
|
|
}
|
|
return kv;
|
|
}
|
|
|
|
void SetUp() override {
|
|
SetupSyncPointsToMockDirectIO();
|
|
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
|
|
env_ = Env::Default();
|
|
fs_ = FileSystem::Default();
|
|
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
|
|
ConfigureTableFactory();
|
|
}
|
|
|
|
virtual void ConfigureTableFactory() = 0;
|
|
|
|
void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
|
|
|
|
// Creates a table with the specificied key value pairs (kv).
|
|
void CreateTable(const std::string& table_name,
|
|
const ImmutableOptions& ioptions,
|
|
const CompressionType& compression_type,
|
|
const std::vector<std::pair<std::string, std::string>>& kv,
|
|
uint32_t compression_parallel_threads = 1,
|
|
uint32_t compression_dict_bytes = 0) {
|
|
std::unique_ptr<WritableFileWriter> writer;
|
|
NewFileWriter(table_name, &writer);
|
|
|
|
InternalKeyComparator comparator(ioptions.user_comparator);
|
|
ColumnFamilyOptions cf_options;
|
|
cf_options.comparator = ioptions.user_comparator;
|
|
cf_options.prefix_extractor = options_.prefix_extractor;
|
|
MutableCFOptions moptions(cf_options);
|
|
CompressionOptions compression_opts;
|
|
compression_opts.parallel_threads = compression_parallel_threads;
|
|
// Enable compression dictionary and set a buffering limit that is the same
|
|
// as each block's size.
|
|
compression_opts.max_dict_bytes = compression_dict_bytes;
|
|
compression_opts.max_dict_buffer_bytes = compression_dict_bytes;
|
|
InternalTblPropCollFactories factories;
|
|
const ReadOptions read_options;
|
|
const WriteOptions write_options;
|
|
std::unique_ptr<TableBuilder> table_builder(
|
|
options_.table_factory->NewTableBuilder(
|
|
TableBuilderOptions(ioptions, moptions, read_options, write_options,
|
|
comparator, &factories, compression_type,
|
|
compression_opts, 0 /* column_family_id */,
|
|
kDefaultColumnFamilyName, -1 /* level */,
|
|
kUnknownNewestKeyTime),
|
|
writer.get()));
|
|
|
|
// Build table.
|
|
for (auto it = kv.begin(); it != kv.end(); it++) {
|
|
std::string v = it->second;
|
|
table_builder->Add(it->first, v);
|
|
}
|
|
ASSERT_OK(table_builder->Finish());
|
|
}
|
|
|
|
void NewBlockBasedTableReader(const FileOptions& foptions,
|
|
const ImmutableOptions& ioptions,
|
|
const InternalKeyComparator& comparator,
|
|
const std::string& table_name,
|
|
std::unique_ptr<BlockBasedTable>* table,
|
|
bool prefetch_index_and_filter_in_cache = true,
|
|
Status* status = nullptr,
|
|
bool user_defined_timestamps_persisted = true) {
|
|
const MutableCFOptions moptions(options_);
|
|
TableReaderOptions table_reader_options = TableReaderOptions(
|
|
ioptions, moptions.prefix_extractor, moptions.compression_manager.get(),
|
|
foptions, comparator, 0 /* block_protection_bytes_per_key */,
|
|
false /* _skip_filters */, false /* _immortal */,
|
|
false /* _force_direct_prefetch */, -1 /* _level */,
|
|
nullptr /* _block_cache_tracer */,
|
|
0 /* _max_file_size_for_l0_meta_pin */, "" /* _cur_db_session_id */,
|
|
table_num_++ /* _cur_file_num */, {} /* _unique_id */,
|
|
0 /* _largest_seqno */, 0 /* _tail_size */,
|
|
user_defined_timestamps_persisted);
|
|
|
|
std::unique_ptr<RandomAccessFileReader> file;
|
|
NewFileReader(table_name, foptions, &file, ioptions.statistics.get());
|
|
|
|
uint64_t file_size = 0;
|
|
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
|
|
|
|
ReadOptions read_opts;
|
|
read_opts.verify_checksums = true;
|
|
std::unique_ptr<TableReader> general_table;
|
|
Status s = options_.table_factory->NewTableReader(
|
|
read_opts, table_reader_options, std::move(file), file_size,
|
|
&general_table, prefetch_index_and_filter_in_cache);
|
|
|
|
if (s.ok()) {
|
|
table->reset(static_cast<BlockBasedTable*>(general_table.release()));
|
|
}
|
|
|
|
if (status) {
|
|
*status = s;
|
|
} else {
|
|
ASSERT_OK(s);
|
|
}
|
|
}
|
|
|
|
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
|
|
|
|
std::string test_dir_;
|
|
Env* env_;
|
|
std::shared_ptr<FileSystem> fs_;
|
|
Options options_;
|
|
uint64_t table_num_{0};
|
|
|
|
private:
|
|
void WriteToFile(const std::string& content, const std::string& filename) {
|
|
std::unique_ptr<FSWritableFile> f;
|
|
ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
|
|
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
|
|
ASSERT_OK(f->Close(IOOptions(), nullptr));
|
|
}
|
|
|
|
void NewFileWriter(const std::string& filename,
|
|
std::unique_ptr<WritableFileWriter>* writer) {
|
|
std::string path = Path(filename);
|
|
EnvOptions env_options;
|
|
FileOptions foptions;
|
|
std::unique_ptr<FSWritableFile> file;
|
|
ASSERT_OK(fs_->NewWritableFile(path, foptions, &file, nullptr));
|
|
writer->reset(new WritableFileWriter(std::move(file), path, env_options));
|
|
}
|
|
|
|
void NewFileReader(const std::string& filename, const FileOptions& opt,
|
|
std::unique_ptr<RandomAccessFileReader>* reader,
|
|
Statistics* stats = nullptr) {
|
|
std::string path = Path(filename);
|
|
std::unique_ptr<FSRandomAccessFile> f;
|
|
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
|
|
reader->reset(new RandomAccessFileReader(std::move(f), path,
|
|
env_->GetSystemClock().get(),
|
|
/*io_tracer=*/nullptr,
|
|
/*stats=*/stats));
|
|
}
|
|
};
|
|
|
|
struct BlockBasedTableReaderTestParam {
|
|
BlockBasedTableReaderTestParam(
|
|
CompressionType _compression_type, bool _use_direct_reads,
|
|
BlockBasedTableOptions::IndexType _index_type, bool _no_block_cache,
|
|
test::UserDefinedTimestampTestMode _udt_test_mode,
|
|
uint32_t _compression_parallel_threads, uint32_t _compression_dict_bytes,
|
|
bool _same_key_diff_ts, const Comparator* _comparator, bool _fill_cache,
|
|
bool _use_async_io, bool _block_align, size_t _super_block_alignment_size,
|
|
size_t _super_block_alignment_space_overhead_ratio)
|
|
: compression_type(_compression_type),
|
|
use_direct_reads(_use_direct_reads),
|
|
index_type(_index_type),
|
|
no_block_cache(_no_block_cache),
|
|
udt_test_mode(_udt_test_mode),
|
|
compression_parallel_threads(_compression_parallel_threads),
|
|
compression_dict_bytes(_compression_dict_bytes),
|
|
same_key_diff_ts(_same_key_diff_ts),
|
|
comparator(_comparator),
|
|
fill_cache(_fill_cache),
|
|
use_async_io(_use_async_io),
|
|
block_align(_block_align),
|
|
super_block_alignment_size(_super_block_alignment_size),
|
|
super_block_alignment_space_overhead_ratio(
|
|
_super_block_alignment_space_overhead_ratio) {}
|
|
|
|
CompressionType compression_type;
|
|
bool use_direct_reads;
|
|
BlockBasedTableOptions::IndexType index_type;
|
|
bool no_block_cache;
|
|
test::UserDefinedTimestampTestMode udt_test_mode;
|
|
uint32_t compression_parallel_threads;
|
|
uint32_t compression_dict_bytes;
|
|
bool same_key_diff_ts;
|
|
const Comparator* comparator;
|
|
bool fill_cache;
|
|
bool use_async_io;
|
|
bool block_align;
|
|
size_t super_block_alignment_size;
|
|
size_t super_block_alignment_space_overhead_ratio;
|
|
};
|
|
|
|
// Define operator<< for SpotLockManagerTestParam to stop valgrind from
|
|
// complaining uinitialized value when printing SpotLockManagerTestParam.
|
|
std::ostream& operator<<(std::ostream& os,
|
|
const BlockBasedTableReaderTestParam& param) {
|
|
os << "compression_type: " << CompressionTypeToString(param.compression_type)
|
|
<< " use_direct_reads: " << param.use_direct_reads
|
|
<< " index_type: " << static_cast<int>(param.index_type)
|
|
<< " no_block_cache: " << param.no_block_cache
|
|
<< " udt_test_mode: " << static_cast<int>(param.udt_test_mode)
|
|
<< " compression_parallel_threads: " << param.compression_parallel_threads
|
|
<< " compression_dict_bytes: " << param.compression_dict_bytes
|
|
<< " same_key_diff_ts: " << param.same_key_diff_ts
|
|
<< " comparator: " << param.comparator->Name()
|
|
<< " fill_cache: " << param.fill_cache
|
|
<< " use_async_io: " << param.use_async_io
|
|
<< " block_align: " << param.block_align
|
|
<< " super_block_alignment_size: " << param.super_block_alignment_size
|
|
<< " super_block_alignment_space_overhead_ratio: "
|
|
<< param.super_block_alignment_space_overhead_ratio;
|
|
|
|
return os;
|
|
}
|
|
|
|
// Param 1: compression type
|
|
// Param 2: whether to use direct reads
|
|
// Param 3: Block Based Table Index type
|
|
// Param 4: BBTO no_block_cache option
|
|
// Param 5: test mode for the user-defined timestamp feature
|
|
// Param 6: number of parallel compression threads
|
|
// Param 7: CompressionOptions.max_dict_bytes and
|
|
// CompressionOptions.max_dict_buffer_bytes to enable/disable
|
|
// compression dictionary.
|
|
// Param 8: test mode to specify the pattern for generating key / value. When
|
|
// true, generate keys with the same user provided key, different
|
|
// user-defined timestamps (if udt enabled), different sequence
|
|
// numbers. This test mode is used for testing `Get`. When false,
|
|
// generate keys with different user provided key, same user-defined
|
|
// timestamps (if udt enabled), same sequence number. This test mode is
|
|
// used for testing `Get`, `MultiGet`, and `NewIterator`.
|
|
// Param 9: test both the default comparator and a reverse comparator.
|
|
class BlockBasedTableReaderTest
|
|
: public BlockBasedTableReaderBaseTest,
|
|
public testing::WithParamInterface<BlockBasedTableReaderTestParam> {
|
|
protected:
|
|
void SetUp() override {
|
|
auto param = GetParam();
|
|
compression_type_ = param.compression_type;
|
|
use_direct_reads_ = param.use_direct_reads;
|
|
test::UserDefinedTimestampTestMode udt_test_mode = param.udt_test_mode;
|
|
udt_enabled_ = test::IsUDTEnabled(udt_test_mode);
|
|
persist_udt_ = test::ShouldPersistUDT(udt_test_mode);
|
|
compression_parallel_threads_ = param.compression_parallel_threads;
|
|
compression_dict_bytes_ = param.compression_dict_bytes;
|
|
same_key_diff_ts_ = param.same_key_diff_ts;
|
|
comparator_ = param.comparator;
|
|
BlockBasedTableReaderBaseTest::SetUp();
|
|
}
|
|
|
|
void ConfigureTableFactory() override {
|
|
BlockBasedTableOptions opts;
|
|
auto param = GetParam();
|
|
opts.index_type = param.index_type;
|
|
opts.no_block_cache = param.no_block_cache;
|
|
opts.super_block_alignment_size = param.super_block_alignment_size;
|
|
opts.super_block_alignment_space_overhead_ratio =
|
|
param.super_block_alignment_space_overhead_ratio;
|
|
opts.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
opts.partition_filters =
|
|
opts.index_type ==
|
|
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
|
opts.metadata_cache_options.partition_pinning = PinningTier::kAll;
|
|
options_.table_factory.reset(
|
|
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
|
|
options_.prefix_extractor =
|
|
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(3));
|
|
}
|
|
|
|
CompressionType compression_type_;
|
|
bool use_direct_reads_;
|
|
bool udt_enabled_;
|
|
bool persist_udt_;
|
|
uint32_t compression_parallel_threads_;
|
|
uint32_t compression_dict_bytes_;
|
|
bool same_key_diff_ts_;
|
|
const Comparator* comparator_{};
|
|
};
|
|
|
|
class BlockBasedTableReaderGetTest : public BlockBasedTableReaderTest {};
|
|
|
|
TEST_P(BlockBasedTableReaderGetTest, Get) {
|
|
Options options;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
}
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
100 /* num_block */,
|
|
true /* mixed_with_human_readable_string_value */, ts_sz,
|
|
same_key_diff_ts_);
|
|
|
|
std::string table_name = "BlockBasedTableReaderGetTest_Get" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
ReadOptions read_opts;
|
|
ASSERT_OK(
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
|
|
|
for (size_t i = 0; i < kv.size(); i += 1) {
|
|
Slice key = kv[i].first;
|
|
Slice lkey = key;
|
|
std::string lookup_ikey;
|
|
if (udt_enabled_ && !persist_udt_) {
|
|
// When user-defined timestamps are collapsed to be the minimum timestamp,
|
|
// we also read with the minimum timestamp to be able to retrieve each
|
|
// value.
|
|
ReplaceInternalKeyWithMinTimestamp(&lookup_ikey, key, ts_sz);
|
|
lkey = lookup_ikey;
|
|
}
|
|
// Reading the first entry in a block caches the whole block.
|
|
if (i % kEntriesPerBlock == 0) {
|
|
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
} else {
|
|
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
}
|
|
PinnableSlice value;
|
|
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
|
|
GetContext::kNotFound, ExtractUserKey(key), &value,
|
|
nullptr, nullptr, nullptr, nullptr,
|
|
true /* do_merge */, nullptr, nullptr, nullptr,
|
|
nullptr, nullptr, nullptr);
|
|
ASSERT_OK(table->Get(read_opts, lkey, &get_context, nullptr));
|
|
ASSERT_EQ(value.ToString(), kv[i].second);
|
|
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
}
|
|
}
|
|
|
|
// Tests MultiGet in both direct IO and non-direct IO mode.
|
|
// The keys should be in cache after MultiGet.
|
|
TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
|
Options options;
|
|
ReadOptions read_opts;
|
|
std::string dummy_ts(sizeof(uint64_t), '\0');
|
|
Slice read_timestamp = dummy_ts;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
read_opts.timestamp = &read_timestamp;
|
|
}
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
100 /* num_block */,
|
|
true /* mixed_with_human_readable_string_value */, ts_sz);
|
|
|
|
// Prepare keys, values, and statuses for MultiGet.
|
|
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
|
|
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys_without_timestamps;
|
|
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
|
|
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
|
autovector<const std::string*, MultiGetContext::MAX_BATCH_SIZE>
|
|
expected_values;
|
|
{
|
|
const int step =
|
|
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
|
|
auto it = kv.begin();
|
|
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
|
|
keys.emplace_back(it->first);
|
|
if (ts_sz > 0) {
|
|
Slice ukey_without_ts =
|
|
ExtractUserKeyAndStripTimestamp(it->first, ts_sz);
|
|
keys_without_timestamps.push_back(ukey_without_ts);
|
|
} else {
|
|
keys_without_timestamps.emplace_back(ExtractUserKey(it->first));
|
|
}
|
|
values.emplace_back();
|
|
statuses.emplace_back();
|
|
expected_values.push_back(&(it->second));
|
|
std::advance(it, step);
|
|
}
|
|
}
|
|
|
|
std::string table_name = "BlockBasedTableReaderTest_MultiGet" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
ASSERT_OK(
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
|
|
|
// Ensure that keys are not in cache before MultiGet.
|
|
for (auto& key : keys) {
|
|
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, key.ToString()));
|
|
}
|
|
|
|
// Prepare MultiGetContext.
|
|
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
|
|
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
|
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
get_context.emplace_back(options.comparator, nullptr, nullptr, nullptr,
|
|
GetContext::kNotFound, ExtractUserKey(keys[i]),
|
|
&values[i], nullptr, nullptr, nullptr, nullptr,
|
|
true /* do_merge */, nullptr, nullptr, nullptr,
|
|
nullptr, nullptr, nullptr);
|
|
key_context.emplace_back(nullptr, keys_without_timestamps[i], &values[i],
|
|
nullptr, nullptr, &statuses.back());
|
|
key_context.back().get_context = &get_context.back();
|
|
}
|
|
for (auto& key_ctx : key_context) {
|
|
sorted_keys.emplace_back(&key_ctx);
|
|
}
|
|
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, read_opts,
|
|
fs_.get(), nullptr);
|
|
|
|
// Execute MultiGet.
|
|
MultiGetContext::Range range = ctx.GetMultiGetRange();
|
|
PerfContext* perf_ctx = get_perf_context();
|
|
perf_ctx->Reset();
|
|
table->MultiGet(read_opts, &range, nullptr);
|
|
|
|
ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count -
|
|
perf_ctx->filter_block_read_count -
|
|
perf_ctx->compression_dict_block_read_count,
|
|
1);
|
|
ASSERT_GE(perf_ctx->block_read_byte, 1);
|
|
|
|
for (const Status& status : statuses) {
|
|
ASSERT_OK(status);
|
|
}
|
|
// Check that keys are in cache after MultiGet.
|
|
for (size_t i = 0; i < keys.size(); i++) {
|
|
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, keys[i]));
|
|
ASSERT_EQ(values[i].ToString(), *expected_values[i]);
|
|
}
|
|
}
|
|
|
|
TEST_P(BlockBasedTableReaderTest, NewIterator) {
|
|
Options options;
|
|
ReadOptions read_opts;
|
|
std::string dummy_ts(sizeof(uint64_t), '\0');
|
|
Slice read_timestamp = dummy_ts;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
read_opts.timestamp = &read_timestamp;
|
|
}
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
100 /* num_block */,
|
|
true /* mixed_with_human_readable_string_value */, ts_sz);
|
|
|
|
std::string table_name = "BlockBasedTableReaderTest_NewIterator" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
ASSERT_OK(
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
|
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
// Test forward scan.
|
|
ASSERT_TRUE(!iter->Valid());
|
|
iter->SeekToFirst();
|
|
ASSERT_OK(iter->status());
|
|
for (auto kv_iter = kv.begin(); kv_iter != kv.end(); kv_iter++) {
|
|
ASSERT_EQ(iter->key().ToString(), kv_iter->first);
|
|
ASSERT_EQ(iter->value().ToString(), kv_iter->second);
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
}
|
|
ASSERT_TRUE(!iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
|
|
// Test backward scan.
|
|
iter->SeekToLast();
|
|
ASSERT_OK(iter->status());
|
|
for (auto kv_iter = kv.rbegin(); kv_iter != kv.rend(); kv_iter++) {
|
|
ASSERT_EQ(iter->key().ToString(), kv_iter->first);
|
|
ASSERT_EQ(iter->value().ToString(), kv_iter->second);
|
|
iter->Prev();
|
|
ASSERT_OK(iter->status());
|
|
}
|
|
ASSERT_TRUE(!iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
}
|
|
|
|
class ChargeTableReaderTest
|
|
: public BlockBasedTableReaderBaseTest,
|
|
public testing::WithParamInterface<
|
|
CacheEntryRoleOptions::Decision /* charge_table_reader_mem */> {
|
|
protected:
|
|
static std::size_t CalculateMaxTableReaderNumBeforeCacheFull(
|
|
std::size_t cache_capacity, std::size_t approx_table_reader_mem) {
|
|
// To make calculation easier for testing
|
|
assert(cache_capacity % CacheReservationManagerImpl<
|
|
CacheEntryRole::kBlockBasedTableReader>::
|
|
GetDummyEntrySize() ==
|
|
0 &&
|
|
cache_capacity >= 2 * CacheReservationManagerImpl<
|
|
CacheEntryRole::kBlockBasedTableReader>::
|
|
GetDummyEntrySize());
|
|
|
|
// We need to subtract 1 for max_num_dummy_entry to account for dummy
|
|
// entries' overhead, assumed the overhead is no greater than 1 dummy entry
|
|
// size
|
|
std::size_t max_num_dummy_entry =
|
|
(size_t)std::floor((
|
|
1.0 * cache_capacity /
|
|
CacheReservationManagerImpl<
|
|
CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize())) -
|
|
1;
|
|
std::size_t cache_capacity_rounded_to_dummy_entry_multiples =
|
|
max_num_dummy_entry *
|
|
CacheReservationManagerImpl<
|
|
CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize();
|
|
std::size_t max_table_reader_num_capped = static_cast<std::size_t>(
|
|
std::floor(1.0 * cache_capacity_rounded_to_dummy_entry_multiples /
|
|
approx_table_reader_mem));
|
|
|
|
return max_table_reader_num_capped;
|
|
}
|
|
|
|
void SetUp() override {
|
|
// To cache and re-use the same kv map and compression type in the test
|
|
// suite for elimiating variance caused by these two factors
|
|
kv_ = BlockBasedTableReaderBaseTest::GenerateKVMap();
|
|
compression_type_ = CompressionType::kNoCompression;
|
|
|
|
table_reader_charge_tracking_cache_ = std::make_shared<
|
|
TargetCacheChargeTrackingCache<
|
|
CacheEntryRole::kBlockBasedTableReader>>((NewLRUCache(
|
|
4 * CacheReservationManagerImpl<
|
|
CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize(),
|
|
0 /* num_shard_bits */, true /* strict_capacity_limit */)));
|
|
|
|
// To ApproximateTableReaderMem() without being affected by
|
|
// the feature of charging its memory, we turn off the feature
|
|
charge_table_reader_ = CacheEntryRoleOptions::Decision::kDisabled;
|
|
BlockBasedTableReaderBaseTest::SetUp();
|
|
approx_table_reader_mem_ = ApproximateTableReaderMem();
|
|
|
|
// Now we condtionally turn on the feature to test
|
|
charge_table_reader_ = GetParam();
|
|
ConfigureTableFactory();
|
|
}
|
|
|
|
void ConfigureTableFactory() override {
|
|
BlockBasedTableOptions table_options;
|
|
table_options.cache_usage_options.options_overrides.insert(
|
|
{CacheEntryRole::kBlockBasedTableReader,
|
|
{/*.charged = */ charge_table_reader_}});
|
|
table_options.block_cache = table_reader_charge_tracking_cache_;
|
|
|
|
table_options.cache_index_and_filter_blocks = false;
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
table_options.partition_filters = true;
|
|
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
|
|
|
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
}
|
|
|
|
CacheEntryRoleOptions::Decision charge_table_reader_;
|
|
std::shared_ptr<
|
|
TargetCacheChargeTrackingCache<CacheEntryRole::kBlockBasedTableReader>>
|
|
table_reader_charge_tracking_cache_;
|
|
std::size_t approx_table_reader_mem_;
|
|
std::vector<std::pair<std::string, std::string>> kv_;
|
|
CompressionType compression_type_;
|
|
|
|
private:
|
|
std::size_t ApproximateTableReaderMem() {
|
|
std::size_t approx_table_reader_mem = 0;
|
|
|
|
std::string table_name = "table_for_approx_table_reader_mem";
|
|
ImmutableOptions ioptions(options_);
|
|
CreateTable(table_name, ioptions, compression_type_, kv_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
Status s;
|
|
NewBlockBasedTableReader(
|
|
FileOptions(), ImmutableOptions(options_),
|
|
InternalKeyComparator(options_.comparator), table_name, &table,
|
|
false /* prefetch_index_and_filter_in_cache */, &s);
|
|
assert(s.ok());
|
|
|
|
approx_table_reader_mem = table->ApproximateMemoryUsage();
|
|
assert(approx_table_reader_mem > 0);
|
|
return approx_table_reader_mem;
|
|
}
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
ChargeTableReaderTest, ChargeTableReaderTest,
|
|
::testing::Values(CacheEntryRoleOptions::Decision::kEnabled,
|
|
CacheEntryRoleOptions::Decision::kDisabled));
|
|
|
|
TEST_P(ChargeTableReaderTest, Basic) {
|
|
const std::size_t max_table_reader_num_capped =
|
|
ChargeTableReaderTest::CalculateMaxTableReaderNumBeforeCacheFull(
|
|
table_reader_charge_tracking_cache_->GetCapacity(),
|
|
approx_table_reader_mem_);
|
|
|
|
// Acceptable estimtation errors coming from
|
|
// 1. overstimate max_table_reader_num_capped due to # dummy entries is high
|
|
// and results in metadata charge overhead greater than 1 dummy entry size
|
|
// (violating our assumption in calculating max_table_reader_num_capped)
|
|
// 2. overestimate/underestimate max_table_reader_num_capped due to the gap
|
|
// between ApproximateTableReaderMem() and actual table reader mem
|
|
std::size_t max_table_reader_num_capped_upper_bound =
|
|
(std::size_t)(max_table_reader_num_capped * 1.05);
|
|
std::size_t max_table_reader_num_capped_lower_bound =
|
|
(std::size_t)(max_table_reader_num_capped * 0.95);
|
|
std::size_t max_table_reader_num_uncapped =
|
|
(std::size_t)(max_table_reader_num_capped * 1.1);
|
|
ASSERT_GT(max_table_reader_num_uncapped,
|
|
max_table_reader_num_capped_upper_bound)
|
|
<< "We need `max_table_reader_num_uncapped` > "
|
|
"`max_table_reader_num_capped_upper_bound` to differentiate cases "
|
|
"between "
|
|
"charge_table_reader_ == kDisabled and == kEnabled)";
|
|
|
|
Status s = Status::OK();
|
|
std::size_t opened_table_reader_num = 0;
|
|
std::string table_name;
|
|
std::vector<std::unique_ptr<BlockBasedTable>> tables;
|
|
ImmutableOptions ioptions(options_);
|
|
// Keep creating BlockBasedTableReader till hiting the memory limit based on
|
|
// cache capacity and creation fails (when charge_table_reader_ ==
|
|
// kEnabled) or reaching a specfied big number of table readers (when
|
|
// charge_table_reader_ == kDisabled)
|
|
while (s.ok() && opened_table_reader_num < max_table_reader_num_uncapped) {
|
|
table_name = "table_" + std::to_string(opened_table_reader_num);
|
|
CreateTable(table_name, ioptions, compression_type_, kv_);
|
|
tables.push_back(std::unique_ptr<BlockBasedTable>());
|
|
NewBlockBasedTableReader(
|
|
FileOptions(), ImmutableOptions(options_),
|
|
InternalKeyComparator(options_.comparator), table_name, &tables.back(),
|
|
false /* prefetch_index_and_filter_in_cache */, &s);
|
|
if (s.ok()) {
|
|
++opened_table_reader_num;
|
|
}
|
|
}
|
|
|
|
if (charge_table_reader_ == CacheEntryRoleOptions::Decision::kEnabled) {
|
|
EXPECT_TRUE(s.IsMemoryLimit()) << "s: " << s.ToString();
|
|
EXPECT_TRUE(s.ToString().find(
|
|
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
|
|
CacheEntryRole::kBlockBasedTableReader)]) !=
|
|
std::string::npos);
|
|
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
|
|
std::string::npos);
|
|
|
|
EXPECT_GE(opened_table_reader_num, max_table_reader_num_capped_lower_bound);
|
|
EXPECT_LE(opened_table_reader_num, max_table_reader_num_capped_upper_bound);
|
|
|
|
std::size_t updated_max_table_reader_num_capped =
|
|
ChargeTableReaderTest::CalculateMaxTableReaderNumBeforeCacheFull(
|
|
table_reader_charge_tracking_cache_->GetCapacity() / 2,
|
|
approx_table_reader_mem_);
|
|
|
|
// Keep deleting BlockBasedTableReader to lower down memory usage from the
|
|
// memory limit to make the next creation succeeds
|
|
while (opened_table_reader_num >= updated_max_table_reader_num_capped) {
|
|
tables.pop_back();
|
|
--opened_table_reader_num;
|
|
}
|
|
table_name = "table_for_successful_table_reader_open";
|
|
CreateTable(table_name, ioptions, compression_type_, kv_);
|
|
tables.push_back(std::unique_ptr<BlockBasedTable>());
|
|
NewBlockBasedTableReader(
|
|
FileOptions(), ImmutableOptions(options_),
|
|
InternalKeyComparator(options_.comparator), table_name, &tables.back(),
|
|
false /* prefetch_index_and_filter_in_cache */, &s);
|
|
EXPECT_TRUE(s.ok()) << s.ToString();
|
|
|
|
tables.clear();
|
|
EXPECT_EQ(table_reader_charge_tracking_cache_->GetCacheCharge(), 0);
|
|
} else {
|
|
EXPECT_TRUE(s.ok() &&
|
|
opened_table_reader_num == max_table_reader_num_uncapped)
|
|
<< "s: " << s.ToString() << " opened_table_reader_num: "
|
|
<< std::to_string(opened_table_reader_num);
|
|
EXPECT_EQ(table_reader_charge_tracking_cache_->GetCacheCharge(), 0);
|
|
}
|
|
}
|
|
|
|
class StrictCapacityLimitReaderTest : public BlockBasedTableReaderTest {
|
|
public:
|
|
StrictCapacityLimitReaderTest() : BlockBasedTableReaderTest() {}
|
|
|
|
protected:
|
|
void ConfigureTableFactory() override {
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.block_cache = std::make_shared<
|
|
TargetCacheChargeTrackingCache<CacheEntryRole::kBlockBasedTableReader>>(
|
|
(NewLRUCache(4 * 1024, 0 /* num_shard_bits */,
|
|
true /* strict_capacity_limit */)));
|
|
|
|
table_options.cache_index_and_filter_blocks = false;
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
table_options.partition_filters = true;
|
|
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
|
|
|
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
}
|
|
};
|
|
|
|
TEST_P(StrictCapacityLimitReaderTest, Get) {
|
|
// Test that we get error status when we exceed
|
|
// the strict_capacity_limit
|
|
Options options;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
2 /* num_block */, true /* mixed_with_human_readable_string_value */,
|
|
ts_sz, false);
|
|
|
|
std::string table_name = "StrictCapacityLimitReaderTest_Get" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = true;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */);
|
|
|
|
ReadOptions read_opts;
|
|
ASSERT_OK(
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
|
|
|
bool hit_memory_limit = false;
|
|
for (size_t i = 0; i < kv.size(); i += 1) {
|
|
Slice key = kv[i].first;
|
|
Slice lkey = key;
|
|
std::string lookup_ikey;
|
|
// Reading the first entry in a block caches the whole block.
|
|
if (i % kEntriesPerBlock == 0) {
|
|
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
} else if (!hit_memory_limit) {
|
|
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
}
|
|
PinnableSlice value;
|
|
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
|
|
GetContext::kNotFound, ExtractUserKey(key), &value,
|
|
nullptr, nullptr, nullptr, nullptr,
|
|
true /* do_merge */, nullptr, nullptr, nullptr,
|
|
nullptr, nullptr, nullptr);
|
|
Status s = table->Get(read_opts, lkey, &get_context, nullptr);
|
|
if (!s.ok()) {
|
|
EXPECT_TRUE(s.IsMemoryLimit());
|
|
EXPECT_TRUE(s.ToString().find("Memory limit reached: Insert failed due "
|
|
"to LRU cache being full") !=
|
|
std::string::npos);
|
|
hit_memory_limit = true;
|
|
} else {
|
|
ASSERT_EQ(value.ToString(), kv[i].second);
|
|
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
|
|
}
|
|
}
|
|
|
|
ASSERT_TRUE(hit_memory_limit);
|
|
}
|
|
|
|
TEST_P(StrictCapacityLimitReaderTest, MultiGet) {
|
|
// Test that we get error status when we exceed
|
|
// the strict_capacity_limit
|
|
Options options;
|
|
ReadOptions read_opts;
|
|
std::string dummy_ts(sizeof(uint64_t), '\0');
|
|
Slice read_timestamp = dummy_ts;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
read_opts.timestamp = &read_timestamp;
|
|
}
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
2 /* num_block */, true /* mixed_with_human_readable_string_value */,
|
|
ts_sz);
|
|
|
|
// Prepare keys, values, and statuses for MultiGet.
|
|
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
|
|
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys_without_timestamps;
|
|
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
|
|
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
|
autovector<const std::string*, MultiGetContext::MAX_BATCH_SIZE>
|
|
expected_values;
|
|
{
|
|
const int step =
|
|
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
|
|
auto it = kv.begin();
|
|
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
|
|
keys.emplace_back(it->first);
|
|
if (ts_sz > 0) {
|
|
Slice ukey_without_ts =
|
|
ExtractUserKeyAndStripTimestamp(it->first, ts_sz);
|
|
keys_without_timestamps.push_back(ukey_without_ts);
|
|
} else {
|
|
keys_without_timestamps.emplace_back(ExtractUserKey(it->first));
|
|
}
|
|
values.emplace_back();
|
|
statuses.emplace_back();
|
|
expected_values.push_back(&(it->second));
|
|
std::advance(it, step);
|
|
}
|
|
}
|
|
|
|
std::string table_name = "StrictCapacityLimitReaderTest_MultiGet" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
ASSERT_OK(
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
|
|
|
// Ensure that keys are not in cache before MultiGet.
|
|
for (auto& key : keys) {
|
|
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, key.ToString()));
|
|
}
|
|
|
|
// Prepare MultiGetContext.
|
|
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
|
|
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
|
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
get_context.emplace_back(options.comparator, nullptr, nullptr, nullptr,
|
|
GetContext::kNotFound, ExtractUserKey(keys[i]),
|
|
&values[i], nullptr, nullptr, nullptr, nullptr,
|
|
true /* do_merge */, nullptr, nullptr, nullptr,
|
|
nullptr, nullptr, nullptr);
|
|
key_context.emplace_back(nullptr, keys_without_timestamps[i], &values[i],
|
|
nullptr, nullptr, &statuses.back());
|
|
key_context.back().get_context = &get_context.back();
|
|
}
|
|
for (auto& key_ctx : key_context) {
|
|
sorted_keys.emplace_back(&key_ctx);
|
|
}
|
|
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, read_opts,
|
|
fs_.get(), nullptr);
|
|
|
|
// Execute MultiGet.
|
|
MultiGetContext::Range range = ctx.GetMultiGetRange();
|
|
PerfContext* perf_ctx = get_perf_context();
|
|
perf_ctx->Reset();
|
|
table->MultiGet(read_opts, &range, nullptr);
|
|
|
|
ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count -
|
|
perf_ctx->filter_block_read_count -
|
|
perf_ctx->compression_dict_block_read_count,
|
|
1);
|
|
ASSERT_GE(perf_ctx->block_read_byte, 1);
|
|
|
|
bool hit_memory_limit = false;
|
|
for (const Status& status : statuses) {
|
|
if (!status.ok()) {
|
|
EXPECT_TRUE(status.IsMemoryLimit());
|
|
hit_memory_limit = true;
|
|
}
|
|
}
|
|
ASSERT_TRUE(hit_memory_limit);
|
|
}
|
|
|
|
class BlockBasedTableReaderTestVerifyChecksum
|
|
: public BlockBasedTableReaderTest {
|
|
public:
|
|
BlockBasedTableReaderTestVerifyChecksum() : BlockBasedTableReaderTest() {}
|
|
};
|
|
|
|
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
|
|
Options options;
|
|
ReadOptions read_opts;
|
|
std::string dummy_ts(sizeof(uint64_t), '\0');
|
|
Slice read_timestamp = dummy_ts;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
read_opts.timestamp = &read_timestamp;
|
|
}
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
800 /* num_block */,
|
|
false /* mixed_with_human_readable_string_value=*/, ts_sz);
|
|
|
|
options.statistics = CreateDBStatistics();
|
|
ImmutableOptions ioptions(options);
|
|
std::string table_name =
|
|
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
// Use the top level iterator to find the offset/size of the first
|
|
// 2nd level index block and corrupt the block
|
|
IndexBlockIter iiter_on_stack;
|
|
BlockCacheLookupContext context{TableReaderCaller::kUserVerifyChecksum};
|
|
InternalIteratorBase<IndexValue>* iiter = table->NewIndexIterator(
|
|
read_opts, /*need_upper_bound_check=*/false, &iiter_on_stack,
|
|
/*get_context=*/nullptr, &context);
|
|
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
|
|
if (iiter != &iiter_on_stack) {
|
|
iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
|
|
}
|
|
ASSERT_OK(iiter->status());
|
|
iiter->SeekToFirst();
|
|
BlockHandle handle = static_cast<PartitionedIndexIterator*>(iiter)
|
|
->index_iter_->value()
|
|
.handle;
|
|
table.reset();
|
|
|
|
// Corrupt the block pointed to by handle
|
|
ASSERT_OK(test::CorruptFile(options.env, Path(table_name),
|
|
static_cast<int>(handle.offset()), 128));
|
|
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
ASSERT_EQ(0,
|
|
options.statistics->getTickerCount(BLOCK_CHECKSUM_MISMATCH_COUNT));
|
|
Status s =
|
|
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum);
|
|
ASSERT_EQ(1,
|
|
options.statistics->getTickerCount(BLOCK_CHECKSUM_MISMATCH_COUNT));
|
|
ASSERT_EQ(s.code(), Status::kCorruption);
|
|
}
|
|
|
|
class BlockBasedTableReaderMultiScanTest : public BlockBasedTableReaderTest {
|
|
public:
|
|
void SetUp() override {
|
|
BlockBasedTableReaderTest::SetUp();
|
|
options_.comparator = comparator_;
|
|
}
|
|
};
|
|
|
|
class BlockBasedTableReaderMultiScanAsyncIOTest
|
|
: public BlockBasedTableReaderMultiScanTest {};
|
|
|
|
// TODO: test no block cache case
|
|
TEST_P(BlockBasedTableReaderMultiScanAsyncIOTest, MultiScanPrepare) {
|
|
auto param = GetParam();
|
|
auto fill_cache = param.fill_cache;
|
|
auto use_async_io = param.use_async_io;
|
|
|
|
options_.statistics = CreateDBStatistics();
|
|
std::shared_ptr<FileSystem> fs = options_.env->GetFileSystem();
|
|
ReadOptions read_opts;
|
|
read_opts.fill_cache = fill_cache;
|
|
size_t ts_sz = options_.comparator->timestamp_size();
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
100 /* num_block */,
|
|
true /* mixed_with_human_readable_string_value */, ts_sz,
|
|
same_key_diff_ts_, comparator_);
|
|
std::string table_name = "BlockBasedTableReaderTest_NewIterator" +
|
|
CompressionTypeToString(compression_type_) +
|
|
"_async" + std::to_string(use_async_io);
|
|
ImmutableOptions ioptions(options_);
|
|
// Only insert 60 out of 100 blocks
|
|
CreateTable(table_name, ioptions, compression_type_,
|
|
std::vector<std::pair<std::string, std::string>>{
|
|
kv.begin() + 20 * kEntriesPerBlock,
|
|
kv.begin() + 80 * kEntriesPerBlock},
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options_.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
// 1. Should coalesce into a single I/O
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.insert(ExtractUserKey(kv[30 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[31 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[32 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[33 * kEntriesPerBlock].first));
|
|
auto read_count_before =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
|
|
iter->Prepare(&scan_options);
|
|
iter->Seek(kv[30 * kEntriesPerBlock].first);
|
|
for (size_t i = 30 * kEntriesPerBlock; i <= 31 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->status().ok()) << iter->status().ToString();
|
|
ASSERT_TRUE(iter->Valid()) << i;
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
// Iter may still be valid after scan range. Upper layer (DBIter) handles
|
|
// exact upper bound checking. So we don't check !iter->Valid() here.
|
|
ASSERT_OK(iter->status());
|
|
iter->Seek(kv[32 * kEntriesPerBlock].first);
|
|
for (size_t i = 32 * kEntriesPerBlock; i < 33 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
ASSERT_OK(iter->status());
|
|
auto read_count_after =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
ASSERT_EQ(read_count_before + 1, read_count_after);
|
|
|
|
// 2. No IO coalesce, should do MultiRead/ReadAsync with 2 read requests.
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.insert(ExtractUserKey(kv[40 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[45 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[70 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[75 * kEntriesPerBlock].first));
|
|
|
|
read_count_before =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
iter->Prepare(&scan_options);
|
|
|
|
iter->Seek(kv[40 * kEntriesPerBlock].first);
|
|
for (size_t i = 40 * kEntriesPerBlock; i < 45 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
ASSERT_OK(iter->status());
|
|
iter->Seek(kv[70 * kEntriesPerBlock].first);
|
|
for (size_t i = 70 * kEntriesPerBlock; i < 75 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
ASSERT_OK(iter->status());
|
|
|
|
read_count_after =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
ASSERT_EQ(read_count_before + 2, read_count_after);
|
|
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
// 3. Tests I/O excludes blocks already in cache.
|
|
// Reading blocks from 40-79
|
|
// From reads above, blocks 40-44 and 70-74 already in cache
|
|
// So we should read 45-69, 75-79 in two I/Os.
|
|
// If fill_cache is false, then we'll do one giant I/O.
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.insert(ExtractUserKey(kv[40 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[80 * kEntriesPerBlock].first));
|
|
read_count_before =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
iter->Prepare(&scan_options);
|
|
read_count_after =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
if (!use_async_io) {
|
|
if (!fill_cache) {
|
|
ASSERT_EQ(read_count_before + 1, read_count_after);
|
|
} else {
|
|
ASSERT_EQ(read_count_before + 2, read_count_after);
|
|
}
|
|
} else {
|
|
// stat is recorded in async callback which happens in Poll(), and
|
|
// Poll() happens during scanning.
|
|
ASSERT_EQ(read_count_before, read_count_after);
|
|
}
|
|
|
|
iter->Seek(kv[40 * kEntriesPerBlock].first);
|
|
for (size_t i = 40 * kEntriesPerBlock; i < 80 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
ASSERT_FALSE(iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
read_count_after =
|
|
options_.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
|
|
if (!fill_cache) {
|
|
ASSERT_EQ(read_count_before + 1, read_count_after);
|
|
} else {
|
|
ASSERT_EQ(read_count_before + 2, read_count_after);
|
|
}
|
|
|
|
// 4. Check cases when Seek key does not match start key in ScanOptions
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.insert(ExtractUserKey(kv[30 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[40 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[50 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[60 * kEntriesPerBlock].first));
|
|
iter->Prepare(&scan_options);
|
|
// Match start key
|
|
iter->Seek(kv[30 * kEntriesPerBlock].first);
|
|
for (size_t i = 30 * kEntriesPerBlock; i < 40 * kEntriesPerBlock; ++i) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(iter->key().ToString(), kv[i].first);
|
|
iter->Next();
|
|
}
|
|
ASSERT_OK(iter->status());
|
|
|
|
// Seek a key that is larger than next start key is allowed, as long as it is
|
|
// larger than the previous key
|
|
iter->Seek(kv[50 * kEntriesPerBlock + 1].first);
|
|
ASSERT_OK(iter->status());
|
|
|
|
// Check seek key going backward
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.insert(ExtractUserKey(kv[30 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[31 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[32 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[33 * kEntriesPerBlock].first));
|
|
iter->Prepare(&scan_options);
|
|
iter->Seek(kv[32 * kEntriesPerBlock].first);
|
|
auto key = iter->key();
|
|
ASSERT_OK(iter->status());
|
|
iter->Seek(kv[30 * kEntriesPerBlock].first);
|
|
// When seek key goes backward, it is adjusted to the last seeked position.
|
|
// Assert the key read is same as before.
|
|
ASSERT_EQ(key, iter->key());
|
|
ASSERT_OK(iter->status());
|
|
|
|
// Test prefetch limit reached.
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.max_prefetch_size = 1024; // less than block size
|
|
scan_options.insert(ExtractUserKey(kv[30 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[40 * kEntriesPerBlock].first));
|
|
iter->Prepare(&scan_options);
|
|
iter->Seek(kv[31 * kEntriesPerBlock].first);
|
|
ASSERT_TRUE(iter->status().IsIncomplete());
|
|
|
|
// Randomly seek keys on the file, as long as the key is moving forward, it
|
|
// is allowed
|
|
|
|
if (use_async_io) {
|
|
// Skip following test when async io is enabled. There is some issue with
|
|
// IO_uring that I am still trying to root cause.
|
|
// TODO : enable the test again with async IO
|
|
return;
|
|
}
|
|
for (int i = 0; i < 100; i++) {
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
scan_options = MultiScanArgs(comparator_);
|
|
scan_options.use_async_io = use_async_io;
|
|
scan_options.insert(ExtractUserKey(kv[5 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[10 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[25 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[35 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[35 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[40 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[45 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[50 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[75 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[85 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[85 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[95 * kEntriesPerBlock].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
|
|
auto random_seed = static_cast<uint32_t>(
|
|
std::chrono::duration_cast<std::chrono::nanoseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch())
|
|
.count());
|
|
Random rnd(random_seed);
|
|
std::cout << random_seed << std::endl;
|
|
SCOPED_TRACE("Random seed " + std::to_string(random_seed));
|
|
|
|
// Search key always start from the start key of first prepared range.
|
|
int last_read_key_index = rnd.Uniform(100) + 5 * kEntriesPerBlock;
|
|
while (last_read_key_index < 100 * kEntriesPerBlock) {
|
|
iter->Seek(kv[last_read_key_index].first);
|
|
EXPECT_OK(iter->status());
|
|
// iterate for a few keys
|
|
while (iter->Valid()) {
|
|
iter->Next();
|
|
last_read_key_index++;
|
|
EXPECT_OK(iter->status());
|
|
}
|
|
last_read_key_index += rnd.Uniform(100);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_P(BlockBasedTableReaderMultiScanTest, MultiScanPrefetchSizeLimit) {
|
|
if (compression_type_ != kNoCompression) {
|
|
// This test relies on block sizes to be close to what's set in option.
|
|
ROCKSDB_GTEST_BYPASS("This test assumes no compression.");
|
|
return;
|
|
}
|
|
ReadOptions read_opts;
|
|
size_t ts_sz = options_.comparator->timestamp_size();
|
|
|
|
// Generate data that spans multiple blocks
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
20 /* num_block */, true /* mixed_with_human_readable_string_value */,
|
|
ts_sz, same_key_diff_ts_, comparator_);
|
|
|
|
std::string table_name = "BlockBasedTableReaderTest_PrefetchSizeLimit" +
|
|
CompressionTypeToString(compression_type_);
|
|
|
|
ImmutableOptions ioptions(options_);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options_.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
// Default block size is 4KB
|
|
//
|
|
// Tests when no block is loaded
|
|
{
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.max_prefetch_size = 1024; // less than block size
|
|
scan_options.insert(ExtractUserKey(kv[0].first),
|
|
ExtractUserKey(kv[5].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
|
|
// Should be able to scan the first block, but not more
|
|
iter->Seek(kv[0].first);
|
|
ASSERT_FALSE(iter->Valid());
|
|
ASSERT_TRUE(iter->status().IsPrefetchLimitReached());
|
|
}
|
|
|
|
// Some blocks are loaded
|
|
{
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.max_prefetch_size = 9 * 1024; // 9KB - 2 blocks with buffer
|
|
scan_options.insert(ExtractUserKey(kv[1 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[8 * kEntriesPerBlock].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
iter->Seek(kv[1 * kEntriesPerBlock].first);
|
|
size_t scanned_keys = 0;
|
|
|
|
// Should be able to scan up to 2 blocks worth of data
|
|
while (iter->Valid()) {
|
|
ASSERT_EQ(iter->key().ToString(),
|
|
kv[scanned_keys + 1 * kEntriesPerBlock].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
}
|
|
|
|
ASSERT_TRUE(iter->status().IsPrefetchLimitReached());
|
|
ASSERT_EQ(scanned_keys, 2 * kEntriesPerBlock);
|
|
}
|
|
|
|
// Tests with some block loaded in cache already:
|
|
// Blocks 1 and 2 are already in cache by the above test.
|
|
// Here we try blocks 0 - 5, with prefetch limit to 3 blocks, and expect to
|
|
// read 3 blocks.
|
|
{
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.max_prefetch_size = 3 * 4 * 1024 + 1024; // 3 blocks + 1KB
|
|
scan_options.insert(ExtractUserKey(kv[0].first),
|
|
ExtractUserKey(kv[5 * kEntriesPerBlock].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
iter->Seek(kv[0].first);
|
|
size_t scanned_keys = 0;
|
|
// Should only read 3 blocks (blocks 0, 1, 2)
|
|
// already cached.
|
|
while (iter->Valid()) {
|
|
ASSERT_EQ(iter->key().ToString(), kv[scanned_keys].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
}
|
|
ASSERT_TRUE(iter->status().IsPrefetchLimitReached());
|
|
ASSERT_EQ(scanned_keys, 3 * kEntriesPerBlock);
|
|
}
|
|
|
|
// Multiple scan ranges with prefetch limit
|
|
{
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.max_prefetch_size = 5 * 4 * 1024 + 1024; // 5 blocks + 1KB
|
|
// Will read 5 entries from first scan range, and 4 blocks from the second
|
|
// scan range
|
|
scan_options.insert(ExtractUserKey(kv[0].first),
|
|
ExtractUserKey(kv[5].first));
|
|
scan_options.insert(ExtractUserKey(kv[12 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[17 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[18 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[19 * kEntriesPerBlock].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
|
|
iter->Seek(kv[0].first);
|
|
size_t scanned_keys = 0;
|
|
size_t key_idx = 0;
|
|
while (iter->Valid()) {
|
|
ASSERT_EQ(iter->key().ToString(), kv[key_idx].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
key_idx++;
|
|
if (key_idx == 5) {
|
|
iter->Seek(kv[12 * kEntriesPerBlock].first);
|
|
key_idx = 12 * kEntriesPerBlock;
|
|
}
|
|
}
|
|
ASSERT_EQ(scanned_keys, 5 + 4 * kEntriesPerBlock);
|
|
ASSERT_TRUE(iter->status().IsPrefetchLimitReached());
|
|
}
|
|
|
|
// Prefetch limit is big enough for all scan ranges.
|
|
{
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(comparator_);
|
|
scan_options.max_prefetch_size = 10 * 1024 * 1024; // 10MB
|
|
scan_options.insert(ExtractUserKey(kv[0].first),
|
|
ExtractUserKey(kv[5].first));
|
|
scan_options.insert(ExtractUserKey(kv[8 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[12 * kEntriesPerBlock].first));
|
|
scan_options.insert(ExtractUserKey(kv[18 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[19 * kEntriesPerBlock].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
|
|
iter->Seek(kv[0].first);
|
|
size_t scanned_keys = 0;
|
|
size_t key_idx = 0;
|
|
// Scan first range
|
|
while (iter->Valid() && key_idx < 5) {
|
|
ASSERT_EQ(iter->key().ToString(), kv[key_idx].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
key_idx++;
|
|
}
|
|
// Move to second range
|
|
iter->Seek(kv[8 * kEntriesPerBlock].first);
|
|
key_idx = 8 * kEntriesPerBlock;
|
|
while (iter->Valid() && key_idx < 12 * kEntriesPerBlock) {
|
|
ASSERT_EQ(iter->key().ToString(), kv[key_idx].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
key_idx++;
|
|
}
|
|
// Move to third range
|
|
iter->Seek(kv[18 * kEntriesPerBlock].first);
|
|
key_idx = 18 * kEntriesPerBlock;
|
|
while (iter->Valid() && key_idx < 19 * kEntriesPerBlock) {
|
|
ASSERT_EQ(iter->key().ToString(), kv[key_idx].first);
|
|
iter->Next();
|
|
scanned_keys++;
|
|
key_idx++;
|
|
}
|
|
// Should not hit prefetch limit
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_EQ(scanned_keys, 5 + 4 * kEntriesPerBlock + 1 * kEntriesPerBlock);
|
|
}
|
|
}
|
|
|
|
TEST_P(BlockBasedTableReaderMultiScanTest, MultiScanUnpinPreviousBlocks) {
|
|
std::vector<std::pair<std::string, std::string>> kv =
|
|
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
|
30 /* num_block */, true /* mixed_with_human_readable_string_value */,
|
|
comparator_->timestamp_size(), same_key_diff_ts_, comparator_);
|
|
std::string table_name = "BlockBasedTableReaderTest_UnpinPreviousBlocks" +
|
|
CompressionTypeToString(compression_type_);
|
|
ImmutableOptions ioptions(options_);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
FileOptions foptions;
|
|
foptions.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator comparator(options_.comparator);
|
|
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
|
true /* bool prefetch_index_and_filter_in_cache */,
|
|
nullptr /* status */, persist_udt_);
|
|
|
|
ReadOptions read_opts;
|
|
std::unique_ptr<InternalIterator> iter;
|
|
iter.reset(table->NewIterator(
|
|
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
|
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
|
|
|
MultiScanArgs scan_options(BytewiseComparator());
|
|
// Range 1: block 0-4, Range 2: block 4-4, Range 3: block 5-15
|
|
scan_options.insert(ExtractUserKey(kv[0 * kEntriesPerBlock].first),
|
|
ExtractUserKey(kv[5 * kEntriesPerBlock - 5].first));
|
|
scan_options.insert(ExtractUserKey(kv[5 * kEntriesPerBlock - 4].first),
|
|
ExtractUserKey(kv[5 * kEntriesPerBlock - 3].first));
|
|
scan_options.insert(ExtractUserKey(kv[5 * kEntriesPerBlock - 2].first),
|
|
ExtractUserKey(kv[15 * kEntriesPerBlock - 1].first));
|
|
|
|
iter->Prepare(&scan_options);
|
|
auto* bbiter = dynamic_cast<BlockBasedTableIterator*>(iter.get());
|
|
ASSERT_TRUE(bbiter);
|
|
for (int block = 0; block < 15; ++block) {
|
|
ASSERT_TRUE(bbiter->TEST_IsBlockPinnedByMultiScan(block)) << block;
|
|
}
|
|
|
|
// MultiScan require seeks to be called in scan_option order
|
|
iter->Seek(kv[0 * kEntriesPerBlock].first);
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
|
|
// Seek to second range - should unpin blocks from first range
|
|
iter->Seek(kv[5 * kEntriesPerBlock - 4].first);
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_EQ(iter->key(), kv[5 * kEntriesPerBlock - 4].first);
|
|
ASSERT_EQ(iter->value(), kv[5 * kEntriesPerBlock - 4].second);
|
|
|
|
// The last block (block 4) is shared with the second range, so
|
|
// it's not unpinned yet.
|
|
for (int block = 0; block < 4; ++block) {
|
|
ASSERT_FALSE(bbiter->TEST_IsBlockPinnedByMultiScan(block)) << block;
|
|
}
|
|
// Blocks from second range still in cache.
|
|
// We skip block 4 here since it's ownership is moved to the actual data
|
|
// block iter.
|
|
for (int block = 5; block < 15; ++block) {
|
|
ASSERT_TRUE(bbiter->TEST_IsBlockPinnedByMultiScan(block)) << block;
|
|
}
|
|
|
|
iter->Seek(kv[5 * kEntriesPerBlock - 2].first);
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_EQ(iter->key(), kv[5 * kEntriesPerBlock - 2].first);
|
|
ASSERT_EQ(iter->value(), kv[5 * kEntriesPerBlock - 2].second);
|
|
|
|
// Still pinned
|
|
for (int block = 5; block < 15; ++block) {
|
|
ASSERT_TRUE(bbiter->TEST_IsBlockPinnedByMultiScan(block)) << block;
|
|
}
|
|
}
|
|
|
|
// Test that fs_prefetch_support flag is correctly initialized during table
|
|
// construction based on filesystem capabilities
|
|
TEST_P(BlockBasedTableReaderTest, FSPrefetchSupportInitializedCorrectly) {
|
|
class ConfigurablePrefetchFS : public FileSystemWrapper {
|
|
public:
|
|
ConfigurablePrefetchFS(const std::shared_ptr<FileSystem>& target,
|
|
bool support_prefetch)
|
|
: FileSystemWrapper(target), support_prefetch_(support_prefetch) {}
|
|
|
|
static const char* kClassName() { return "ConfigurablePrefetchFS"; }
|
|
const char* Name() const override { return kClassName(); }
|
|
|
|
void SupportedOps(int64_t& supported_ops) override {
|
|
target()->SupportedOps(supported_ops);
|
|
if (!support_prefetch_) { // Disable prefetch support if requested
|
|
supported_ops &= ~(1 << FSSupportedOps::kFSPrefetch);
|
|
}
|
|
}
|
|
|
|
private:
|
|
bool support_prefetch_;
|
|
};
|
|
|
|
// Prepare test table
|
|
Options options;
|
|
options.persist_user_defined_timestamps = persist_udt_;
|
|
if (udt_enabled_) {
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
}
|
|
size_t ts_sz = options.comparator->timestamp_size();
|
|
auto kv = BlockBasedTableReaderBaseTest::GenerateKVMap(5, true, ts_sz);
|
|
std::string table_name = "BlockBasedTableReaderTest_BlockPrefetcherTest" +
|
|
CompressionTypeToString(compression_type_);
|
|
ImmutableOptions ioptions(options);
|
|
CreateTable(table_name, ioptions, compression_type_, kv,
|
|
compression_parallel_threads_, compression_dict_bytes_);
|
|
|
|
// Test Case 1: Filesystem supports prefetch, fs_prefetch_support should be
|
|
// true
|
|
{
|
|
auto fs_with_prefetch = std::make_shared<ConfigurablePrefetchFS>(
|
|
env_->GetFileSystem(), true /* support_prefetch */);
|
|
std::unique_ptr<Env> env_wrapper(
|
|
new CompositeEnvWrapper(env_, fs_with_prefetch));
|
|
options.env = env_wrapper.get();
|
|
|
|
FileOptions fopts;
|
|
fopts.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator cmp(options.comparator);
|
|
ImmutableOptions iopts(options);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
NewBlockBasedTableReader(fopts, iopts, cmp, table_name, &table,
|
|
false /* prefetch_index_and_filter_in_cache */,
|
|
nullptr, persist_udt_);
|
|
|
|
ASSERT_TRUE(table->get_rep()->fs_prefetch_support);
|
|
ASSERT_TRUE(CheckFSFeatureSupport(fs_with_prefetch.get(),
|
|
FSSupportedOps::kFSPrefetch));
|
|
}
|
|
|
|
// Test Case 2: Filesystem doesn't support prefetch, fs_prefetch_support
|
|
// should be false
|
|
{
|
|
auto fs_without_prefetch = std::make_shared<ConfigurablePrefetchFS>(
|
|
env_->GetFileSystem(), false /* support_prefetch */);
|
|
std::unique_ptr<Env> env_wrapper(
|
|
new CompositeEnvWrapper(env_, fs_without_prefetch));
|
|
options.env = env_wrapper.get();
|
|
|
|
FileOptions fopts;
|
|
fopts.use_direct_reads = use_direct_reads_;
|
|
InternalKeyComparator cmp(options.comparator);
|
|
ImmutableOptions iopts(options);
|
|
|
|
std::unique_ptr<BlockBasedTable> table;
|
|
NewBlockBasedTableReader(fopts, iopts, cmp, table_name, &table,
|
|
false /* prefetch_index_and_filter_in_cache */,
|
|
nullptr, persist_udt_);
|
|
|
|
ASSERT_FALSE(table->get_rep()->fs_prefetch_support);
|
|
ASSERT_FALSE(CheckFSFeatureSupport(fs_without_prefetch.get(),
|
|
FSSupportedOps::kFSPrefetch));
|
|
}
|
|
}
|
|
std::vector<BlockBasedTableReaderTestParam> GenerateCombinedParameters(
|
|
const std::vector<CompressionType>& compression_types,
|
|
const std::vector<bool>& use_direct_read_flags,
|
|
const std::vector<BlockBasedTableOptions::IndexType>& index_types,
|
|
const std::vector<bool>& no_block_cache_flags,
|
|
const std::vector<test::UserDefinedTimestampTestMode>& udt_test_modes,
|
|
const std::vector<int>& parallel_compression_thread_counts,
|
|
const std::vector<uint32_t>& compression_dict_byte_counts,
|
|
const std::vector<bool>& same_key_diff_ts_flags,
|
|
const std::vector<const Comparator*>& comparators,
|
|
const std::vector<bool>& fill_cache_flags,
|
|
const std::vector<bool>& use_async_io_flags,
|
|
const std::vector<bool>& block_align_flags,
|
|
const std::vector<size_t>& super_block_alignment_sizes,
|
|
const std::vector<size_t>& super_block_alignment_space_overhead_ratios) {
|
|
std::vector<BlockBasedTableReaderTestParam> params;
|
|
for (const auto& compression_type : compression_types) {
|
|
for (auto use_direct_read : use_direct_read_flags) {
|
|
for (const auto& index_type : index_types) {
|
|
for (auto no_block_cache : no_block_cache_flags) {
|
|
for (const auto& udt_test_mode : udt_test_modes) {
|
|
for (auto parallel_compression_thread_count :
|
|
parallel_compression_thread_counts) {
|
|
for (auto compression_dict_byte_count :
|
|
compression_dict_byte_counts) {
|
|
for (auto same_key_diff_ts_flag : same_key_diff_ts_flags) {
|
|
for (const auto& comparator : comparators) {
|
|
for (auto fill_cache : fill_cache_flags) {
|
|
for (auto use_async_io : use_async_io_flags) {
|
|
for (auto block_align : block_align_flags) {
|
|
for (auto super_block_alignment_size :
|
|
super_block_alignment_sizes) {
|
|
for (
|
|
auto
|
|
super_block_alignment_space_overhead_ratio :
|
|
super_block_alignment_space_overhead_ratios) {
|
|
if (super_block_alignment_size == 0) {
|
|
// Override padding size to 0 if alignment size
|
|
// is 0, which means no super block alignment
|
|
super_block_alignment_space_overhead_ratio = 0;
|
|
}
|
|
params.emplace_back(
|
|
compression_type, use_direct_read, index_type,
|
|
no_block_cache, udt_test_mode,
|
|
parallel_compression_thread_count,
|
|
compression_dict_byte_count,
|
|
same_key_diff_ts_flag, comparator, fill_cache,
|
|
use_async_io, block_align,
|
|
super_block_alignment_size,
|
|
super_block_alignment_space_overhead_ratio);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return params;
|
|
}
|
|
|
|
std::vector<bool> Bool() { return {true, false}; }
|
|
|
|
struct BlockBasedTableReaderTestParamBuilder {
|
|
BlockBasedTableReaderTestParamBuilder() {
|
|
// Default values
|
|
compression_types = GetSupportedCompressions();
|
|
use_direct_read_flags = Bool();
|
|
index_types = {
|
|
BlockBasedTableOptions::IndexType::kBinarySearch,
|
|
BlockBasedTableOptions::IndexType::kHashSearch,
|
|
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
|
|
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey};
|
|
no_block_cache_flags = {false};
|
|
udt_test_modes = {
|
|
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp};
|
|
parallel_compression_thread_counts = {1, 2};
|
|
compression_dict_byte_counts = {0, 4096};
|
|
same_key_diff_ts_flags = {false};
|
|
comparators = {BytewiseComparator()};
|
|
fill_cache_flags = {true};
|
|
use_async_io_flags = {false};
|
|
block_align_flags = {false};
|
|
super_block_alignment_sizes = {0};
|
|
super_block_alignment_space_overhead_ratios = {128};
|
|
}
|
|
|
|
// builder methods for each member
|
|
BlockBasedTableReaderTestParamBuilder& WithCompressionTypes(
|
|
const std::vector<CompressionType>& _compression_types) {
|
|
compression_types = _compression_types;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithUseDirectReadFlags(
|
|
const std::vector<bool>& _use_direct_read_flags) {
|
|
use_direct_read_flags = _use_direct_read_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithIndexTypes(
|
|
const std::vector<BlockBasedTableOptions::IndexType>& _index_types) {
|
|
index_types = _index_types;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithNoBlockCacheFlags(
|
|
const std::vector<bool>& _no_block_cache_flags) {
|
|
no_block_cache_flags = _no_block_cache_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithUDTTestModes(
|
|
const std::vector<test::UserDefinedTimestampTestMode>& _udt_test_modes) {
|
|
udt_test_modes = _udt_test_modes;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithParallelCompressionThreadCounts(
|
|
const std::vector<int>& _parallel_compression_thread_counts) {
|
|
parallel_compression_thread_counts = _parallel_compression_thread_counts;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithCompressionDictByteCounts(
|
|
const std::vector<uint32_t>& _compression_dict_byte_counts) {
|
|
compression_dict_byte_counts = _compression_dict_byte_counts;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithSameKeyDiffTsFlags(
|
|
const std::vector<bool>& _same_key_diff_ts_flags) {
|
|
same_key_diff_ts_flags = _same_key_diff_ts_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithComparators(
|
|
const std::vector<const Comparator*>& _comparators) {
|
|
comparators = _comparators;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithFillCacheFlags(
|
|
const std::vector<bool>& _fill_cache_flags) {
|
|
fill_cache_flags = _fill_cache_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithUseAsyncIoFlags(
|
|
const std::vector<bool>& _use_async_io_flags) {
|
|
use_async_io_flags = _use_async_io_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithBlockAlignFlags(
|
|
const std::vector<bool>& _block_align_flags) {
|
|
block_align_flags = _block_align_flags;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder& WithSuperBlockAlignmentSizes(
|
|
const std::vector<size_t>& _super_block_alignment_sizes) {
|
|
super_block_alignment_sizes = _super_block_alignment_sizes;
|
|
return *this;
|
|
}
|
|
|
|
BlockBasedTableReaderTestParamBuilder&
|
|
WithSuperBlockAlignmentSpaceOverheadRatios(
|
|
const std::vector<size_t>& _super_block_alignment_space_overhead_ratios) {
|
|
super_block_alignment_space_overhead_ratios =
|
|
_super_block_alignment_space_overhead_ratios;
|
|
return *this;
|
|
}
|
|
|
|
std::vector<BlockBasedTableReaderTestParam> build() {
|
|
return GenerateCombinedParameters(
|
|
compression_types, use_direct_read_flags, index_types,
|
|
no_block_cache_flags, udt_test_modes,
|
|
parallel_compression_thread_counts, compression_dict_byte_counts,
|
|
same_key_diff_ts_flags, comparators, fill_cache_flags,
|
|
use_async_io_flags, block_align_flags, super_block_alignment_sizes,
|
|
super_block_alignment_space_overhead_ratios);
|
|
}
|
|
|
|
std::vector<CompressionType> compression_types;
|
|
std::vector<bool> use_direct_read_flags;
|
|
std::vector<BlockBasedTableOptions::IndexType> index_types;
|
|
std::vector<bool> no_block_cache_flags;
|
|
std::vector<test::UserDefinedTimestampTestMode> udt_test_modes;
|
|
std::vector<int> parallel_compression_thread_counts;
|
|
std::vector<uint32_t> compression_dict_byte_counts;
|
|
std::vector<bool> same_key_diff_ts_flags;
|
|
std::vector<const Comparator*> comparators;
|
|
std::vector<bool> fill_cache_flags;
|
|
std::vector<bool> use_async_io_flags;
|
|
std::vector<bool> block_align_flags;
|
|
std::vector<size_t> super_block_alignment_sizes;
|
|
std::vector<size_t> super_block_alignment_space_overhead_ratios;
|
|
};
|
|
|
|
std::vector<bool> IOUringFlags() {
|
|
#ifdef ROCKSDB_IOURING_PRESENT
|
|
return {false, true};
|
|
#else
|
|
return {false};
|
|
#endif
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
BlockBasedTableReaderTest, BlockBasedTableReaderTest,
|
|
::testing::ValuesIn(BlockBasedTableReaderTestParamBuilder()
|
|
.WithUDTTestModes(test::GetUDTTestModes())
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
BlockBasedTableReaderMultiScanAsyncIOTest,
|
|
BlockBasedTableReaderMultiScanAsyncIOTest,
|
|
::testing::ValuesIn(BlockBasedTableReaderTestParamBuilder()
|
|
.WithComparators({BytewiseComparator(),
|
|
ReverseBytewiseComparator()})
|
|
.WithFillCacheFlags(Bool())
|
|
.WithUseAsyncIoFlags(IOUringFlags())
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
BlockBasedTableReaderMultiScanTest, BlockBasedTableReaderMultiScanTest,
|
|
::testing::ValuesIn(BlockBasedTableReaderTestParamBuilder()
|
|
.WithComparators({BytewiseComparator(),
|
|
ReverseBytewiseComparator()})
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
BlockBasedTableReaderGetTest, BlockBasedTableReaderGetTest,
|
|
::testing::ValuesIn(BlockBasedTableReaderTestParamBuilder()
|
|
.WithUDTTestModes(test::GetUDTTestModes())
|
|
.WithSameKeyDiffTsFlags(Bool())
|
|
.WithComparators({BytewiseComparator(),
|
|
ReverseBytewiseComparator()})
|
|
.WithFillCacheFlags({false})
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
BlockBasedTableReaderSuperBlockAlignTest, BlockBasedTableReaderGetTest,
|
|
::testing::ValuesIn(
|
|
BlockBasedTableReaderTestParamBuilder()
|
|
.WithIndexTypes(
|
|
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
|
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch})
|
|
.WithFillCacheFlags({false})
|
|
.WithBlockAlignFlags(Bool())
|
|
.WithSuperBlockAlignmentSizes({0, 32 * 1024, 16 * 1024})
|
|
.WithSuperBlockAlignmentSpaceOverheadRatios({0, 4, 256})
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
StrictCapacityLimitReaderTest, StrictCapacityLimitReaderTest,
|
|
::testing::ValuesIn(
|
|
BlockBasedTableReaderTestParamBuilder()
|
|
.WithIndexTypes(
|
|
{BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch})
|
|
.WithUDTTestModes(test::GetUDTTestModes())
|
|
.WithCompressionDictByteCounts({0})
|
|
.WithSameKeyDiffTsFlags(Bool())
|
|
.WithFillCacheFlags({false})
|
|
.build()));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
|
|
::testing::ValuesIn(
|
|
BlockBasedTableReaderTestParamBuilder()
|
|
.WithUseDirectReadFlags({false})
|
|
.WithIndexTypes(
|
|
{BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch})
|
|
.WithNoBlockCacheFlags({true})
|
|
.WithUDTTestModes(test::GetUDTTestModes())
|
|
.WithCompressionDictByteCounts({0})
|
|
.WithFillCacheFlags({false})
|
|
.build()));
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|