Summary: Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs. Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including * Making a clean divide between public and internal APIs (currently just indicated with comments) * Enough generality that built-in compressions generally fit into the framework rather than needing special treatment * Avoid exposing obnoxious idioms like `compress_format_version` to the user. * Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer * Explicit thread-safety contracts (carefully considered) * Contract details around schema compatibility and extension with code changes (more detail in next PR) * Customizable "working areas" (e.g. for ZSTD "context") * Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache) Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540 Test Plan: This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test. SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously: ``` SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted): -compression_parallel_threads=1 -compression_type=none 1908372 -compression_parallel_threads=1 -compression_type=snappy 1926093 -compression_parallel_threads=1 -compression_type=zstd 1208259 -compression_parallel_threads=1 -compression_type=zstd -verify_compression=1 997583 -compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180 934246 -compression_parallel_threads=4 -compression_type=snappy 1644849 After: -compression_parallel_threads=1 -compression_type=none 1956054 (+2.5%) -compression_parallel_threads=1 -compression_type=snappy 1911433 (-0.8%) -compression_parallel_threads=1 -compression_type=zstd 1205668 (-0.3%) -compression_parallel_threads=1 -compression_type=zstd -verify_compression=1 999263 (+0.2%) -compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180 934322 (+0.0%) -compression_parallel_threads=4 -compression_type=snappy 1642519 (-0.2%) Pretty neutral change(s) overall. SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up: ``` for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done ``` Test (compile with CLANG, run before & after simultaneously): ``` for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted): none 1495646 snappy 1172443 zstd 706036 zstd (after constructing with -compression_max_dict_bytes=8180) 656182 After: none 1494981 (-0.0%) snappy 1171846 (-0.1%) zstd 696363 (-1.4%) zstd (after constructing with -compression_max_dict_bytes=8180) 667585 (+1.7%) Pretty neutral. Reviewed By: hx235 Differential Revision: D74626863 Pulled By: pdillinger fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
117 lines
3.9 KiB
C++
117 lines
3.9 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
|
|
#include "table/block_based/uncompression_dict_reader.h"
|
|
|
|
#include "logging/logging.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "util/compression.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
Status UncompressionDictReader::Create(
|
|
const BlockBasedTable* table, const ReadOptions& ro,
|
|
FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
|
|
bool pin, BlockCacheLookupContext* lookup_context,
|
|
std::unique_ptr<UncompressionDictReader>* uncompression_dict_reader) {
|
|
assert(table);
|
|
assert(table->get_rep());
|
|
assert(!pin || prefetch);
|
|
assert(uncompression_dict_reader);
|
|
|
|
CachableEntry<DecompressorDict> uncompression_dict;
|
|
if (prefetch || !use_cache) {
|
|
const Status s = ReadUncompressionDictionary(
|
|
table, prefetch_buffer, ro, use_cache, nullptr /* get_context */,
|
|
lookup_context, &uncompression_dict);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
if (use_cache && !pin) {
|
|
uncompression_dict.Reset();
|
|
}
|
|
}
|
|
|
|
uncompression_dict_reader->reset(
|
|
new UncompressionDictReader(table, std::move(uncompression_dict)));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status UncompressionDictReader::ReadUncompressionDictionary(
|
|
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
|
|
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
|
|
BlockCacheLookupContext* lookup_context,
|
|
CachableEntry<DecompressorDict>* uncompression_dict) {
|
|
// TODO: add perf counter for compression dictionary read time
|
|
|
|
assert(table);
|
|
assert(uncompression_dict);
|
|
assert(uncompression_dict->IsEmpty());
|
|
|
|
const BlockBasedTable::Rep* const rep = table->get_rep();
|
|
assert(rep);
|
|
assert(!rep->compression_dict_handle.IsNull());
|
|
|
|
const Status s = table->RetrieveBlock(
|
|
prefetch_buffer, read_options, rep->compression_dict_handle,
|
|
/* decomp */ nullptr, uncompression_dict, get_context, lookup_context,
|
|
/* for_compaction */ false, use_cache,
|
|
/* async_read */ false, /* use_block_cache_for_lookup */ true);
|
|
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_WARN(
|
|
rep->ioptions.logger,
|
|
"Encountered error while reading data from compression dictionary "
|
|
"block %s",
|
|
s.ToString().c_str());
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status UncompressionDictReader::GetOrReadUncompressionDictionary(
|
|
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
|
GetContext* get_context, BlockCacheLookupContext* lookup_context,
|
|
CachableEntry<DecompressorDict>* uncompression_dict) const {
|
|
assert(uncompression_dict);
|
|
|
|
if (!uncompression_dict_.IsEmpty()) {
|
|
uncompression_dict->SetUnownedValue(uncompression_dict_.GetValue());
|
|
return Status::OK();
|
|
}
|
|
|
|
return ReadUncompressionDictionary(table_, prefetch_buffer, ro,
|
|
cache_dictionary_blocks(), get_context,
|
|
lookup_context, uncompression_dict);
|
|
}
|
|
|
|
size_t UncompressionDictReader::ApproximateMemoryUsage() const {
|
|
assert(!uncompression_dict_.GetOwnValue() ||
|
|
uncompression_dict_.GetValue() != nullptr);
|
|
size_t usage = uncompression_dict_.GetOwnValue()
|
|
? uncompression_dict_.GetValue()->ApproximateMemoryUsage()
|
|
: 0;
|
|
|
|
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
|
|
usage += malloc_usable_size(const_cast<UncompressionDictReader*>(this));
|
|
#else
|
|
usage += sizeof(*this);
|
|
#endif // ROCKSDB_MALLOC_USABLE_SIZE
|
|
|
|
return usage;
|
|
}
|
|
|
|
bool UncompressionDictReader::cache_dictionary_blocks() const {
|
|
assert(table_);
|
|
assert(table_->get_rep());
|
|
|
|
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|