rocksdb/table/block_based/block_based_table_reader_impl.h
Peter Dillinger 7c9b580681 Big refactor for preliminary custom compression API (#13540)
Summary:
Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs.

Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including
* Making a clean divide between public and internal APIs (currently just indicated with comments)
* Enough generality that built-in compressions generally fit into the framework rather than needing special treatment
* Avoid exposing obnoxious idioms like `compress_format_version` to the user.
* Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer
* Explicit thread-safety contracts (carefully considered)
* Contract details around schema compatibility and extension with code changes (more detail in next PR)
* Customizable "working areas" (e.g. for ZSTD "context")
* Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540

Test Plan:
This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test.

SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously:

```
SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
-compression_parallel_threads=1 -compression_type=none
1908372
-compression_parallel_threads=1 -compression_type=snappy
1926093
-compression_parallel_threads=1 -compression_type=zstd
1208259
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
997583
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934246
-compression_parallel_threads=4 -compression_type=snappy
1644849

After:
-compression_parallel_threads=1 -compression_type=none
1956054 (+2.5%)
-compression_parallel_threads=1 -compression_type=snappy
1911433 (-0.8%)
-compression_parallel_threads=1 -compression_type=zstd
1205668 (-0.3%)
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
999263 (+0.2%)
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934322 (+0.0%)
-compression_parallel_threads=4 -compression_type=snappy
1642519 (-0.2%)

Pretty neutral change(s) overall.

SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up:
```
for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done
```
Test (compile with CLANG, run before & after simultaneously):
```
for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
none
1495646
snappy
1172443
zstd
706036
zstd (after constructing with -compression_max_dict_bytes=8180)
656182

After:
none
1494981 (-0.0%)
snappy
1171846 (-0.1%)
zstd
696363 (-1.4%)
zstd (after constructing with -compression_max_dict_bytes=8180)
667585 (+1.7%)

Pretty neutral.

Reviewed By: hx235

Differential Revision: D74626863

Pulled By: pdillinger

fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
2025-05-15 17:14:23 -07:00

204 lines
7.5 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <type_traits>
#include "block.h"
#include "block_cache.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/reader_common.h"
// The file contains some member functions of BlockBasedTable that
// cannot be implemented in block_based_table_reader.cc because
// it's called by other files (e.g. block_based_iterator.h) and
// are templates.
namespace ROCKSDB_NAMESPACE {
namespace {
using IterPlaceholderCacheInterface =
PlaceholderCacheInterface<CacheEntryRole::kMisc>;
template <typename TBlockIter>
struct IterTraits {};
template <>
struct IterTraits<DataBlockIter> {
using IterBlocklike = Block_kData;
};
template <>
struct IterTraits<IndexBlockIter> {
using IterBlocklike = Block_kIndex;
};
} // namespace
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
Status& s, bool use_block_cache_for_lookup) const {
using IterBlocklike = typename IterTraits<TBlockIter>::IterBlocklike;
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
CachableEntry<Block> block;
{
CachableEntry<DecompressorDict> dict;
Decompressor* decomp = rep_->decompressor.get();
if (rep_->uncompression_dict_reader && block_type == BlockType::kData) {
// For async scans, don't use the prefetch buffer since an async prefetch
// might already be under way and this would invalidate it. Also, the
// uncompression dict is typically at the end of the file and would
// most likely break the sequentiality of the access pattern.
// Same is with auto_readahead_size. It iterates over index to lookup for
// data blocks. And this could break the the sequentiality of the access
// pattern.
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
((ro.async_io || ro.auto_readahead_size) ? nullptr : prefetch_buffer),
ro, get_context, lookup_context, &dict);
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(dict.GetValue());
if (dict.GetValue()) {
decomp = dict.GetValue()->decompressor_.get();
}
}
s = RetrieveBlock(
prefetch_buffer, ro, handle, decomp, &block.As<IterBlocklike>(),
get_context, lookup_context, for_compaction,
/* use_cache */ true, async_read, use_block_cache_for_lookup);
}
if (s.IsTryAgain() && async_read) {
return iter;
}
if (!s.ok()) {
assert(block.IsEmpty());
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache) {
IterPlaceholderCacheInterface block_cache{
rep_->table_options.block_cache.get()};
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
CacheKey key =
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
s = block_cache.Insert(key.AsSlice(),
block.GetValue()->ApproximateMemoryUsage(),
&cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter,
Status s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
iter, block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache) {
IterPlaceholderCacheInterface block_cache{
rep_->table_options.block_cache.get()};
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
CacheKey key =
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
s = block_cache.Insert(key.AsSlice(),
block.GetValue()->ApproximateMemoryUsage(),
&cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
} // namespace ROCKSDB_NAMESPACE