rocksdb/table/block_based/block_based_table_reader_impl.h
Josh Kang 901c88e37b Separate keys and values in data blocks (#14287)
Summary:
Introduce new table option with separated key-value storage in data blocks.

This PR implements a new SST block format where keys and values are stored in separate sections within data blocks, rather than interleaved. Keys are stored first, followed by all values in a contiguous section. The motivation is better cpu cache hit rate during seeks and potentially better compression.

The additional storage cost is a varint per restart point, and 4 bytes additional block footer. For a data block with a restart interval of 16, it is approximately 1 bit of overhead per entry. But compression actually performs better, resulting in ~3% storage savings from benchmark.

For now I've opted to not separate kvs in non-data blocks since restart interval for those blocks is typically 1, and values are typically small and probably better inlined.

### New block layout

```
+------------------+
| Keys Section     |  <- Key entries with delta encoding
+------------------+
| Values Section   |  <- (new) Values stored contiguously
+------------------+
| Restart Array    |  <- Fixed32 offsets to restart points
+------------------+
| Values Offset    |  <- (new) 4 bytes: offset to values section
| Footer           |  <- 4 bytes: packed index_type + num_restarts
+------------------+
```

### Entry Format

**At restart points**
```
+--------------+------------------+----------------+-----------------+-----------+
| shared (v32) | non_shared (v32) | value_sz (v32) | value_off (v32) | key_delta |
+--------------+------------------+----------------+-----------------+-----------+
```

**At non-restart points**
```
+--------------+------------------+----------------+-----------+
| shared (v32) | non_shared (v32) | value_sz (v32) | key_delta |
+--------------+------------------+----------------+-----------+
```

- `value_offset` is only stored at restart points to save space
- For non-restart entries, value offset is computed as: `prev_value_offset + prev_value_size`

### Forward Compatibility
- We make use of reserved block footer bits to mark if a block has separated kv format. Should an older version read this, it will assume a very large block restart interval and result in a corruption error.

### Key Changes
- **BlockBuilder**: Accumulates values in a separate buffer; value offsets are stored only at restart points (other entries derive offset from previous value's position). There is an additional memcpy cost to place the value data after the key data.
- **Block iteration**: Iteration now needs to know if we are at a restart point. This will rely on `cur_entry_idx_`, which was previously only used for per-kv checksum purposes. In this new format, we also need to know the block_restart_interval, which was previously also only calculated for per-kv checksums.
- **Table properties**: Store `data_block_restart_interval`, `index_block_restart_interval`, and `separate_key_value_in_data_block` in table properties

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14287

Test Plan:
- Extended block_test, table_test, compaction_test to contain new separated_kv param
- Added new parameter to crash test

 ---

## Benchmark

### Varying Value Size

**Write:** `db_bench --num=1000000 --separate_key_value_in_data_block=<bool> --value_size=<X> --benchmarks=fillrandom,compact`
**Read:** `db_bench --db=$DB --use_existing_db --benchmarks=readrandom,readseq`

| Value Size | FillRandom (ops/s) | | Compact (s) | | ReadRandom (ops/s) | | ReadSeq (ops/s) | | SST Size (MB) | |
|----------:|-------------------:|-----:|----------:|-----:|-------------------:|-----:|----------------:|-----:|-------------:|-----:|
| | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv |
| 1 | 253,280 | 264,977 | 0.516 | 0.497 (**-3.7%**) | 596,328 | 586,625 (**-1.6%**) | 5,904,681 | 6,069,581 (**+2.8%**) | 5.1 | 4.2 (**-18.6%**) |
| 16 | 235,757 | 242,367 | 0.572 | 0.533 (**-6.8%**) | 570,751 | 572,815 (**+0.4%**) | 5,371,138 | 5,604,908 (**+4.4%**) | 14.7 | 13.4 (**-8.5%**) |
| 100 | 264,299 | 265,427 | 0.461 | 0.454 (**-1.5%**) | 323,696 | 332,790 (**+2.8%**) | 4,239,725 | 4,232,416 (**-0.2%**) | 38.8 | 37.6 (**-3.2%**) |
| 1,000 | 238,992 | 242,764 | 2.349 | 2.329 (**-0.9%**) | 244,608 | 261,403 (**+6.9%**) | 1,285,394 | 1,265,868 (**-1.5%**) | 342.1 | 342.0 (**-0.0%**) |

### Varying Block Restart Interval

**Write:** `db_bench --num=1000000 --separate_key_value_in_data_block=<bool> --block_restart_interval=<X> --benchmarks=fillrandom,compact`
**Read:** `db_bench --db=$DB --use_existing_db --benchmarks=readrandom,readseq`

| BRI | FillRandom (ops/s) | | Compact (s) | | ReadRandom (ops/s) | | ReadSeq (ops/s) | | SST Size (MB) | |
|----:|-------------------:|-----:|----------:|-----:|-------------------:|-----:|----------------:|-----:|-------------:|-----:|
| | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv |
| 1 | 251,654 | 263,707 | 0.453 | 0.485 (**+7.1%**) | 334,653 | 328,708 (**-1.8%**) | 4,194,342 | 3,954,291 (**-5.7%**) | 40.6 | 42.4 (**+4.5%**) |
| 4 | 253,797 | 252,394 | 0.476 | 0.476 (**+0.0%**) | 332,719 | 341,676 (**+2.7%**) | 4,135,691 | 4,051,151 (**-2.0%**) | 39.2 | 39.3 (**+0.3%**) |
| 8 | 260,143 | 262,273 | 0.496 | 0.460 (**-7.3%**) | 330,859 | 337,567 (**+2.0%**) | 4,144,081 | 4,187,389 (**+1.0%**) | 38.9 | 38.1 (**-2.1%**) |
| 16 | 252,875 | 263,176 | 0.464 | 0.455 (**-1.9%**) | 323,783 | 335,418 (**+3.6%**) | 4,127,310 | 4,217,028 (**+2.2%**) | 38.8 | 37.6 (**-3.2%**) |
| 32 | 260,224 | 269,422 | 0.464 | 0.451 (**-2.8%**) | 304,001 | 314,989 (**+3.6%**) | 4,310,162 | 4,247,248 (**-1.5%**) | 38.8 | 37.3 (**-3.8%**) |

### Varying Compression

**Write:** `db_bench --num=1000000 --separate_key_value_in_data_block=<bool> --compression_type=<X> [--compression_level=<N>] --benchmarks=fillrandom,compact`
**Read:** `db_bench --db=$DB --use_existing_db --benchmarks=readrandom,readseq`

| Compression | FillRandom (ops/s) | | Compact (s) | | ReadRandom (ops/s) | | ReadSeq (ops/s) | | SST Size (MB) | |
|------------|-------------------:|-----:|----------:|-----:|-------------------:|-----:|----------------:|-----:|-------------:|-----:|
| | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv |
| None | 252,494 | 260,552 | 0.413 | 0.419 (**+1.5%**) | 356,290 | 371,535 (**+4.3%**) | 4,479,261 | 4,507,133 (**+0.6%**) | 73.5 | 73.6 (**+0.2%**) |
| LZ4 | 246,010 | 256,360 | 0.477 | 0.455 (**-4.6%**) | 342,497 | 345,882 (**+1.0%**) | 4,400,570 | 4,268,102 (**-3.0%**) | 38.3 | 37.6 (**-2.0%**) |
| ZSTD (L3) | 254,748 | 258,556 | 1.067 | 1.055 (**-1.1%**) | 176,724 | 177,566 (**+0.5%**) | 2,736,841 | 2,717,739 (**-0.7%**) | 32.9 | 31.3 (**-4.7%**) |
| ZSTD (L6) | 256,459 | 259,388 | 1.556 | 1.462 (**-6.0%**) | 177,390 | 176,691 (**-0.4%**) | 2,754,336 | 2,688,682 (**-2.4%**) | 32.8 | 31.1 (**-5.1%**) |

### Varying Block Size

**Write:** `db_bench --num=1000000 --separate_key_value_in_data_block=<bool> --block_size=<X> --benchmarks=fillrandom,compact`
**Read:** `db_bench --db=$DB --use_existing_db --benchmarks=readrandom,readseq`

| Block Size | FillRandom (ops/s) | | Compact (s) | | ReadRandom (ops/s) | | ReadSeq (ops/s) | | SST Size (MB) | |
|-----------:|-------------------:|-----:|----------:|-----:|-------------------:|-----:|----------------:|-----:|-------------:|-----:|
| | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv |
| 4 KB | 263,203 | 260,362 | 0.469 | 0.461 (**-1.7%**) | 324,178 | 332,249 (**+2.5%**) | 4,231,537 | 4,217,763 (**-0.3%**) | 38.8 | 37.6 (**-3.1%**) |
| 16 KB | 252,742 | 263,161 | 0.426 | 0.428 (**+0.5%**) | 227,805 | 222,873 (**-2.2%**) | 5,146,997 | 5,081,080 (**-1.3%**) | 38.1 | 36.7 (**-3.6%**) |
| 64 KB | 257,490 | 260,225 | 0.423 | 0.414 (**-2.1%**) | 86,807 | 91,586 (**+5.5%**) | 5,380,403 | 5,372,372 (**-0.1%**) | 36.3 | 35.0 (**-3.5%**) |

### Varying Key Size

**Write:** `db_bench --num=1000000 --separate_key_value_in_data_block=<bool> --min_key_size=10 --max_key_size=100 --benchmarks=fillrandom,compact`
**Read:** `db_bench --db=$DB --use_existing_db --benchmarks=readrandom,readseq`

| Key Size | FillRandom (ops/s) | | Compact (s) | | ReadRandom (ops/s) | | ReadSeq (ops/s) | | SST Size (MB) | |
|---------:|-------------------:|-----:|----------:|-----:|-------------------:|-----:|----------------:|-----:|-------------:|-----:|
| | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv | baseline | sep_kv |
| 10–100 | 243,740 | 255,183 | 0.618 | 0.622 (**+0.6%**) | 284,304 | 307,569 (**+8.2%**) | 3,738,921 | 3,686,676 (**-1.4%**) | 41.5 | 41.2 (**-0.8%**) |

### CPU Profile Notes
- No compression: DataBlock::SeekForGet uses less cpu (13.2% vs 13.9%)
  - https://fburl.com/strobelight/6mwwebft with separated KV
  - https://fburl.com/strobelight/m9m798ka without
- ZSTD compression: rocksdb::DecompressSerializedBlock uses more CPU (45.8% vs 44.9%), while DataBlock::SeekForGet uses less cpu (5.09% vs 6.52%)
  - https://fburl.com/strobelight/3x5nw1k4 with separated KV
  - https://fburl.com/strobelight/e7809046 without

 ---

Reviewed By: xingbowang, pdillinger

Differential Revision: D92103024

Pulled By: joshkang97

fbshipit-source-id: 47cfeb656ff3c20d34975f0b6c4c0462935a83dc
2026-02-23 12:42:05 -08:00

217 lines
8.1 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <type_traits>
#include "block.h"
#include "block_cache.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/reader_common.h"
// The file contains some member functions of BlockBasedTable that
// cannot be implemented in block_based_table_reader.cc because
// it's called by other files (e.g. block_based_iterator.h) and
// are templates.
namespace ROCKSDB_NAMESPACE {
namespace {
using IterPlaceholderCacheInterface =
PlaceholderCacheInterface<CacheEntryRole::kMisc>;
template <typename TBlockIter>
struct IterTraits {};
template <>
struct IterTraits<DataBlockIter> {
using IterBlocklike = Block_kData;
};
template <>
struct IterTraits<IndexBlockIter> {
using IterBlocklike = Block_kIndex;
};
} // namespace
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
Status& s, bool use_block_cache_for_lookup) const {
using IterBlocklike = typename IterTraits<TBlockIter>::IterBlocklike;
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
CachableEntry<Block> block;
{
CachableEntry<DecompressorDict> dict;
Decompressor* decomp = rep_->decompressor.get();
if (rep_->uncompression_dict_reader && block_type == BlockType::kData) {
// For async scans, don't use the prefetch buffer since an async prefetch
// might already be under way and this would invalidate it. Also, the
// uncompression dict is typically at the end of the file and would
// most likely break the sequentiality of the access pattern.
// Same is with auto_readahead_size. It iterates over index to lookup for
// data blocks. And this could break the the sequentiality of the access
// pattern.
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
((ro.async_io || ro.auto_readahead_size) ? nullptr : prefetch_buffer),
ro, get_context, lookup_context, &dict);
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(dict.GetValue());
if (dict.GetValue()) {
decomp = dict.GetValue()->decompressor_.get();
}
}
if (block_type == BlockType::kRangeDeletion) {
s = RetrieveBlock(prefetch_buffer, ro, handle, decomp,
&block.As<Block_kRangeDeletion>(), get_context,
lookup_context, for_compaction, /* use_cache */ true,
async_read, use_block_cache_for_lookup);
} else {
s = RetrieveBlock(
prefetch_buffer, ro, handle, decomp, &block.As<IterBlocklike>(),
get_context, lookup_context, for_compaction,
/* use_cache */ true, async_read, use_block_cache_for_lookup);
}
}
if (s.IsTryAgain() && async_read) {
return iter;
}
if (!s.ok()) {
assert(block.IsEmpty());
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
assert(block_type != BlockType::kData ||
block.GetValue()->HasSeparatedKV() ==
rep_->separate_key_value_in_data_block);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache) {
IterPlaceholderCacheInterface block_cache{
rep_->table_options.block_cache.get()};
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
CacheKey key =
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
s = block_cache.Insert(key.AsSlice(),
block.GetValue()->ApproximateMemoryUsage(),
&cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter,
Status s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
assert(block.GetValue()->HasSeparatedKV() ==
rep_->separate_key_value_in_data_block);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
iter, block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache) {
IterPlaceholderCacheInterface block_cache{
rep_->table_options.block_cache.get()};
if (block_cache) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle = nullptr;
CacheKey key =
CacheKey::CreateUniqueForCacheLifetime(block_cache.get());
s = block_cache.Insert(key.AsSlice(),
block.GetValue()->ApproximateMemoryUsage(),
&cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache.get(),
cache_handle);
}
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
} // namespace ROCKSDB_NAMESPACE