rocksdb/util/auto_skip_compressor.h
Sujit Maharjan 34d8f03af4 Moving predictor to WorkingArea to make it thread safe (#13706)
Summary:
**Summary:**

We need to move the Predictor to WorkingArea so that it is local to each thread and thus is thread safe.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13706

Test Plan: It should pass the test case written in ./compression_test.

Reviewed By: pdillinger

Differential Revision: D76836846

Pulled By: shubhajeet

fbshipit-source-id: 0d0170baf65f4bb95ba107fec77151e66b8a4449
2025-06-17 19:17:25 -07:00

90 lines
3.4 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Creates auto skip compressor wrapper which intelligently decides bypassing
// compression based on past data
#pragma once
#include <memory>
#include "rocksdb/advanced_compression.h"
namespace ROCKSDB_NAMESPACE {
// Predict rejection probability using a moving window approach
class CompressionRejectionProbabilityPredictor {
public:
CompressionRejectionProbabilityPredictor(int window_size)
: pred_rejection_prob_percentage_(0),
rejected_count_(0),
compressed_count_(0),
window_size_(window_size) {}
int Predict() const;
bool Record(Slice uncompressed_block_data, std::string* compressed_output,
const CompressionOptions& opts);
size_t attempted_compression_count() const;
protected:
int pred_rejection_prob_percentage_;
size_t rejected_count_;
size_t compressed_count_;
size_t window_size_;
};
class AutoSkipWorkingArea : public Compressor::WorkingArea {
public:
explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
: wrapped(std::move(wa)),
predictor(
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
~AutoSkipWorkingArea() {}
AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
: wrapped(std::move(other.wrapped)),
predictor(std::move(other.predictor)) {}
AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
if (this != &other) {
wrapped = std::move(other.wrapped);
predictor = std::move(other.predictor);
}
return *this;
}
Compressor::ManagedWorkingArea wrapped;
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
};
class AutoSkipCompressorWrapper : public CompressorWrapper {
public:
const char* Name() const override;
explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
const CompressionOptions& opts);
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
ManagedWorkingArea ObtainWorkingArea() override;
void ReleaseWorkingArea(WorkingArea* wa) override;
private:
Status CompressBlockAndRecord(Slice uncompressed_data,
std::string* compressed_output,
CompressionType* out_compression_type,
AutoSkipWorkingArea* wa);
static constexpr int kExplorationPercentage = 10;
static constexpr int kProbabilityCutOff = 50;
const CompressionOptions kOpts;
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor_;
};
class AutoSkipCompressorManager : public CompressionManagerWrapper {
using CompressionManagerWrapper::CompressionManagerWrapper;
const char* Name() const override;
std::unique_ptr<Compressor> GetCompressorForSST(
const FilterBuildingContext& context, const CompressionOptions& opts,
CompressionType preferred) override;
};
} // namespace ROCKSDB_NAMESPACE