rocksdb/util/compression_test.cc
Peter Dillinger 60a0172096 Compression API clarifcations/minor fixes (#13775)
Summary:
* A number of comments clarifying contracts, etc.
* Make ReleaseWorkingArea public instead of protected because there are some limited cases where a wrapper implementation might want to call it directly
* Check non-empty dictionary precondition on MaybeCloneForDict
* Expand testing of wrapped WorkingAreas
* Random documentation improvement in block_builder.cc

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13775

Test Plan: existing and expanded tests and assertions

Reviewed By: hx235

Differential Revision: D78304550

Pulled By: pdillinger

fbshipit-source-id: e5f064e8405a5a49be123ee13145cb3626bbbfbf
2025-07-14 17:26:22 -07:00

1893 lines
73 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Testing various compression features
#include <cstdlib>
#include <memory>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/utilities/object_registry.h"
#include "table/block_based/block_builder.h"
#include "test_util/testutil.h"
#include "util/auto_tune_compressor.h"
#include "util/random.h"
#include "util/simple_mixed_compressor.h"
namespace ROCKSDB_NAMESPACE {
class DBCompressionTest : public DBTestBase {
public:
DBCompressionTest() : DBTestBase("compression_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBCompressionTest, PresetCompressionDict) {
// Verifies that compression ratio improves when dictionary is enabled, and
// improves even further when the dictionary is trained by ZSTD.
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
Options options;
// Make sure to use any custom env that the test is configured with.
options.env = CurrentOptions().env;
options.allow_concurrent_memtable_write = false;
options.arena_block_size = kBlockSizeBytes;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
options.num_levels = 2;
options.target_file_size_base = kL0FileBytes;
options.target_file_size_multiplier = 2;
options.write_buffer_size = kL0FileBytes;
BlockBasedTableOptions table_options;
table_options.block_size = kBlockSizeBytes;
std::vector<CompressionType> compression_types;
if (Zlib_Supported()) {
compression_types.push_back(kZlibCompression);
}
#if LZ4_VERSION_NUMBER >= 10400 // r124+
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
if (ZSTD_Supported()) {
compression_types.push_back(kZSTD);
}
enum DictionaryTypes : int {
kWithoutDict,
kWithDict,
kWithZSTDfinalizeDict,
kWithZSTDTrainedDict,
kDictEnd,
};
for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t bytes_without_dict = 0;
size_t bytes_with_dict = 0;
size_t bytes_with_zstd_finalize_dict = 0;
size_t bytes_with_zstd_trained_dict = 0;
for (int i = kWithoutDict; i < kDictEnd; i++) {
// First iteration: compress without preset dictionary
// Second iteration: compress with preset dictionary
// Third iteration (zstd only): compress with zstd-trained dictionary
//
// To make sure the compression dictionary has the intended effect, we
// verify the compressed size is smaller in successive iterations. Also in
// the non-first iterations, verify the data we get out is the same data
// we put in.
switch (i) {
case kWithoutDict:
options.compression_opts.max_dict_bytes = 0;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case kWithDict:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case kWithZSTDfinalizeDict:
if (compression_type != kZSTD ||
!ZSTD_FinalizeDictionarySupported()) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
options.compression_opts.use_zstd_dict_trainer = false;
break;
case kWithZSTDTrainedDict:
if (compression_type != kZSTD || !ZSTD_TrainDictionarySupported()) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
options.compression_opts.use_zstd_dict_trainer = true;
break;
default:
assert(false);
}
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_datas[10];
for (int j = 0; j < 10; ++j) {
seq_datas[j] =
rnd.RandomString(kBlockSizeBytes - kApproxPerBlockOverheadBytes);
}
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
seq_datas[(key_num / 10) % 10]));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
true /* disallow_trivial_move */));
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
// Get the live sst files size
size_t total_sst_bytes = TotalSize(1);
if (i == kWithoutDict) {
bytes_without_dict = total_sst_bytes;
} else if (i == kWithDict) {
bytes_with_dict = total_sst_bytes;
} else if (i == kWithZSTDfinalizeDict) {
bytes_with_zstd_finalize_dict = total_sst_bytes;
} else if (i == kWithZSTDTrainedDict) {
bytes_with_zstd_trained_dict = total_sst_bytes;
}
for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
}
if (i == kWithDict) {
ASSERT_GT(bytes_without_dict, bytes_with_dict);
} else if (i == kWithZSTDTrainedDict) {
// In zstd compression, it is sometimes possible that using a finalized
// dictionary does not get as good a compression ratio as raw content
// dictionary. But using a dictionary should always get better
// compression ratio than not using one.
ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_finalize_dict ||
bytes_without_dict > bytes_with_zstd_finalize_dict);
} else if (i == kWithZSTDTrainedDict) {
// In zstd compression, it is sometimes possible that using a trained
// dictionary does not get as good a compression ratio as without
// training.
// But using a dictionary (with or without training) should always get
// better compression ratio than not using one.
ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_trained_dict ||
bytes_without_dict > bytes_with_zstd_trained_dict);
}
DestroyAndReopen(options);
}
}
}
TEST_F(DBCompressionTest, PresetCompressionDictLocality) {
if (!ZSTD_Supported()) {
return;
}
// Verifies that compression dictionary is generated from local data. The
// verification simply checks all output SSTs have different compression
// dictionaries. We do not verify effectiveness as that'd likely be flaky in
// the future.
const int kNumEntriesPerFile = 1 << 10; // 1KB
const int kNumBytesPerEntry = 1 << 10; // 1KB
const int kNumFiles = 4;
Options options = CurrentOptions();
options.compression = kZSTD;
options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumEntriesPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
rnd.RandomString(kNumBytesPerEntry)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
}
// Store all the dictionaries generated during a full compaction.
std::vector<std::string> compression_dicts;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
[&](void* arg) {
compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
// Dictionary compression should not be so good as to compress four totally
// random files into one. If it does then there's probably something wrong
// with the test.
ASSERT_GT(NumTableFilesAtLevel(1), 1);
// Furthermore, there should be one compression dictionary generated per file.
// And they should all be different from each other.
ASSERT_EQ(NumTableFilesAtLevel(1),
static_cast<int>(compression_dicts.size()));
for (size_t i = 1; i < compression_dicts.size(); ++i) {
std::string& a = compression_dicts[i - 1];
std::string& b = compression_dicts[i];
size_t alen = a.size();
size_t blen = b.size();
ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
}
}
static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
return r;
}
TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel) {
if (!Snappy_Supported()) {
return;
}
const int kNKeys = 120;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
Random rnd(301);
Options options;
options.env = env_;
options.create_if_missing = true;
options.db_write_buffer_size = 20480;
options.write_buffer_size = 20480;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.target_file_size_base = 20480;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 102400;
options.max_bytes_for_level_multiplier = 4;
options.max_background_compactions = 1;
options.num_levels = 5;
options.statistics = CreateDBStatistics();
options.compression_per_level.resize(3);
// No compression for L0
options.compression_per_level[0] = kNoCompression;
// No compression for the Ln whre L0 is compacted to
options.compression_per_level[1] = kNoCompression;
// Snappy compression for Ln+1
options.compression_per_level[2] = kSnappyCompression;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
// Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
// be compressed, so there shouldn't be any compression.
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_TRUE(NumTableFilesAtLevel(0) > 0 || NumTableFilesAtLevel(4) > 0);
// Verify there was no compression
auto num_block_compressed =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
ASSERT_EQ(num_block_compressed, 0);
// Insert 400KB and there will be some files end up in L3. According to the
// above compression settings for each level, there will be some compression.
ASSERT_OK(options.statistics->Reset());
ASSERT_EQ(num_block_compressed, 0);
for (int i = 20; i < 120; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GE(NumTableFilesAtLevel(3), 1);
ASSERT_GE(NumTableFilesAtLevel(4), 1);
// Verify there was compression
num_block_compressed =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
ASSERT_GT(num_block_compressed, 0);
// Make sure data in files in L3 is not compacted by removing all files
// in L4 and calculate number of rows
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta);
// Ensure that L1+ files are non-overlapping and together with L0 encompass
// full key range between smallestkey and largestkey from CF file metadata.
int largestkey_in_prev_level = -1;
int keys_found = 0;
for (int level = (int)cf_meta.levels.size() - 1; level >= 0; level--) {
int files_in_level = (int)cf_meta.levels[level].files.size();
int largestkey_in_prev_file = -1;
for (int j = 0; j < files_in_level; j++) {
int smallestkey = IdFromKey(cf_meta.levels[level].files[j].smallestkey);
int largestkey = IdFromKey(cf_meta.levels[level].files[j].largestkey);
int num_entries = (int)cf_meta.levels[level].files[j].num_entries;
ASSERT_EQ(num_entries, largestkey - smallestkey + 1);
keys_found += num_entries;
if (level > 0) {
if (j == 0) {
ASSERT_GT(smallestkey, largestkey_in_prev_level);
}
if (j > 0) {
ASSERT_GT(smallestkey, largestkey_in_prev_file);
}
if (j == files_in_level - 1) {
largestkey_in_prev_level = largestkey;
}
}
largestkey_in_prev_file = largestkey;
}
}
ASSERT_EQ(keys_found, kNKeys);
for (const auto& file : cf_meta.levels[4].files) {
listener->SetExpectedFileName(dbname_ + file.name);
const RangeOpt ranges(file.smallestkey, file.largestkey);
// Given verification from above, we're guaranteed that by deleting all the
// files in [<smallestkey>, <largestkey>] range, we're effectively deleting
// that very single file and nothing more.
EXPECT_OK(dbfull()->DeleteFilesInRanges(dbfull()->DefaultColumnFamily(),
&ranges, true /* include_end */));
}
listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
int num_keys = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GE(NumTableFilesAtLevel(3), 1);
ASSERT_EQ(NumTableFilesAtLevel(4), 0);
ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U + num_keys * 10U);
}
TEST_F(DBCompressionTest, DynamicLevelCompressionPerLevel2) {
if (!Snappy_Supported() || !LZ4_Supported() || !Zlib_Supported()) {
return;
}
const int kNKeys = 500;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
RandomShuffle(std::begin(keys), std::end(keys));
Random rnd(301);
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 6000000;
options.write_buffer_size = 600000;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.soft_pending_compaction_bytes_limit = 1024 * 1024;
options.target_file_size_base = 20;
options.env = env_;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 200;
options.max_bytes_for_level_multiplier = 8;
options.max_background_compactions = 1;
options.num_levels = 5;
std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
options.table_factory = mtf;
options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kLZ4Compression;
options.compression_per_level[2] = kZlibCompression;
DestroyAndReopen(options);
// When base level is L4, L4 is LZ4.
std::atomic<int> num_zlib(0);
std::atomic<int> num_lz4(0);
std::atomic<int> num_no(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
if (compaction->output_level() == 4) {
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
auto* compression = static_cast<CompressionType*>(arg);
ASSERT_TRUE(*compression == kNoCompression);
num_no.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 100; i++) {
std::string value = rnd.RandomString(200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 25 == 24) {
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), 0);
ASSERT_GT(num_no.load(), 2);
ASSERT_GT(num_lz4.load(), 0);
int prev_num_files_l4 = NumTableFilesAtLevel(4);
// After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
num_lz4.store(0);
num_no.store(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
if (compaction->output_level() == 4 && compaction->start_level() == 3) {
ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
num_zlib.fetch_add(1);
} else {
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
auto* compression = static_cast<CompressionType*>(arg);
ASSERT_TRUE(*compression == kNoCompression);
num_no.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 101; i < 500; i++) {
std::string value = rnd.RandomString(200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 100 == 99) {
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_GT(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
ASSERT_GT(num_no.load(), 2);
ASSERT_GT(num_lz4.load(), 0);
ASSERT_GT(num_zlib.load(), 0);
}
class PresetCompressionDictTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
public:
PresetCompressionDictTest()
: DBTestBase("db_test2", false /* env_do_fsync */),
compression_type_(std::get<0>(GetParam())),
bottommost_(std::get<1>(GetParam())) {}
protected:
const CompressionType compression_type_;
const bool bottommost_;
};
INSTANTIATE_TEST_CASE_P(
DBCompressionTest, PresetCompressionDictTest,
::testing::Combine(::testing::ValuesIn(GetSupportedDictCompressions()),
::testing::Bool()));
TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when
// `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
// size of the dictionary is within expectations according to the limit on
// buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the flush finishes.
if (bottommost_) {
// Flush is never considered bottommost. This should change in the future
// since flushed files may have nothing underneath them, like the one in
// this test case.
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else {
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in
// ZSTD's digested dictionary format.
if (compression_type_ != kZSTD) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit
// after each block is built.
ASSERT_LE(TestGetTickerCount(options,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
2 * kBlockLen);
}
}
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
// dictionary. Also verifies the size of the dictionary is within expectations
// according to the limit on buffering set by
// `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(2);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("2,0,1", FilesPerLevel(0));
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key-
// range.
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_EQ("0,1,1", FilesPerLevel(0));
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the compaction finishes.
if (bottommost_) {
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
} else {
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in
// ZSTD's digested dictionary format.
if (compression_type_ != kZSTD) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit
// after each block is built.
ASSERT_LE(TestGetTickerCount(options,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
}
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
// `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
// verifies the size of the dictionary is within expectations according to the
// limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (size_t j = 0; j <= kKeysPerFile; ++j) {
ASSERT_OK(Put(Key(static_cast<int>(j)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("2", FilesPerLevel(0));
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in ZSTD's
// digested dictionary format.
if (compression_type_ != kZSTD) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
}
class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)
: db_options_(db_options) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
// Figure out last level with files
int bottommost_level = 0;
for (int level = 0; level < db->NumberLevels(); level++) {
std::string files_at_level;
ASSERT_TRUE(
db->GetProperty("rocksdb.num-files-at-level" + std::to_string(level),
&files_at_level));
if (files_at_level != "0") {
bottommost_level = level;
}
}
if (db_options_->bottommost_compression != kDisableCompressionOption &&
ci.output_level == bottommost_level) {
ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
} else if (db_options_->compression_per_level.size() != 0) {
ASSERT_EQ(ci.compression,
db_options_->compression_per_level[ci.output_level]);
} else {
ASSERT_EQ(ci.compression, db_options_->compression);
}
max_level_checked = std::max(max_level_checked, ci.output_level);
}
int max_level_checked = 0;
const Options* db_options_;
};
enum CompressionFailureType {
kTestCompressionFail,
kTestDecompressionFail,
kTestDecompressionCorruption
};
class CompressionFailuresTest
: public DBCompressionTest,
public testing::WithParamInterface<std::tuple<
CompressionFailureType, CompressionType, uint32_t, uint32_t>> {
public:
CompressionFailuresTest() {
std::tie(compression_failure_type_, compression_type_,
compression_max_dict_bytes_, compression_parallel_threads_) =
GetParam();
}
CompressionFailureType compression_failure_type_ = kTestCompressionFail;
CompressionType compression_type_ = kNoCompression;
uint32_t compression_max_dict_bytes_ = 0;
uint32_t compression_parallel_threads_ = 0;
};
INSTANTIATE_TEST_CASE_P(
DBCompressionTest, CompressionFailuresTest,
::testing::Combine(::testing::Values(kTestCompressionFail,
kTestDecompressionFail,
kTestDecompressionCorruption),
::testing::ValuesIn(GetSupportedCompressions()),
::testing::Values(0, 10), ::testing::Values(1, 4)));
TEST_P(CompressionFailuresTest, CompressionFailures) {
if (compression_type_ == kNoCompression) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.target_file_size_base = 512;
BlockBasedTableOptions table_options;
table_options.block_size = 512;
table_options.verify_compression = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = compression_type_;
options.compression_opts.parallel_threads = compression_parallel_threads_;
options.compression_opts.max_dict_bytes = compression_max_dict_bytes_;
options.bottommost_compression_opts.parallel_threads =
compression_parallel_threads_;
options.bottommost_compression_opts.max_dict_bytes =
compression_max_dict_bytes_;
if (compression_failure_type_ == kTestCompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompressData:TamperWithReturnValue", [](void* arg) {
bool* ret = static_cast<bool*>(arg);
*ret = false;
});
} else if (compression_failure_type_ == kTestDecompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DecompressBlockData:TamperWithReturnValue", [](void* arg) {
Status* ret = static_cast<Status*>(arg);
ASSERT_OK(*ret);
*ret = Status::Corruption("kTestDecompressionFail");
});
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DecompressBlockData:TamperWithDecompressionOutput", [](void* arg) {
BlockContents* contents = static_cast<BlockContents*>(arg);
// Ensure uncompressed data != original data
const size_t len = contents->data.size() + 1;
std::unique_ptr<char[]> fake_data(new char[len]());
*contents = BlockContents(std::move(fake_data), len);
});
}
std::map<std::string, std::string> key_value_written;
const int kKeySize = 5;
const int kValUnitSize = 16;
const int kValSize = 256;
Random rnd(405);
Status s = Status::OK();
DestroyAndReopen(options);
// Write 10 random files
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
std::string key = rnd.RandomString(kKeySize);
// Ensure good compression ratio
std::string valueUnit = rnd.RandomString(kValUnitSize);
std::string value;
for (int k = 0; k < kValSize; k += kValUnitSize) {
value += valueUnit;
}
s = Put(key, value);
if (compression_failure_type_ == kTestCompressionFail) {
key_value_written[key] = value;
ASSERT_OK(s);
}
}
s = Flush();
if (compression_failure_type_ == kTestCompressionFail) {
ASSERT_OK(s);
}
s = dbfull()->TEST_WaitForCompact();
if (compression_failure_type_ == kTestCompressionFail) {
ASSERT_OK(s);
}
if (i == 4) {
// Make compression fail at the mid of table building
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
if (compression_failure_type_ == kTestCompressionFail) {
// Should be kNoCompression, check content consistency
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_OK(db_iter->status());
ASSERT_EQ(0, key_value_written.size());
} else if (compression_failure_type_ == kTestDecompressionFail) {
ASSERT_EQ(std::string(s.getState()),
"Could not decompress: kTestDecompressionFail");
} else if (compression_failure_type_ == kTestDecompressionCorruption) {
ASSERT_EQ(std::string(s.getState()),
"Decompressed block did not match pre-compression block");
}
}
TEST_F(DBCompressionTest, CompressionOptions) {
if (!Zlib_Supported() || !Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 100;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
CompactionCompressionListener* listener =
new CompactionCompressionListener(&options);
options.listeners.emplace_back(listener);
const int kKeySize = 5;
const int kValSize = 20;
Random rnd(301);
std::vector<uint32_t> compression_parallel_threads = {1, 4};
std::map<std::string, std::string> key_value_written;
for (int iter = 0; iter <= 2; iter++) {
listener->max_level_checked = 0;
if (iter == 0) {
// Use different compression algorithms for different levels but
// always use Zlib for bottommost level
options.compression_per_level = {kNoCompression, kNoCompression,
kNoCompression, kSnappyCompression,
kSnappyCompression, kSnappyCompression,
kZlibCompression};
options.compression = kNoCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 1) {
// Use Snappy except for bottommost level use ZLib
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 2) {
// Use Snappy everywhere
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kDisableCompressionOption;
}
for (auto num_threads : compression_parallel_threads) {
options.compression_opts.parallel_threads = num_threads;
options.bottommost_compression_opts.parallel_threads = num_threads;
DestroyAndReopen(options);
// Write 10 random files
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
std::string key = rnd.RandomString(kKeySize);
std::string value = rnd.RandomString(kValSize);
key_value_written[key] = value;
ASSERT_OK(Put(key, value));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6);
// Make sure database content is the same as key_value_written
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_OK(db_iter->status());
ASSERT_EQ(0, key_value_written.size());
}
}
}
TEST_F(DBCompressionTest, RoundRobinManager) {
if (ZSTD_Supported()) {
auto mgr =
std::make_shared<RoundRobinManager>(GetBuiltinV2CompressionManager());
std::vector<std::string> values;
for (bool use_wrapper : {true}) {
SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
Random rnd(301);
constexpr int kCount = 13;
// Highly compressible blocks, except 1 non-compressible. Half of the
// compressible are morked for bypass and 1 marked for rejection. Values
// are large enough to ensure just 1 k-v per block.
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
// One non-compressible block
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
}
values.push_back(value);
ASSERT_OK(Put(Key(i), value));
ASSERT_EQ(Get(Key(i)), value);
}
ASSERT_OK(Flush());
// Ensure well-formed for reads
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
ASSERT_EQ(Get(Key(i)), values[i]);
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
}
}
}
TEST_F(DBCompressionTest, RandomMixedCompressionManager) {
if (ZSTD_Supported()) {
auto mgr = std::make_shared<RandomMixedCompressionManager>(
GetBuiltinV2CompressionManager());
std::vector<std::string> values;
for (bool use_wrapper : {true}) {
SCOPED_TRACE((use_wrapper ? "With " : "No ") + std::string("wrapper"));
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
Random rnd(301);
constexpr int kCount = 13;
// Highly compressible blocks, except 1 non-compressible. Half of the
// compressible are morked for bypass and 1 marked for rejection. Values
// are large enough to ensure just 1 k-v per block.
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
// One non-compressible block
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
}
values.push_back(value);
ASSERT_OK(Put(Key(i), value));
ASSERT_EQ(Get(Key(i)), value);
}
ASSERT_OK(Flush());
// Ensure well-formed for reads
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
ASSERT_EQ(Get(Key(i)), values[i]);
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
}
}
}
TEST_F(DBCompressionTest, CompressionManagerWrapper) {
// Test that we can use a custom CompressionManager to wrap the built-in
// CompressionManager, thus adopting a custom *strategy* based on existing
// algorithms. This will "mark" some blocks (in their contents) as "do not
// compress", i.e. no attempt to compress, and some blocks as "reject
// compression", i.e. compression attempted but rejected because of ratio
// or otherwise. These cases are distinguishable for statistics that
// approximate "wasted effort".
static std::string kDoNotCompress = "do_not_compress";
static std::string kRejectCompression = "reject_compression";
struct MyCompressor : public CompressorWrapper {
using CompressorWrapper::CompressorWrapper;
const char* Name() const override { return "MyCompressor"; }
Status CompressBlock(Slice uncompressed_data,
std::string* compressed_output,
CompressionType* out_compression_type,
ManagedWorkingArea* working_area) override {
auto begin = uncompressed_data.data();
auto end = uncompressed_data.data() + uncompressed_data.size();
if (std::search(begin, end, kDoNotCompress.begin(),
kDoNotCompress.end()) != end) {
// Do not attempt compression
EXPECT_EQ(*out_compression_type, kNoCompression);
return Status::OK();
} else if (std::search(begin, end, kRejectCompression.begin(),
kRejectCompression.end()) != end) {
// Simulate attempted & rejected compression
*compressed_output = "blah";
EXPECT_EQ(*out_compression_type, kNoCompression);
return Status::OK();
} else {
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
out_compression_type, working_area);
}
}
// Also check WorkingArea handling
struct MyWorkingArea : public WorkingArea {
explicit MyWorkingArea(ManagedWorkingArea&& wrapped)
: wrapped_(std::move(wrapped)) {}
ManagedWorkingArea wrapped_;
};
ManagedWorkingArea ObtainWorkingArea() override {
ManagedWorkingArea rv{
new MyWorkingArea{CompressorWrapper::ObtainWorkingArea()}, this};
if (GetPreferredCompressionType() == kZSTD) {
// ZSTD should always use WorkingArea, so this is our chance to ensure
// CompressorWrapper::ObtainWorkingArea() is properly connected
assert(rv.get() != nullptr);
}
return rv;
}
void ReleaseWorkingArea(WorkingArea* wa) override {
delete static_cast<MyWorkingArea*>(wa);
}
};
struct MyManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override { return "MyManager"; }
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override {
return std::make_unique<MyCompressor>(
wrapped_->GetCompressorForSST(context, opts, preferred));
}
};
auto mgr = std::make_shared<MyManager>(GetBuiltinV2CompressionManager());
for (CompressionType type : GetSupportedCompressions()) {
for (bool use_wrapper : {false, true}) {
if (type == kNoCompression) {
continue;
}
SCOPED_TRACE("Compression type: " + std::to_string(type) +
(use_wrapper ? " with " : " no ") + "wrapper");
Options options = CurrentOptions();
options.compression = type;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.compression_manager = use_wrapper ? mgr : nullptr;
DestroyAndReopen(options);
auto PopStat = [&](Tickers t) -> uint64_t {
return options.statistics->getAndResetTickerCount(t);
};
Random rnd(301);
constexpr int kCount = 13;
// Highly compressible blocks, except 1 non-compressible. Half of the
// compressible are morked for bypass and 1 marked for rejection. Values
// are large enough to ensure just 1 k-v per block.
for (int i = 0; i < kCount; ++i) {
std::string value;
if (i == 6) {
// One non-compressible block
value = rnd.RandomBinaryString(20000);
} else {
test::CompressibleString(&rnd, 0.1, 20000, &value);
if ((i % 2) == 0) {
// Half for bypass
value += kDoNotCompress;
} else if (i == 7) {
// One for rejection
value += kRejectCompression;
}
}
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
if (use_wrapper) {
EXPECT_EQ(kCount / 2 - 1, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(kCount / 2, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(1 + 1, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
} else {
EXPECT_EQ(kCount - 1, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(1, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
}
// Ensure well-formed for reads
for (int i = 0; i < kCount; ++i) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
}
ASSERT_EQ(Get(Key(kCount)), "NOT_FOUND");
}
}
}
TEST_F(DBCompressionTest, CompressionManagerCustomCompression) {
// Test that we can use a custom CompressionManager to implement custom
// compression algorithms, and that there are appropriate schema guard rails
// to ensure data is not processed by the wrong algorithm.
using Compressor8A = test::CompressorCustomAlg<kCustomCompression8A>;
using Compressor8B = test::CompressorCustomAlg<kCustomCompression8B>;
using Compressor8C = test::CompressorCustomAlg<kCustomCompression8C>;
if (!Compressor8A::Supported() || !LZ4_Supported()) {
fprintf(stderr,
"Prerequisite compression library not supported. Skipping\n");
return;
}
class MyManager : public CompressionManager {
public:
explicit MyManager(const char* compat_name) : compat_name_(compat_name) {}
const char* Name() const override { return name_.c_str(); }
const char* CompatibilityName() const override { return compat_name_; }
bool SupportsCompressionType(CompressionType type) const override {
return type == kCustomCompression8A || type == kCustomCompression8B ||
type == kCustomCompression8C ||
GetBuiltinV2CompressionManager()->SupportsCompressionType(type);
}
int used_compressor8A_count_ = 0;
int used_compressor8B_count_ = 0;
int used_compressor8C_count_ = 0;
std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
CompressionType type) override {
switch (static_cast<unsigned char>(type)) {
case kCustomCompression8A:
used_compressor8A_count_++;
return std::make_unique<Compressor8A>();
case kCustomCompression8B:
used_compressor8B_count_++;
return std::make_unique<Compressor8B>();
case kCustomCompression8C:
used_compressor8C_count_++;
return std::make_unique<Compressor8C>();
// Also support built-in compression algorithms
default:
return GetBuiltinV2CompressionManager()->GetCompressor(opts, type);
}
}
std::shared_ptr<Decompressor> GetDecompressor() override {
return std::make_shared<test::DecompressorCustomAlg>();
}
RelaxedAtomic<CompressionType> last_specific_decompressor_type_{
kNoCompression};
std::shared_ptr<Decompressor> GetDecompressorForTypes(
const CompressionType* types_begin,
const CompressionType* types_end) override {
assert(types_end > types_begin);
last_specific_decompressor_type_.StoreRelaxed(*types_begin);
auto decomp = std::make_shared<test::DecompressorCustomAlg>();
decomp->SetAllowedTypes(types_begin, types_end);
return decomp;
}
void AddFriend(const std::shared_ptr<CompressionManager>& mgr) {
friends_[mgr->CompatibilityName()] = mgr;
}
std::shared_ptr<CompressionManager> FindCompatibleCompressionManager(
Slice compatibility_name) override {
std::shared_ptr<CompressionManager> rv =
CompressionManager::FindCompatibleCompressionManager(
compatibility_name);
if (!rv) {
auto it = friends_.find(compatibility_name.ToString());
if (it != friends_.end()) {
return it->second.lock();
}
}
return rv;
}
private:
const char* compat_name_;
std::string name_;
// weak_ptr to avoid cycles
std::map<std::string, std::weak_ptr<CompressionManager>> friends_;
};
for (bool use_dict : {false, true}) {
SCOPED_TRACE(use_dict ? "With dict" : "No dict");
// Although these compression managers are actually compatible, we must
// respect their distinct compatibility names and treat them as incompatible
// (or else risk processing data incorrectly)
// NOTE: these are not registered in ObjectRegistry to test what happens
// when the original CompressionManager might not be available, but
// mgr_bar will be registered during the test, with different names to
// prevent interference between iterations.
auto mgr_foo = std::make_shared<MyManager>("Foo");
auto mgr_bar = std::make_shared<MyManager>(use_dict ? "Bar1" : "Bar2");
// And this one claims to be fully compatible with the built-in compression
// manager when it's not fully compatible (for custom CompressionTypes)
auto mgr_claim_compatible = std::make_shared<MyManager>("BuiltinV2");
constexpr uint16_t kValueSize = 10000;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 20;
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.format_version = 6; // Before custom compression alg support
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
// Claims not to use custom compression (and doesn't unless setting a custom
// CompressionType)
options.compression_manager = mgr_claim_compatible;
// Use a built-in compression type with dictionary support
options.compression = kLZ4Compression;
options.compression_opts.max_dict_bytes = kValueSize / 2;
DestroyAndReopen(options);
Random rnd(404);
std::string value;
ASSERT_OK(
Put("a", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
// That data should be readable without access to the original compression
// manager, because it used the built-in CompatibilityName and a built-in
// CompressionType
options.compression_manager = nullptr;
Reopen(options);
ASSERT_EQ(Get("a"), value);
// Verify it was compressed
Range r = {"a", "a0"};
TablePropertiesCollection tables_properties;
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "LZ4");
// Disallow setting a custom CompressionType with a CompressionManager
// claiming to be built-in compatible.
options.compression_manager = mgr_claim_compatible;
options.compression = kCustomCompression8A;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression_manager = nullptr;
options.compression = kCustomCompressionFE;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
options.compression =
static_cast<CompressionType>(kLastBuiltinCompression + 1);
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
// Custom compression schema (different CompatibilityName) not supported
// before format_version=7
options.compression_manager = mgr_foo;
options.compression = kLZ4Compression;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
// Set format version supporting custom compression
bbto.format_version = 7;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
// Custom compression type not supported with built-in schema name, even
// with format_version=7
options.compression_manager = mgr_claim_compatible;
options.compression = kCustomCompression8B;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kInvalidArgument);
// Custom compression schema, but specifying a custom compression type it
// doesn't support.
options.compression_manager = mgr_foo;
options.compression = kCustomCompressionF0;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
// Using a built-in compression type with fv=7 but named custom schema
options.compression = kLZ4Compression;
Reopen(options);
ASSERT_OK(
Put("b", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
ASSERT_EQ(Get("b"), value);
// Verify it was compressed with LZ4
r = {"b", "b0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
// Uses new format for "compression_name" property
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
// Custom compression type
options.compression = kCustomCompression8A;
Reopen(options);
ASSERT_OK(
Put("c", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
EXPECT_EQ(mgr_foo->used_compressor8A_count_, 0);
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
ASSERT_EQ(Get("c"), value);
EXPECT_EQ(mgr_foo->used_compressor8A_count_, 1);
// Verify it was compressed with custom format
r = {"c", "c0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8A;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kCustomCompression8A);
// Also dynamically changeable, because the compression manager will respect
// the current setting as reported under the legacy logic
ASSERT_OK(dbfull()->SetOptions({{"compression", "kLZ4Compression"}}));
ASSERT_OK(
Put("d", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
ASSERT_EQ(Get("d"), value);
// Verify it was compressed with LZ4
r = {"d", "d0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;04;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
// Dynamically changeable to custom compressions also
ASSERT_OK(dbfull()->SetOptions({{"compression", "kCustomCompression8B"}}));
ASSERT_OK(
Put("e", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 5);
ASSERT_EQ(Get("e"), value);
// Verify it was compressed with custom format
r = {"e", "e0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(tables_properties.begin()->second->compression_name, "Foo;8B;");
EXPECT_EQ(mgr_foo->last_specific_decompressor_type_.LoadRelaxed(),
kCustomCompression8B);
// Fails to re-open with incompatible compression manager (can't find
// compression manager Foo because it's not registered nor known by Bar)
options.compression_manager = mgr_bar;
options.compression = kLZ4Compression;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
// But should re-open if we make Bar aware of the Foo compression manager
mgr_bar->AddFriend(mgr_foo);
Reopen(options);
// Can still read everything
ASSERT_EQ(Get("a").size(), kValueSize);
ASSERT_EQ(Get("b").size(), kValueSize);
ASSERT_EQ(Get("c").size(), kValueSize);
ASSERT_EQ(Get("d").size(), kValueSize);
ASSERT_EQ(Get("e").size(), kValueSize);
// Add a file using mgr_bar
ASSERT_OK(
Put("f", test::CompressibleString(&rnd, 0.1, kValueSize, &value)));
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 6);
ASSERT_EQ(Get("f"), value);
// Verify it was compressed appropriately
r = {"f", "f0"};
tables_properties.clear();
ASSERT_OK(db_->GetPropertiesOfTablesInRange(db_->DefaultColumnFamily(), &r,
1, &tables_properties));
ASSERT_EQ(tables_properties.size(), 1U);
EXPECT_LT(tables_properties.begin()->second->data_size, kValueSize / 2);
EXPECT_EQ(mgr_bar->last_specific_decompressor_type_.LoadRelaxed(),
kLZ4Compression);
// Fails to re-open with incompatible compression manager (can't find
// compression manager Bar because it's not registered nor known by Foo)
options.compression_manager = mgr_foo;
ASSERT_EQ(TryReopen(options).code(), Status::Code::kNotSupported);
// Register and re-open
auto& library = *ObjectLibrary::Default();
library.AddFactory<CompressionManager>(
mgr_bar->CompatibilityName(),
[mgr_bar](const std::string& /*uri*/,
std::unique_ptr<CompressionManager>* guard,
std::string* /*errmsg*/) {
*guard = std::make_unique<MyManager>(mgr_bar->CompatibilityName());
return guard->get();
});
Reopen(options);
// Can still read everything
ASSERT_EQ(Get("a").size(), kValueSize);
ASSERT_EQ(Get("b").size(), kValueSize);
ASSERT_EQ(Get("c").size(), kValueSize);
ASSERT_EQ(Get("d").size(), kValueSize);
ASSERT_EQ(Get("e").size(), kValueSize);
ASSERT_EQ(Get("f").size(), kValueSize);
// TODO: test old version of a compression manager unable to read a
// compression type
}
}
TEST_F(DBCompressionTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression,
kXpressCompression};
for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {
// not supported, we should fail the Open()
Options options = CurrentOptions();
options.compression = comp;
ASSERT_TRUE(!TryReopen(options).ok());
// Try if CreateColumnFamily also fails
options.compression = kNoCompression;
ASSERT_OK(TryReopen(options));
ColumnFamilyOptions cf_options(options);
cf_options.compression = comp;
ColumnFamilyHandle* handle;
ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
}
}
}
class AutoSkipTestFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit AutoSkipTestFlushBlockPolicy(const int window,
const BlockBuilder& data_block_builder,
std::shared_ptr<Statistics> statistics)
: window_(window),
num_keys_(0),
data_block_builder_(data_block_builder),
statistics_(statistics) {}
bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
auto nth_window = num_keys_ / window_;
if (data_block_builder_.empty()) {
// First key in this block
return false;
}
// Check every window
if (num_keys_ % window_ == 0) {
auto set_exploration = [&](void* arg) {
bool* exploration = static_cast<bool*>(arg);
*exploration = true;
};
auto unset_exploration = [&](void* arg) {
bool* exploration = static_cast<bool*>(arg);
*exploration = false;
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// We force exploration to set the predicted rejection ratio for odd
// window and then test that the prediction is exploited in the even
// window
if (nth_window % 2 == 0) {
SyncPoint::GetInstance()->SetCallBack(
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
set_exploration);
} else {
SyncPoint::GetInstance()->SetCallBack(
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
unset_exploration);
}
SyncPoint::GetInstance()->EnableProcessing();
auto compressed_count = PopStat(NUMBER_BLOCK_COMPRESSED);
auto bypassed_count = PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED);
auto rejected_count = PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED);
auto total = compressed_count + rejected_count + bypassed_count;
int rejection_percentage, bypassed_percentage, compressed_percentage;
if (total != 0) {
rejection_percentage = static_cast<int>(rejected_count * 100 / total);
bypassed_percentage = static_cast<int>(bypassed_count * 100 / total);
compressed_percentage =
static_cast<int>(compressed_count * 100 / total);
// use nth window to detect test cases and set the expected
switch (nth_window) {
case 1:
// In first window we only explore and thus here we verify that the
// correct prediction has been made by the end of the window
// Since 6 of 10 blocks are compression unfriendly, the predicted
// rejection ratio should be 60%
EXPECT_EQ(rejection_percentage, 60);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 40);
break;
case 2:
// With the rejection ratio set to 0.6 all the blocks should be
// bypassed in next window
EXPECT_EQ(rejection_percentage, 0);
EXPECT_EQ(bypassed_percentage, 100);
EXPECT_EQ(compressed_percentage, 0);
break;
case 3:
// In third window we only explore and verify that the correct
// prediction has been made by the end of the window
// since 4 of 10 blocks are compression ufriendly, the predicted
// rejection ratio should be 40%
EXPECT_EQ(rejection_percentage, 40);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 60);
break;
case 4:
// With the rejection ratio set to 0.4 all the blocks should be
// attempted to be compressed
// 6 of 10 blocks are compression unfriendly and thus should be
// rejected 4 of 10 blocks are compression friendly and thus should
// be compressed
EXPECT_EQ(rejection_percentage, 60);
EXPECT_EQ(bypassed_percentage, 0);
EXPECT_EQ(compressed_percentage, 40);
}
}
}
num_keys_++;
return true;
}
uint64_t PopStat(Tickers t) { return statistics_->getAndResetTickerCount(t); }
private:
int window_;
int num_keys_;
const BlockBuilder& data_block_builder_;
std::shared_ptr<Statistics> statistics_;
};
class AutoSkipTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit AutoSkipTestFlushBlockPolicyFactory(
const int window, std::shared_ptr<Statistics> statistics)
: window_(window), statistics_(statistics) {}
virtual const char* Name() const override {
return "AutoSkipTestFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
(void)data_block_builder;
return new AutoSkipTestFlushBlockPolicy(window_, data_block_builder,
statistics_);
}
private:
int window_;
std::shared_ptr<Statistics> statistics_;
};
class DBAutoSkip : public DBTestBase {
public:
Options options;
Random rnd_;
int key_index_;
DBAutoSkip()
: DBTestBase("db_auto_skip", /*env_do_fsync=*/true),
options(CurrentOptions()),
rnd_(231),
key_index_(0) {
options.compression_manager = CreateAutoSkipCompressionManager();
auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics = statistics;
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.flush_block_policy_factory.reset(
new AutoSkipTestFlushBlockPolicyFactory(10, statistics));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
}
bool CompressionFriendlyPut(const int no_of_kvs, const int size_of_value) {
auto value = std::string(size_of_value, 'A');
for (int i = 0; i < no_of_kvs; ++i) {
auto status = Put(Key(key_index_), value);
EXPECT_EQ(status.ok(), true);
key_index_++;
}
return true;
}
bool CompressionUnfriendlyPut(const int no_of_kvs, const int size_of_value) {
auto value = rnd_.RandomBinaryString(size_of_value);
for (int i = 0; i < no_of_kvs; ++i) {
auto status = Put(Key(key_index_), value);
EXPECT_EQ(status.ok(), true);
key_index_++;
}
return true;
}
};
TEST_F(DBAutoSkip, AutoSkipCompressionManager) {
for (auto type : GetSupportedCompressions()) {
if (type == kNoCompression) {
continue;
}
options.compression = type;
options.bottommost_compression = type;
DestroyAndReopen(options);
const int kValueSize = 20000;
// This will set the rejection ratio to 60%
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
// This will verify all the data block compressions are bypassed based on
// previous prediction
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
// This will set the rejection ratio to 40%
CompressionUnfriendlyPut(4, kValueSize);
CompressionFriendlyPut(6, kValueSize);
// This will verify all the data block compression are attempted based on
// previous prediction
// Compression will be rejected for 6 compression unfriendly blocks
// Compression will be accepted for 4 compression friendly blocks
CompressionUnfriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
// Extra block write to ensure that the all above cases are checked
CompressionFriendlyPut(6, kValueSize);
CompressionFriendlyPut(4, kValueSize);
ASSERT_OK(Flush());
}
}
class CostAwareTestFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit CostAwareTestFlushBlockPolicy(const int window,
const BlockBuilder& data_block_builder)
: window_(window),
num_keys_(0),
data_block_builder_(data_block_builder) {}
bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
auto nth_window = num_keys_ / window_;
if (data_block_builder_.empty()) {
// First key in this block
return false;
}
// Check every window
if (num_keys_ % window_ == 0) {
auto get_predictor = [&](void* arg) {
// gets the predictor and sets the mocked cpu and io cost
predictor_ = static_cast<IOCPUCostPredictor*>(arg);
predictor_->CPUPredictor.SetPrediction(1000);
predictor_->IOPredictor.SetPrediction(100);
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// Add syncpoint to get the cpu and io cost
SyncPoint::GetInstance()->SetCallBack(
"CostAwareCompressor::CompressBlockAndRecord::"
"GetPredictor",
get_predictor);
SyncPoint::GetInstance()->EnableProcessing();
// use nth window to detect test cases and set the expected
switch (nth_window) {
case 0:
break;
case 1:
// Verify that the Mocked cpu cost and io cost are predicted correctly
auto predicted_cpu_time = predictor_->CPUPredictor.Predict();
auto predicted_io_bytes = predictor_->IOPredictor.Predict();
EXPECT_EQ(predicted_io_bytes, 100);
EXPECT_EQ(predicted_cpu_time, 1000);
break;
}
}
num_keys_++;
return true;
}
private:
int window_;
int num_keys_;
const BlockBuilder& data_block_builder_;
IOCPUCostPredictor* predictor_;
};
class CostAwareTestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit CostAwareTestFlushBlockPolicyFactory(const int window)
: window_(window) {}
virtual const char* Name() const override {
return "CostAwareTestFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
(void)data_block_builder;
return new CostAwareTestFlushBlockPolicy(window_, data_block_builder);
}
private:
int window_;
};
class DBCompressionCostPredictor : public DBTestBase {
public:
Options options;
DBCompressionCostPredictor()
: DBTestBase("db_cpuio_skip", /*env_do_fsync=*/true),
options(CurrentOptions()) {
options.compression_manager = CreateCostAwareCompressionManager();
auto statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics = statistics;
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
bbto.flush_block_policy_factory.reset(
new CostAwareTestFlushBlockPolicyFactory(10));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
}
};
TEST_F(DBCompressionCostPredictor, CostAwareCompressorManager) {
// making sure that the compression is supported
if (!ZSTD_Supported()) {
return;
}
const int kValueSize = 20000;
int next_key = 0;
Random rnd(231);
auto value = rnd.RandomBinaryString(kValueSize);
int window_size = 10;
auto WindowWrite = [&]() {
for (auto i = 0; i < window_size; ++i) {
auto status = Put(Key(next_key), value);
EXPECT_OK(status);
next_key++;
}
};
// This denotes the first window
// Mocked to have specific cpu utilization and io cost
WindowWrite();
// check the predictor is predicting the correct cpu and io cost
WindowWrite();
ASSERT_OK(Flush());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}