Summary: * Make new format_version=7 a supported setting. * Fix a bug in compressed_secondary_cache.cc that is newly exercised by custom compression types and showing up in crash test with tiered secondary cache * Small change to handling of disabled compression in fv=7: use empty compression manager compatibility name. * Get rid of GetDefaultBuiltinCompressionManager() in public API because it could cause unexpected+unsafe schema change on a user's CompressionManager if built upon the default built-in manager and we add a new built-in schema. Now must be referenced by explicit compression schema version in the public API. (That notion was already exposed in compressed secondary cache API, for better or worse.) * Improve some error messages for compression misconfiguration * Improve testing with ObjectLibrary and CompressionManagers * Improve testing of compression_name table property in BlockBasedTableTest.BlockBasedTableProperties2 * Improve some comments Pull Request resolved: https://github.com/facebook/rocksdb/pull/13713 Test Plan: existing and updated tests. Notably, the crash test has already been running with (unpublished) format_version=7 Reviewed By: mszeszko-meta, hx235 Differential Revision: D77035482 Pulled By: pdillinger fbshipit-source-id: 95278de8734a79706a22361bff2184b1edb230ca
131 lines
5 KiB
C++
131 lines
5 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
|
|
#include "util/auto_skip_compressor.h"
|
|
|
|
#include "options/options_helper.h"
|
|
#include "rocksdb/advanced_compression.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/random.h"
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
int CompressionRejectionProbabilityPredictor::Predict() const {
|
|
return pred_rejection_prob_percentage_;
|
|
}
|
|
|
|
size_t CompressionRejectionProbabilityPredictor::attempted_compression_count()
|
|
const {
|
|
return rejected_count_ + compressed_count_;
|
|
}
|
|
|
|
bool CompressionRejectionProbabilityPredictor::Record(
|
|
Slice uncompressed_block_data, std::string* compressed_output,
|
|
const CompressionOptions& opts) {
|
|
if (compressed_output->size() >
|
|
(static_cast<uint64_t>(opts.max_compressed_bytes_per_kb) *
|
|
uncompressed_block_data.size()) >>
|
|
10) {
|
|
rejected_count_++;
|
|
} else {
|
|
compressed_count_++;
|
|
}
|
|
auto attempted = attempted_compression_count();
|
|
if (attempted >= window_size_) {
|
|
pred_rejection_prob_percentage_ =
|
|
static_cast<int>(rejected_count_ * 100 / attempted);
|
|
compressed_count_ = 0;
|
|
rejected_count_ = 0;
|
|
assert(attempted_compression_count() == 0);
|
|
}
|
|
return true;
|
|
}
|
|
AutoSkipCompressorWrapper::AutoSkipCompressorWrapper(
|
|
std::unique_ptr<Compressor> compressor, const CompressionOptions& opts)
|
|
: CompressorWrapper::CompressorWrapper(std::move(compressor)),
|
|
kOpts(opts),
|
|
predictor_(
|
|
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
|
|
|
|
const char* AutoSkipCompressorWrapper::Name() const {
|
|
return "AutoSkipCompressorWrapper";
|
|
}
|
|
|
|
Status AutoSkipCompressorWrapper::CompressBlock(
|
|
Slice uncompressed_data, std::string* compressed_output,
|
|
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
|
|
// Check if the managed working area is provided or owned by this object.
|
|
// If not, bypass auto-skip logic since the working area lacks a predictor to
|
|
// record or make necessary decisions to compress or bypass compression of the
|
|
// block
|
|
if (wa == nullptr || wa->owner() != this) {
|
|
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
|
|
out_compression_type, wa);
|
|
}
|
|
bool exploration =
|
|
Random::GetTLSInstance()->PercentTrue(kExplorationPercentage);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
|
|
&exploration);
|
|
auto autoskip_wa = static_cast<AutoSkipWorkingArea*>(wa->get());
|
|
if (exploration) {
|
|
return CompressBlockAndRecord(uncompressed_data, compressed_output,
|
|
out_compression_type, autoskip_wa);
|
|
} else {
|
|
auto predictor_ptr = autoskip_wa->predictor;
|
|
auto prediction = predictor_ptr->Predict();
|
|
if (prediction <= kProbabilityCutOff) {
|
|
// decide to compress
|
|
return CompressBlockAndRecord(uncompressed_data, compressed_output,
|
|
out_compression_type, autoskip_wa);
|
|
} else {
|
|
// decide to bypass compression
|
|
*out_compression_type = kNoCompression;
|
|
return Status::OK();
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Compressor::ManagedWorkingArea AutoSkipCompressorWrapper::ObtainWorkingArea() {
|
|
auto wrap_wa = wrapped_->ObtainWorkingArea();
|
|
return ManagedWorkingArea(new AutoSkipWorkingArea(std::move(wrap_wa)), this);
|
|
}
|
|
void AutoSkipCompressorWrapper::ReleaseWorkingArea(WorkingArea* wa) {
|
|
delete static_cast<AutoSkipWorkingArea*>(wa);
|
|
}
|
|
|
|
Status AutoSkipCompressorWrapper::CompressBlockAndRecord(
|
|
Slice uncompressed_data, std::string* compressed_output,
|
|
CompressionType* out_compression_type, AutoSkipWorkingArea* wa) {
|
|
Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output,
|
|
out_compression_type, &(wa->wrapped));
|
|
// determine if it was rejected or compressed
|
|
auto predictor_ptr = wa->predictor;
|
|
predictor_ptr->Record(uncompressed_data, compressed_output, kOpts);
|
|
return status;
|
|
}
|
|
|
|
const char* AutoSkipCompressorManager::Name() const {
|
|
// should have returned "AutoSkipCompressorManager" but we currently have an
|
|
// error so for now returning name of the wrapped container
|
|
return wrapped_->Name();
|
|
}
|
|
|
|
std::unique_ptr<Compressor> AutoSkipCompressorManager::GetCompressorForSST(
|
|
const FilterBuildingContext& context, const CompressionOptions& opts,
|
|
CompressionType preferred) {
|
|
assert(GetSupportedCompressions().size() > 1);
|
|
assert(preferred != kNoCompression);
|
|
return std::make_unique<AutoSkipCompressorWrapper>(
|
|
wrapped_->GetCompressorForSST(context, opts, preferred), opts);
|
|
}
|
|
|
|
std::shared_ptr<CompressionManagerWrapper> CreateAutoSkipCompressionManager(
|
|
std::shared_ptr<CompressionManager> wrapped) {
|
|
return std::make_shared<AutoSkipCompressorManager>(
|
|
wrapped == nullptr ? GetBuiltinV2CompressionManager() : wrapped);
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|