rocksdb/table/block_based/block_based_table_builder.cc
Peter Dillinger 78c83ac1ec Publish/support format_version=7, related enhancements (#13713)
Summary:
* Make new format_version=7 a supported setting.
* Fix a bug in compressed_secondary_cache.cc that is newly exercised by custom compression types and showing up in crash test with tiered secondary cache
* Small change to handling of disabled compression in fv=7: use empty compression manager compatibility name.
* Get rid of GetDefaultBuiltinCompressionManager() in public API because it could cause unexpected+unsafe schema change on a user's CompressionManager if built upon the default built-in manager and we add a new built-in schema. Now must be referenced by explicit compression schema version in the public API. (That notion was already exposed in compressed secondary cache API, for better or worse.)
* Improve some error messages for compression misconfiguration
* Improve testing with ObjectLibrary and CompressionManagers
* Improve testing of compression_name table property in BlockBasedTableTest.BlockBasedTableProperties2
* Improve some comments

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13713

Test Plan: existing and updated tests. Notably, the crash test has already been running with (unpublished) format_version=7

Reviewed By: mszeszko-meta, hx235

Differential Revision: D77035482

Pulled By: pdillinger

fbshipit-source-id: 95278de8734a79706a22361bff2184b1edb230ca
2025-06-20 17:39:47 -07:00

2356 lines
91 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/block_based_table_builder.h"
#include <atomic>
#include <cassert>
#include <cstdio>
#include <list>
#include <map>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
#include "block_cache.h"
#include "cache/cache_entry_roles.h"
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
#include "logging/logging.h"
#include "memory/memory_allocator_impl.h"
#include "options/options_helper.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/table.h"
#include "rocksdb/types.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h"
#include "table/block_based/partitioned_filter_block.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/work_queue.h"
namespace ROCKSDB_NAMESPACE {
extern const std::string kHashIndexPrefixesBlock;
extern const std::string kHashIndexPrefixesMetadataBlock;
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
// Create a filter block builder based on its type.
FilterBlockBuilder* CreateFilterBlockBuilder(
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
const FilterBuildingContext& context,
const bool use_delta_encoding_for_index_values,
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
const bool persist_user_defined_timestamps) {
const BlockBasedTableOptions& table_opt = context.table_options;
assert(table_opt.filter_policy); // precondition
FilterBitsBuilder* filter_bits_builder =
BloomFilterPolicy::GetBuilderFromContext(context);
if (filter_bits_builder == nullptr) {
return nullptr;
} else {
if (table_opt.partition_filters) {
assert(p_index_builder != nullptr);
// Since after partition cut request from filter builder it takes time
// until index builder actully cuts the partition, until the end of a
// data block potentially with many keys, we take the lower bound as
// partition size.
assert(table_opt.block_size_deviation <= 100);
auto partition_size =
static_cast<uint32_t>(((table_opt.metadata_block_size *
(100 - table_opt.block_size_deviation)) +
99) /
100);
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
return new PartitionedFilterBlockBuilder(
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
filter_bits_builder, table_opt.index_block_restart_interval,
use_delta_encoding_for_index_values, p_index_builder, partition_size,
ts_sz, persist_user_defined_timestamps,
table_opt.decouple_partitioned_filters);
} else {
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
table_opt.whole_key_filtering,
filter_bits_builder);
}
}
}
} // namespace
// kBlockBasedTableMagicNumber was picked by running
// echo rocksdb.table.block_based | sha1sum
// and taking the leading 64 bits.
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
// .cc files
// for that reason we declare it extern in the header but to get the space
// allocated
// it must be not extern in one place.
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
// We also support reading and writing legacy block based table format (for
// backwards compatibility)
const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
// A collector that collects properties of interest to block-based table.
// For now this class looks heavy-weight since we only write one additional
// property.
// But in the foreseeable future, we will add more and more properties that are
// specific to block-based table.
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public InternalTblPropColl {
public:
explicit BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
bool prefix_filtering, bool decoupled_partitioned_filters)
: index_type_(index_type),
whole_key_filtering_(whole_key_filtering),
prefix_filtering_(prefix_filtering),
decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
uint64_t /*file_size*/) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
void BlockAdd(uint64_t /* block_uncomp_bytes */,
uint64_t /* block_compressed_bytes_fast */,
uint64_t /* block_compressed_bytes_slow */) override {
// Intentionally left blank. No interest in collecting stats for
// blocks.
}
Status Finish(UserCollectedProperties* properties) override {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
whole_key_filtering_ ? kPropTrue : kPropFalse});
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
prefix_filtering_ ? kPropTrue : kPropFalse});
if (decoupled_partitioned_filters_) {
properties->insert(
{BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
kPropTrue});
}
return Status::OK();
}
// The name of the properties collector can be used for debugging purpose.
const char* Name() const override {
return "BlockBasedTablePropertiesCollector";
}
UserCollectedProperties GetReadableProperties() const override {
// Intentionally left blank.
return UserCollectedProperties();
}
private:
BlockBasedTableOptions::IndexType index_type_;
bool whole_key_filtering_;
bool prefix_filtering_;
bool decoupled_partitioned_filters_;
};
struct BlockBasedTableBuilder::WorkingAreaPair {
Compressor::ManagedWorkingArea compress;
Decompressor::ManagedWorkingArea verify;
};
struct BlockBasedTableBuilder::ParallelCompressionRep {
// TODO: consider replacing with autovector or similar
// Keys is a wrapper of vector of strings avoiding
// releasing string memories during vector clear()
// in order to save memory allocation overhead
class Keys {
public:
Keys() : keys_(kKeysInitSize), size_(0) {}
void PushBack(const Slice& key) {
if (size_ == keys_.size()) {
keys_.emplace_back(key.data(), key.size());
} else {
keys_[size_].assign(key.data(), key.size());
}
size_++;
}
void SwapAssign(std::vector<std::string>& keys) {
size_ = keys.size();
std::swap(keys_, keys);
}
void Clear() { size_ = 0; }
size_t Size() { return size_; }
std::string& Back() { return keys_[size_ - 1]; }
std::string& operator[](size_t idx) {
assert(idx < size_);
return keys_[idx];
}
private:
static constexpr size_t kKeysInitSize = 32;
std::vector<std::string> keys_;
size_t size_;
};
Keys curr_block_keys;
struct BlockRep;
// Use BlockRepSlot to keep block order in write thread.
// slot_ will pass references to BlockRep
class BlockRepSlot {
public:
BlockRepSlot() : slot_(1) {}
template <typename T>
void Fill(T&& rep) {
slot_.push(std::forward<T>(rep));
}
void Take(BlockRep*& rep) { slot_.pop(rep); }
private:
// slot_ will pass references to BlockRep in block_rep_buf,
// and those references are always valid before the destruction of
// block_rep_buf.
WorkQueue<BlockRep*> slot_;
};
// BlockRep instances are fetched from and recycled to
// block_rep_pool during parallel compression.
struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
// Uncompressed block contents
std::string uncompressed;
std::string compressed;
CompressionType compression_type = kNoCompression;
// For efficiency, the std::string is repeatedly overwritten without
// checking for "has no value". Only at the end of its life will it be
// assigned "no value". Thus, it needs to start with a value.
std::optional<std::string> first_key_in_next_block = std::string{};
Keys keys;
BlockRepSlot slot;
Status status;
};
// Use a vector of BlockRep as a buffer for a determined number
// of BlockRep structures. All data referenced by pointers in
// BlockRep will be freed when this vector is destructed.
using BlockRepBuffer = std::vector<BlockRep>;
BlockRepBuffer block_rep_buf;
// Use a thread-safe queue for concurrent access from block
// building thread and writer thread.
using BlockRepPool = WorkQueue<BlockRep*>;
BlockRepPool block_rep_pool;
// Compression queue will pass references to BlockRep in block_rep_buf,
// and those references are always valid before the destruction of
// block_rep_buf.
using CompressQueue = WorkQueue<BlockRep*>;
CompressQueue compress_queue;
std::vector<port::Thread> compress_thread_pool;
// Write queue will pass references to BlockRep::slot in block_rep_buf,
// and those references are always valid before the corresponding
// BlockRep::slot is destructed, which is before the destruction of
// block_rep_buf.
using WriteQueue = WorkQueue<BlockRepSlot*>;
WriteQueue write_queue;
std::unique_ptr<port::Thread> write_thread;
// Estimate output file size when parallel compression is enabled. This is
// necessary because compression & flush are no longer synchronized,
// and BlockBasedTableBuilder::FileSize() is no longer accurate.
// memory_order_relaxed suffices because accurate statistics is not required.
class FileSizeEstimator {
public:
explicit FileSizeEstimator()
: uncomp_bytes_compressed(0),
uncomp_bytes_curr_block(0),
uncomp_bytes_curr_block_set(false),
uncomp_bytes_inflight(0),
blocks_inflight(0),
curr_compression_ratio(0),
estimated_file_size(0) {}
// Estimate file size when a block is about to be emitted to
// compression thread
void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) {
uint64_t new_uncomp_bytes_inflight =
uncomp_bytes_inflight.fetch_add(uncomp_block_size,
std::memory_order_relaxed) +
uncomp_block_size;
uint64_t new_blocks_inflight =
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
estimated_file_size.store(
curr_file_size +
static_cast<uint64_t>(
static_cast<double>(new_uncomp_bytes_inflight) *
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
}
// Estimate file size when a block is already reaped from
// compression thread
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
assert(uncomp_bytes_curr_block_set);
uint64_t new_uncomp_bytes_compressed =
uncomp_bytes_compressed + uncomp_bytes_curr_block;
assert(new_uncomp_bytes_compressed > 0);
curr_compression_ratio.store(
(curr_compression_ratio.load(std::memory_order_relaxed) *
uncomp_bytes_compressed +
compressed_block_size) /
static_cast<double>(new_uncomp_bytes_compressed),
std::memory_order_relaxed);
uncomp_bytes_compressed = new_uncomp_bytes_compressed;
uint64_t new_uncomp_bytes_inflight =
uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block,
std::memory_order_relaxed) -
uncomp_bytes_curr_block;
uint64_t new_blocks_inflight =
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
estimated_file_size.store(
curr_file_size +
static_cast<uint64_t>(
static_cast<double>(new_uncomp_bytes_inflight) *
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
uncomp_bytes_curr_block_set = false;
}
void SetEstimatedFileSize(uint64_t size) {
estimated_file_size.store(size, std::memory_order_relaxed);
}
uint64_t GetEstimatedFileSize() {
return estimated_file_size.load(std::memory_order_relaxed);
}
void SetCurrBlockUncompSize(uint64_t size) {
uncomp_bytes_curr_block = size;
uncomp_bytes_curr_block_set = true;
}
private:
// Input bytes compressed so far.
uint64_t uncomp_bytes_compressed;
// Size of current block being appended.
uint64_t uncomp_bytes_curr_block;
// Whether uncomp_bytes_curr_block has been set for next
// ReapBlock call.
bool uncomp_bytes_curr_block_set;
// Input bytes under compression and not appended yet.
std::atomic<uint64_t> uncomp_bytes_inflight;
// Number of blocks under compression and not appended yet.
std::atomic<uint64_t> blocks_inflight;
// Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock.
std::atomic<double> curr_compression_ratio;
// Estimated SST file size.
std::atomic<uint64_t> estimated_file_size;
};
FileSizeEstimator file_size_estimator;
// Facilities used for waiting first block completion. Need to Wait for
// the completion of first block compression and flush to get a non-zero
// compression ratio.
std::atomic<bool> first_block_processed;
std::condition_variable first_block_cond;
std::mutex first_block_mutex;
explicit ParallelCompressionRep(uint32_t parallel_threads)
: block_rep_buf(parallel_threads),
block_rep_pool(parallel_threads),
compress_queue(parallel_threads),
write_queue(parallel_threads),
first_block_processed(false) {
for (uint32_t i = 0; i < parallel_threads; i++) {
// Prime the queue of available BlockReps
block_rep_pool.push(&block_rep_buf[i]);
}
}
~ParallelCompressionRep() { block_rep_pool.finish(); }
// Make a block prepared to be emitted to compression thread
// Used in non-buffered mode
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
BlockBuilder* data_block) {
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
assert(block_rep != nullptr);
data_block->SwapAndReset(block_rep->uncompressed);
std::swap(block_rep->keys, curr_block_keys);
curr_block_keys.Clear();
return block_rep;
}
// Used in EnterUnbuffered
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
std::string* data_block,
std::vector<std::string>* keys) {
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
assert(block_rep != nullptr);
std::swap(block_rep->uncompressed, *data_block);
block_rep->keys.SwapAssign(*keys);
return block_rep;
}
// Emit a block to compression thread
void EmitBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
assert(block_rep->status.ok());
if (!write_queue.push(&block_rep->slot)) {
return;
}
if (!compress_queue.push(block_rep)) {
return;
}
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(first_block_mutex);
first_block_cond.wait(lock, [this] {
return first_block_processed.load(std::memory_order_relaxed);
});
}
}
// Reap a block from compression thread
void ReapBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
block_rep->compressed.clear();
block_rep_pool.push(block_rep);
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::lock_guard<std::mutex> lock(first_block_mutex);
first_block_processed.store(true, std::memory_order_relaxed);
first_block_cond.notify_one();
}
}
private:
BlockRep* PrepareBlockInternal(const Slice* first_key_in_next_block) {
BlockRep* block_rep = nullptr;
block_rep_pool.pop(block_rep);
assert(block_rep != nullptr);
block_rep->compression_type = kNoCompression;
if (first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block = {};
} else {
block_rep->first_key_in_next_block->assign(
first_key_in_next_block->data(), first_key_in_next_block->size());
}
return block_rep;
}
};
struct BlockBasedTableBuilder::Rep {
const ImmutableOptions ioptions;
// BEGIN from MutableCFOptions
std::shared_ptr<const SliceTransform> prefix_extractor;
// END from MutableCFOptions
const WriteOptions write_options;
const BlockBasedTableOptions table_options;
const InternalKeyComparator& internal_comparator;
// Size in bytes for the user-defined timestamps.
size_t ts_sz;
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
// user key will be stripped when creating the block based table. This
// stripping happens for all user keys, including the keys in data block,
// index block for data block, index block for index block (if index type is
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
// deletion entries.
// As long as the user keys are sorted when added via `Add` API, their logic
// ordering won't change after timestamps are stripped. However, for each user
// key to be logically equivalent before and after timestamp is stripped, the
// user key should contain the minimum timestamp.
bool persist_user_defined_timestamps;
WritableFileWriter* file;
std::atomic<uint64_t> offset;
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks to replay later. Needed when
// compression dictionary is enabled so we can finalize the dictionary before
// compressing any data blocks.
std::vector<std::string> data_block_buffers;
BlockBuilder range_del_block;
InternalKeySliceTransform internal_prefix_transform;
std::unique_ptr<IndexBuilder> index_builder;
std::string index_separator_scratch;
PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_ikey; // Internal key or empty (unset)
const Slice* first_key_in_next_block = nullptr;
bool warm_cache = false;
bool uses_explicit_compression_manager = false;
uint64_t sample_for_compression;
std::atomic<uint64_t> compressible_input_data_bytes;
std::atomic<uint64_t> uncompressible_input_data_bytes;
std::atomic<uint64_t> sampled_input_data_bytes;
std::atomic<uint64_t> sampled_output_slow_data_bytes;
std::atomic<uint64_t> sampled_output_fast_data_bytes;
uint32_t compression_parallel_threads;
int max_compressed_bytes_per_kb;
size_t max_dict_sample_bytes = 0;
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
// *** these are distinct fields to minimize extra conditionals and ***
// *** field reads on hot code paths. ***
// A compressor for blocks in general, without dictionary compression
std::unique_ptr<Compressor> basic_compressor;
// A compressor using dictionary compression (when applicable)
std::unique_ptr<Compressor> compressor_with_dict;
// Once configured/determined, points to one of the above Compressors to
// use on data blocks.
Compressor* data_block_compressor = nullptr;
// A decompressor corresponding to basic_compressor (when non-nullptr).
// Used for verification and cache warming.
std::shared_ptr<Decompressor> basic_decompressor;
// When needed, a decompressor for verifying compression using a
// dictionary sampled/trained from this file.
std::unique_ptr<Decompressor> verify_decompressor_with_dict;
// When non-nullptr, compression should be verified with this corresponding
// decompressor, except for data blocks. (Points to same as basic_decompressor
// when verify_compression is set.)
UnownedPtr<Decompressor> verify_decompressor;
// Once configured/determined, points to one of the above Decompressors to use
// in verifying data blocks.
UnownedPtr<Decompressor> data_block_verify_decompressor;
// Set of compression types used for blocks in this file (mixing compression
// algorithms in a single file is allowed, using a CompressionManager)
SmallEnumSet<CompressionType, kDisableCompressionOption>
compression_types_used;
// Working area for basic_compressor when compression_parallel_threads==1
WorkingAreaPair basic_working_area;
// Working areas for data_block_compressor, for each of
// compression_parallel_threads
std::vector<WorkingAreaPair> data_block_working_areas;
size_t data_begin_offset = 0;
TableProperties props;
// States of the builder.
//
// - `kBuffered`: This is the initial state where zero or more data blocks are
// accumulated uncompressed in-memory. From this state, call
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
// state.
//
// - `kUnbuffered`: This is the state when compression dictionary is finalized
// either because it wasn't enabled in the first place or it's been created
// from sampling previously buffered data. In this state, blocks are simply
// compressed/written out as they fill up. From this state, call `Finish()`
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
// the partially created file.
//
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
// called, so the table builder is no longer usable. We must be in this
// state by the time the destructor runs.
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};
State state = State::kUnbuffered;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit = 0;
std::shared_ptr<CacheReservationManager>
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
OffsetableCacheKey base_cache_key;
const TableFileCreationReason reason;
BlockHandle pending_handle; // Handle to add to index block
std::string single_threaded_compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
std::unique_ptr<ParallelCompressionRep> pc_rep;
BlockCreateContext create_context;
// The size of the "tail" part of a SST file. "Tail" refers to
// all blocks after data blocks till the end of the SST file.
uint64_t tail_size;
// The total size of all blocks in this file before they are compressed.
// This is used for logging compaction stats.
uint64_t pre_compression_size = 0;
// See class Footer
uint32_t base_context_checksum;
uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
bool IsParallelCompressionEnabled() const {
return compression_parallel_threads > 1;
}
Status GetStatus() {
// We need to make modifications of status visible when status_ok is set
// to false, and this is ensured by status_mutex, so no special memory
// order for status_ok is required.
if (status_ok.load(std::memory_order_relaxed)) {
return Status::OK();
} else {
return CopyStatus();
}
}
Status CopyStatus() {
std::lock_guard<std::mutex> lock(status_mutex);
return status;
}
IOStatus GetIOStatus() {
// We need to make modifications of io_status visible when status_ok is set
// to false, and this is ensured by io_status_mutex, so no special memory
// order for io_status_ok is required.
if (io_status_ok.load(std::memory_order_relaxed)) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
auto ios = CopyIOStatus();
ios.PermitUncheckedError();
// Assume no races in unit tests
assert(ios.ok());
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
return IOStatus::OK();
} else {
return CopyIOStatus();
}
}
IOStatus CopyIOStatus() {
std::lock_guard<std::mutex> lock(io_status_mutex);
return io_status;
}
// Never erase an existing status that is not OK.
void SetStatus(Status s) {
if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std::lock_guard<std::mutex> lock(status_mutex);
status = s;
status_ok.store(false, std::memory_order_relaxed);
}
}
// Never erase an existing I/O status that is not OK.
// Calling this will also SetStatus(ios)
void SetIOStatus(IOStatus ios) {
if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std::lock_guard<std::mutex> lock(io_status_mutex);
io_status = ios;
io_status_ok.store(false, std::memory_order_relaxed);
}
SetStatus(ios);
}
Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
WritableFileWriter* f)
: ioptions(tbo.ioptions),
prefix_extractor(tbo.moptions.prefix_extractor),
write_options(tbo.write_options),
table_options(table_opt),
internal_comparator(tbo.internal_comparator),
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
persist_user_defined_timestamps(
tbo.ioptions.persist_user_defined_timestamps),
file(f),
offset(0),
alignment(table_options.block_align
? std::min(static_cast<size_t>(table_options.block_size),
kDefaultPageSize)
: 0),
data_block(table_options.block_restart_interval,
table_options.use_delta_encoding,
false /* use_value_delta_encoding */,
tbo.internal_comparator.user_comparator()
->CanKeysWithDifferentByteContentsBeEqual()
? BlockBasedTableOptions::kDataBlockBinarySearch
: table_options.data_block_index_type,
table_options.data_block_hash_table_util_ratio, ts_sz,
persist_user_defined_timestamps),
range_del_block(
1 /* block_restart_interval */, true /* use_delta_encoding */,
false /* use_value_delta_encoding */,
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
persist_user_defined_timestamps),
internal_prefix_transform(prefix_extractor.get()),
sample_for_compression(tbo.moptions.sample_for_compression),
compressible_input_data_bytes(0),
uncompressible_input_data_bytes(0),
sampled_input_data_bytes(0),
sampled_output_slow_data_bytes(0),
sampled_output_fast_data_bytes(0),
compression_parallel_threads(tbo.compression_opts.parallel_threads),
max_compressed_bytes_per_kb(
tbo.compression_opts.max_compressed_bytes_per_kb),
data_block_working_areas(compression_parallel_threads),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
reason(tbo.reason),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)),
create_context(&table_options, &ioptions, ioptions.stats,
/*decompressor=*/nullptr,
tbo.moptions.block_protection_bytes_per_key,
tbo.internal_comparator.user_comparator(),
!use_delta_encoding_for_index_values,
table_opt.index_type ==
BlockBasedTableOptions::kBinarySearchWithFirstKey),
tail_size(0),
status_ok(true),
io_status_ok(true) {
FilterBuildingContext filter_context(table_options);
filter_context.info_log = ioptions.logger;
filter_context.column_family_name = tbo.column_family_name;
filter_context.reason = reason;
// Only populate other fields if known to be in LSM rather than
// generating external SST file
if (reason != TableFileCreationReason::kMisc) {
filter_context.compaction_style = ioptions.compaction_style;
filter_context.num_levels = ioptions.num_levels;
filter_context.level_at_creation = tbo.level_at_creation;
filter_context.is_bottommost = tbo.is_bottommost;
assert(filter_context.level_at_creation < filter_context.num_levels);
}
props.compression_options =
CompressionOptionsToString(tbo.compression_opts);
auto* mgr = tbo.moptions.compression_manager.get();
if (mgr == nullptr) {
uses_explicit_compression_manager = false;
mgr = GetBuiltinCompressionManager(
GetCompressFormatForVersion(
static_cast<uint32_t>(table_opt.format_version)))
.get();
} else {
uses_explicit_compression_manager = true;
// Stuff some extra debugging info as extra pseudo-options. Using
// underscore prefix to indicate they are special.
props.compression_options.append("_compression_manager=");
props.compression_options.append(mgr->GetId());
props.compression_options.append("; ");
}
// Sanitize to only allowing compression when it saves space.
max_compressed_bytes_per_kb =
std::min(int{1023}, tbo.compression_opts.max_compressed_bytes_per_kb);
basic_compressor = mgr->GetCompressorForSST(
filter_context, tbo.compression_opts, tbo.compression_type);
if (basic_compressor) {
if (table_options.enable_index_compression) {
basic_working_area.compress = basic_compressor->ObtainWorkingArea();
}
max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
CacheEntryRole::kDataBlock);
if (max_dict_sample_bytes > 0) {
state = State::kBuffered;
if (tbo.target_file_size == 0) {
buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
} else if (tbo.compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = tbo.target_file_size;
} else {
buffer_limit = std::min(tbo.target_file_size,
tbo.compression_opts.max_dict_buffer_bytes);
}
} else {
// No distinct data block compressor using dictionary
data_block_compressor = basic_compressor.get();
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
data_block_working_areas[i].compress =
data_block_compressor->ObtainWorkingArea();
}
}
basic_decompressor = mgr->GetDecompressorForCompressor(*basic_compressor);
create_context.decompressor = basic_decompressor.get();
if (table_options.verify_compression) {
verify_decompressor = basic_decompressor.get();
if (table_options.enable_index_compression) {
basic_working_area.verify = verify_decompressor->ObtainWorkingArea(
basic_compressor->GetPreferredCompressionType());
}
if (state == State::kUnbuffered) {
assert(data_block_compressor);
data_block_verify_decompressor = verify_decompressor.get();
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
data_block_working_areas[i].verify =
data_block_verify_decompressor->ObtainWorkingArea(
data_block_compressor->GetPreferredCompressionType());
}
}
}
}
switch (table_options.prepopulate_block_cache) {
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
warm_cache = (reason == TableFileCreationReason::kFlush);
break;
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
warm_cache = false;
break;
default:
// missing case
assert(false);
warm_cache = false;
}
const auto compress_dict_build_buffer_charged =
table_options.cache_usage_options.options_overrides
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
.charged;
if (table_options.block_cache &&
(compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kEnabled ||
compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kFallback)) {
compression_dict_buffer_cache_res_mgr =
std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
table_options.block_cache);
} else {
compression_dict_buffer_cache_res_mgr = nullptr;
}
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
&internal_comparator, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps);
index_builder.reset(p_index_builder_);
} else {
index_builder.reset(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps));
}
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
// Apply optimize_filters_for_hits setting here when applicable by
// skipping filter generation
filter_builder.reset();
} else if (tbo.skip_filters) {
// For SstFileWriter skip_filters
filter_builder.reset();
} else if (!table_options.filter_policy) {
// Null filter_policy -> no filter
filter_builder.reset();
} else {
filter_builder.reset(CreateFilterBlockBuilder(
ioptions, tbo.moptions, filter_context,
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
persist_user_defined_timestamps));
}
assert(tbo.internal_tbl_prop_coll_factories);
for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
assert(factory);
std::unique_ptr<InternalTblPropColl> collector{
factory->CreateInternalTblPropColl(
tbo.column_family_id, tbo.level_at_creation,
tbo.ioptions.num_levels,
tbo.last_level_inclusive_max_seqno_threshold)};
if (collector) {
table_properties_collectors.emplace_back(std::move(collector));
}
}
table_properties_collectors.emplace_back(
new BlockBasedTablePropertiesCollector(
table_options.index_type, table_options.whole_key_filtering,
prefix_extractor != nullptr,
table_options.decouple_partitioned_filters));
if (ts_sz > 0 && persist_user_defined_timestamps) {
table_properties_collectors.emplace_back(
new TimestampTablePropertiesCollector(
tbo.internal_comparator.user_comparator()));
}
// These are only needed for populating table properties
props.column_family_id = tbo.column_family_id;
props.column_family_name = tbo.column_family_name;
props.oldest_key_time = tbo.oldest_key_time;
props.newest_key_time = tbo.newest_key_time;
props.file_creation_time = tbo.file_creation_time;
props.orig_file_number = tbo.cur_file_num;
props.db_id = tbo.db_id;
props.db_session_id = tbo.db_session_id;
props.db_host_id = ioptions.db_host_id;
props.format_version = table_options.format_version;
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
}
// Default is UINT64_MAX for unknown. Setting it to 0 here
// to allow updating it by taking max in BlockBasedTableBuilder::Add().
props.key_largest_seqno = 0;
PrePopulateCompressionProperties(mgr);
if (FormatVersionUsesContextChecksum(table_options.format_version)) {
// Must be non-zero and semi- or quasi-random
// TODO: ideally guaranteed different for related files (e.g. use file
// number and db_session, for benefit of SstFileWriter)
do {
base_context_checksum = Random::GetTLSInstance()->Next();
} while (UNLIKELY(base_context_checksum == 0));
} else {
base_context_checksum = 0;
}
if (alignment > 0 && basic_compressor) {
// With better sanitization in `CompactionPicker::CompactFiles()`, we
// would not need to handle this case here and could change it to an
// assertion instead.
SetStatus(Status::InvalidArgument(
"Enable block_align, but compression enabled"));
}
}
Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;
void PrePopulateCompressionProperties(UnownedPtr<CompressionManager> mgr) {
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
assert(mgr);
// Use newer compression_name property
props.compression_name.reserve(32);
// If compression is disabled, use empty manager name
if (basic_compressor) {
props.compression_name.append(mgr->CompatibilityName());
}
props.compression_name.push_back(';');
// Rest of property to be filled out at the end of building the file
} else {
// Use legacy compression_name property, populated at the end of building
// the file. Not compatible with compression managers using custom
// algorithms / compression types.
assert(Slice(mgr->CompatibilityName())
.compare(GetBuiltinCompressionManager(
GetCompressFormatForVersion(
static_cast<uint32_t>(props.format_version)))
->CompatibilityName()) == 0);
}
}
void PostPopulateCompressionProperties() {
// Do not include "no compression" in the set. It's not really useful
// information whether there are any uncompressed blocks. Some kinds of
// blocks are never compressed anyway.
compression_types_used.Remove(kNoCompression);
size_t ctype_count = compression_types_used.count();
if (uses_explicit_compression_manager) {
// Stuff some extra debugging info as extra pseudo-options. Using
// underscore prefix to indicate they are special.
std::string& compression_options = props.compression_options;
compression_options.append("_compressor=");
compression_options.append(data_block_compressor
? data_block_compressor->GetId()
: std::string{});
compression_options.append("; ");
} else {
// No explicit compression manager
assert(compression_types_used.count() <= 1);
}
std::string& compression_name = props.compression_name;
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
// Fill in extended field of "compression name" property, which is the set
// of compression types used, sorted by unsigned byte and then hex
// encoded with two digits each (so that table properties are human
// readable).
assert(*compression_name.rbegin() == ';');
size_t pos = compression_name.size();
// Make space for the field contents
compression_name.append(ctype_count * 2, '\0');
char* ptr = compression_name.data() + pos;
// Populate the field contents
for (CompressionType t : compression_types_used) {
PutBaseChars<16>(&ptr, /*digits=*/2, static_cast<unsigned char>(t),
/*uppercase=*/true);
}
assert(ptr == compression_name.data() + pos + ctype_count * 2);
// Allow additional fields in the future
compression_name.push_back(';');
} else {
// Use legacy compression naming. To adhere to requirements described in
// TableProperties::compression_name, we might have to replace the name
// based on the legacy configured compression type.
assert(compression_name.empty());
if (ctype_count == 0) {
// We could get a slight performance boost in the reader by marking the
// file as "no compression" if compression is configured but
// consistently rejected, but that would give misleading info for
// debugging purposes. So instead we record the configured compression
// type, matching the historical behavior.
if (data_block_compressor) {
compression_name = CompressionTypeToString(
data_block_compressor->GetPreferredCompressionType());
} else {
assert(basic_compressor == nullptr);
compression_name = CompressionTypeToString(kNoCompression);
}
} else if (compression_types_used.Contains(kZSTD)) {
compression_name = CompressionTypeToString(kZSTD);
} else {
compression_name =
CompressionTypeToString(*compression_types_used.begin());
}
}
}
private:
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std::mutex status_mutex;
std::atomic<bool> status_ok;
Status status;
std::mutex io_status_mutex;
std::atomic<bool> io_status_ok;
IOStatus io_status;
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
WritableFileWriter* file) {
BlockBasedTableOptions sanitized_table_options(table_options);
auto ucmp = tbo.internal_comparator.user_comparator();
assert(ucmp);
(void)ucmp; // avoids unused variable error.
rep_ = new Rep(sanitized_table_options, tbo, file);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
const_cast<TableProperties*>(&rep_->props));
BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
tbo.cur_file_num, &rep_->base_cache_key);
if (rep_->IsParallelCompressionEnabled()) {
StartParallelCompression();
} else if (rep_->basic_compressor) {
rep_->single_threaded_compressed_output.reserve(table_options.block_size);
}
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
// Catch errors where caller forgot to call Finish()
assert(rep_->state == Rep::State::kClosed);
delete rep_;
}
void BlockBasedTableBuilder::Add(const Slice& ikey, const Slice& value) {
Rep* r = rep_;
assert(rep_->state != Rep::State::kClosed);
if (!ok()) {
return;
}
ValueType value_type;
SequenceNumber seq;
UnPackSequenceAndType(ExtractInternalKeyFooter(ikey), &seq, &value_type);
r->props.key_largest_seqno = std::max(r->props.key_largest_seqno, seq);
if (IsValueType(value_type)) {
#ifndef NDEBUG
if (r->props.num_entries > r->props.num_range_deletions) {
assert(r->internal_comparator.Compare(ikey, Slice(r->last_ikey)) > 0);
}
bool skip = false;
TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Add::skip", (void*)&skip);
if (skip) {
return;
}
#endif // !NDEBUG
auto should_flush = r->flush_block_policy->Update(ikey, value);
if (should_flush) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &ikey;
Flush();
if (r->state == Rep::State::kBuffered) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool exceeds_global_block_cache_limit = false;
// Increase cache charging for the last buffered data block
// only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager
if (!exceeds_buffer_limit &&
r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s =
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
exceeds_global_block_cache_limit = s.IsMemoryLimit();
}
if (exceeds_buffer_limit || exceeds_global_block_cache_limit) {
EnterUnbuffered();
}
}
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok() && r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys.Clear();
} else {
r->index_builder->AddIndexEntry(r->last_ikey, &ikey,
r->pending_handle,
&r->index_separator_scratch);
}
}
}
// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
if (r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys.PushBack(ikey);
} else {
if (r->filter_builder != nullptr) {
r->filter_builder->AddWithPrevKey(
ExtractUserKeyAndStripTimestamp(ikey, r->ts_sz),
r->last_ikey.empty()
? Slice{}
: ExtractUserKeyAndStripTimestamp(r->last_ikey, r->ts_sz));
}
}
}
r->data_block.AddWithLastKey(ikey, value, r->last_ikey);
r->last_ikey.assign(ikey.data(), ikey.size());
assert(!r->last_ikey.empty());
if (r->state == Rep::State::kBuffered) {
// Buffered keys will be replayed from data_block_buffers during
// `Finish()` once compression dictionary has been finalized.
} else {
if (!r->IsParallelCompressionEnabled()) {
r->index_builder->OnKeyAdded(ikey);
}
}
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else if (value_type == kTypeRangeDeletion) {
Slice persisted_end = value;
// When timestamps should not be persisted, we physically strip away range
// tombstone end key's user timestamp before passing it along to block
// builder. Physically stripping away start key's user timestamp is
// handled at the block builder level in the same way as the other data
// blocks.
if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
}
r->range_del_block.Add(ikey, persisted_end);
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else {
assert(false);
r->SetStatus(Status::InvalidArgument(
"BlockBasedBuilder::Add() received a key with invalid value type " +
std::to_string(static_cast<unsigned int>(value_type))));
return;
}
r->props.num_entries++;
r->props.raw_key_size += ikey.size();
if (!r->persist_user_defined_timestamps) {
r->props.raw_key_size -= r->ts_sz;
}
r->props.raw_value_size += value.size();
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
value_type == kTypeDeletionWithTimestamp) {
r->props.num_deletions++;
} else if (value_type == kTypeRangeDeletion) {
r->props.num_deletions++;
r->props.num_range_deletions++;
} else if (value_type == kTypeMerge) {
r->props.num_merge_operands++;
}
}
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
assert(rep_->state != Rep::State::kClosed);
if (!ok()) {
return;
}
if (r->data_block.empty()) {
return;
}
Slice uncompressed_block_data = r->data_block.Finish();
// NOTE: compression sampling is done here in the same thread as building
// the uncompressed block because of the requirements to call table
// property collectors:
// * BlockAdd function expects block_compressed_bytes_{fast,slow} for
// historical reasons. Probably a hassle to remove.
// * Collector is not thread safe so calls need to be serialized/synchronized.
// * Ideally, AddUserKey and BlockAdd calls need to line up such that a
// reported block corresponds to all the keys reported since the previous
// block.
// If requested, we sample one in every N block with a
// fast and slow compression algorithm and report the stats.
// The users can use these stats to decide if it is worthwhile
// enabling compression and they also get a hint about which
// compression algorithm wil be beneficial.
if (r->sample_for_compression > 0 &&
Random::GetTLSInstance()->OneIn(
static_cast<int>(r->sample_for_compression))) {
std::string sampled_output_fast;
std::string sampled_output_slow;
// Sampling with a fast compression algorithm
if (LZ4_Supported() || Snappy_Supported()) {
CompressionType c =
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
CompressionOptions options;
CompressionContext context(c, options);
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c);
OLD_CompressData(
uncompressed_block_data, info_tmp,
GetCompressFormatForVersion(r->table_options.format_version),
&sampled_output_fast);
}
// Sampling with a slow but high-compression algorithm
if (ZSTD_Supported() || Zlib_Supported()) {
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
CompressionOptions options;
CompressionContext context(c, options);
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c);
OLD_CompressData(
uncompressed_block_data, info_tmp,
GetCompressFormatForVersion(r->table_options.format_version),
&sampled_output_slow);
}
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
// Currently compression sampling is only enabled for data block.
r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(),
std::memory_order_relaxed);
r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
std::memory_order_relaxed);
r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
std::memory_order_relaxed);
}
NotifyCollectTableCollectorsOnBlockAdd(
r->table_properties_collectors, uncompressed_block_data.size(),
sampled_output_slow.size(), sampled_output_fast.size());
} else {
NotifyCollectTableCollectorsOnBlockAdd(
r->table_properties_collectors, uncompressed_block_data.size(),
0 /*block_compressed_bytes_slow*/, 0 /*block_compressed_bytes_fast*/);
}
if (rep_->state == Rep::State::kBuffered) {
std::string uncompressed_block_holder;
uncompressed_block_holder.reserve(rep_->table_options.block_size);
r->data_block.SwapAndReset(uncompressed_block_holder);
assert(uncompressed_block_data.size() == uncompressed_block_holder.size());
rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_holder));
rep_->data_begin_offset += uncompressed_block_data.size();
} else if (r->IsParallelCompressionEnabled()) {
assert(rep_->state == Rep::State::kUnbuffered);
ParallelCompressionRep::BlockRep* block_rep =
r->pc_rep->PrepareBlock(r->first_key_in_next_block, &(r->data_block));
assert(block_rep != nullptr);
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
r->get_offset());
r->pc_rep->EmitBlock(block_rep);
} else {
assert(rep_->state == Rep::State::kUnbuffered);
WriteBlock(uncompressed_block_data, &r->pending_handle, BlockType::kData);
r->data_block.Reset();
}
}
void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
BlockHandle* handle,
BlockType block_type) {
Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
CompressionType type;
Status compress_status;
bool is_data_block = block_type == BlockType::kData;
CompressAndVerifyBlock(
uncompressed_block_data, is_data_block,
is_data_block ? r->data_block_working_areas[0] : r->basic_working_area,
&r->single_threaded_compressed_output, &type, &compress_status);
r->SetStatus(compress_status);
if (!ok()) {
return;
}
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
&r->single_threaded_compressed_output);
WriteMaybeCompressedBlock(type == kNoCompression
? uncompressed_block_data
: Slice(r->single_threaded_compressed_output),
type, handle, block_type, &uncompressed_block_data);
r->single_threaded_compressed_output.clear();
if (is_data_block) {
r->props.data_size = r->get_offset();
++r->props.num_data_blocks;
}
}
void BlockBasedTableBuilder::BGWorkCompression(WorkingAreaPair& working_area) {
ParallelCompressionRep::BlockRep* block_rep = nullptr;
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
assert(block_rep != nullptr);
// Skip compression if we are aborting anyway
if (ok()) {
CompressAndVerifyBlock(block_rep->uncompressed, true, /* is_data_block*/
working_area, &block_rep->compressed,
&block_rep->compression_type, &block_rep->status);
}
block_rep->slot.Fill(block_rep);
}
}
void BlockBasedTableBuilder::CompressAndVerifyBlock(
const Slice& uncompressed_block_data, bool is_data_block,
WorkingAreaPair& working_area, std::string* compressed_output,
CompressionType* result_compression_type, Status* out_status) {
Rep* r = rep_;
Compressor* compressor = nullptr;
Decompressor* verify_decomp = nullptr;
if (is_data_block) {
compressor = r->data_block_compressor;
verify_decomp = r->data_block_verify_decompressor.get();
} else {
compressor = r->basic_compressor.get();
verify_decomp = r->verify_decompressor.get();
}
CompressionType type = kNoCompression;
if (LIKELY(uncompressed_block_data.size() < kCompressionSizeLimit)) {
if (compressor) {
StopWatchNano timer(
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
*out_status =
compressor->CompressBlock(uncompressed_block_data, compressed_output,
&type, &working_area.compress);
// Post-condition of Compressor::CompressBlock
assert(type == kNoCompression || out_status->ok());
assert(type == kNoCompression ||
r->table_options.verify_compression == (verify_decomp != nullptr));
// Check for acceptable compression ratio. (For efficiency, avoid floating
// point and division.)
// TODO: integrate into Compressor?
if (compressed_output->size() >
(static_cast<uint64_t>(r->max_compressed_bytes_per_kb) *
uncompressed_block_data.size()) >>
10) {
// Prefer to keep uncompressed
type = kNoCompression;
}
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
if (verify_decomp && type != kNoCompression) {
BlockContents contents;
Status uncompress_status = DecompressBlockData(
compressed_output->data(), compressed_output->size(), type,
*verify_decomp, &contents, r->ioptions,
/*allocator=*/nullptr, &working_area.verify);
if (uncompress_status.ok()) {
bool data_match = contents.data.compare(uncompressed_block_data) == 0;
if (!data_match) {
// The result of the compression was invalid. abort.
const char* const msg =
"Decompressed block did not match pre-compression block";
ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
*out_status = Status::Corruption(msg);
type = kNoCompression;
}
} else {
// Decompression reported an error. abort.
*out_status =
Status::Corruption(std::string("Could not decompress: ") +
uncompress_status.getState());
type = kNoCompression;
}
}
if (timer.IsStarted()) {
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
}
if (is_data_block) {
r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(),
std::memory_order_relaxed);
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
std::memory_order_relaxed);
}
} else {
// Status is not OK, or block is too big to be compressed.
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(
uncompressed_block_data.size() + kBlockTrailerSize,
std::memory_order_relaxed);
}
}
// Abort compression if the block is too big, or did not pass
// verification.
if (type == kNoCompression) {
bool compression_attempted = !compressed_output->empty();
RecordTick(r->ioptions.stats, compression_attempted
? NUMBER_BLOCK_COMPRESSION_REJECTED
: NUMBER_BLOCK_COMPRESSION_BYPASSED);
RecordTick(r->ioptions.stats,
compression_attempted ? BYTES_COMPRESSION_REJECTED
: BYTES_COMPRESSION_BYPASSED,
uncompressed_block_data.size());
} else {
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
uncompressed_block_data.size());
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
compressed_output->size());
}
*result_compression_type = type;
}
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
BlockType block_type, const Slice* uncompressed_block_data) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// compression_type: uint8
// checksum: uint32
Rep* r = rep_;
bool is_data_block = block_type == BlockType::kData;
IOOptions io_options;
IOStatus io_s =
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
if (!io_s.ok()) {
r->SetIOStatus(io_s);
return;
}
// Old, misleading name of this function: WriteRawBlock
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
const uint64_t offset = r->get_offset();
handle->set_offset(offset);
handle->set_size(block_contents.size());
assert(status().ok());
assert(io_status().ok());
if (uncompressed_block_data == nullptr) {
uncompressed_block_data = &block_contents;
assert(comp_type == kNoCompression);
}
// TODO: consider a variant of this function that puts the trailer after
// block_contents (if it comes from a std::string) so we only need one
// r->file->Append call
{
io_s = r->file->Append(io_options, block_contents);
if (!io_s.ok()) {
r->SetIOStatus(io_s);
return;
}
}
r->compression_types_used.Add(comp_type);
std::array<char, kBlockTrailerSize> trailer;
trailer[0] = comp_type;
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
r->table_options.checksum, block_contents.data(), block_contents.size(),
/*last_byte*/ comp_type);
checksum += ChecksumModifierForContext(r->base_context_checksum, offset);
if (block_type == BlockType::kFilter) {
Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);
if (!s.ok()) {
r->SetStatus(s);
return;
}
}
EncodeFixed32(trailer.data() + 1, checksum);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
trailer.data());
{
io_s = r->file->Append(io_options, Slice(trailer.data(), trailer.size()));
if (!io_s.ok()) {
r->SetIOStatus(io_s);
return;
}
}
if (r->warm_cache) {
Status s =
InsertBlockInCacheHelper(*uncompressed_block_data, handle, block_type);
if (!s.ok()) {
r->SetStatus(s);
return;
}
}
r->pre_compression_size +=
uncompressed_block_data->size() + kBlockTrailerSize;
r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment -
((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
(r->alignment - 1);
io_s = r->file->Pad(io_options, pad_bytes);
if (io_s.ok()) {
r->pre_compression_size += pad_bytes;
r->set_offset(r->get_offset() + pad_bytes);
} else {
r->SetIOStatus(io_s);
return;
}
}
if (r->IsParallelCompressionEnabled()) {
if (is_data_block) {
r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
r->get_offset());
} else {
r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
}
}
}
void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
Rep* r = rep_;
ParallelCompressionRep::BlockRepSlot* slot = nullptr;
ParallelCompressionRep::BlockRep* block_rep = nullptr;
// Starts empty; see FilterBlockBuilder::AddWithPrevKey
std::string prev_block_last_key_no_ts;
while (r->pc_rep->write_queue.pop(slot)) {
// FIXME: this is weird popping off write queue just to wait again on
// compress queue
assert(slot != nullptr);
slot->Take(block_rep);
assert(block_rep != nullptr);
if (!block_rep->status.ok()) {
r->SetStatus(block_rep->status);
// Reap block so that blocked Flush() can finish
// if there is one, and Flush() will notice !ok() next time.
block_rep->status = Status::OK();
r->pc_rep->ReapBlock(block_rep);
continue;
}
Slice prev_key_no_ts = prev_block_last_key_no_ts;
for (size_t i = 0; i < block_rep->keys.Size(); i++) {
auto& key = block_rep->keys[i];
if (r->filter_builder != nullptr) {
Slice key_no_ts = ExtractUserKeyAndStripTimestamp(key, r->ts_sz);
r->filter_builder->AddWithPrevKey(key_no_ts, prev_key_no_ts);
prev_key_no_ts = key_no_ts;
}
r->index_builder->OnKeyAdded(key);
}
if (r->filter_builder != nullptr) {
prev_block_last_key_no_ts.assign(prev_key_no_ts.data(),
prev_key_no_ts.size());
}
r->pc_rep->file_size_estimator.SetCurrBlockUncompSize(
block_rep->uncompressed.size());
Slice compressed = block_rep->compressed;
Slice uncompressed = block_rep->uncompressed;
WriteMaybeCompressedBlock(block_rep->compression_type == kNoCompression
? uncompressed
: compressed,
block_rep->compression_type, &r->pending_handle,
BlockType::kData, &uncompressed);
if (!ok()) {
break;
}
r->props.data_size = r->get_offset();
++r->props.num_data_blocks;
if (!block_rep->first_key_in_next_block.has_value()) {
r->index_builder->AddIndexEntry(block_rep->keys.Back(), nullptr,
r->pending_handle,
&r->index_separator_scratch);
} else {
Slice first_key_in_next_block =
Slice(*block_rep->first_key_in_next_block);
r->index_builder->AddIndexEntry(
block_rep->keys.Back(), &first_key_in_next_block, r->pending_handle,
&r->index_separator_scratch);
}
r->pc_rep->ReapBlock(block_rep);
}
}
void BlockBasedTableBuilder::StartParallelCompression() {
rep_->pc_rep.reset(
new ParallelCompressionRep(rep_->compression_parallel_threads));
rep_->pc_rep->compress_thread_pool.reserve(
rep_->compression_parallel_threads);
for (uint32_t i = 0; i < rep_->compression_parallel_threads; i++) {
rep_->pc_rep->compress_thread_pool.emplace_back(
[this, i] { BGWorkCompression(rep_->data_block_working_areas[i]); });
}
rep_->pc_rep->write_thread.reset(
new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); }));
}
void BlockBasedTableBuilder::StopParallelCompression() {
rep_->pc_rep->compress_queue.finish();
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
thread.join();
}
rep_->pc_rep->write_queue.finish();
rep_->pc_rep->write_thread->join();
}
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
IOStatus BlockBasedTableBuilder::io_status() const {
return rep_->GetIOStatus();
}
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
const Slice& block_contents, const BlockHandle* handle,
BlockType block_type) {
Cache* block_cache = rep_->table_options.block_cache.get();
Status s;
auto helper =
GetCacheItemHelper(block_type, rep_->ioptions.lowest_used_cache_tier);
if (block_cache && helper && helper->create_cb) {
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
size_t charge;
// NOTE: data blocks (and everything else) will be warmed in decompressed
// state, so does not need a dictionary-aware decompressor. The only thing
// needing a decompressor here (in create_context) is warming the
// (de)compression dictionary, which will clone and save a dict-based
// decompressor from the corresponding non-dict decompressor.
s = WarmInCache(block_cache, key.AsSlice(), block_contents,
&rep_->create_context, helper, Cache::Priority::LOW,
&charge);
if (s.ok()) {
BlockBasedTable::UpdateCacheInsertionMetrics(
block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
rep_->ioptions.stats);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
}
}
return s;
}
void BlockBasedTableBuilder::WriteFilterBlock(
MetaIndexBuilder* meta_index_builder) {
if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
// No filter block needed
return;
}
if (!rep_->last_ikey.empty()) {
// We might have been using AddWithPrevKey, so need PrevKeyBeforeFinish
// to be safe. And because we are re-synchronized after buffered/parallel
// operation, rep_->last_ikey is accurate.
rep_->filter_builder->PrevKeyBeforeFinish(
ExtractUserKeyAndStripTimestamp(rep_->last_ikey, rep_->ts_sz));
}
BlockHandle filter_block_handle;
bool is_partitioned_filter = rep_->table_options.partition_filters;
if (ok()) {
rep_->props.num_filter_entries +=
rep_->filter_builder->EstimateEntriesAdded();
Status s = Status::Incomplete();
while (ok() && s.IsIncomplete()) {
// filter_data is used to store the transferred filter data payload from
// FilterBlockBuilder and deallocate the payload by going out of scope.
// Otherwise, the payload will unnecessarily remain until
// BlockBasedTableBuilder is deallocated.
//
// See FilterBlockBuilder::Finish() for more on the difference in
// transferred filter data payload among different FilterBlockBuilder
// subtypes.
std::unique_ptr<const char[]> filter_owner;
Slice filter_content;
s = rep_->filter_builder->Finish(filter_block_handle, &filter_content,
&filter_owner);
assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
if (s.IsCorruption()) {
rep_->SetStatus(s);
break;
}
rep_->props.filter_size += filter_content.size();
BlockType btype = is_partitioned_filter && /* last */ s.ok()
? BlockType::kFilterPartitionIndex
: BlockType::kFilter;
WriteMaybeCompressedBlock(filter_content, kNoCompression,
&filter_block_handle, btype);
}
rep_->filter_builder->ResetFilterBitsBuilder();
}
if (ok()) {
// Add mapping from "<filter_block_prefix>.Name" to location
// of filter data.
std::string key;
key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
: BlockBasedTable::kFullFilterBlockPrefix;
key.append(rep_->table_options.filter_policy->CompatibilityName());
meta_index_builder->Add(key, filter_block_handle);
}
}
void BlockBasedTableBuilder::WriteIndexBlock(
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
if (!ok()) {
return;
}
IndexBuilder::IndexBlocks index_blocks;
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
if (index_builder_status.IsIncomplete()) {
// We we have more than one index partition then meta_blocks are not
// supported for the index. Currently meta_blocks are used only by
// HashIndexBuilder which is not multi-partition.
assert(index_blocks.meta_blocks.empty());
} else if (ok() && !index_builder_status.ok()) {
rep_->SetStatus(index_builder_status);
}
if (ok()) {
for (const auto& item : index_blocks.meta_blocks) {
BlockHandle block_handle;
WriteBlock(item.second, &block_handle, BlockType::kIndex);
if (!ok()) {
break;
}
meta_index_builder->Add(item.first, block_handle);
}
}
if (ok()) {
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
BlockType::kIndex);
} else {
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
kNoCompression, index_block_handle,
BlockType::kIndex);
}
}
// If there are more index partitions, finish them and write them out
if (index_builder_status.IsIncomplete()) {
bool index_building_finished = false;
while (ok() && !index_building_finished) {
Status s =
rep_->index_builder->Finish(&index_blocks, *index_block_handle);
if (s.ok()) {
index_building_finished = true;
} else if (s.IsIncomplete()) {
// More partitioned index after this one
assert(!index_building_finished);
} else {
// Error
rep_->SetStatus(s);
return;
}
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
BlockType::kIndex);
} else {
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
kNoCompression, index_block_handle,
BlockType::kIndex);
}
// The last index_block_handle will be for the partition index block
}
}
// If success and need to record in metaindex rather than footer...
if (!FormatVersionUsesIndexHandleInFooter(
rep_->table_options.format_version)) {
meta_index_builder->Add(kIndexBlockName, *index_block_handle);
}
}
void BlockBasedTableBuilder::WritePropertiesBlock(
MetaIndexBuilder* meta_index_builder) {
BlockHandle properties_block_handle;
if (ok()) {
PropertyBlockBuilder property_block_builder;
rep_->props.filter_policy_name =
rep_->table_options.filter_policy != nullptr
? rep_->table_options.filter_policy->Name()
: "";
rep_->props.index_size =
rep_->index_builder->IndexSize() + kBlockTrailerSize;
rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
? rep_->ioptions.user_comparator->Name()
: "nullptr";
rep_->props.merge_operator_name =
rep_->ioptions.merge_operator != nullptr
? rep_->ioptions.merge_operator->Name()
: "nullptr";
rep_->props.prefix_extractor_name =
rep_->prefix_extractor ? rep_->prefix_extractor->AsString() : "nullptr";
std::string property_collectors_names = "[";
for (size_t i = 0;
i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
if (i != 0) {
property_collectors_names += ",";
}
property_collectors_names +=
rep_->ioptions.table_properties_collector_factories[i]->Name();
}
property_collectors_names += "]";
rep_->props.property_collectors_names = property_collectors_names;
rep_->PostPopulateCompressionProperties();
if (rep_->table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
assert(rep_->p_index_builder_ != nullptr);
rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
rep_->props.top_level_index_size =
rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
}
rep_->props.index_key_is_user_key =
!rep_->index_builder->seperator_is_key_plus_seq();
rep_->props.index_value_is_delta_encoded =
rep_->use_delta_encoding_for_index_values;
if (rep_->sampled_input_data_bytes > 0) {
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(rep_->sampled_output_slow_data_bytes) /
rep_->sampled_input_data_bytes *
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes + 0.5);
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(rep_->sampled_output_fast_data_bytes) /
rep_->sampled_input_data_bytes *
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes + 0.5);
} else if (rep_->sample_for_compression > 0) {
// We tried to sample but none were found. Assume worst-case (compression
// ratio 1.0) so data is complete and aggregatable.
rep_->props.slow_compression_estimated_data_size =
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes;
rep_->props.fast_compression_estimated_data_size =
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes;
}
rep_->props.user_defined_timestamps_persisted =
rep_->persist_user_defined_timestamps;
assert(IsEmpty() || rep_->props.key_largest_seqno != UINT64_MAX);
// Add basic properties
property_block_builder.AddTableProperty(rep_->props);
// Add use collected properties
NotifyCollectTableCollectorsOnFinish(
rep_->table_properties_collectors, rep_->ioptions.logger,
&property_block_builder, rep_->props.user_collected_properties,
rep_->props.readable_properties);
Slice block_data = property_block_builder.Finish();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
WriteMaybeCompressedBlock(block_data, kNoCompression,
&properties_block_handle, BlockType::kProperties);
}
if (ok()) {
#ifndef NDEBUG
{
uint64_t props_block_offset = properties_block_handle.offset();
uint64_t props_block_size = properties_block_handle.size();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
&props_block_offset);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
&props_block_size);
}
#endif // !NDEBUG
const std::string* properties_block_meta = &kPropertiesBlockName;
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:Meta",
&properties_block_meta);
meta_index_builder->Add(*properties_block_meta, properties_block_handle);
}
}
void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
Slice compression_dict;
if (rep_->compressor_with_dict) {
compression_dict = rep_->compressor_with_dict->GetSerializedDict();
}
if (!compression_dict.empty()) {
BlockHandle compression_dict_block_handle;
if (ok()) {
WriteMaybeCompressedBlock(compression_dict, kNoCompression,
&compression_dict_block_handle,
BlockType::kCompressionDictionary);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
&compression_dict);
}
if (ok()) {
meta_index_builder->Add(kCompressionDictBlockName,
compression_dict_block_handle);
}
}
}
void BlockBasedTableBuilder::WriteRangeDelBlock(
MetaIndexBuilder* meta_index_builder) {
if (ok() && !rep_->range_del_block.empty()) {
BlockHandle range_del_block_handle;
WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
&range_del_block_handle,
BlockType::kRangeDeletion);
meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
}
}
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle) {
assert(ok());
Rep* r = rep_;
// this is guaranteed by BlockBasedTableBuilder's constructor
assert(r->table_options.checksum == kCRC32c ||
r->table_options.format_version != 0);
FooterBuilder footer;
Status s = footer.Build(kBlockBasedTableMagicNumber,
r->table_options.format_version, r->get_offset(),
r->table_options.checksum, metaindex_block_handle,
index_block_handle, r->base_context_checksum);
if (!s.ok()) {
r->SetStatus(s);
return;
}
IOOptions io_options;
IOStatus ios =
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
if (!ios.ok()) {
r->SetIOStatus(ios);
return;
}
ios = r->file->Append(io_options, footer.GetSlice());
if (ios.ok()) {
r->pre_compression_size += footer.GetSlice().size();
r->set_offset(r->get_offset() + footer.GetSlice().size());
} else {
r->SetIOStatus(ios);
}
}
void BlockBasedTableBuilder::EnterUnbuffered() {
Rep* r = rep_;
assert(r->state == Rep::State::kBuffered);
r->state = Rep::State::kUnbuffered;
const size_t kNumBlocksBuffered = r->data_block_buffers.size();
if (kNumBlocksBuffered == 0) {
// The below code is neither safe nor necessary for handling zero data
// blocks.
// For PostPopulateCompressionProperties()
r->data_block_compressor = r->basic_compressor.get();
return;
}
// Abstract algebra teaches us that a finite cyclic group (such as the
// additive group of integers modulo N) can be generated by a number that is
// coprime with N. Since N is variable (number of buffered data blocks), we
// must then pick a prime number in order to guarantee coprimeness with any N.
//
// One downside of this approach is the spread will be poor when
// `kPrimeGeneratorRemainder` is close to zero or close to
// `kNumBlocksBuffered`.
//
// Picked a random number between one and one trillion and then chose the
// next prime number greater than or equal to it.
const uint64_t kPrimeGenerator = 545055921143ull;
// Can avoid repeated division by just adding the remainder repeatedly.
const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
Compressor::DictSampleArgs samples;
size_t buffer_idx = kInitSampleIdx;
for (size_t i = 0; i < kNumBlocksBuffered &&
samples.sample_data.size() < r->max_dict_sample_bytes;
++i) {
size_t copy_len =
std::min(r->max_dict_sample_bytes - samples.sample_data.size(),
r->data_block_buffers[buffer_idx].size());
samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
samples.sample_lens.emplace_back(copy_len);
buffer_idx += kPrimeGeneratorRemainder;
if (buffer_idx >= kNumBlocksBuffered) {
buffer_idx -= kNumBlocksBuffered;
}
}
assert(samples.sample_data.size() > 0);
// final sample data block flushed, now we can generate dictionary
r->compressor_with_dict = r->basic_compressor->MaybeCloneSpecialized(
CacheEntryRole::kDataBlock, std::move(samples));
// The compressor might opt not to use a dictionary, in which case we
// can use the same compressor as for e.g. index blocks.
r->data_block_compressor = r->compressor_with_dict
? r->compressor_with_dict.get()
: r->basic_compressor.get();
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
r->data_block_working_areas[i].compress =
r->data_block_compressor->ObtainWorkingArea();
}
Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
if (r->verify_decompressor) {
if (serialized_dict.empty()) {
// No dictionary
r->data_block_verify_decompressor = r->verify_decompressor.get();
} else {
// Get an updated dictionary-aware decompressor for verification.
Status s = r->verify_decompressor->MaybeCloneForDict(
serialized_dict, &r->verify_decompressor_with_dict);
// Dictionary support must be present on the decompressor side if it's on
// the compressor side.
assert(r->verify_decompressor_with_dict);
if (r->verify_decompressor_with_dict) {
r->data_block_verify_decompressor =
r->verify_decompressor_with_dict.get();
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
r->data_block_working_areas[i].verify =
r->data_block_verify_decompressor->ObtainWorkingArea(
r->data_block_compressor->GetPreferredCompressionType());
}
assert(s.ok());
} else {
assert(!s.ok());
r->SetStatus(s);
}
}
}
auto get_iterator_for_block = [&r](size_t i) {
auto& data_block = r->data_block_buffers[i];
assert(!data_block.empty());
Block reader{BlockContents{data_block}};
DataBlockIter* iter = reader.NewDataIterator(
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
nullptr /* iter */, nullptr /* stats */,
false /* block_contents_pinned */, r->persist_user_defined_timestamps);
iter->SeekToFirst();
assert(iter->Valid());
return std::unique_ptr<DataBlockIter>(iter);
};
std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
if (iter == nullptr) {
iter = get_iterator_for_block(i);
assert(iter != nullptr);
};
if (i + 1 < r->data_block_buffers.size()) {
next_block_iter = get_iterator_for_block(i + 1);
}
auto& data_block = r->data_block_buffers[i];
if (r->IsParallelCompressionEnabled()) {
Slice first_key_in_next_block;
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
if (i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
first_key_in_next_block = next_block_iter->key();
} else {
first_key_in_next_block_ptr = r->first_key_in_next_block;
}
std::vector<std::string> keys;
for (; iter->Valid(); iter->Next()) {
keys.emplace_back(iter->key().ToString());
}
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
first_key_in_next_block_ptr, &data_block, &keys);
assert(block_rep != nullptr);
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
r->get_offset());
r->pc_rep->EmitBlock(block_rep);
} else {
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (r->filter_builder != nullptr) {
// NOTE: AddWithPrevKey here would only save key copying if prev is
// pinned (iter->IsKeyPinned()), which is probably rare with delta
// encoding. OK to go from Add() here to AddWithPrevKey() in
// unbuffered operation.
r->filter_builder->Add(
ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
}
r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
if (ok() && i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
Slice first_key_in_next_block = next_block_iter->key();
Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
iter->SeekToLast();
assert(iter->Valid());
r->index_builder->AddIndexEntry(
iter->key(), first_key_in_next_block_ptr, r->pending_handle,
&r->index_separator_scratch);
}
}
std::swap(iter, next_block_iter);
}
r->data_block_buffers.clear();
r->data_begin_offset = 0;
// Release all reserved cache for data block buffers
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
s.PermitUncheckedError();
}
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
r->first_key_in_next_block = nullptr;
Flush();
if (r->state == Rep::State::kBuffered) {
EnterUnbuffered();
}
if (r->IsParallelCompressionEnabled()) {
StopParallelCompression();
#ifndef NDEBUG
for (const auto& br : r->pc_rep->block_rep_buf) {
assert(br.status.ok());
}
#endif // !NDEBUG
} else {
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
r->index_builder->AddIndexEntry(
r->last_ikey, nullptr /* no next data block */, r->pending_handle,
&r->index_separator_scratch);
}
}
r->props.tail_start_offset = r->offset;
// Write meta blocks, metaindex block and footer in the following order.
// 1. [meta block: filter]
// 2. [meta block: index]
// 3. [meta block: compression dictionary]
// 4. [meta block: range deletion tombstone]
// 5. [meta block: properties]
// 6. [metaindex block]
// 7. Footer
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
WriteIndexBlock(&meta_index_builder, &index_block_handle);
WriteCompressionDictBlock(&meta_index_builder);
WriteRangeDelBlock(&meta_index_builder);
WritePropertiesBlock(&meta_index_builder);
if (ok()) {
// flush the meta index block
WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle, BlockType::kMetaIndex);
}
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
r->tail_size = r->offset - r->props.tail_start_offset;
Status ret_status = r->CopyStatus();
IOStatus ios = r->GetIOStatus();
if (!ios.ok() && ret_status.ok()) {
// Let io_status supersede ok status (otherwise status takes precedennce)
ret_status = ios;
}
return ret_status;
}
void BlockBasedTableBuilder::Abandon() {
assert(rep_->state != Rep::State::kClosed);
if (rep_->IsParallelCompressionEnabled()) {
StopParallelCompression();
}
rep_->state = Rep::State::kClosed;
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
rep_->CopyStatus().PermitUncheckedError();
rep_->CopyIOStatus().PermitUncheckedError();
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
bool BlockBasedTableBuilder::IsEmpty() const {
return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
}
uint64_t BlockBasedTableBuilder::PreCompressionSize() const {
return rep_->pre_compression_size;
}
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
if (rep_->IsParallelCompressionEnabled()) {
// Use compression ratio so far and inflight uncompressed bytes to estimate
// final SST size.
return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
} else {
return FileSize();
}
}
uint64_t BlockBasedTableBuilder::GetTailSize() const { return rep_->tail_size; }
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
return true;
}
}
return false;
}
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
return rep_->props;
}
std::string BlockBasedTableBuilder::GetFileChecksum() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksum();
} else {
return kUnknownFileChecksum;
}
}
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksumFuncName();
} else {
return kUnknownFileChecksumFuncName;
}
}
void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) {
assert(rep_->props.seqno_to_time_mapping.empty());
relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping);
rep_->props.creation_time = oldest_ancestor_time;
}
const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
"partitionedfilter.";
} // namespace ROCKSDB_NAMESPACE