Summary: Adds new classes etc. in internal compression.h that are intended to become public APIs for supporting custom/pluggable compression. Some steps remain to allow for pluggable compression and to remove a lot of legacy code (e.g. now called `OLD_CompressData` and `OLD_UncompressData`), but this change refactors the key integration points of SST building and reading and compressed secondary cache over to the new APIs. Compared with the proposed https://github.com/facebook/rocksdb/issues/7650, this fixes a number of issues including * Making a clean divide between public and internal APIs (currently just indicated with comments) * Enough generality that built-in compressions generally fit into the framework rather than needing special treatment * Avoid exposing obnoxious idioms like `compress_format_version` to the user. * Enough generality that a compressor mixing algorithms/strategies from other compressors is pretty well supported without an extra schema layer * Explicit thread-safety contracts (carefully considered) * Contract details around schema compatibility and extension with code changes (more detail in next PR) * Customizable "working areas" (e.g. for ZSTD "context") * Decompression into an arbitrary memory location (rather than involving the decompressor in memory allocation; should facilitate reducing number of objects in block cache) Pull Request resolved: https://github.com/facebook/rocksdb/pull/13540 Test Plan: This is currently an internal refactor. More testing will come when the new API is migrated to the public API. A test in db_block_cache_test is updated to meaningfully cover a case (cache warming compression dictionary block) that was previously only covered in the crash test. SST write performance test, like https://github.com/facebook/rocksdb/issues/13583. Compile with CLANG, run before & after simultaneously: ``` SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=1 -compression_type=zstd" "-compression_parallel_threads=1 -compression_type=zstd -verify_compression=1" "-compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted): -compression_parallel_threads=1 -compression_type=none 1908372 -compression_parallel_threads=1 -compression_type=snappy 1926093 -compression_parallel_threads=1 -compression_type=zstd 1208259 -compression_parallel_threads=1 -compression_type=zstd -verify_compression=1 997583 -compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180 934246 -compression_parallel_threads=4 -compression_type=snappy 1644849 After: -compression_parallel_threads=1 -compression_type=none 1956054 (+2.5%) -compression_parallel_threads=1 -compression_type=snappy 1911433 (-0.8%) -compression_parallel_threads=1 -compression_type=zstd 1205668 (-0.3%) -compression_parallel_threads=1 -compression_type=zstd -verify_compression=1 999263 (+0.2%) -compression_parallel_threads=1 -compression_type=zstd -compression_max_dict_bytes=8180 934322 (+0.0%) -compression_parallel_threads=4 -compression_type=snappy 1642519 (-0.2%) Pretty neutral change(s) overall. SST read performance test (related to https://github.com/facebook/rocksdb/issues/13583). Set up: ``` for COMP in none snappy zstd; do echo $ARGS; ./db_bench -db=/dev/shm/dbbench-$COMP --benchmarks=fillseq,flush -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -compression_type=$COMP; done ``` Test (compile with CLANG, run before & after simultaneously): ``` for COMP in none snappy zstd; do echo $COMP; (for I in `seq 1 5`; do ./db_bench -readonly -db=/dev/shm/dbbench-$COMP --benchmarks=readrandom -num=10000000 -duration=20 -threads=8 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Before (this PR and with https://github.com/facebook/rocksdb/issues/13583 reverted): none 1495646 snappy 1172443 zstd 706036 zstd (after constructing with -compression_max_dict_bytes=8180) 656182 After: none 1494981 (-0.0%) snappy 1171846 (-0.1%) zstd 696363 (-1.4%) zstd (after constructing with -compression_max_dict_bytes=8180) 667585 (+1.7%) Pretty neutral. Reviewed By: hx235 Differential Revision: D74626863 Pulled By: pdillinger fbshipit-source-id: dc8ff3178da9b4eaa7c16aa1bb910c872afaf14a
204 lines
7.5 KiB
C++
204 lines
7.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
#include <type_traits>
|
|
|
|
#include "block.h"
|
|
#include "block_cache.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/reader_common.h"
|
|
|
|
// The file contains some member functions of BlockBasedTable that
|
|
// cannot be implemented in block_based_table_reader.cc because
|
|
// it's called by other files (e.g. block_based_iterator.h) and
|
|
// are templates.
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
namespace {
|
|
using IterPlaceholderCacheInterface =
|
|
PlaceholderCacheInterface<CacheEntryRole::kMisc>;
|
|
|
|
template <typename TBlockIter>
|
|
struct IterTraits {};
|
|
|
|
template <>
|
|
struct IterTraits<DataBlockIter> {
|
|
using IterBlocklike = Block_kData;
|
|
};
|
|
|
|
template <>
|
|
struct IterTraits<IndexBlockIter> {
|
|
using IterBlocklike = Block_kIndex;
|
|
};
|
|
|
|
} // namespace
|
|
|
|
// Convert an index iterator value (i.e., an encoded BlockHandle)
|
|
// into an iterator over the contents of the corresponding block.
|
|
// If input_iter is null, new a iterator
|
|
// If input_iter is not null, update this iter and return it
|
|
template <typename TBlockIter>
|
|
TBlockIter* BlockBasedTable::NewDataBlockIterator(
|
|
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
|
|
BlockType block_type, GetContext* get_context,
|
|
BlockCacheLookupContext* lookup_context,
|
|
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
|
|
Status& s, bool use_block_cache_for_lookup) const {
|
|
using IterBlocklike = typename IterTraits<TBlockIter>::IterBlocklike;
|
|
PERF_TIMER_GUARD(new_table_block_iter_nanos);
|
|
|
|
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
CachableEntry<Block> block;
|
|
{
|
|
CachableEntry<DecompressorDict> dict;
|
|
Decompressor* decomp = rep_->decompressor.get();
|
|
if (rep_->uncompression_dict_reader && block_type == BlockType::kData) {
|
|
// For async scans, don't use the prefetch buffer since an async prefetch
|
|
// might already be under way and this would invalidate it. Also, the
|
|
// uncompression dict is typically at the end of the file and would
|
|
// most likely break the sequentiality of the access pattern.
|
|
// Same is with auto_readahead_size. It iterates over index to lookup for
|
|
// data blocks. And this could break the the sequentiality of the access
|
|
// pattern.
|
|
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
|
((ro.async_io || ro.auto_readahead_size) ? nullptr : prefetch_buffer),
|
|
ro, get_context, lookup_context, &dict);
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
assert(dict.GetValue());
|
|
if (dict.GetValue()) {
|
|
decomp = dict.GetValue()->decompressor_.get();
|
|
}
|
|
}
|
|
s = RetrieveBlock(
|
|
prefetch_buffer, ro, handle, decomp, &block.As<IterBlocklike>(),
|
|
get_context, lookup_context, for_compaction,
|
|
/* use_cache */ true, async_read, use_block_cache_for_lookup);
|
|
}
|
|
|
|
if (s.IsTryAgain() && async_read) {
|
|
return iter;
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
assert(block.IsEmpty());
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
assert(block.GetValue() != nullptr);
|
|
|
|
// Block contents are pinned and it is still pinned after the iterator
|
|
// is destroyed as long as cleanup functions are moved to another object,
|
|
// when:
|
|
// 1. block cache handle is set to be released in cleanup function, or
|
|
// 2. it's pointing to immortal source. If own_bytes is true then we are
|
|
// not reading data from the original source, whether immortal or not.
|
|
// Otherwise, the block is pinned iff the source is immortal.
|
|
const bool block_contents_pinned =
|
|
block.IsCached() ||
|
|
(!block.GetValue()->own_bytes() && rep_->immortal_table);
|
|
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
|
|
block_contents_pinned);
|
|
|
|
if (!block.IsCached()) {
|
|
if (!ro.fill_cache) {
|
|
IterPlaceholderCacheInterface block_cache{
|
|
rep_->table_options.block_cache.get()};
|
|
if (block_cache) {
|
|
// insert a dummy record to block cache to track the memory usage
|
|
Cache::Handle* cache_handle = nullptr;
|
|
CacheKey key =
|
|
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
|
|
s = block_cache.Insert(key.AsSlice(),
|
|
block.GetValue()->ApproximateMemoryUsage(),
|
|
&cache_handle);
|
|
|
|
if (s.ok()) {
|
|
assert(cache_handle != nullptr);
|
|
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
|
|
cache_handle);
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
iter->SetCacheHandle(block.GetCacheHandle());
|
|
}
|
|
|
|
block.TransferTo(iter);
|
|
|
|
return iter;
|
|
}
|
|
|
|
// Convert an uncompressed data block (i.e CachableEntry<Block>)
|
|
// into an iterator over the contents of the corresponding block.
|
|
// If input_iter is null, new a iterator
|
|
// If input_iter is not null, update this iter and return it
|
|
template <typename TBlockIter>
|
|
TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
|
|
CachableEntry<Block>& block,
|
|
TBlockIter* input_iter,
|
|
Status s) const {
|
|
PERF_TIMER_GUARD(new_table_block_iter_nanos);
|
|
|
|
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
|
|
if (!s.ok()) {
|
|
iter->Invalidate(s);
|
|
return iter;
|
|
}
|
|
|
|
assert(block.GetValue() != nullptr);
|
|
// Block contents are pinned and it is still pinned after the iterator
|
|
// is destroyed as long as cleanup functions are moved to another object,
|
|
// when:
|
|
// 1. block cache handle is set to be released in cleanup function, or
|
|
// 2. it's pointing to immortal source. If own_bytes is true then we are
|
|
// not reading data from the original source, whether immortal or not.
|
|
// Otherwise, the block is pinned iff the source is immortal.
|
|
const bool block_contents_pinned =
|
|
block.IsCached() ||
|
|
(!block.GetValue()->own_bytes() && rep_->immortal_table);
|
|
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
|
|
iter, block_contents_pinned);
|
|
|
|
if (!block.IsCached()) {
|
|
if (!ro.fill_cache) {
|
|
IterPlaceholderCacheInterface block_cache{
|
|
rep_->table_options.block_cache.get()};
|
|
if (block_cache) {
|
|
// insert a dummy record to block cache to track the memory usage
|
|
Cache::Handle* cache_handle = nullptr;
|
|
CacheKey key =
|
|
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
|
|
s = block_cache.Insert(key.AsSlice(),
|
|
block.GetValue()->ApproximateMemoryUsage(),
|
|
&cache_handle);
|
|
|
|
if (s.ok()) {
|
|
assert(cache_handle != nullptr);
|
|
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
|
|
cache_handle);
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
iter->SetCacheHandle(block.GetCacheHandle());
|
|
}
|
|
|
|
block.TransferTo(iter);
|
|
return iter;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|