rocksdb/util/auto_tune_compressor.h
Peter Dillinger 6a79e02ebd Support pre-defined compression dictionaries (#14253)
Summary:
... in addition to those derived from samples. This could be useful when trade-offs favor an offline trained dictionary that's good for the whole work load, which can involve heavy-weight training, vs. on-the-fly training on samples for each file, which has limitations.

This involves some breaking changes to some deeper parts of the new compression API. I'm not concerned about performance because this doesn't touch the per-block parts of the API, just the per-file parts.

Bonus: change to
CompressionManagerWrapper::FindCompatibleCompressionManager to implement what is likely the preferred behavior.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14253

Test Plan: unit test included

Reviewed By: hx235

Differential Revision: D91082208

Pulled By: pdillinger

fbshipit-source-id: 1442db65e15c9435437204c19787c96f7a40a207
2026-01-23 10:01:50 -08:00

197 lines
7.6 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Defines auto skip compressor wrapper which intelligently decides bypassing
// compression based on past data
// Defines CostAwareCompressor which currently tries to predict the cpu and io
// cost of the compression
#pragma once
#include <memory>
#include "rocksdb/advanced_compression.h"
namespace ROCKSDB_NAMESPACE {
// Auto Skip Compression Components
// Predict rejection probability using a moving window approach
class CompressionRejectionProbabilityPredictor {
public:
explicit CompressionRejectionProbabilityPredictor(int window_size)
: pred_rejection_prob_percentage_(0),
rejected_count_(0),
compressed_count_(0),
window_size_(window_size) {}
int Predict() const;
bool Record(Slice uncompressed_block_data, char* compressed_output,
size_t compressed_output_size, CompressionType compression_type);
size_t attempted_compression_count() const;
protected:
int pred_rejection_prob_percentage_;
size_t rejected_count_;
size_t compressed_count_;
size_t window_size_;
};
class AutoSkipWorkingArea : public Compressor::WorkingArea {
public:
explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
: wrapped(std::move(wa)),
predictor(
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
~AutoSkipWorkingArea() {}
AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
: wrapped(std::move(other.wrapped)),
predictor(std::move(other.predictor)) {}
AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
if (this != &other) {
wrapped = std::move(other.wrapped);
predictor = std::move(other.predictor);
}
return *this;
}
Compressor::ManagedWorkingArea wrapped;
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
};
class AutoSkipCompressorWrapper : public CompressorWrapper {
public:
const char* Name() const override;
explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
const CompressionOptions& opts);
std::unique_ptr<Compressor> Clone() const override;
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override;
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
ManagedWorkingArea ObtainWorkingArea() override;
void ReleaseWorkingArea(WorkingArea* wa) override;
private:
Status CompressBlockAndRecord(Slice uncompressed_data,
char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
AutoSkipWorkingArea* wa);
static constexpr int kExplorationPercentage = 10;
static constexpr int kProbabilityCutOff = 50;
const CompressionOptions opts_;
};
class AutoSkipCompressorManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override;
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override;
};
// Cost Aware Components
template <typename T>
class WindowAveragePredictor {
public:
explicit WindowAveragePredictor(int window_size)
: sum_(0), prediction_(0), count_(0), kWindowSize(window_size) {}
T Predict() { return prediction_; }
bool Record(T data) {
sum_ += data;
count_++;
if (count_ >= kWindowSize) {
prediction_ = sum_ / count_;
sum_ = 0;
count_ = 0;
}
return true;
}
void SetPrediction(T prediction) { prediction_ = prediction; }
private:
T sum_;
T prediction_;
int count_;
const int kWindowSize;
};
using IOCostPredictor = WindowAveragePredictor<size_t>;
using CPUUtilPredictor = WindowAveragePredictor<uint64_t>;
struct IOCPUCostPredictor {
explicit IOCPUCostPredictor(int window_size)
: IOPredictor(window_size), CPUPredictor(window_size) {}
IOCostPredictor IOPredictor;
CPUUtilPredictor CPUPredictor;
};
class CostAwareWorkingArea : public Compressor::WorkingArea {
public:
explicit CostAwareWorkingArea(Compressor::ManagedWorkingArea&& wa)
: wrapped_(std::move(wa)) {}
~CostAwareWorkingArea() {}
CostAwareWorkingArea(const CostAwareWorkingArea&) = delete;
CostAwareWorkingArea& operator=(const CostAwareWorkingArea&) = delete;
CostAwareWorkingArea(CostAwareWorkingArea&& other) noexcept
: wrapped_(std::move(other.wrapped_)) {}
CostAwareWorkingArea& operator=(CostAwareWorkingArea&& other) noexcept {
if (this != &other) {
wrapped_ = std::move(other.wrapped_);
cost_predictors_ = std::move(other.cost_predictors_);
}
return *this;
}
Compressor::ManagedWorkingArea wrapped_;
std::vector<std::vector<IOCPUCostPredictor*>> cost_predictors_;
};
class CostAwareCompressor : public Compressor {
public:
explicit CostAwareCompressor(const CompressionOptions& opts);
const char* Name() const override;
std::unique_ptr<Compressor> Clone() const override;
DictConfig GetDictGuidance(CacheEntryRole block_type) const override;
Slice GetSerializedDict() const override;
CompressionType GetPreferredCompressionType() const override;
ManagedWorkingArea ObtainWorkingArea() override;
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override;
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
void ReleaseWorkingArea(WorkingArea* wa) override;
private:
Status CompressBlockAndRecord(size_t choosen_compression_type,
size_t compresion_level_ptr,
Slice uncompressed_data,
char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
CostAwareWorkingArea* wa);
static constexpr int kExplorationPercentage = 10;
static constexpr int kProbabilityCutOff = 50;
// This is the vector containing the list of compression levels that
// CostAwareCompressor will use create compressor and predicts the cost
// The vector contains list of compression level for compression algorithm in
// the order defined by enum CompressionType
static const std::vector<std::vector<int>> kCompressionLevels;
const CompressionOptions opts_;
std::vector<std::vector<std::unique_ptr<Compressor>>> allcompressors_;
std::vector<std::pair<size_t, size_t>> allcompressors_index_;
};
class CostAwareCompressorManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override;
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override;
};
} // namespace ROCKSDB_NAMESPACE