rocksdb/util/auto_tune_compressor.h
Sujit Maharjan fd95bc8f5a Custom Compressor for predicting the CPU and IO cost of the block level compression (#13711)
Summary:
This pull request implements the prediction aspect of auto-tuning compression in RocksDB, as part of Milestone 2. The goal is to optimize compression decisions to meet a given CPU and IO budget, based on the predicted CPU time and result compression ratio for compression decisions on a data block.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13711

Test Plan:
Ran benchmark tests to evaluate performance impact of new algorithm
Verified that optimization does not compromise overall system performance
```bash
SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=zstd -compression_manager=none"  "-compression_parallel_threads=4 -compression_type=zstd -compression_manager=none" "-compression_parallel_threads=1 -compression_type=zstd -compression_manager=costpredictor"  "-compression_parallel_threads=4 -compression_type=zstd -compression_manager=costpredictor" ; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done

```
parallel threads | 1 | 4
-- | -- | --
master branch | 1076660.5 ops | 1668411.3 ops
new code compression manager="none" | 1057155.35 ops (-1.81%) | 1648664.2 ops (-1.18%)
new code compression manager="costpredictor" | 1080794.8 ops (0.38%)| 1652720.35 ops (-0.94%)

Used the mean absolute percentage error (MAPE) to show accuracy of the predictor.
```bash
./db_bench --db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq --compaction_style=2 --num=10000000 --fifo_compaction_max_table_files_size_mb=1000 --fifo_compaction_allow_compaction=0 --disable_wal --write_buffer_size=12000000 --statistics --stats_level=5 --value_size=2000 --compression_manager=costpredictor --compression_type=zstd --progress_reports=false 2>&1 | tee /tmp/predict.log
```

compression_name | compression_level | MAPE (cpu cost) | MAPE (io cost) | average measured_time (micro sec) | average predicted_time (micro sec) | average measured_io (bytes) | average predicted_io (bytes)
-- | -- | -- | -- | -- | -- | -- | --
Snappy | 0 | 16.979548 | 3.138885 | 3.639488 | 2.98755 | 2257.655152 | 2178.070375
LZ4 | 1 | 15.508632 | 3.103681 | 4.733639 | 4.010361 | 2257.803299 | 2179.82233
LZ4 | 4 | 15.471204 | 3.102158 | 4.731955 | 4.006011 | 2258.529203 | 2179.778441
LZ4 | 9 | 15.429305 | 3.09599 | 4.729104 | 4.007059 | 2257.822368 | 2179.927506
LZ4HC | 1 | 7.254545 | 3.112858 | 79.64412 | 76.603272 | 2258.636774 | 2177.464922
LZ4HC | 4 | 7.249132 | 3.085802 | 79.591264 | 76.576416 | 2255.098757 | 2176.126082
LZ4HC | 9 | 7.248921 | 3.09695 | 79.719061 | 76.614155 | 2253.772057 | 2175.882686
ZSTD | 1 | 8.728305 | 3.223971 | 18.93434 | 17.882706 | 1957.773706 | 1890.895071
ZSTD | 15 | 4.853552 | 3.238199 | 329.396574 | 318.277613 | 1918.021616 | 1853.833546
ZSTD | 22 | 4.275209 | 3.243137 | 625.471394 | 596.254939 | 1919.035477 | 1853.44902

```bash
./db_bench --db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq --compaction_style=2 --num=10000000 --fifo_compaction_max_table_files_size_mb=1000 --fifo_compaction_allow_compaction=0 --disable_wal --write_buffer_size=12000000 --statistics --stats_level=5 --value_size=2000 --compression_manager=costpredictor --compression_type=zstd --progress_reports=false --write_buffer_size=140737488355328 --block_size=16382
```
Increasing the block size i.e. doubling the measured time reduces the MAPE by half.
compression_name | compression_level | MAPE (cpu cost) | MAPE (io cost) | average measured_time (micro sec) | average predicted_time (micro sec) | average measured_io (bytes) | average predicted_io (bytes)
-- | -- | -- | -- | -- | -- | -- | --
Snappy | 0 | 7.933944 | 0.061173 | 7.187587 | 6.815071 | 4466.536629 | 4465.925648
LZ4 | 1 | 5.614279 | 0.050215 | 8.526641 | 8.14445 | 4473.768752 | 4473.159792
LZ4 | 4 | 5.617925 | 0.050317 | 8.525155 | 8.144209 | 4473.772343 | 4473.159782
LZ4 | 9 | 5.65519 | 0.050249 | 8.530569 | 8.14836 | 4473.762187 | 4473.150695
LZ4HC | 1 | 4.259648 | 0.028564 | 98.273778 | 97.820515 | 4471.691596 | 4471.05918
LZ4HC | 4 | 4.269529 | 0.027665 | 98.240579 | 97.788721 | 4465.537078 | 4464.901328
LZ4HC | 9 | 4.274553 | 0.027555 | 98.319357 | 97.8637 | 4465.539437 | 4464.903889
ZSTD | 1 | 4.909716 | 0.155441 | 29.503133 | 29.047057 | 3713.562704 | 3712.978633
ZSTD | 15 | 1.310407 | 0.162864 | 643.803097 | 635.960631 | 3797.544307 | 3705.772419
ZSTD | 22 | 1.011497 | 0.155876 | 1221.189822 | 1220.693678 | 3705.556448 | 3704.972332

Reviewed By: hx235

Differential Revision: D77065528

Pulled By: shubhajeet

fbshipit-source-id: f7f4ae018f786bfeae3eacf0135055c63e142610
2025-06-26 08:59:56 -07:00

189 lines
7.2 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Defines auto skip compressor wrapper which intelligently decides bypassing
// compression based on past data
// Defines CostAwareCompressor which currently tries to predict the cpu and io
// cost of the compression
#pragma once
#include <memory>
#include "rocksdb/advanced_compression.h"
namespace ROCKSDB_NAMESPACE {
// Auto Skip Compression Components
// Predict rejection probability using a moving window approach
class CompressionRejectionProbabilityPredictor {
public:
explicit CompressionRejectionProbabilityPredictor(int window_size)
: pred_rejection_prob_percentage_(0),
rejected_count_(0),
compressed_count_(0),
window_size_(window_size) {}
int Predict() const;
bool Record(Slice uncompressed_block_data, std::string* compressed_output,
const CompressionOptions& opts);
size_t attempted_compression_count() const;
protected:
int pred_rejection_prob_percentage_;
size_t rejected_count_;
size_t compressed_count_;
size_t window_size_;
};
class AutoSkipWorkingArea : public Compressor::WorkingArea {
public:
explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
: wrapped(std::move(wa)),
predictor(
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
~AutoSkipWorkingArea() {}
AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
: wrapped(std::move(other.wrapped)),
predictor(std::move(other.predictor)) {}
AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
if (this != &other) {
wrapped = std::move(other.wrapped);
predictor = std::move(other.predictor);
}
return *this;
}
Compressor::ManagedWorkingArea wrapped;
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
};
class AutoSkipCompressorWrapper : public CompressorWrapper {
public:
const char* Name() const override;
explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
const CompressionOptions& opts);
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
ManagedWorkingArea ObtainWorkingArea() override;
void ReleaseWorkingArea(WorkingArea* wa) override;
private:
Status CompressBlockAndRecord(Slice uncompressed_data,
std::string* compressed_output,
CompressionType* out_compression_type,
AutoSkipWorkingArea* wa);
static constexpr int kExplorationPercentage = 10;
static constexpr int kProbabilityCutOff = 50;
const CompressionOptions opts_;
};
class AutoSkipCompressorManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override;
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override;
};
// Cost Aware Components
template <typename T>
class WindowAveragePredictor {
public:
explicit WindowAveragePredictor(int window_size)
: sum_(0), prediction_(0), count_(0), kWindowSize(window_size) {}
T Predict() { return prediction_; }
bool Record(T data) {
sum_ += data;
count_++;
if (count_ >= kWindowSize) {
prediction_ = sum_ / count_;
sum_ = 0;
count_ = 0;
}
return true;
}
void SetPrediction(T prediction) { prediction_ = prediction; }
private:
T sum_;
T prediction_;
int count_;
const int kWindowSize;
};
using IOCostPredictor = WindowAveragePredictor<size_t>;
using CPUUtilPredictor = WindowAveragePredictor<uint64_t>;
struct IOCPUCostPredictor {
explicit IOCPUCostPredictor(int window_size)
: IOPredictor(window_size), CPUPredictor(window_size) {}
IOCostPredictor IOPredictor;
CPUUtilPredictor CPUPredictor;
};
class CostAwareWorkingArea : public Compressor::WorkingArea {
public:
explicit CostAwareWorkingArea(Compressor::ManagedWorkingArea&& wa)
: wrapped_(std::move(wa)) {}
~CostAwareWorkingArea() {}
CostAwareWorkingArea(const CostAwareWorkingArea&) = delete;
CostAwareWorkingArea& operator=(const CostAwareWorkingArea&) = delete;
CostAwareWorkingArea(CostAwareWorkingArea&& other) noexcept
: wrapped_(std::move(other.wrapped_)) {}
CostAwareWorkingArea& operator=(CostAwareWorkingArea&& other) noexcept {
if (this != &other) {
wrapped_ = std::move(other.wrapped_);
cost_predictors_ = std::move(other.cost_predictors_);
}
return *this;
}
Compressor::ManagedWorkingArea wrapped_;
std::vector<std::vector<IOCPUCostPredictor*>> cost_predictors_;
};
class CostAwareCompressor : public Compressor {
public:
explicit CostAwareCompressor(const CompressionOptions& opts);
const char* Name() const override;
size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override;
Slice GetSerializedDict() const override;
CompressionType GetPreferredCompressionType() const override;
ManagedWorkingArea ObtainWorkingArea() override;
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole block_type, DictSampleArgs&& dict_samples) override;
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
void ReleaseWorkingArea(WorkingArea* wa) override;
private:
Status CompressBlockAndRecord(size_t choosen_compression_type,
size_t compresion_level_ptr,
Slice uncompressed_data,
std::string* compressed_output,
CompressionType* out_compression_type,
CostAwareWorkingArea* wa);
static constexpr int kExplorationPercentage = 10;
static constexpr int kProbabilityCutOff = 50;
// This is the vector containing the list of compression levels that
// CostAwareCompressor will use create compressor and predicts the cost
// The vector contains list of compression level for compression algorithm in
// the order defined by enum CompressionType
static const std::vector<std::vector<int>> kCompressionLevels;
const CompressionOptions opts_;
std::vector<std::vector<std::unique_ptr<Compressor>>> allcompressors_;
std::vector<std::pair<size_t, size_t>> allcompressors_index_;
};
class CostAwareCompressorManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override;
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override;
};
} // namespace ROCKSDB_NAMESPACE