Summary: * Make new format_version=7 a supported setting. * Fix a bug in compressed_secondary_cache.cc that is newly exercised by custom compression types and showing up in crash test with tiered secondary cache * Small change to handling of disabled compression in fv=7: use empty compression manager compatibility name. * Get rid of GetDefaultBuiltinCompressionManager() in public API because it could cause unexpected+unsafe schema change on a user's CompressionManager if built upon the default built-in manager and we add a new built-in schema. Now must be referenced by explicit compression schema version in the public API. (That notion was already exposed in compressed secondary cache API, for better or worse.) * Improve some error messages for compression misconfiguration * Improve testing with ObjectLibrary and CompressionManagers * Improve testing of compression_name table property in BlockBasedTableTest.BlockBasedTableProperties2 * Improve some comments Pull Request resolved: https://github.com/facebook/rocksdb/pull/13713 Test Plan: existing and updated tests. Notably, the crash test has already been running with (unpublished) format_version=7 Reviewed By: mszeszko-meta, hx235 Differential Revision: D77035482 Pulled By: pdillinger fbshipit-source-id: 95278de8734a79706a22361bff2184b1edb230ca
2356 lines
91 KiB
C++
2356 lines
91 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "table/block_based/block_based_table_builder.h"
|
|
|
|
#include <atomic>
|
|
#include <cassert>
|
|
#include <cstdio>
|
|
#include <list>
|
|
#include <map>
|
|
#include <memory>
|
|
#include <numeric>
|
|
#include <string>
|
|
#include <unordered_map>
|
|
#include <utility>
|
|
|
|
#include "block_cache.h"
|
|
#include "cache/cache_entry_roles.h"
|
|
#include "cache/cache_helpers.h"
|
|
#include "cache/cache_key.h"
|
|
#include "cache/cache_reservation_manager.h"
|
|
#include "db/dbformat.h"
|
|
#include "index_builder.h"
|
|
#include "logging/logging.h"
|
|
#include "memory/memory_allocator_impl.h"
|
|
#include "options/options_helper.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/filter_policy.h"
|
|
#include "rocksdb/flush_block_policy.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/types.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/block_builder.h"
|
|
#include "table/block_based/filter_block.h"
|
|
#include "table/block_based/filter_policy_internal.h"
|
|
#include "table/block_based/full_filter_block.h"
|
|
#include "table/block_based/partitioned_filter_block.h"
|
|
#include "table/format.h"
|
|
#include "table/meta_blocks.h"
|
|
#include "table/table_builder.h"
|
|
#include "util/coding.h"
|
|
#include "util/compression.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/string_util.h"
|
|
#include "util/work_queue.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
extern const std::string kHashIndexPrefixesBlock;
|
|
extern const std::string kHashIndexPrefixesMetadataBlock;
|
|
|
|
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
|
namespace {
|
|
|
|
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
|
|
|
|
// Create a filter block builder based on its type.
|
|
FilterBlockBuilder* CreateFilterBlockBuilder(
|
|
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
|
|
const FilterBuildingContext& context,
|
|
const bool use_delta_encoding_for_index_values,
|
|
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
|
|
const bool persist_user_defined_timestamps) {
|
|
const BlockBasedTableOptions& table_opt = context.table_options;
|
|
assert(table_opt.filter_policy); // precondition
|
|
|
|
FilterBitsBuilder* filter_bits_builder =
|
|
BloomFilterPolicy::GetBuilderFromContext(context);
|
|
if (filter_bits_builder == nullptr) {
|
|
return nullptr;
|
|
} else {
|
|
if (table_opt.partition_filters) {
|
|
assert(p_index_builder != nullptr);
|
|
// Since after partition cut request from filter builder it takes time
|
|
// until index builder actully cuts the partition, until the end of a
|
|
// data block potentially with many keys, we take the lower bound as
|
|
// partition size.
|
|
assert(table_opt.block_size_deviation <= 100);
|
|
auto partition_size =
|
|
static_cast<uint32_t>(((table_opt.metadata_block_size *
|
|
(100 - table_opt.block_size_deviation)) +
|
|
99) /
|
|
100);
|
|
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
|
|
return new PartitionedFilterBlockBuilder(
|
|
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
|
|
filter_bits_builder, table_opt.index_block_restart_interval,
|
|
use_delta_encoding_for_index_values, p_index_builder, partition_size,
|
|
ts_sz, persist_user_defined_timestamps,
|
|
table_opt.decouple_partitioned_filters);
|
|
} else {
|
|
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
|
|
table_opt.whole_key_filtering,
|
|
filter_bits_builder);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
// kBlockBasedTableMagicNumber was picked by running
|
|
// echo rocksdb.table.block_based | sha1sum
|
|
// and taking the leading 64 bits.
|
|
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
|
|
// .cc files
|
|
// for that reason we declare it extern in the header but to get the space
|
|
// allocated
|
|
// it must be not extern in one place.
|
|
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
|
|
// We also support reading and writing legacy block based table format (for
|
|
// backwards compatibility)
|
|
const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
|
|
|
|
// A collector that collects properties of interest to block-based table.
|
|
// For now this class looks heavy-weight since we only write one additional
|
|
// property.
|
|
// But in the foreseeable future, we will add more and more properties that are
|
|
// specific to block-based table.
|
|
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
|
|
: public InternalTblPropColl {
|
|
public:
|
|
explicit BlockBasedTablePropertiesCollector(
|
|
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
|
|
bool prefix_filtering, bool decoupled_partitioned_filters)
|
|
: index_type_(index_type),
|
|
whole_key_filtering_(whole_key_filtering),
|
|
prefix_filtering_(prefix_filtering),
|
|
decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
|
|
|
|
Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
|
|
uint64_t /*file_size*/) override {
|
|
// Intentionally left blank. Have no interest in collecting stats for
|
|
// individual key/value pairs.
|
|
return Status::OK();
|
|
}
|
|
|
|
void BlockAdd(uint64_t /* block_uncomp_bytes */,
|
|
uint64_t /* block_compressed_bytes_fast */,
|
|
uint64_t /* block_compressed_bytes_slow */) override {
|
|
// Intentionally left blank. No interest in collecting stats for
|
|
// blocks.
|
|
}
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
std::string val;
|
|
PutFixed32(&val, static_cast<uint32_t>(index_type_));
|
|
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
|
|
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
|
|
whole_key_filtering_ ? kPropTrue : kPropFalse});
|
|
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
|
|
prefix_filtering_ ? kPropTrue : kPropFalse});
|
|
if (decoupled_partitioned_filters_) {
|
|
properties->insert(
|
|
{BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
|
|
kPropTrue});
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// The name of the properties collector can be used for debugging purpose.
|
|
const char* Name() const override {
|
|
return "BlockBasedTablePropertiesCollector";
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
// Intentionally left blank.
|
|
return UserCollectedProperties();
|
|
}
|
|
|
|
private:
|
|
BlockBasedTableOptions::IndexType index_type_;
|
|
bool whole_key_filtering_;
|
|
bool prefix_filtering_;
|
|
bool decoupled_partitioned_filters_;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::WorkingAreaPair {
|
|
Compressor::ManagedWorkingArea compress;
|
|
Decompressor::ManagedWorkingArea verify;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::ParallelCompressionRep {
|
|
// TODO: consider replacing with autovector or similar
|
|
// Keys is a wrapper of vector of strings avoiding
|
|
// releasing string memories during vector clear()
|
|
// in order to save memory allocation overhead
|
|
class Keys {
|
|
public:
|
|
Keys() : keys_(kKeysInitSize), size_(0) {}
|
|
void PushBack(const Slice& key) {
|
|
if (size_ == keys_.size()) {
|
|
keys_.emplace_back(key.data(), key.size());
|
|
} else {
|
|
keys_[size_].assign(key.data(), key.size());
|
|
}
|
|
size_++;
|
|
}
|
|
void SwapAssign(std::vector<std::string>& keys) {
|
|
size_ = keys.size();
|
|
std::swap(keys_, keys);
|
|
}
|
|
void Clear() { size_ = 0; }
|
|
size_t Size() { return size_; }
|
|
std::string& Back() { return keys_[size_ - 1]; }
|
|
std::string& operator[](size_t idx) {
|
|
assert(idx < size_);
|
|
return keys_[idx];
|
|
}
|
|
|
|
private:
|
|
static constexpr size_t kKeysInitSize = 32;
|
|
std::vector<std::string> keys_;
|
|
size_t size_;
|
|
};
|
|
Keys curr_block_keys;
|
|
|
|
struct BlockRep;
|
|
|
|
// Use BlockRepSlot to keep block order in write thread.
|
|
// slot_ will pass references to BlockRep
|
|
class BlockRepSlot {
|
|
public:
|
|
BlockRepSlot() : slot_(1) {}
|
|
template <typename T>
|
|
void Fill(T&& rep) {
|
|
slot_.push(std::forward<T>(rep));
|
|
}
|
|
void Take(BlockRep*& rep) { slot_.pop(rep); }
|
|
|
|
private:
|
|
// slot_ will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
WorkQueue<BlockRep*> slot_;
|
|
};
|
|
|
|
// BlockRep instances are fetched from and recycled to
|
|
// block_rep_pool during parallel compression.
|
|
struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
|
|
// Uncompressed block contents
|
|
std::string uncompressed;
|
|
std::string compressed;
|
|
CompressionType compression_type = kNoCompression;
|
|
// For efficiency, the std::string is repeatedly overwritten without
|
|
// checking for "has no value". Only at the end of its life will it be
|
|
// assigned "no value". Thus, it needs to start with a value.
|
|
std::optional<std::string> first_key_in_next_block = std::string{};
|
|
Keys keys;
|
|
BlockRepSlot slot;
|
|
Status status;
|
|
};
|
|
|
|
// Use a vector of BlockRep as a buffer for a determined number
|
|
// of BlockRep structures. All data referenced by pointers in
|
|
// BlockRep will be freed when this vector is destructed.
|
|
using BlockRepBuffer = std::vector<BlockRep>;
|
|
BlockRepBuffer block_rep_buf;
|
|
// Use a thread-safe queue for concurrent access from block
|
|
// building thread and writer thread.
|
|
using BlockRepPool = WorkQueue<BlockRep*>;
|
|
BlockRepPool block_rep_pool;
|
|
|
|
// Compression queue will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
using CompressQueue = WorkQueue<BlockRep*>;
|
|
CompressQueue compress_queue;
|
|
std::vector<port::Thread> compress_thread_pool;
|
|
|
|
// Write queue will pass references to BlockRep::slot in block_rep_buf,
|
|
// and those references are always valid before the corresponding
|
|
// BlockRep::slot is destructed, which is before the destruction of
|
|
// block_rep_buf.
|
|
using WriteQueue = WorkQueue<BlockRepSlot*>;
|
|
WriteQueue write_queue;
|
|
std::unique_ptr<port::Thread> write_thread;
|
|
|
|
// Estimate output file size when parallel compression is enabled. This is
|
|
// necessary because compression & flush are no longer synchronized,
|
|
// and BlockBasedTableBuilder::FileSize() is no longer accurate.
|
|
// memory_order_relaxed suffices because accurate statistics is not required.
|
|
class FileSizeEstimator {
|
|
public:
|
|
explicit FileSizeEstimator()
|
|
: uncomp_bytes_compressed(0),
|
|
uncomp_bytes_curr_block(0),
|
|
uncomp_bytes_curr_block_set(false),
|
|
uncomp_bytes_inflight(0),
|
|
blocks_inflight(0),
|
|
curr_compression_ratio(0),
|
|
estimated_file_size(0) {}
|
|
|
|
// Estimate file size when a block is about to be emitted to
|
|
// compression thread
|
|
void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) {
|
|
uint64_t new_uncomp_bytes_inflight =
|
|
uncomp_bytes_inflight.fetch_add(uncomp_block_size,
|
|
std::memory_order_relaxed) +
|
|
uncomp_block_size;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_uncomp_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
// Estimate file size when a block is already reaped from
|
|
// compression thread
|
|
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
|
|
assert(uncomp_bytes_curr_block_set);
|
|
|
|
uint64_t new_uncomp_bytes_compressed =
|
|
uncomp_bytes_compressed + uncomp_bytes_curr_block;
|
|
assert(new_uncomp_bytes_compressed > 0);
|
|
|
|
curr_compression_ratio.store(
|
|
(curr_compression_ratio.load(std::memory_order_relaxed) *
|
|
uncomp_bytes_compressed +
|
|
compressed_block_size) /
|
|
static_cast<double>(new_uncomp_bytes_compressed),
|
|
std::memory_order_relaxed);
|
|
uncomp_bytes_compressed = new_uncomp_bytes_compressed;
|
|
|
|
uint64_t new_uncomp_bytes_inflight =
|
|
uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block,
|
|
std::memory_order_relaxed) -
|
|
uncomp_bytes_curr_block;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_uncomp_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
|
|
uncomp_bytes_curr_block_set = false;
|
|
}
|
|
|
|
void SetEstimatedFileSize(uint64_t size) {
|
|
estimated_file_size.store(size, std::memory_order_relaxed);
|
|
}
|
|
|
|
uint64_t GetEstimatedFileSize() {
|
|
return estimated_file_size.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void SetCurrBlockUncompSize(uint64_t size) {
|
|
uncomp_bytes_curr_block = size;
|
|
uncomp_bytes_curr_block_set = true;
|
|
}
|
|
|
|
private:
|
|
// Input bytes compressed so far.
|
|
uint64_t uncomp_bytes_compressed;
|
|
// Size of current block being appended.
|
|
uint64_t uncomp_bytes_curr_block;
|
|
// Whether uncomp_bytes_curr_block has been set for next
|
|
// ReapBlock call.
|
|
bool uncomp_bytes_curr_block_set;
|
|
// Input bytes under compression and not appended yet.
|
|
std::atomic<uint64_t> uncomp_bytes_inflight;
|
|
// Number of blocks under compression and not appended yet.
|
|
std::atomic<uint64_t> blocks_inflight;
|
|
// Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock.
|
|
std::atomic<double> curr_compression_ratio;
|
|
// Estimated SST file size.
|
|
std::atomic<uint64_t> estimated_file_size;
|
|
};
|
|
FileSizeEstimator file_size_estimator;
|
|
|
|
// Facilities used for waiting first block completion. Need to Wait for
|
|
// the completion of first block compression and flush to get a non-zero
|
|
// compression ratio.
|
|
std::atomic<bool> first_block_processed;
|
|
std::condition_variable first_block_cond;
|
|
std::mutex first_block_mutex;
|
|
|
|
explicit ParallelCompressionRep(uint32_t parallel_threads)
|
|
: block_rep_buf(parallel_threads),
|
|
block_rep_pool(parallel_threads),
|
|
compress_queue(parallel_threads),
|
|
write_queue(parallel_threads),
|
|
first_block_processed(false) {
|
|
for (uint32_t i = 0; i < parallel_threads; i++) {
|
|
// Prime the queue of available BlockReps
|
|
block_rep_pool.push(&block_rep_buf[i]);
|
|
}
|
|
}
|
|
|
|
~ParallelCompressionRep() { block_rep_pool.finish(); }
|
|
|
|
// Make a block prepared to be emitted to compression thread
|
|
// Used in non-buffered mode
|
|
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
|
|
BlockBuilder* data_block) {
|
|
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
data_block->SwapAndReset(block_rep->uncompressed);
|
|
std::swap(block_rep->keys, curr_block_keys);
|
|
curr_block_keys.Clear();
|
|
return block_rep;
|
|
}
|
|
|
|
// Used in EnterUnbuffered
|
|
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
|
|
std::string* data_block,
|
|
std::vector<std::string>* keys) {
|
|
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
std::swap(block_rep->uncompressed, *data_block);
|
|
block_rep->keys.SwapAssign(*keys);
|
|
return block_rep;
|
|
}
|
|
|
|
// Emit a block to compression thread
|
|
void EmitBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
assert(block_rep->status.ok());
|
|
if (!write_queue.push(&block_rep->slot)) {
|
|
return;
|
|
}
|
|
if (!compress_queue.push(block_rep)) {
|
|
return;
|
|
}
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::unique_lock<std::mutex> lock(first_block_mutex);
|
|
first_block_cond.wait(lock, [this] {
|
|
return first_block_processed.load(std::memory_order_relaxed);
|
|
});
|
|
}
|
|
}
|
|
|
|
// Reap a block from compression thread
|
|
void ReapBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
block_rep->compressed.clear();
|
|
block_rep_pool.push(block_rep);
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::lock_guard<std::mutex> lock(first_block_mutex);
|
|
first_block_processed.store(true, std::memory_order_relaxed);
|
|
first_block_cond.notify_one();
|
|
}
|
|
}
|
|
|
|
private:
|
|
BlockRep* PrepareBlockInternal(const Slice* first_key_in_next_block) {
|
|
BlockRep* block_rep = nullptr;
|
|
block_rep_pool.pop(block_rep);
|
|
assert(block_rep != nullptr);
|
|
|
|
block_rep->compression_type = kNoCompression;
|
|
|
|
if (first_key_in_next_block == nullptr) {
|
|
block_rep->first_key_in_next_block = {};
|
|
} else {
|
|
block_rep->first_key_in_next_block->assign(
|
|
first_key_in_next_block->data(), first_key_in_next_block->size());
|
|
}
|
|
|
|
return block_rep;
|
|
}
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::Rep {
|
|
const ImmutableOptions ioptions;
|
|
// BEGIN from MutableCFOptions
|
|
std::shared_ptr<const SliceTransform> prefix_extractor;
|
|
// END from MutableCFOptions
|
|
const WriteOptions write_options;
|
|
const BlockBasedTableOptions table_options;
|
|
const InternalKeyComparator& internal_comparator;
|
|
// Size in bytes for the user-defined timestamps.
|
|
size_t ts_sz;
|
|
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
|
|
// user key will be stripped when creating the block based table. This
|
|
// stripping happens for all user keys, including the keys in data block,
|
|
// index block for data block, index block for index block (if index type is
|
|
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
|
|
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
|
|
// deletion entries.
|
|
// As long as the user keys are sorted when added via `Add` API, their logic
|
|
// ordering won't change after timestamps are stripped. However, for each user
|
|
// key to be logically equivalent before and after timestamp is stripped, the
|
|
// user key should contain the minimum timestamp.
|
|
bool persist_user_defined_timestamps;
|
|
WritableFileWriter* file;
|
|
std::atomic<uint64_t> offset;
|
|
size_t alignment;
|
|
BlockBuilder data_block;
|
|
// Buffers uncompressed data blocks to replay later. Needed when
|
|
// compression dictionary is enabled so we can finalize the dictionary before
|
|
// compressing any data blocks.
|
|
std::vector<std::string> data_block_buffers;
|
|
BlockBuilder range_del_block;
|
|
|
|
InternalKeySliceTransform internal_prefix_transform;
|
|
std::unique_ptr<IndexBuilder> index_builder;
|
|
std::string index_separator_scratch;
|
|
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
|
|
|
std::string last_ikey; // Internal key or empty (unset)
|
|
const Slice* first_key_in_next_block = nullptr;
|
|
bool warm_cache = false;
|
|
bool uses_explicit_compression_manager = false;
|
|
|
|
uint64_t sample_for_compression;
|
|
std::atomic<uint64_t> compressible_input_data_bytes;
|
|
std::atomic<uint64_t> uncompressible_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_slow_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_fast_data_bytes;
|
|
uint32_t compression_parallel_threads;
|
|
int max_compressed_bytes_per_kb;
|
|
size_t max_dict_sample_bytes = 0;
|
|
|
|
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
|
|
// *** these are distinct fields to minimize extra conditionals and ***
|
|
// *** field reads on hot code paths. ***
|
|
|
|
// A compressor for blocks in general, without dictionary compression
|
|
std::unique_ptr<Compressor> basic_compressor;
|
|
// A compressor using dictionary compression (when applicable)
|
|
std::unique_ptr<Compressor> compressor_with_dict;
|
|
// Once configured/determined, points to one of the above Compressors to
|
|
// use on data blocks.
|
|
Compressor* data_block_compressor = nullptr;
|
|
// A decompressor corresponding to basic_compressor (when non-nullptr).
|
|
// Used for verification and cache warming.
|
|
std::shared_ptr<Decompressor> basic_decompressor;
|
|
// When needed, a decompressor for verifying compression using a
|
|
// dictionary sampled/trained from this file.
|
|
std::unique_ptr<Decompressor> verify_decompressor_with_dict;
|
|
// When non-nullptr, compression should be verified with this corresponding
|
|
// decompressor, except for data blocks. (Points to same as basic_decompressor
|
|
// when verify_compression is set.)
|
|
UnownedPtr<Decompressor> verify_decompressor;
|
|
// Once configured/determined, points to one of the above Decompressors to use
|
|
// in verifying data blocks.
|
|
UnownedPtr<Decompressor> data_block_verify_decompressor;
|
|
|
|
// Set of compression types used for blocks in this file (mixing compression
|
|
// algorithms in a single file is allowed, using a CompressionManager)
|
|
SmallEnumSet<CompressionType, kDisableCompressionOption>
|
|
compression_types_used;
|
|
|
|
// Working area for basic_compressor when compression_parallel_threads==1
|
|
WorkingAreaPair basic_working_area;
|
|
// Working areas for data_block_compressor, for each of
|
|
// compression_parallel_threads
|
|
std::vector<WorkingAreaPair> data_block_working_areas;
|
|
|
|
size_t data_begin_offset = 0;
|
|
|
|
TableProperties props;
|
|
|
|
// States of the builder.
|
|
//
|
|
// - `kBuffered`: This is the initial state where zero or more data blocks are
|
|
// accumulated uncompressed in-memory. From this state, call
|
|
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
|
|
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
|
|
// state.
|
|
//
|
|
// - `kUnbuffered`: This is the state when compression dictionary is finalized
|
|
// either because it wasn't enabled in the first place or it's been created
|
|
// from sampling previously buffered data. In this state, blocks are simply
|
|
// compressed/written out as they fill up. From this state, call `Finish()`
|
|
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
|
|
// the partially created file.
|
|
//
|
|
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
|
|
// called, so the table builder is no longer usable. We must be in this
|
|
// state by the time the destructor runs.
|
|
enum class State {
|
|
kBuffered,
|
|
kUnbuffered,
|
|
kClosed,
|
|
};
|
|
State state = State::kUnbuffered;
|
|
// `kBuffered` state is allowed only as long as the buffering of uncompressed
|
|
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
|
|
uint64_t buffer_limit = 0;
|
|
std::shared_ptr<CacheReservationManager>
|
|
compression_dict_buffer_cache_res_mgr;
|
|
const bool use_delta_encoding_for_index_values;
|
|
std::unique_ptr<FilterBlockBuilder> filter_builder;
|
|
OffsetableCacheKey base_cache_key;
|
|
const TableFileCreationReason reason;
|
|
|
|
BlockHandle pending_handle; // Handle to add to index block
|
|
|
|
std::string single_threaded_compressed_output;
|
|
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
|
|
|
|
std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
|
|
|
|
std::unique_ptr<ParallelCompressionRep> pc_rep;
|
|
BlockCreateContext create_context;
|
|
|
|
// The size of the "tail" part of a SST file. "Tail" refers to
|
|
// all blocks after data blocks till the end of the SST file.
|
|
uint64_t tail_size;
|
|
|
|
// The total size of all blocks in this file before they are compressed.
|
|
// This is used for logging compaction stats.
|
|
uint64_t pre_compression_size = 0;
|
|
|
|
// See class Footer
|
|
uint32_t base_context_checksum;
|
|
|
|
uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
|
|
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
|
|
|
|
bool IsParallelCompressionEnabled() const {
|
|
return compression_parallel_threads > 1;
|
|
}
|
|
|
|
Status GetStatus() {
|
|
// We need to make modifications of status visible when status_ok is set
|
|
// to false, and this is ensured by status_mutex, so no special memory
|
|
// order for status_ok is required.
|
|
if (status_ok.load(std::memory_order_relaxed)) {
|
|
return Status::OK();
|
|
} else {
|
|
return CopyStatus();
|
|
}
|
|
}
|
|
|
|
Status CopyStatus() {
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
return status;
|
|
}
|
|
|
|
IOStatus GetIOStatus() {
|
|
// We need to make modifications of io_status visible when status_ok is set
|
|
// to false, and this is ensured by io_status_mutex, so no special memory
|
|
// order for io_status_ok is required.
|
|
if (io_status_ok.load(std::memory_order_relaxed)) {
|
|
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
|
|
auto ios = CopyIOStatus();
|
|
ios.PermitUncheckedError();
|
|
// Assume no races in unit tests
|
|
assert(ios.ok());
|
|
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
|
return IOStatus::OK();
|
|
} else {
|
|
return CopyIOStatus();
|
|
}
|
|
}
|
|
|
|
IOStatus CopyIOStatus() {
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
return io_status;
|
|
}
|
|
|
|
// Never erase an existing status that is not OK.
|
|
void SetStatus(Status s) {
|
|
if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
status = s;
|
|
status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Never erase an existing I/O status that is not OK.
|
|
// Calling this will also SetStatus(ios)
|
|
void SetIOStatus(IOStatus ios) {
|
|
if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
io_status = ios;
|
|
io_status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
SetStatus(ios);
|
|
}
|
|
|
|
Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* f)
|
|
: ioptions(tbo.ioptions),
|
|
prefix_extractor(tbo.moptions.prefix_extractor),
|
|
write_options(tbo.write_options),
|
|
table_options(table_opt),
|
|
internal_comparator(tbo.internal_comparator),
|
|
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
|
|
persist_user_defined_timestamps(
|
|
tbo.ioptions.persist_user_defined_timestamps),
|
|
file(f),
|
|
offset(0),
|
|
alignment(table_options.block_align
|
|
? std::min(static_cast<size_t>(table_options.block_size),
|
|
kDefaultPageSize)
|
|
: 0),
|
|
data_block(table_options.block_restart_interval,
|
|
table_options.use_delta_encoding,
|
|
false /* use_value_delta_encoding */,
|
|
tbo.internal_comparator.user_comparator()
|
|
->CanKeysWithDifferentByteContentsBeEqual()
|
|
? BlockBasedTableOptions::kDataBlockBinarySearch
|
|
: table_options.data_block_index_type,
|
|
table_options.data_block_hash_table_util_ratio, ts_sz,
|
|
persist_user_defined_timestamps),
|
|
range_del_block(
|
|
1 /* block_restart_interval */, true /* use_delta_encoding */,
|
|
false /* use_value_delta_encoding */,
|
|
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
|
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
|
persist_user_defined_timestamps),
|
|
internal_prefix_transform(prefix_extractor.get()),
|
|
sample_for_compression(tbo.moptions.sample_for_compression),
|
|
compressible_input_data_bytes(0),
|
|
uncompressible_input_data_bytes(0),
|
|
sampled_input_data_bytes(0),
|
|
sampled_output_slow_data_bytes(0),
|
|
sampled_output_fast_data_bytes(0),
|
|
compression_parallel_threads(tbo.compression_opts.parallel_threads),
|
|
max_compressed_bytes_per_kb(
|
|
tbo.compression_opts.max_compressed_bytes_per_kb),
|
|
data_block_working_areas(compression_parallel_threads),
|
|
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
|
!table_opt.block_align),
|
|
reason(tbo.reason),
|
|
flush_block_policy(
|
|
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
|
|
table_options, data_block)),
|
|
create_context(&table_options, &ioptions, ioptions.stats,
|
|
/*decompressor=*/nullptr,
|
|
tbo.moptions.block_protection_bytes_per_key,
|
|
tbo.internal_comparator.user_comparator(),
|
|
!use_delta_encoding_for_index_values,
|
|
table_opt.index_type ==
|
|
BlockBasedTableOptions::kBinarySearchWithFirstKey),
|
|
tail_size(0),
|
|
status_ok(true),
|
|
io_status_ok(true) {
|
|
FilterBuildingContext filter_context(table_options);
|
|
|
|
filter_context.info_log = ioptions.logger;
|
|
filter_context.column_family_name = tbo.column_family_name;
|
|
filter_context.reason = reason;
|
|
|
|
// Only populate other fields if known to be in LSM rather than
|
|
// generating external SST file
|
|
if (reason != TableFileCreationReason::kMisc) {
|
|
filter_context.compaction_style = ioptions.compaction_style;
|
|
filter_context.num_levels = ioptions.num_levels;
|
|
filter_context.level_at_creation = tbo.level_at_creation;
|
|
filter_context.is_bottommost = tbo.is_bottommost;
|
|
assert(filter_context.level_at_creation < filter_context.num_levels);
|
|
}
|
|
|
|
props.compression_options =
|
|
CompressionOptionsToString(tbo.compression_opts);
|
|
|
|
auto* mgr = tbo.moptions.compression_manager.get();
|
|
if (mgr == nullptr) {
|
|
uses_explicit_compression_manager = false;
|
|
mgr = GetBuiltinCompressionManager(
|
|
GetCompressFormatForVersion(
|
|
static_cast<uint32_t>(table_opt.format_version)))
|
|
.get();
|
|
} else {
|
|
uses_explicit_compression_manager = true;
|
|
|
|
// Stuff some extra debugging info as extra pseudo-options. Using
|
|
// underscore prefix to indicate they are special.
|
|
props.compression_options.append("_compression_manager=");
|
|
props.compression_options.append(mgr->GetId());
|
|
props.compression_options.append("; ");
|
|
}
|
|
|
|
// Sanitize to only allowing compression when it saves space.
|
|
max_compressed_bytes_per_kb =
|
|
std::min(int{1023}, tbo.compression_opts.max_compressed_bytes_per_kb);
|
|
|
|
basic_compressor = mgr->GetCompressorForSST(
|
|
filter_context, tbo.compression_opts, tbo.compression_type);
|
|
if (basic_compressor) {
|
|
if (table_options.enable_index_compression) {
|
|
basic_working_area.compress = basic_compressor->ObtainWorkingArea();
|
|
}
|
|
max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
|
|
CacheEntryRole::kDataBlock);
|
|
if (max_dict_sample_bytes > 0) {
|
|
state = State::kBuffered;
|
|
if (tbo.target_file_size == 0) {
|
|
buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
|
|
} else if (tbo.compression_opts.max_dict_buffer_bytes == 0) {
|
|
buffer_limit = tbo.target_file_size;
|
|
} else {
|
|
buffer_limit = std::min(tbo.target_file_size,
|
|
tbo.compression_opts.max_dict_buffer_bytes);
|
|
}
|
|
} else {
|
|
// No distinct data block compressor using dictionary
|
|
data_block_compressor = basic_compressor.get();
|
|
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
|
|
data_block_working_areas[i].compress =
|
|
data_block_compressor->ObtainWorkingArea();
|
|
}
|
|
}
|
|
basic_decompressor = mgr->GetDecompressorForCompressor(*basic_compressor);
|
|
create_context.decompressor = basic_decompressor.get();
|
|
|
|
if (table_options.verify_compression) {
|
|
verify_decompressor = basic_decompressor.get();
|
|
if (table_options.enable_index_compression) {
|
|
basic_working_area.verify = verify_decompressor->ObtainWorkingArea(
|
|
basic_compressor->GetPreferredCompressionType());
|
|
}
|
|
if (state == State::kUnbuffered) {
|
|
assert(data_block_compressor);
|
|
data_block_verify_decompressor = verify_decompressor.get();
|
|
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
|
|
data_block_working_areas[i].verify =
|
|
data_block_verify_decompressor->ObtainWorkingArea(
|
|
data_block_compressor->GetPreferredCompressionType());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
switch (table_options.prepopulate_block_cache) {
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
|
|
warm_cache = (reason == TableFileCreationReason::kFlush);
|
|
break;
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
|
|
warm_cache = false;
|
|
break;
|
|
default:
|
|
// missing case
|
|
assert(false);
|
|
warm_cache = false;
|
|
}
|
|
|
|
const auto compress_dict_build_buffer_charged =
|
|
table_options.cache_usage_options.options_overrides
|
|
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
|
|
.charged;
|
|
if (table_options.block_cache &&
|
|
(compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kEnabled ||
|
|
compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kFallback)) {
|
|
compression_dict_buffer_cache_res_mgr =
|
|
std::make_shared<CacheReservationManagerImpl<
|
|
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
|
|
table_options.block_cache);
|
|
} else {
|
|
compression_dict_buffer_cache_res_mgr = nullptr;
|
|
}
|
|
|
|
if (table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
|
&internal_comparator, use_delta_encoding_for_index_values,
|
|
table_options, ts_sz, persist_user_defined_timestamps);
|
|
index_builder.reset(p_index_builder_);
|
|
} else {
|
|
index_builder.reset(IndexBuilder::CreateIndexBuilder(
|
|
table_options.index_type, &internal_comparator,
|
|
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
|
|
table_options, ts_sz, persist_user_defined_timestamps));
|
|
}
|
|
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
|
|
// Apply optimize_filters_for_hits setting here when applicable by
|
|
// skipping filter generation
|
|
filter_builder.reset();
|
|
} else if (tbo.skip_filters) {
|
|
// For SstFileWriter skip_filters
|
|
filter_builder.reset();
|
|
} else if (!table_options.filter_policy) {
|
|
// Null filter_policy -> no filter
|
|
filter_builder.reset();
|
|
} else {
|
|
filter_builder.reset(CreateFilterBlockBuilder(
|
|
ioptions, tbo.moptions, filter_context,
|
|
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
|
|
persist_user_defined_timestamps));
|
|
}
|
|
|
|
assert(tbo.internal_tbl_prop_coll_factories);
|
|
for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
|
|
assert(factory);
|
|
|
|
std::unique_ptr<InternalTblPropColl> collector{
|
|
factory->CreateInternalTblPropColl(
|
|
tbo.column_family_id, tbo.level_at_creation,
|
|
tbo.ioptions.num_levels,
|
|
tbo.last_level_inclusive_max_seqno_threshold)};
|
|
if (collector) {
|
|
table_properties_collectors.emplace_back(std::move(collector));
|
|
}
|
|
}
|
|
table_properties_collectors.emplace_back(
|
|
new BlockBasedTablePropertiesCollector(
|
|
table_options.index_type, table_options.whole_key_filtering,
|
|
prefix_extractor != nullptr,
|
|
table_options.decouple_partitioned_filters));
|
|
if (ts_sz > 0 && persist_user_defined_timestamps) {
|
|
table_properties_collectors.emplace_back(
|
|
new TimestampTablePropertiesCollector(
|
|
tbo.internal_comparator.user_comparator()));
|
|
}
|
|
|
|
// These are only needed for populating table properties
|
|
props.column_family_id = tbo.column_family_id;
|
|
props.column_family_name = tbo.column_family_name;
|
|
props.oldest_key_time = tbo.oldest_key_time;
|
|
props.newest_key_time = tbo.newest_key_time;
|
|
props.file_creation_time = tbo.file_creation_time;
|
|
props.orig_file_number = tbo.cur_file_num;
|
|
props.db_id = tbo.db_id;
|
|
props.db_session_id = tbo.db_session_id;
|
|
props.db_host_id = ioptions.db_host_id;
|
|
props.format_version = table_options.format_version;
|
|
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
|
|
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
|
|
}
|
|
// Default is UINT64_MAX for unknown. Setting it to 0 here
|
|
// to allow updating it by taking max in BlockBasedTableBuilder::Add().
|
|
props.key_largest_seqno = 0;
|
|
PrePopulateCompressionProperties(mgr);
|
|
|
|
if (FormatVersionUsesContextChecksum(table_options.format_version)) {
|
|
// Must be non-zero and semi- or quasi-random
|
|
// TODO: ideally guaranteed different for related files (e.g. use file
|
|
// number and db_session, for benefit of SstFileWriter)
|
|
do {
|
|
base_context_checksum = Random::GetTLSInstance()->Next();
|
|
} while (UNLIKELY(base_context_checksum == 0));
|
|
} else {
|
|
base_context_checksum = 0;
|
|
}
|
|
|
|
if (alignment > 0 && basic_compressor) {
|
|
// With better sanitization in `CompactionPicker::CompactFiles()`, we
|
|
// would not need to handle this case here and could change it to an
|
|
// assertion instead.
|
|
SetStatus(Status::InvalidArgument(
|
|
"Enable block_align, but compression enabled"));
|
|
}
|
|
}
|
|
|
|
Rep(const Rep&) = delete;
|
|
Rep& operator=(const Rep&) = delete;
|
|
|
|
void PrePopulateCompressionProperties(UnownedPtr<CompressionManager> mgr) {
|
|
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
|
|
assert(mgr);
|
|
// Use newer compression_name property
|
|
props.compression_name.reserve(32);
|
|
// If compression is disabled, use empty manager name
|
|
if (basic_compressor) {
|
|
props.compression_name.append(mgr->CompatibilityName());
|
|
}
|
|
props.compression_name.push_back(';');
|
|
// Rest of property to be filled out at the end of building the file
|
|
} else {
|
|
// Use legacy compression_name property, populated at the end of building
|
|
// the file. Not compatible with compression managers using custom
|
|
// algorithms / compression types.
|
|
assert(Slice(mgr->CompatibilityName())
|
|
.compare(GetBuiltinCompressionManager(
|
|
GetCompressFormatForVersion(
|
|
static_cast<uint32_t>(props.format_version)))
|
|
->CompatibilityName()) == 0);
|
|
}
|
|
}
|
|
void PostPopulateCompressionProperties() {
|
|
// Do not include "no compression" in the set. It's not really useful
|
|
// information whether there are any uncompressed blocks. Some kinds of
|
|
// blocks are never compressed anyway.
|
|
compression_types_used.Remove(kNoCompression);
|
|
size_t ctype_count = compression_types_used.count();
|
|
|
|
if (uses_explicit_compression_manager) {
|
|
// Stuff some extra debugging info as extra pseudo-options. Using
|
|
// underscore prefix to indicate they are special.
|
|
std::string& compression_options = props.compression_options;
|
|
compression_options.append("_compressor=");
|
|
compression_options.append(data_block_compressor
|
|
? data_block_compressor->GetId()
|
|
: std::string{});
|
|
compression_options.append("; ");
|
|
} else {
|
|
// No explicit compression manager
|
|
assert(compression_types_used.count() <= 1);
|
|
}
|
|
|
|
std::string& compression_name = props.compression_name;
|
|
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
|
|
// Fill in extended field of "compression name" property, which is the set
|
|
// of compression types used, sorted by unsigned byte and then hex
|
|
// encoded with two digits each (so that table properties are human
|
|
// readable).
|
|
assert(*compression_name.rbegin() == ';');
|
|
size_t pos = compression_name.size();
|
|
// Make space for the field contents
|
|
compression_name.append(ctype_count * 2, '\0');
|
|
char* ptr = compression_name.data() + pos;
|
|
// Populate the field contents
|
|
for (CompressionType t : compression_types_used) {
|
|
PutBaseChars<16>(&ptr, /*digits=*/2, static_cast<unsigned char>(t),
|
|
/*uppercase=*/true);
|
|
}
|
|
assert(ptr == compression_name.data() + pos + ctype_count * 2);
|
|
// Allow additional fields in the future
|
|
compression_name.push_back(';');
|
|
} else {
|
|
// Use legacy compression naming. To adhere to requirements described in
|
|
// TableProperties::compression_name, we might have to replace the name
|
|
// based on the legacy configured compression type.
|
|
assert(compression_name.empty());
|
|
if (ctype_count == 0) {
|
|
// We could get a slight performance boost in the reader by marking the
|
|
// file as "no compression" if compression is configured but
|
|
// consistently rejected, but that would give misleading info for
|
|
// debugging purposes. So instead we record the configured compression
|
|
// type, matching the historical behavior.
|
|
if (data_block_compressor) {
|
|
compression_name = CompressionTypeToString(
|
|
data_block_compressor->GetPreferredCompressionType());
|
|
} else {
|
|
assert(basic_compressor == nullptr);
|
|
compression_name = CompressionTypeToString(kNoCompression);
|
|
}
|
|
} else if (compression_types_used.Contains(kZSTD)) {
|
|
compression_name = CompressionTypeToString(kZSTD);
|
|
} else {
|
|
compression_name =
|
|
CompressionTypeToString(*compression_types_used.begin());
|
|
}
|
|
}
|
|
}
|
|
|
|
private:
|
|
// Synchronize status & io_status accesses across threads from main thread,
|
|
// compression thread and write thread in parallel compression.
|
|
std::mutex status_mutex;
|
|
std::atomic<bool> status_ok;
|
|
Status status;
|
|
std::mutex io_status_mutex;
|
|
std::atomic<bool> io_status_ok;
|
|
IOStatus io_status;
|
|
};
|
|
|
|
BlockBasedTableBuilder::BlockBasedTableBuilder(
|
|
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* file) {
|
|
BlockBasedTableOptions sanitized_table_options(table_options);
|
|
auto ucmp = tbo.internal_comparator.user_comparator();
|
|
assert(ucmp);
|
|
(void)ucmp; // avoids unused variable error.
|
|
rep_ = new Rep(sanitized_table_options, tbo, file);
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
|
|
const_cast<TableProperties*>(&rep_->props));
|
|
|
|
BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
|
|
tbo.cur_file_num, &rep_->base_cache_key);
|
|
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StartParallelCompression();
|
|
} else if (rep_->basic_compressor) {
|
|
rep_->single_threaded_compressed_output.reserve(table_options.block_size);
|
|
}
|
|
}
|
|
|
|
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
|
|
// Catch errors where caller forgot to call Finish()
|
|
assert(rep_->state == Rep::State::kClosed);
|
|
delete rep_;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Add(const Slice& ikey, const Slice& value) {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
ValueType value_type;
|
|
SequenceNumber seq;
|
|
UnPackSequenceAndType(ExtractInternalKeyFooter(ikey), &seq, &value_type);
|
|
r->props.key_largest_seqno = std::max(r->props.key_largest_seqno, seq);
|
|
if (IsValueType(value_type)) {
|
|
#ifndef NDEBUG
|
|
if (r->props.num_entries > r->props.num_range_deletions) {
|
|
assert(r->internal_comparator.Compare(ikey, Slice(r->last_ikey)) > 0);
|
|
}
|
|
bool skip = false;
|
|
TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Add::skip", (void*)&skip);
|
|
if (skip) {
|
|
return;
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
auto should_flush = r->flush_block_policy->Update(ikey, value);
|
|
if (should_flush) {
|
|
assert(!r->data_block.empty());
|
|
r->first_key_in_next_block = &ikey;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
bool exceeds_buffer_limit =
|
|
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
|
|
bool exceeds_global_block_cache_limit = false;
|
|
|
|
// Increase cache charging for the last buffered data block
|
|
// only if the block is not going to be unbuffered immediately
|
|
// and there exists a cache reservation manager
|
|
if (!exceeds_buffer_limit &&
|
|
r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s =
|
|
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
exceeds_global_block_cache_limit = s.IsMemoryLimit();
|
|
}
|
|
|
|
if (exceeds_buffer_limit || exceeds_global_block_cache_limit) {
|
|
EnterUnbuffered();
|
|
}
|
|
}
|
|
|
|
// Add item to index block.
|
|
// We do not emit the index entry for a block until we have seen the
|
|
// first key for the next data block. This allows us to use shorter
|
|
// keys in the index block. For example, consider a block boundary
|
|
// between the keys "the quick brown fox" and "the who". We can use
|
|
// "the r" as the key for the index block entry since it is >= all
|
|
// entries in the first block and < all entries in subsequent
|
|
// blocks.
|
|
if (ok() && r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys.Clear();
|
|
} else {
|
|
r->index_builder->AddIndexEntry(r->last_ikey, &ikey,
|
|
r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Note: PartitionedFilterBlockBuilder requires key being added to filter
|
|
// builder after being added to index builder.
|
|
if (r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys.PushBack(ikey);
|
|
} else {
|
|
if (r->filter_builder != nullptr) {
|
|
r->filter_builder->AddWithPrevKey(
|
|
ExtractUserKeyAndStripTimestamp(ikey, r->ts_sz),
|
|
r->last_ikey.empty()
|
|
? Slice{}
|
|
: ExtractUserKeyAndStripTimestamp(r->last_ikey, r->ts_sz));
|
|
}
|
|
}
|
|
}
|
|
|
|
r->data_block.AddWithLastKey(ikey, value, r->last_ikey);
|
|
r->last_ikey.assign(ikey.data(), ikey.size());
|
|
assert(!r->last_ikey.empty());
|
|
if (r->state == Rep::State::kBuffered) {
|
|
// Buffered keys will be replayed from data_block_buffers during
|
|
// `Finish()` once compression dictionary has been finalized.
|
|
} else {
|
|
if (!r->IsParallelCompressionEnabled()) {
|
|
r->index_builder->OnKeyAdded(ikey);
|
|
}
|
|
}
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
Slice persisted_end = value;
|
|
// When timestamps should not be persisted, we physically strip away range
|
|
// tombstone end key's user timestamp before passing it along to block
|
|
// builder. Physically stripping away start key's user timestamp is
|
|
// handled at the block builder level in the same way as the other data
|
|
// blocks.
|
|
if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
|
|
persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
|
|
}
|
|
r->range_del_block.Add(ikey, persisted_end);
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
} else {
|
|
assert(false);
|
|
r->SetStatus(Status::InvalidArgument(
|
|
"BlockBasedBuilder::Add() received a key with invalid value type " +
|
|
std::to_string(static_cast<unsigned int>(value_type))));
|
|
return;
|
|
}
|
|
|
|
r->props.num_entries++;
|
|
r->props.raw_key_size += ikey.size();
|
|
if (!r->persist_user_defined_timestamps) {
|
|
r->props.raw_key_size -= r->ts_sz;
|
|
}
|
|
r->props.raw_value_size += value.size();
|
|
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
|
|
value_type == kTypeDeletionWithTimestamp) {
|
|
r->props.num_deletions++;
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
r->props.num_deletions++;
|
|
r->props.num_range_deletions++;
|
|
} else if (value_type == kTypeMerge) {
|
|
r->props.num_merge_operands++;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Flush() {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
if (r->data_block.empty()) {
|
|
return;
|
|
}
|
|
|
|
Slice uncompressed_block_data = r->data_block.Finish();
|
|
|
|
// NOTE: compression sampling is done here in the same thread as building
|
|
// the uncompressed block because of the requirements to call table
|
|
// property collectors:
|
|
// * BlockAdd function expects block_compressed_bytes_{fast,slow} for
|
|
// historical reasons. Probably a hassle to remove.
|
|
// * Collector is not thread safe so calls need to be serialized/synchronized.
|
|
// * Ideally, AddUserKey and BlockAdd calls need to line up such that a
|
|
// reported block corresponds to all the keys reported since the previous
|
|
// block.
|
|
|
|
// If requested, we sample one in every N block with a
|
|
// fast and slow compression algorithm and report the stats.
|
|
// The users can use these stats to decide if it is worthwhile
|
|
// enabling compression and they also get a hint about which
|
|
// compression algorithm wil be beneficial.
|
|
if (r->sample_for_compression > 0 &&
|
|
Random::GetTLSInstance()->OneIn(
|
|
static_cast<int>(r->sample_for_compression))) {
|
|
std::string sampled_output_fast;
|
|
std::string sampled_output_slow;
|
|
|
|
// Sampling with a fast compression algorithm
|
|
if (LZ4_Supported() || Snappy_Supported()) {
|
|
CompressionType c =
|
|
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
|
|
CompressionOptions options;
|
|
CompressionContext context(c, options);
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c);
|
|
|
|
OLD_CompressData(
|
|
uncompressed_block_data, info_tmp,
|
|
GetCompressFormatForVersion(r->table_options.format_version),
|
|
&sampled_output_fast);
|
|
}
|
|
|
|
// Sampling with a slow but high-compression algorithm
|
|
if (ZSTD_Supported() || Zlib_Supported()) {
|
|
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
|
|
CompressionOptions options;
|
|
CompressionContext context(c, options);
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c);
|
|
|
|
OLD_CompressData(
|
|
uncompressed_block_data, info_tmp,
|
|
GetCompressFormatForVersion(r->table_options.format_version),
|
|
&sampled_output_slow);
|
|
}
|
|
|
|
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
|
|
// Currently compression sampling is only enabled for data block.
|
|
r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
NotifyCollectTableCollectorsOnBlockAdd(
|
|
r->table_properties_collectors, uncompressed_block_data.size(),
|
|
sampled_output_slow.size(), sampled_output_fast.size());
|
|
} else {
|
|
NotifyCollectTableCollectorsOnBlockAdd(
|
|
r->table_properties_collectors, uncompressed_block_data.size(),
|
|
0 /*block_compressed_bytes_slow*/, 0 /*block_compressed_bytes_fast*/);
|
|
}
|
|
|
|
if (rep_->state == Rep::State::kBuffered) {
|
|
std::string uncompressed_block_holder;
|
|
uncompressed_block_holder.reserve(rep_->table_options.block_size);
|
|
r->data_block.SwapAndReset(uncompressed_block_holder);
|
|
assert(uncompressed_block_data.size() == uncompressed_block_holder.size());
|
|
rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_holder));
|
|
rep_->data_begin_offset += uncompressed_block_data.size();
|
|
} else if (r->IsParallelCompressionEnabled()) {
|
|
assert(rep_->state == Rep::State::kUnbuffered);
|
|
ParallelCompressionRep::BlockRep* block_rep =
|
|
r->pc_rep->PrepareBlock(r->first_key_in_next_block, &(r->data_block));
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
assert(rep_->state == Rep::State::kUnbuffered);
|
|
WriteBlock(uncompressed_block_data, &r->pending_handle, BlockType::kData);
|
|
r->data_block.Reset();
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
|
|
BlockHandle* handle,
|
|
BlockType block_type) {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kUnbuffered);
|
|
CompressionType type;
|
|
Status compress_status;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
CompressAndVerifyBlock(
|
|
uncompressed_block_data, is_data_block,
|
|
is_data_block ? r->data_block_working_areas[0] : r->basic_working_area,
|
|
&r->single_threaded_compressed_output, &type, &compress_status);
|
|
r->SetStatus(compress_status);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
|
|
&r->single_threaded_compressed_output);
|
|
WriteMaybeCompressedBlock(type == kNoCompression
|
|
? uncompressed_block_data
|
|
: Slice(r->single_threaded_compressed_output),
|
|
type, handle, block_type, &uncompressed_block_data);
|
|
r->single_threaded_compressed_output.clear();
|
|
if (is_data_block) {
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkCompression(WorkingAreaPair& working_area) {
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
|
|
assert(block_rep != nullptr);
|
|
// Skip compression if we are aborting anyway
|
|
if (ok()) {
|
|
CompressAndVerifyBlock(block_rep->uncompressed, true, /* is_data_block*/
|
|
working_area, &block_rep->compressed,
|
|
&block_rep->compression_type, &block_rep->status);
|
|
}
|
|
block_rep->slot.Fill(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::CompressAndVerifyBlock(
|
|
const Slice& uncompressed_block_data, bool is_data_block,
|
|
WorkingAreaPair& working_area, std::string* compressed_output,
|
|
CompressionType* result_compression_type, Status* out_status) {
|
|
Rep* r = rep_;
|
|
|
|
Compressor* compressor = nullptr;
|
|
Decompressor* verify_decomp = nullptr;
|
|
if (is_data_block) {
|
|
compressor = r->data_block_compressor;
|
|
verify_decomp = r->data_block_verify_decompressor.get();
|
|
} else {
|
|
compressor = r->basic_compressor.get();
|
|
verify_decomp = r->verify_decompressor.get();
|
|
}
|
|
|
|
CompressionType type = kNoCompression;
|
|
if (LIKELY(uncompressed_block_data.size() < kCompressionSizeLimit)) {
|
|
if (compressor) {
|
|
StopWatchNano timer(
|
|
r->ioptions.clock,
|
|
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
|
|
|
|
*out_status =
|
|
compressor->CompressBlock(uncompressed_block_data, compressed_output,
|
|
&type, &working_area.compress);
|
|
|
|
// Post-condition of Compressor::CompressBlock
|
|
assert(type == kNoCompression || out_status->ok());
|
|
assert(type == kNoCompression ||
|
|
r->table_options.verify_compression == (verify_decomp != nullptr));
|
|
|
|
// Check for acceptable compression ratio. (For efficiency, avoid floating
|
|
// point and division.)
|
|
// TODO: integrate into Compressor?
|
|
if (compressed_output->size() >
|
|
(static_cast<uint64_t>(r->max_compressed_bytes_per_kb) *
|
|
uncompressed_block_data.size()) >>
|
|
10) {
|
|
// Prefer to keep uncompressed
|
|
type = kNoCompression;
|
|
}
|
|
|
|
// Some of the compression algorithms are known to be unreliable. If
|
|
// the verify_compression flag is set then try to de-compress the
|
|
// compressed data and compare to the input.
|
|
if (verify_decomp && type != kNoCompression) {
|
|
BlockContents contents;
|
|
Status uncompress_status = DecompressBlockData(
|
|
compressed_output->data(), compressed_output->size(), type,
|
|
*verify_decomp, &contents, r->ioptions,
|
|
/*allocator=*/nullptr, &working_area.verify);
|
|
|
|
if (uncompress_status.ok()) {
|
|
bool data_match = contents.data.compare(uncompressed_block_data) == 0;
|
|
if (!data_match) {
|
|
// The result of the compression was invalid. abort.
|
|
const char* const msg =
|
|
"Decompressed block did not match pre-compression block";
|
|
ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
|
|
*out_status = Status::Corruption(msg);
|
|
type = kNoCompression;
|
|
}
|
|
} else {
|
|
// Decompression reported an error. abort.
|
|
*out_status =
|
|
Status::Corruption(std::string("Could not decompress: ") +
|
|
uncompress_status.getState());
|
|
type = kNoCompression;
|
|
}
|
|
}
|
|
if (timer.IsStarted()) {
|
|
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
|
|
timer.ElapsedNanos());
|
|
}
|
|
}
|
|
if (is_data_block) {
|
|
r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(),
|
|
std::memory_order_relaxed);
|
|
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
} else {
|
|
// Status is not OK, or block is too big to be compressed.
|
|
if (is_data_block) {
|
|
r->uncompressible_input_data_bytes.fetch_add(
|
|
uncompressed_block_data.size() + kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Abort compression if the block is too big, or did not pass
|
|
// verification.
|
|
if (type == kNoCompression) {
|
|
bool compression_attempted = !compressed_output->empty();
|
|
RecordTick(r->ioptions.stats, compression_attempted
|
|
? NUMBER_BLOCK_COMPRESSION_REJECTED
|
|
: NUMBER_BLOCK_COMPRESSION_BYPASSED);
|
|
RecordTick(r->ioptions.stats,
|
|
compression_attempted ? BYTES_COMPRESSION_REJECTED
|
|
: BYTES_COMPRESSION_BYPASSED,
|
|
uncompressed_block_data.size());
|
|
} else {
|
|
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
|
|
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
|
|
uncompressed_block_data.size());
|
|
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
|
|
compressed_output->size());
|
|
}
|
|
*result_compression_type = type;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
|
|
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
|
|
BlockType block_type, const Slice* uncompressed_block_data) {
|
|
// File format contains a sequence of blocks where each block has:
|
|
// block_data: uint8[n]
|
|
// compression_type: uint8
|
|
// checksum: uint32
|
|
Rep* r = rep_;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
IOOptions io_options;
|
|
IOStatus io_s =
|
|
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
// Old, misleading name of this function: WriteRawBlock
|
|
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
|
|
const uint64_t offset = r->get_offset();
|
|
handle->set_offset(offset);
|
|
handle->set_size(block_contents.size());
|
|
assert(status().ok());
|
|
assert(io_status().ok());
|
|
if (uncompressed_block_data == nullptr) {
|
|
uncompressed_block_data = &block_contents;
|
|
assert(comp_type == kNoCompression);
|
|
}
|
|
|
|
// TODO: consider a variant of this function that puts the trailer after
|
|
// block_contents (if it comes from a std::string) so we only need one
|
|
// r->file->Append call
|
|
{
|
|
io_s = r->file->Append(io_options, block_contents);
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
r->compression_types_used.Add(comp_type);
|
|
std::array<char, kBlockTrailerSize> trailer;
|
|
trailer[0] = comp_type;
|
|
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
|
|
r->table_options.checksum, block_contents.data(), block_contents.size(),
|
|
/*last_byte*/ comp_type);
|
|
checksum += ChecksumModifierForContext(r->base_context_checksum, offset);
|
|
|
|
if (block_type == BlockType::kFilter) {
|
|
Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
EncodeFixed32(trailer.data() + 1, checksum);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
|
|
trailer.data());
|
|
{
|
|
io_s = r->file->Append(io_options, Slice(trailer.data(), trailer.size()));
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (r->warm_cache) {
|
|
Status s =
|
|
InsertBlockInCacheHelper(*uncompressed_block_data, handle, block_type);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
r->pre_compression_size +=
|
|
uncompressed_block_data->size() + kBlockTrailerSize;
|
|
r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
|
|
if (r->table_options.block_align && is_data_block) {
|
|
size_t pad_bytes =
|
|
(r->alignment -
|
|
((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
|
|
(r->alignment - 1);
|
|
|
|
io_s = r->file->Pad(io_options, pad_bytes);
|
|
if (io_s.ok()) {
|
|
r->pre_compression_size += pad_bytes;
|
|
r->set_offset(r->get_offset() + pad_bytes);
|
|
} else {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
if (is_data_block) {
|
|
r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
|
|
r->get_offset());
|
|
} else {
|
|
r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
|
|
Rep* r = rep_;
|
|
ParallelCompressionRep::BlockRepSlot* slot = nullptr;
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
// Starts empty; see FilterBlockBuilder::AddWithPrevKey
|
|
std::string prev_block_last_key_no_ts;
|
|
while (r->pc_rep->write_queue.pop(slot)) {
|
|
// FIXME: this is weird popping off write queue just to wait again on
|
|
// compress queue
|
|
assert(slot != nullptr);
|
|
slot->Take(block_rep);
|
|
assert(block_rep != nullptr);
|
|
if (!block_rep->status.ok()) {
|
|
r->SetStatus(block_rep->status);
|
|
// Reap block so that blocked Flush() can finish
|
|
// if there is one, and Flush() will notice !ok() next time.
|
|
block_rep->status = Status::OK();
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
continue;
|
|
}
|
|
|
|
Slice prev_key_no_ts = prev_block_last_key_no_ts;
|
|
for (size_t i = 0; i < block_rep->keys.Size(); i++) {
|
|
auto& key = block_rep->keys[i];
|
|
if (r->filter_builder != nullptr) {
|
|
Slice key_no_ts = ExtractUserKeyAndStripTimestamp(key, r->ts_sz);
|
|
r->filter_builder->AddWithPrevKey(key_no_ts, prev_key_no_ts);
|
|
prev_key_no_ts = key_no_ts;
|
|
}
|
|
r->index_builder->OnKeyAdded(key);
|
|
}
|
|
if (r->filter_builder != nullptr) {
|
|
prev_block_last_key_no_ts.assign(prev_key_no_ts.data(),
|
|
prev_key_no_ts.size());
|
|
}
|
|
|
|
r->pc_rep->file_size_estimator.SetCurrBlockUncompSize(
|
|
block_rep->uncompressed.size());
|
|
Slice compressed = block_rep->compressed;
|
|
Slice uncompressed = block_rep->uncompressed;
|
|
WriteMaybeCompressedBlock(block_rep->compression_type == kNoCompression
|
|
? uncompressed
|
|
: compressed,
|
|
block_rep->compression_type, &r->pending_handle,
|
|
BlockType::kData, &uncompressed);
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
|
|
if (!block_rep->first_key_in_next_block.has_value()) {
|
|
r->index_builder->AddIndexEntry(block_rep->keys.Back(), nullptr,
|
|
r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
} else {
|
|
Slice first_key_in_next_block =
|
|
Slice(*block_rep->first_key_in_next_block);
|
|
r->index_builder->AddIndexEntry(
|
|
block_rep->keys.Back(), &first_key_in_next_block, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StartParallelCompression() {
|
|
rep_->pc_rep.reset(
|
|
new ParallelCompressionRep(rep_->compression_parallel_threads));
|
|
rep_->pc_rep->compress_thread_pool.reserve(
|
|
rep_->compression_parallel_threads);
|
|
for (uint32_t i = 0; i < rep_->compression_parallel_threads; i++) {
|
|
rep_->pc_rep->compress_thread_pool.emplace_back(
|
|
[this, i] { BGWorkCompression(rep_->data_block_working_areas[i]); });
|
|
}
|
|
rep_->pc_rep->write_thread.reset(
|
|
new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); }));
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StopParallelCompression() {
|
|
rep_->pc_rep->compress_queue.finish();
|
|
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
|
|
thread.join();
|
|
}
|
|
rep_->pc_rep->write_queue.finish();
|
|
rep_->pc_rep->write_thread->join();
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
|
|
|
|
IOStatus BlockBasedTableBuilder::io_status() const {
|
|
return rep_->GetIOStatus();
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
|
|
const Slice& block_contents, const BlockHandle* handle,
|
|
BlockType block_type) {
|
|
Cache* block_cache = rep_->table_options.block_cache.get();
|
|
Status s;
|
|
auto helper =
|
|
GetCacheItemHelper(block_type, rep_->ioptions.lowest_used_cache_tier);
|
|
if (block_cache && helper && helper->create_cb) {
|
|
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
|
|
size_t charge;
|
|
// NOTE: data blocks (and everything else) will be warmed in decompressed
|
|
// state, so does not need a dictionary-aware decompressor. The only thing
|
|
// needing a decompressor here (in create_context) is warming the
|
|
// (de)compression dictionary, which will clone and save a dict-based
|
|
// decompressor from the corresponding non-dict decompressor.
|
|
s = WarmInCache(block_cache, key.AsSlice(), block_contents,
|
|
&rep_->create_context, helper, Cache::Priority::LOW,
|
|
&charge);
|
|
if (s.ok()) {
|
|
BlockBasedTable::UpdateCacheInsertionMetrics(
|
|
block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
|
|
rep_->ioptions.stats);
|
|
} else {
|
|
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFilterBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
|
|
// No filter block needed
|
|
return;
|
|
}
|
|
if (!rep_->last_ikey.empty()) {
|
|
// We might have been using AddWithPrevKey, so need PrevKeyBeforeFinish
|
|
// to be safe. And because we are re-synchronized after buffered/parallel
|
|
// operation, rep_->last_ikey is accurate.
|
|
rep_->filter_builder->PrevKeyBeforeFinish(
|
|
ExtractUserKeyAndStripTimestamp(rep_->last_ikey, rep_->ts_sz));
|
|
}
|
|
BlockHandle filter_block_handle;
|
|
bool is_partitioned_filter = rep_->table_options.partition_filters;
|
|
if (ok()) {
|
|
rep_->props.num_filter_entries +=
|
|
rep_->filter_builder->EstimateEntriesAdded();
|
|
Status s = Status::Incomplete();
|
|
while (ok() && s.IsIncomplete()) {
|
|
// filter_data is used to store the transferred filter data payload from
|
|
// FilterBlockBuilder and deallocate the payload by going out of scope.
|
|
// Otherwise, the payload will unnecessarily remain until
|
|
// BlockBasedTableBuilder is deallocated.
|
|
//
|
|
// See FilterBlockBuilder::Finish() for more on the difference in
|
|
// transferred filter data payload among different FilterBlockBuilder
|
|
// subtypes.
|
|
std::unique_ptr<const char[]> filter_owner;
|
|
Slice filter_content;
|
|
s = rep_->filter_builder->Finish(filter_block_handle, &filter_content,
|
|
&filter_owner);
|
|
|
|
assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
|
|
if (s.IsCorruption()) {
|
|
rep_->SetStatus(s);
|
|
break;
|
|
}
|
|
|
|
rep_->props.filter_size += filter_content.size();
|
|
|
|
BlockType btype = is_partitioned_filter && /* last */ s.ok()
|
|
? BlockType::kFilterPartitionIndex
|
|
: BlockType::kFilter;
|
|
WriteMaybeCompressedBlock(filter_content, kNoCompression,
|
|
&filter_block_handle, btype);
|
|
}
|
|
rep_->filter_builder->ResetFilterBitsBuilder();
|
|
}
|
|
if (ok()) {
|
|
// Add mapping from "<filter_block_prefix>.Name" to location
|
|
// of filter data.
|
|
std::string key;
|
|
key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
|
|
: BlockBasedTable::kFullFilterBlockPrefix;
|
|
key.append(rep_->table_options.filter_policy->CompatibilityName());
|
|
meta_index_builder->Add(key, filter_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteIndexBlock(
|
|
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
IndexBuilder::IndexBlocks index_blocks;
|
|
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
|
|
if (index_builder_status.IsIncomplete()) {
|
|
// We we have more than one index partition then meta_blocks are not
|
|
// supported for the index. Currently meta_blocks are used only by
|
|
// HashIndexBuilder which is not multi-partition.
|
|
assert(index_blocks.meta_blocks.empty());
|
|
} else if (ok() && !index_builder_status.ok()) {
|
|
rep_->SetStatus(index_builder_status);
|
|
}
|
|
if (ok()) {
|
|
for (const auto& item : index_blocks.meta_blocks) {
|
|
BlockHandle block_handle;
|
|
WriteBlock(item.second, &block_handle, BlockType::kIndex);
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
meta_index_builder->Add(item.first, block_handle);
|
|
}
|
|
}
|
|
if (ok()) {
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
|
|
kNoCompression, index_block_handle,
|
|
BlockType::kIndex);
|
|
}
|
|
}
|
|
// If there are more index partitions, finish them and write them out
|
|
if (index_builder_status.IsIncomplete()) {
|
|
bool index_building_finished = false;
|
|
while (ok() && !index_building_finished) {
|
|
Status s =
|
|
rep_->index_builder->Finish(&index_blocks, *index_block_handle);
|
|
if (s.ok()) {
|
|
index_building_finished = true;
|
|
} else if (s.IsIncomplete()) {
|
|
// More partitioned index after this one
|
|
assert(!index_building_finished);
|
|
} else {
|
|
// Error
|
|
rep_->SetStatus(s);
|
|
return;
|
|
}
|
|
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
|
|
kNoCompression, index_block_handle,
|
|
BlockType::kIndex);
|
|
}
|
|
// The last index_block_handle will be for the partition index block
|
|
}
|
|
}
|
|
// If success and need to record in metaindex rather than footer...
|
|
if (!FormatVersionUsesIndexHandleInFooter(
|
|
rep_->table_options.format_version)) {
|
|
meta_index_builder->Add(kIndexBlockName, *index_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WritePropertiesBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
BlockHandle properties_block_handle;
|
|
if (ok()) {
|
|
PropertyBlockBuilder property_block_builder;
|
|
rep_->props.filter_policy_name =
|
|
rep_->table_options.filter_policy != nullptr
|
|
? rep_->table_options.filter_policy->Name()
|
|
: "";
|
|
rep_->props.index_size =
|
|
rep_->index_builder->IndexSize() + kBlockTrailerSize;
|
|
rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
|
|
? rep_->ioptions.user_comparator->Name()
|
|
: "nullptr";
|
|
rep_->props.merge_operator_name =
|
|
rep_->ioptions.merge_operator != nullptr
|
|
? rep_->ioptions.merge_operator->Name()
|
|
: "nullptr";
|
|
rep_->props.prefix_extractor_name =
|
|
rep_->prefix_extractor ? rep_->prefix_extractor->AsString() : "nullptr";
|
|
std::string property_collectors_names = "[";
|
|
for (size_t i = 0;
|
|
i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
|
|
if (i != 0) {
|
|
property_collectors_names += ",";
|
|
}
|
|
property_collectors_names +=
|
|
rep_->ioptions.table_properties_collector_factories[i]->Name();
|
|
}
|
|
property_collectors_names += "]";
|
|
rep_->props.property_collectors_names = property_collectors_names;
|
|
|
|
rep_->PostPopulateCompressionProperties();
|
|
|
|
if (rep_->table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
assert(rep_->p_index_builder_ != nullptr);
|
|
rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
|
|
rep_->props.top_level_index_size =
|
|
rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
|
|
}
|
|
rep_->props.index_key_is_user_key =
|
|
!rep_->index_builder->seperator_is_key_plus_seq();
|
|
rep_->props.index_value_is_delta_encoded =
|
|
rep_->use_delta_encoding_for_index_values;
|
|
if (rep_->sampled_input_data_bytes > 0) {
|
|
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_slow_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_fast_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
} else if (rep_->sample_for_compression > 0) {
|
|
// We tried to sample but none were found. Assume worst-case (compression
|
|
// ratio 1.0) so data is complete and aggregatable.
|
|
rep_->props.slow_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
rep_->props.fast_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
}
|
|
rep_->props.user_defined_timestamps_persisted =
|
|
rep_->persist_user_defined_timestamps;
|
|
|
|
assert(IsEmpty() || rep_->props.key_largest_seqno != UINT64_MAX);
|
|
// Add basic properties
|
|
property_block_builder.AddTableProperty(rep_->props);
|
|
|
|
// Add use collected properties
|
|
NotifyCollectTableCollectorsOnFinish(
|
|
rep_->table_properties_collectors, rep_->ioptions.logger,
|
|
&property_block_builder, rep_->props.user_collected_properties,
|
|
rep_->props.readable_properties);
|
|
|
|
Slice block_data = property_block_builder.Finish();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
|
|
WriteMaybeCompressedBlock(block_data, kNoCompression,
|
|
&properties_block_handle, BlockType::kProperties);
|
|
}
|
|
if (ok()) {
|
|
#ifndef NDEBUG
|
|
{
|
|
uint64_t props_block_offset = properties_block_handle.offset();
|
|
uint64_t props_block_size = properties_block_handle.size();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
|
|
&props_block_offset);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
|
|
&props_block_size);
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
const std::string* properties_block_meta = &kPropertiesBlockName;
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:Meta",
|
|
&properties_block_meta);
|
|
meta_index_builder->Add(*properties_block_meta, properties_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
Slice compression_dict;
|
|
if (rep_->compressor_with_dict) {
|
|
compression_dict = rep_->compressor_with_dict->GetSerializedDict();
|
|
}
|
|
if (!compression_dict.empty()) {
|
|
BlockHandle compression_dict_block_handle;
|
|
if (ok()) {
|
|
WriteMaybeCompressedBlock(compression_dict, kNoCompression,
|
|
&compression_dict_block_handle,
|
|
BlockType::kCompressionDictionary);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
|
|
&compression_dict);
|
|
}
|
|
if (ok()) {
|
|
meta_index_builder->Add(kCompressionDictBlockName,
|
|
compression_dict_block_handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteRangeDelBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (ok() && !rep_->range_del_block.empty()) {
|
|
BlockHandle range_del_block_handle;
|
|
WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
|
|
&range_del_block_handle,
|
|
BlockType::kRangeDeletion);
|
|
meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
|
|
BlockHandle& index_block_handle) {
|
|
assert(ok());
|
|
Rep* r = rep_;
|
|
// this is guaranteed by BlockBasedTableBuilder's constructor
|
|
assert(r->table_options.checksum == kCRC32c ||
|
|
r->table_options.format_version != 0);
|
|
FooterBuilder footer;
|
|
Status s = footer.Build(kBlockBasedTableMagicNumber,
|
|
r->table_options.format_version, r->get_offset(),
|
|
r->table_options.checksum, metaindex_block_handle,
|
|
index_block_handle, r->base_context_checksum);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
IOOptions io_options;
|
|
IOStatus ios =
|
|
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
|
|
if (!ios.ok()) {
|
|
r->SetIOStatus(ios);
|
|
return;
|
|
}
|
|
ios = r->file->Append(io_options, footer.GetSlice());
|
|
if (ios.ok()) {
|
|
r->pre_compression_size += footer.GetSlice().size();
|
|
r->set_offset(r->get_offset() + footer.GetSlice().size());
|
|
} else {
|
|
r->SetIOStatus(ios);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::EnterUnbuffered() {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kBuffered);
|
|
r->state = Rep::State::kUnbuffered;
|
|
const size_t kNumBlocksBuffered = r->data_block_buffers.size();
|
|
if (kNumBlocksBuffered == 0) {
|
|
// The below code is neither safe nor necessary for handling zero data
|
|
// blocks.
|
|
// For PostPopulateCompressionProperties()
|
|
r->data_block_compressor = r->basic_compressor.get();
|
|
return;
|
|
}
|
|
|
|
// Abstract algebra teaches us that a finite cyclic group (such as the
|
|
// additive group of integers modulo N) can be generated by a number that is
|
|
// coprime with N. Since N is variable (number of buffered data blocks), we
|
|
// must then pick a prime number in order to guarantee coprimeness with any N.
|
|
//
|
|
// One downside of this approach is the spread will be poor when
|
|
// `kPrimeGeneratorRemainder` is close to zero or close to
|
|
// `kNumBlocksBuffered`.
|
|
//
|
|
// Picked a random number between one and one trillion and then chose the
|
|
// next prime number greater than or equal to it.
|
|
const uint64_t kPrimeGenerator = 545055921143ull;
|
|
// Can avoid repeated division by just adding the remainder repeatedly.
|
|
const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
|
|
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
|
|
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
|
|
|
|
Compressor::DictSampleArgs samples;
|
|
size_t buffer_idx = kInitSampleIdx;
|
|
for (size_t i = 0; i < kNumBlocksBuffered &&
|
|
samples.sample_data.size() < r->max_dict_sample_bytes;
|
|
++i) {
|
|
size_t copy_len =
|
|
std::min(r->max_dict_sample_bytes - samples.sample_data.size(),
|
|
r->data_block_buffers[buffer_idx].size());
|
|
samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
|
|
samples.sample_lens.emplace_back(copy_len);
|
|
|
|
buffer_idx += kPrimeGeneratorRemainder;
|
|
if (buffer_idx >= kNumBlocksBuffered) {
|
|
buffer_idx -= kNumBlocksBuffered;
|
|
}
|
|
}
|
|
|
|
assert(samples.sample_data.size() > 0);
|
|
|
|
// final sample data block flushed, now we can generate dictionary
|
|
r->compressor_with_dict = r->basic_compressor->MaybeCloneSpecialized(
|
|
CacheEntryRole::kDataBlock, std::move(samples));
|
|
|
|
// The compressor might opt not to use a dictionary, in which case we
|
|
// can use the same compressor as for e.g. index blocks.
|
|
r->data_block_compressor = r->compressor_with_dict
|
|
? r->compressor_with_dict.get()
|
|
: r->basic_compressor.get();
|
|
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
|
|
r->data_block_working_areas[i].compress =
|
|
r->data_block_compressor->ObtainWorkingArea();
|
|
}
|
|
Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
|
|
if (r->verify_decompressor) {
|
|
if (serialized_dict.empty()) {
|
|
// No dictionary
|
|
r->data_block_verify_decompressor = r->verify_decompressor.get();
|
|
} else {
|
|
// Get an updated dictionary-aware decompressor for verification.
|
|
Status s = r->verify_decompressor->MaybeCloneForDict(
|
|
serialized_dict, &r->verify_decompressor_with_dict);
|
|
// Dictionary support must be present on the decompressor side if it's on
|
|
// the compressor side.
|
|
assert(r->verify_decompressor_with_dict);
|
|
if (r->verify_decompressor_with_dict) {
|
|
r->data_block_verify_decompressor =
|
|
r->verify_decompressor_with_dict.get();
|
|
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
|
|
r->data_block_working_areas[i].verify =
|
|
r->data_block_verify_decompressor->ObtainWorkingArea(
|
|
r->data_block_compressor->GetPreferredCompressionType());
|
|
}
|
|
assert(s.ok());
|
|
} else {
|
|
assert(!s.ok());
|
|
r->SetStatus(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
auto get_iterator_for_block = [&r](size_t i) {
|
|
auto& data_block = r->data_block_buffers[i];
|
|
assert(!data_block.empty());
|
|
|
|
Block reader{BlockContents{data_block}};
|
|
DataBlockIter* iter = reader.NewDataIterator(
|
|
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
|
|
nullptr /* iter */, nullptr /* stats */,
|
|
false /* block_contents_pinned */, r->persist_user_defined_timestamps);
|
|
|
|
iter->SeekToFirst();
|
|
assert(iter->Valid());
|
|
return std::unique_ptr<DataBlockIter>(iter);
|
|
};
|
|
|
|
std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
|
|
|
|
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
|
|
if (iter == nullptr) {
|
|
iter = get_iterator_for_block(i);
|
|
assert(iter != nullptr);
|
|
};
|
|
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
next_block_iter = get_iterator_for_block(i + 1);
|
|
}
|
|
|
|
auto& data_block = r->data_block_buffers[i];
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
Slice first_key_in_next_block;
|
|
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
first_key_in_next_block = next_block_iter->key();
|
|
} else {
|
|
first_key_in_next_block_ptr = r->first_key_in_next_block;
|
|
}
|
|
|
|
std::vector<std::string> keys;
|
|
for (; iter->Valid(); iter->Next()) {
|
|
keys.emplace_back(iter->key().ToString());
|
|
}
|
|
|
|
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
|
|
first_key_in_next_block_ptr, &data_block, &keys);
|
|
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
for (; iter->Valid(); iter->Next()) {
|
|
Slice key = iter->key();
|
|
if (r->filter_builder != nullptr) {
|
|
// NOTE: AddWithPrevKey here would only save key copying if prev is
|
|
// pinned (iter->IsKeyPinned()), which is probably rare with delta
|
|
// encoding. OK to go from Add() here to AddWithPrevKey() in
|
|
// unbuffered operation.
|
|
r->filter_builder->Add(
|
|
ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
|
|
}
|
|
r->index_builder->OnKeyAdded(key);
|
|
}
|
|
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
|
|
if (ok() && i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
Slice first_key_in_next_block = next_block_iter->key();
|
|
|
|
Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
|
|
iter->SeekToLast();
|
|
assert(iter->Valid());
|
|
r->index_builder->AddIndexEntry(
|
|
iter->key(), first_key_in_next_block_ptr, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
std::swap(iter, next_block_iter);
|
|
}
|
|
r->data_block_buffers.clear();
|
|
r->data_begin_offset = 0;
|
|
// Release all reserved cache for data block buffers
|
|
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
s.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::Finish() {
|
|
Rep* r = rep_;
|
|
assert(r->state != Rep::State::kClosed);
|
|
bool empty_data_block = r->data_block.empty();
|
|
r->first_key_in_next_block = nullptr;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
EnterUnbuffered();
|
|
}
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
#ifndef NDEBUG
|
|
for (const auto& br : r->pc_rep->block_rep_buf) {
|
|
assert(br.status.ok());
|
|
}
|
|
#endif // !NDEBUG
|
|
} else {
|
|
// To make sure properties block is able to keep the accurate size of index
|
|
// block, we will finish writing all index entries first.
|
|
if (ok() && !empty_data_block) {
|
|
r->index_builder->AddIndexEntry(
|
|
r->last_ikey, nullptr /* no next data block */, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
|
|
r->props.tail_start_offset = r->offset;
|
|
|
|
// Write meta blocks, metaindex block and footer in the following order.
|
|
// 1. [meta block: filter]
|
|
// 2. [meta block: index]
|
|
// 3. [meta block: compression dictionary]
|
|
// 4. [meta block: range deletion tombstone]
|
|
// 5. [meta block: properties]
|
|
// 6. [metaindex block]
|
|
// 7. Footer
|
|
BlockHandle metaindex_block_handle, index_block_handle;
|
|
MetaIndexBuilder meta_index_builder;
|
|
WriteFilterBlock(&meta_index_builder);
|
|
WriteIndexBlock(&meta_index_builder, &index_block_handle);
|
|
WriteCompressionDictBlock(&meta_index_builder);
|
|
WriteRangeDelBlock(&meta_index_builder);
|
|
WritePropertiesBlock(&meta_index_builder);
|
|
if (ok()) {
|
|
// flush the meta index block
|
|
WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
|
|
&metaindex_block_handle, BlockType::kMetaIndex);
|
|
}
|
|
if (ok()) {
|
|
WriteFooter(metaindex_block_handle, index_block_handle);
|
|
}
|
|
r->state = Rep::State::kClosed;
|
|
r->tail_size = r->offset - r->props.tail_start_offset;
|
|
|
|
Status ret_status = r->CopyStatus();
|
|
IOStatus ios = r->GetIOStatus();
|
|
if (!ios.ok() && ret_status.ok()) {
|
|
// Let io_status supersede ok status (otherwise status takes precedennce)
|
|
ret_status = ios;
|
|
}
|
|
return ret_status;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Abandon() {
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
}
|
|
rep_->state = Rep::State::kClosed;
|
|
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
|
|
rep_->CopyStatus().PermitUncheckedError();
|
|
rep_->CopyIOStatus().PermitUncheckedError();
|
|
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::NumEntries() const {
|
|
return rep_->props.num_entries;
|
|
}
|
|
|
|
bool BlockBasedTableBuilder::IsEmpty() const {
|
|
return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::PreCompressionSize() const {
|
|
return rep_->pre_compression_size;
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
|
|
|
|
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
// Use compression ratio so far and inflight uncompressed bytes to estimate
|
|
// final SST size.
|
|
return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
|
|
} else {
|
|
return FileSize();
|
|
}
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::GetTailSize() const { return rep_->tail_size; }
|
|
|
|
bool BlockBasedTableBuilder::NeedCompact() const {
|
|
for (const auto& collector : rep_->table_properties_collectors) {
|
|
if (collector->NeedCompact()) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
|
|
return rep_->props;
|
|
}
|
|
|
|
std::string BlockBasedTableBuilder::GetFileChecksum() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksum();
|
|
} else {
|
|
return kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksumFuncName();
|
|
} else {
|
|
return kUnknownFileChecksumFuncName;
|
|
}
|
|
}
|
|
void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
|
|
const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) {
|
|
assert(rep_->props.seqno_to_time_mapping.empty());
|
|
relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping);
|
|
rep_->props.creation_time = oldest_ancestor_time;
|
|
}
|
|
|
|
const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
|
|
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
|
|
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
|
|
"partitionedfilter.";
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|