rocksdb/table/block_based/block_based_table_builder.h
Peter Dillinger 7c9b580681 Big refactor for preliminary custom compression API (#13540)
Summary:
Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs.

Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including
* Making a clean divide between public and internal APIs (currently just indicated with comments)
* Enough generality that built-in compressions generally fit into the framework rather than needing special treatment
* Avoid exposing obnoxious idioms like `compress_format_version` to the user.
* Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer
* Explicit thread-safety contracts (carefully considered)
* Contract details around schema compatibility and extension with code changes (more detail in next PR)
* Customizable "working areas" (e.g. for ZSTD "context")
* Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540

Test Plan:
This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test.

SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously:

```
SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
-compression_parallel_threads=1 -compression_type=none
1908372
-compression_parallel_threads=1 -compression_type=snappy
1926093
-compression_parallel_threads=1 -compression_type=zstd
1208259
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
997583
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934246
-compression_parallel_threads=4 -compression_type=snappy
1644849

After:
-compression_parallel_threads=1 -compression_type=none
1956054 (+2.5%)
-compression_parallel_threads=1 -compression_type=snappy
1911433 (-0.8%)
-compression_parallel_threads=1 -compression_type=zstd
1205668 (-0.3%)
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
999263 (+0.2%)
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934322 (+0.0%)
-compression_parallel_threads=4 -compression_type=snappy
1642519 (-0.2%)

Pretty neutral change(s) overall.

SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up:
```
for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done
```
Test (compile with CLANG, run before & after simultaneously):
```
for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
none
1495646
snappy
1172443
zstd
706036
zstd (after constructing with -compression_max_dict_bytes=8180)
656182

After:
none
1494981 (-0.0%)
snappy
1171846 (-0.1%)
zstd
696363 (-1.4%)
zstd (after constructing with -compression_max_dict_bytes=8180)
667585 (+1.7%)

Pretty neutral.

Reviewed By: hx235

Differential Revision: D74626863

Pulled By: pdillinger

fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
2025-05-15 17:14:23 -07:00

195 lines
7.4 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <stdint.h>
#include <array>
#include <limits>
#include <string>
#include <utility>
#include <vector>
#include "db/version_edit.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/atomic.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
class BlockBuilder;
class BlockHandle;
class WritableFile;
struct BlockBasedTableOptions;
extern const uint64_t kBlockBasedTableMagicNumber;
extern const uint64_t kLegacyBlockBasedTableMagicNumber;
class BlockBasedTableBuilder : public TableBuilder {
public:
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
BlockBasedTableBuilder(const BlockBasedTableOptions& table_options,
const TableBuilderOptions& table_builder_options,
WritableFileWriter* file);
// No copying allowed
BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete;
BlockBasedTableBuilder& operator=(const BlockBasedTableBuilder&) = delete;
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
// Add key,value to the table being constructed.
// REQUIRES: Unless key has type kTypeRangeDeletion, key is after any
// previously added non-kTypeRangeDeletion key according to
// comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value) override;
// Return non-ok iff some error has been detected.
Status status() const override;
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override;
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
// If the caller is not going to call Finish(), it must call Abandon()
// before destroying this builder.
// REQUIRES: Finish(), Abandon() have not been called
void Abandon() override;
// Number of calls to Add() so far.
uint64_t NumEntries() const override;
bool IsEmpty() const override;
uint64_t PreCompressionSize() const override;
// Size of the file generated so far. If invoked after a successful
// Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override;
// Estimated size of the file generated so far. This is used when
// FileSize() cannot estimate final SST size, e.g. parallel compression
// is enabled.
uint64_t EstimatedFileSize() const override;
// Get the size of the "tail" part of a SST file. "Tail" refers to
// all blocks after data blocks till the end of the SST file.
uint64_t GetTailSize() const override;
bool NeedCompact() const override;
// Get table properties
TableProperties GetTableProperties() const override;
// Get file checksum
std::string GetFileChecksum() const override;
// Get file checksum function name
const char* GetFileChecksumFuncName() const override;
void SetSeqnoTimeTableProperties(const SeqnoToTimeMapping& relevant_mapping,
uint64_t oldest_ancestor_time) override;
private:
bool ok() const { return status().ok(); }
// Transition state from buffered to unbuffered. See `Rep::State` API comment
// for details of the states.
// REQUIRES: `rep_->state == kBuffered`
void EnterUnbuffered();
// Compress and write block content to the file.
void WriteBlock(const Slice& block_contents, BlockHandle* handle,
BlockType block_type);
// Directly write data to the file.
void WriteMaybeCompressedBlock(
const Slice& block_contents, CompressionType, BlockHandle* handle,
BlockType block_type, const Slice* uncompressed_block_data = nullptr);
void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);
template <typename TBlocklike>
Status InsertBlockInCache(const Slice& block_contents,
const BlockHandle* handle, BlockType block_type);
Status InsertBlockInCacheHelper(const Slice& block_contents,
const BlockHandle* handle,
BlockType block_type);
Status InsertBlockInCompressedCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle);
void WriteFilterBlock(MetaIndexBuilder* meta_index_builder);
void WriteIndexBlock(MetaIndexBuilder* meta_index_builder,
BlockHandle* index_block_handle);
void WritePropertiesBlock(MetaIndexBuilder* meta_index_builder);
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void WriteRangeDelBlock(MetaIndexBuilder* meta_index_builder);
void WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle);
struct Rep;
class BlockBasedTablePropertiesCollectorFactory;
class BlockBasedTablePropertiesCollector;
Rep* rep_;
struct WorkingAreaPair;
struct ParallelCompressionRep;
// Advanced operation: flush any buffered key/value pairs to file.
// Can be used to ensure that two adjacent entries never live in
// the same data block. Most clients should not need to use this method.
// REQUIRES: Finish(), Abandon() have not been called
void Flush();
// Some compression libraries fail when the uncompressed size is bigger than
// int. If uncompressed size is bigger than kCompressionSizeLimit, don't
// compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
// Get blocks from mem-table walking thread, compress them and
// pass them to the write thread. Used in parallel compression mode only
void BGWorkCompression(WorkingAreaPair& working_area);
// Given uncompressed block content, try to compress it and return result and
// compression type
void CompressAndVerifyBlock(const Slice& uncompressed_block_data,
bool is_data_block, WorkingAreaPair& working_area,
std::string* compressed_output,
CompressionType* result_compression_type,
Status* out_status);
// Get compressed blocks from BGWorkCompression and write them into SST
void BGWorkWriteMaybeCompressedBlock();
// Initialize parallel compression context and
// start BGWorkCompression and BGWorkWriteMaybeCompressedBlock threads
void StartParallelCompression();
// Stop BGWorkCompression and BGWorkWriteMaybeCompressedBlock threads
void StopParallelCompression();
};
} // namespace ROCKSDB_NAMESPACE