rocksdb/db/blob/blob_file_builder.cc
Peter Dillinger 7c9b580681 Big refactor for preliminary custom compression API (#13540)
Summary:
Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs.

Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including
* Making a clean divide between public and internal APIs (currently just indicated with comments)
* Enough generality that built-in compressions generally fit into the framework rather than needing special treatment
* Avoid exposing obnoxious idioms like `compress_format_version` to the user.
* Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer
* Explicit thread-safety contracts (carefully considered)
* Contract details around schema compatibility and extension with code changes (more detail in next PR)
* Customizable "working areas" (e.g. for ZSTD "context")
* Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540

Test Plan:
This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test.

SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously:

```
SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
-compression_parallel_threads=1 -compression_type=none
1908372
-compression_parallel_threads=1 -compression_type=snappy
1926093
-compression_parallel_threads=1 -compression_type=zstd
1208259
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
997583
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934246
-compression_parallel_threads=4 -compression_type=snappy
1644849

After:
-compression_parallel_threads=1 -compression_type=none
1956054 (+2.5%)
-compression_parallel_threads=1 -compression_type=snappy
1911433 (-0.8%)
-compression_parallel_threads=1 -compression_type=zstd
1205668 (-0.3%)
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
999263 (+0.2%)
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934322 (+0.0%)
-compression_parallel_threads=4 -compression_type=snappy
1642519 (-0.2%)

Pretty neutral change(s) overall.

SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up:
```
for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done
```
Test (compile with CLANG, run before & after simultaneously):
```
for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
none
1495646
snappy
1172443
zstd
706036
zstd (after constructing with -compression_max_dict_bytes=8180)
656182

After:
none
1494981 (-0.0%)
snappy
1171846 (-0.1%)
zstd
696363 (-1.4%)
zstd (after constructing with -compression_max_dict_bytes=8180)
667585 (+1.7%)

Pretty neutral.

Reviewed By: hx235

Differential Revision: D74626863

Pulled By: pdillinger

fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
2025-05-15 17:14:23 -07:00

432 lines
13 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_builder.h"
#include <cassert>
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_completion_callback.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "db/blob/blob_source.h"
#include "db/event_helpers.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "options/cf_options.h"
#include "options/options_helper.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "trace_replay/io_tracer.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
BlobFileBuilder::BlobFileBuilder(
VersionSet* versions, FileSystem* fs,
const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
const WriteOptions* write_options, std::string db_id,
std::string db_session_id, int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
immutable_options, mutable_cf_options, file_options,
write_options, db_id, db_session_id, job_id,
column_family_id, column_family_name, write_hint,
io_tracer, blob_callback, creation_reason,
blob_file_paths, blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, FileSystem* fs,
const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
const WriteOptions* write_options, std::string db_id,
std::string db_session_id, int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
fs_(fs),
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
write_options_(write_options),
db_id_(std::move(db_id)),
db_session_id_(std::move(db_session_id)),
job_id_(job_id),
column_family_id_(column_family_id),
column_family_name_(column_family_name),
write_hint_(write_hint),
io_tracer_(io_tracer),
blob_callback_(blob_callback),
creation_reason_(creation_reason),
blob_file_paths_(blob_file_paths),
blob_file_additions_(blob_file_additions),
blob_count_(0),
blob_bytes_(0) {
assert(file_number_generator_);
assert(fs_);
assert(immutable_options_);
assert(file_options_);
assert(write_options_);
assert(blob_file_paths_);
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());
}
BlobFileBuilder::~BlobFileBuilder() = default;
Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
std::string* blob_index) {
assert(blob_index);
assert(blob_index->empty());
if (value.size() < min_blob_size_) {
return Status::OK();
}
{
const Status s = OpenBlobFileIfNeeded();
if (!s.ok()) {
return s;
}
}
Slice blob = value;
std::string compressed_blob;
{
const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
if (!s.ok()) {
return s;
}
}
uint64_t blob_file_number = 0;
uint64_t blob_offset = 0;
{
const Status s =
WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
if (!s.ok()) {
return s;
}
}
{
const Status s = CloseBlobFileIfNeeded();
if (!s.ok()) {
return s;
}
}
{
const Status s =
PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_options_->info_log,
"Failed to pre-populate the blob into blob cache: %s",
s.ToString().c_str());
}
}
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
return Status::OK();
}
Status BlobFileBuilder::Finish() {
if (!IsBlobFileOpen()) {
return Status::OK();
}
return CloseBlobFile();
}
bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
Status BlobFileBuilder::OpenBlobFileIfNeeded() {
if (IsBlobFileOpen()) {
return Status::OK();
}
assert(!blob_count_);
assert(!blob_bytes_);
assert(file_number_generator_);
const uint64_t blob_file_number = file_number_generator_();
assert(immutable_options_);
assert(!immutable_options_->cf_paths.empty());
std::string blob_file_path =
BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
if (blob_callback_) {
blob_callback_->OnBlobFileCreationStarted(
blob_file_path, column_family_name_, job_id_, creation_reason_);
}
std::unique_ptr<FSWritableFile> file;
FileOptions fo_copy;
{
assert(file_options_);
fo_copy = *file_options_;
fo_copy.write_hint = write_hint_;
Status s = NewWritableFile(fs_, blob_file_path, &file, fo_copy);
TEST_SYNC_POINT_CALLBACK(
"BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
if (!s.ok()) {
return s;
}
}
// Note: files get added to blob_file_paths_ right after the open, so they
// can be cleaned up upon failure. Contrast this with blob_file_additions_,
// which only contains successfully written files.
assert(blob_file_paths_);
blob_file_paths_->emplace_back(std::move(blob_file_path));
assert(file);
file->SetIOPriority(write_options_->rate_limiter_priority);
// Subsequent attempts to override the hint via SetWriteLifeTimeHint
// with the very same value will be ignored by the fs.
file->SetWriteLifeTimeHint(fo_copy.write_hint);
FileTypeSet tmp_set = immutable_options_->checksum_handoff_file_types;
Statistics* const statistics = immutable_options_->stats;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_paths_->back(), *file_options_,
immutable_options_->clock, io_tracer_, statistics,
Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS, immutable_options_->listeners,
immutable_options_->file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kBlobFile), false));
constexpr bool do_flush = false;
std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
std::move(file_writer), immutable_options_->clock, statistics,
blob_file_number, immutable_options_->use_fsync, do_flush));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
expiration_range);
{
Status s = blob_log_writer->WriteHeader(*write_options_, header);
TEST_SYNC_POINT_CALLBACK(
"BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
if (!s.ok()) {
return s;
}
}
writer_ = std::move(blob_log_writer);
assert(IsBlobFileOpen());
return Status::OK();
}
Status BlobFileBuilder::CompressBlobIfNeeded(
Slice* blob, std::string* compressed_blob) const {
assert(blob);
assert(compressed_blob);
assert(compressed_blob->empty());
assert(immutable_options_);
if (blob_compression_type_ == kNoCompression) {
return Status::OK();
}
// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_, opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_);
constexpr uint32_t compression_format_version = 2;
bool success = false;
{
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
BLOB_DB_COMPRESSION_MICROS);
success = OLD_CompressData(*blob, info, compression_format_version,
compressed_blob);
}
if (!success) {
return Status::Corruption("Error compressing blob");
}
*blob = Slice(*compressed_blob);
return Status::OK();
}
Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
uint64_t* blob_file_number,
uint64_t* blob_offset) {
assert(IsBlobFileOpen());
assert(blob_file_number);
assert(blob_offset);
uint64_t key_offset = 0;
Status s =
writer_->AddRecord(*write_options_, key, blob, &key_offset, blob_offset);
TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
if (!s.ok()) {
return s;
}
*blob_file_number = writer_->get_log_number();
++blob_count_;
blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
return Status::OK();
}
Status BlobFileBuilder::CloseBlobFile() {
assert(IsBlobFileOpen());
BlobLogFooter footer;
footer.blob_count = blob_count_;
std::string checksum_method;
std::string checksum_value;
Status s = writer_->AppendFooter(*write_options_, footer, &checksum_method,
&checksum_value);
TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
if (!s.ok()) {
return s;
}
const uint64_t blob_file_number = writer_->get_log_number();
if (blob_callback_) {
s = blob_callback_->OnBlobFileCompleted(
blob_file_paths_->back(), column_family_name_, job_id_,
blob_file_number, creation_reason_, s, checksum_value, checksum_method,
blob_count_, blob_bytes_);
}
assert(blob_file_additions_);
blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
std::move(checksum_method),
std::move(checksum_value));
assert(immutable_options_);
ROCKS_LOG_INFO(immutable_options_->logger,
"[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
" total blobs, %" PRIu64 " total bytes",
column_family_name_.c_str(), job_id_, blob_file_number,
blob_count_, blob_bytes_);
writer_.reset();
blob_count_ = 0;
blob_bytes_ = 0;
return s;
}
Status BlobFileBuilder::CloseBlobFileIfNeeded() {
assert(IsBlobFileOpen());
const WritableFileWriter* const file_writer = writer_->file();
assert(file_writer);
if (file_writer->GetFileSize() < blob_file_size_) {
return Status::OK();
}
return CloseBlobFile();
}
void BlobFileBuilder::Abandon(const Status& s) {
if (!IsBlobFileOpen()) {
return;
}
if (blob_callback_) {
// BlobFileBuilder::Abandon() is called because of error while writing to
// Blob files. So we can ignore the below error.
blob_callback_
->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
job_id_, writer_->get_log_number(),
creation_reason_, s, "", "", blob_count_,
blob_bytes_)
.PermitUncheckedError();
}
writer_.reset();
blob_count_ = 0;
blob_bytes_ = 0;
}
Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
uint64_t blob_file_number,
uint64_t blob_offset) const {
Status s = Status::OK();
BlobSource::SharedCacheInterface blob_cache{immutable_options_->blob_cache};
auto statistics = immutable_options_->statistics.get();
bool warm_cache =
prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
creation_reason_ == BlobFileCreationReason::kFlush;
if (blob_cache && warm_cache) {
const OffsetableCacheKey base_cache_key(db_id_, db_session_id_,
blob_file_number);
const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
const Slice key = cache_key.AsSlice();
const Cache::Priority priority = Cache::Priority::BOTTOM;
s = blob_cache.InsertSaved(key, blob, nullptr /*context*/, priority,
immutable_options_->lowest_used_cache_tier);
if (s.ok()) {
RecordTick(statistics, BLOB_DB_CACHE_ADD);
RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, blob.size());
} else {
RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
}
}
return s;
}
} // namespace ROCKSDB_NAMESPACE