rocksdb/table/block_based/partitioned_index_reader.cc
Peter Dillinger 7c9b580681 Big refactor for preliminary custom compression API (#13540)
Summary:
Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs.

Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including
* Making a clean divide between public and internal APIs (currently just indicated with comments)
* Enough generality that built-in compressions generally fit into the framework rather than needing special treatment
* Avoid exposing obnoxious idioms like `compress_format_version` to the user.
* Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer
* Explicit thread-safety contracts (carefully considered)
* Contract details around schema compatibility and extension with code changes (more detail in next PR)
* Customizable "working areas" (e.g. for ZSTD "context")
* Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540

Test Plan:
This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test.

SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously:

```
SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
-compression_parallel_threads=1 -compression_type=none
1908372
-compression_parallel_threads=1 -compression_type=snappy
1926093
-compression_parallel_threads=1 -compression_type=zstd
1208259
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
997583
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934246
-compression_parallel_threads=4 -compression_type=snappy
1644849

After:
-compression_parallel_threads=1 -compression_type=none
1956054 (+2.5%)
-compression_parallel_threads=1 -compression_type=snappy
1911433 (-0.8%)
-compression_parallel_threads=1 -compression_type=zstd
1205668 (-0.3%)
-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1
999263 (+0.2%)
-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180
934322 (+0.0%)
-compression_parallel_threads=4 -compression_type=snappy
1642519 (-0.2%)

Pretty neutral change(s) overall.

SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up:
```
for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done
```
Test (compile with CLANG, run before & after simultaneously):
```
for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done
```

Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted):
none
1495646
snappy
1172443
zstd
706036
zstd (after constructing with -compression_max_dict_bytes=8180)
656182

After:
none
1494981 (-0.0%)
snappy
1171846 (-0.1%)
zstd
696363 (-1.4%)
zstd (after constructing with -compression_max_dict_bytes=8180)
667585 (+1.7%)

Pretty neutral.

Reviewed By: hx235

Differential Revision: D74626863

Pulled By: pdillinger

fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
2025-05-15 17:14:23 -07:00

264 lines
9.9 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/partitioned_index_reader.h"
#include "block_cache.h"
#include "file/random_access_file_reader.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/partitioned_index_iterator.h"
namespace ROCKSDB_NAMESPACE {
Status PartitionIndexReader::Create(
const BlockBasedTable* table, const ReadOptions& ro,
FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
bool pin, BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader) {
assert(table != nullptr);
assert(table->get_rep());
assert(!pin || prefetch);
assert(index_reader != nullptr);
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ro, use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
index_block.Reset();
}
}
index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
return Status::OK();
}
InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(get_context, lookup_context,
&index_block, read_options);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
return iter;
}
return NewErrorInternalIterator<IndexValue>(s);
}
const BlockBasedTable::Rep* rep = table()->rep_;
InternalIteratorBase<IndexValue>* it = nullptr;
Statistics* kNullStats = nullptr;
// Filters are already checked before seeking the index
if (!partition_map_.empty()) {
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
it = NewTwoLevelIterator(
new BlockBasedTable::PartitionedIndexIteratorState(table(),
&partition_map_),
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false /* block_contents_pinned */,
user_defined_timestamps_persisted()));
} else {
ReadOptions ro{read_options};
// FIXME? Possible regression seen in prefetch_test if this field is
// propagated
ro.readahead_size = ReadOptions{}.readahead_size;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false /* block_contents_pinned */,
user_defined_timestamps_persisted()));
it = new PartitionedIndexIterator(
table(), ro, *internal_comparator(), std::move(index_iter),
lookup_context ? lookup_context->caller
: TableReaderCaller::kUncategorized);
}
assert(it != nullptr);
index_block.TransferTo(it);
return it;
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack BlockIter while the state is on heap. Currentlly it assumes
// the first level iter is always on heap and will attempt to delete it
// in its destructor.
}
Status PartitionIndexReader::CacheDependencies(
const ReadOptions& ro, bool pin, FilePrefetchBuffer* tail_prefetch_buffer) {
if (!partition_map_.empty()) {
// The dependencies are already cached since `partition_map_` is filled in
// an all-or-nothing manner.
return Status::OK();
}
// Before read partitions, prefetch them to avoid lots of IOs
BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
const BlockBasedTable::Rep* rep = table()->rep_;
IndexBlockIter biter;
BlockHandle handle;
Statistics* kNullStats = nullptr;
CachableEntry<Block> index_block;
{
Status s = GetOrReadIndexBlock(nullptr /* get_context */, &lookup_context,
&index_block, ro);
if (!s.ok()) {
return s;
}
}
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
index_block.GetValue()->NewIndexIterator(
internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
false /* block_contents_pinned */, user_defined_timestamps_persisted());
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
if (!biter.Valid()) {
// Empty index.
return biter.status();
}
handle = biter.value().handle;
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
if (!biter.Valid()) {
// Empty index.
return biter.status();
}
handle = biter.value().handle;
uint64_t last_off =
handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
/*readaheadsize_cb*/ nullptr,
/*usage=*/FilePrefetchBufferUsage::kUnknown);
IOOptions opts;
{
Status s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
}
if (!s.ok()) {
return s;
}
}
}
// For saving "all or nothing" to partition_map_
UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress;
// After prefetch, read the partitions one by one
biter.SeekToFirst();
size_t partition_count = 0;
for (; biter.Valid(); biter.Next()) {
handle = biter.value().handle;
CachableEntry<Block> block;
++partition_count;
// TODO: Support counter batch update for partitioned index and
// filter blocks
Status s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro,
handle, rep->decompressor.get(),
/*for_compaction=*/false, &block.As<Block_kIndex>(),
/*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr,
/*async_read=*/false, /*use_block_cache_for_lookup=*/true);
if (!s.ok()) {
return s;
}
if (block.GetValue() != nullptr) {
// Might need to "pin" some mmap-read blocks (GetOwnValue) if some
// partitions are successfully compressed (cached) and some are not
// compressed (mmap eligible)
if (block.IsCached() || block.GetOwnValue()) {
if (pin) {
map_in_progress[handle.offset()] = std::move(block);
}
}
}
}
Status s = biter.status();
// Save (pin) them only if everything checks out
if (map_in_progress.size() == partition_count && s.ok()) {
std::swap(partition_map_, map_in_progress);
}
return s;
}
void PartitionIndexReader::EraseFromCacheBeforeDestruction(
uint32_t uncache_aggressiveness) {
// NOTE: essentially a copy of
// PartitionedFilterBlockReader::EraseFromCacheBeforeDestruction
if (uncache_aggressiveness > 0) {
CachableEntry<Block> top_level_block;
ReadOptions ro_no_io;
ro_no_io.read_tier = ReadTier::kBlockCacheTier;
GetOrReadIndexBlock(/*get_context=*/nullptr,
/*lookup_context=*/nullptr, &top_level_block, ro_no_io)
.PermitUncheckedError();
if (!partition_map_.empty()) {
// All partitions present if any
for (auto& e : partition_map_) {
e.second.ResetEraseIfLastRef();
}
} else if (!top_level_block.IsEmpty()) {
IndexBlockIter biter;
const InternalKeyComparator* const comparator = internal_comparator();
Statistics* kNullStats = nullptr;
top_level_block.GetValue()->NewIndexIterator(
comparator->user_comparator(),
table()->get_rep()->get_global_seqno(BlockType::kIndex), &biter,
kNullStats, true /* total_order_seek */, index_has_first_key(),
index_key_includes_seq(), index_value_is_full(),
false /* block_contents_pinned */,
user_defined_timestamps_persisted());
UncacheAggressivenessAdvisor advisor(uncache_aggressiveness);
for (biter.SeekToFirst(); biter.Valid() && advisor.ShouldContinue();
biter.Next()) {
bool erased = table()->EraseFromCache(biter.value().handle);
advisor.Report(erased);
}
biter.status().PermitUncheckedError();
}
top_level_block.ResetEraseIfLastRef();
}
// Might be needed to un-cache a pinned top-level block
BlockBasedTable::IndexReaderCommon::EraseFromCacheBeforeDestruction(
uncache_aggressiveness);
}
} // namespace ROCKSDB_NAMESPACE