Summary: The original intention of the User Defined Index interface was to use the user key. However, the implementation mixed user and internal key usage. This PR makes it consistent. It also clarifies the UDI contract. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13865 Test Plan: Update tests in table_test.cc Reviewed By: pdillinger Differential Revision: D80050344 Pulled By: anand1976 fbshipit-source-id: ace47737d21684ec19709640a09e198cee2d98bd
2399 lines
93 KiB
C++
2399 lines
93 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "table/block_based/block_based_table_builder.h"
|
|
|
|
#include <atomic>
|
|
#include <cassert>
|
|
#include <cstdio>
|
|
#include <list>
|
|
#include <map>
|
|
#include <memory>
|
|
#include <numeric>
|
|
#include <string>
|
|
#include <unordered_map>
|
|
#include <utility>
|
|
|
|
#include "block_cache.h"
|
|
#include "cache/cache_entry_roles.h"
|
|
#include "cache/cache_helpers.h"
|
|
#include "cache/cache_key.h"
|
|
#include "cache/cache_reservation_manager.h"
|
|
#include "db/dbformat.h"
|
|
#include "index_builder.h"
|
|
#include "logging/logging.h"
|
|
#include "memory/memory_allocator_impl.h"
|
|
#include "options/options_helper.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/filter_policy.h"
|
|
#include "rocksdb/flush_block_policy.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/types.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/block_builder.h"
|
|
#include "table/block_based/filter_block.h"
|
|
#include "table/block_based/filter_policy_internal.h"
|
|
#include "table/block_based/full_filter_block.h"
|
|
#include "table/block_based/partitioned_filter_block.h"
|
|
#include "table/block_based/user_defined_index_wrapper.h"
|
|
#include "table/format.h"
|
|
#include "table/meta_blocks.h"
|
|
#include "table/table_builder.h"
|
|
#include "util/coding.h"
|
|
#include "util/compression.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/string_util.h"
|
|
#include "util/work_queue.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
extern const std::string kHashIndexPrefixesBlock;
|
|
extern const std::string kHashIndexPrefixesMetadataBlock;
|
|
|
|
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
|
namespace {
|
|
|
|
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
|
|
|
|
// Create a filter block builder based on its type.
|
|
FilterBlockBuilder* CreateFilterBlockBuilder(
|
|
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
|
|
const FilterBuildingContext& context,
|
|
const bool use_delta_encoding_for_index_values,
|
|
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
|
|
const bool persist_user_defined_timestamps) {
|
|
const BlockBasedTableOptions& table_opt = context.table_options;
|
|
assert(table_opt.filter_policy); // precondition
|
|
|
|
FilterBitsBuilder* filter_bits_builder =
|
|
BloomFilterPolicy::GetBuilderFromContext(context);
|
|
if (filter_bits_builder == nullptr) {
|
|
return nullptr;
|
|
} else {
|
|
if (table_opt.partition_filters) {
|
|
assert(p_index_builder != nullptr);
|
|
// Since after partition cut request from filter builder it takes time
|
|
// until index builder actully cuts the partition, until the end of a
|
|
// data block potentially with many keys, we take the lower bound as
|
|
// partition size.
|
|
assert(table_opt.block_size_deviation <= 100);
|
|
auto partition_size =
|
|
static_cast<uint32_t>(((table_opt.metadata_block_size *
|
|
(100 - table_opt.block_size_deviation)) +
|
|
99) /
|
|
100);
|
|
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
|
|
return new PartitionedFilterBlockBuilder(
|
|
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
|
|
filter_bits_builder, table_opt.index_block_restart_interval,
|
|
use_delta_encoding_for_index_values, p_index_builder, partition_size,
|
|
ts_sz, persist_user_defined_timestamps,
|
|
table_opt.decouple_partitioned_filters);
|
|
} else {
|
|
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
|
|
table_opt.whole_key_filtering,
|
|
filter_bits_builder);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
// kBlockBasedTableMagicNumber was picked by running
|
|
// echo rocksdb.table.block_based | sha1sum
|
|
// and taking the leading 64 bits.
|
|
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
|
|
// .cc files
|
|
// for that reason we declare it extern in the header but to get the space
|
|
// allocated
|
|
// it must be not extern in one place.
|
|
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
|
|
// We also support reading and writing legacy block based table format (for
|
|
// backwards compatibility)
|
|
const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
|
|
|
|
// A collector that collects properties of interest to block-based table.
|
|
// For now this class looks heavy-weight since we only write one additional
|
|
// property.
|
|
// But in the foreseeable future, we will add more and more properties that are
|
|
// specific to block-based table.
|
|
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
|
|
: public InternalTblPropColl {
|
|
public:
|
|
explicit BlockBasedTablePropertiesCollector(
|
|
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
|
|
bool prefix_filtering, bool decoupled_partitioned_filters)
|
|
: index_type_(index_type),
|
|
whole_key_filtering_(whole_key_filtering),
|
|
prefix_filtering_(prefix_filtering),
|
|
decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
|
|
|
|
Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
|
|
uint64_t /*file_size*/) override {
|
|
// Intentionally left blank. Have no interest in collecting stats for
|
|
// individual key/value pairs.
|
|
return Status::OK();
|
|
}
|
|
|
|
void BlockAdd(uint64_t /* block_uncomp_bytes */,
|
|
uint64_t /* block_compressed_bytes_fast */,
|
|
uint64_t /* block_compressed_bytes_slow */) override {
|
|
// Intentionally left blank. No interest in collecting stats for
|
|
// blocks.
|
|
}
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
std::string val;
|
|
PutFixed32(&val, static_cast<uint32_t>(index_type_));
|
|
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
|
|
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
|
|
whole_key_filtering_ ? kPropTrue : kPropFalse});
|
|
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
|
|
prefix_filtering_ ? kPropTrue : kPropFalse});
|
|
if (decoupled_partitioned_filters_) {
|
|
properties->insert(
|
|
{BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
|
|
kPropTrue});
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// The name of the properties collector can be used for debugging purpose.
|
|
const char* Name() const override {
|
|
return "BlockBasedTablePropertiesCollector";
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
// Intentionally left blank.
|
|
return UserCollectedProperties();
|
|
}
|
|
|
|
private:
|
|
BlockBasedTableOptions::IndexType index_type_;
|
|
bool whole_key_filtering_;
|
|
bool prefix_filtering_;
|
|
bool decoupled_partitioned_filters_;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::WorkingAreaPair {
|
|
Compressor::ManagedWorkingArea compress;
|
|
Decompressor::ManagedWorkingArea verify;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::ParallelCompressionRep {
|
|
// TODO: consider replacing with autovector or similar
|
|
// Keys is a wrapper of vector of strings avoiding
|
|
// releasing string memories during vector clear()
|
|
// in order to save memory allocation overhead
|
|
class Keys {
|
|
public:
|
|
Keys() : keys_(kKeysInitSize), size_(0) {}
|
|
void PushBack(const Slice& key) {
|
|
if (size_ == keys_.size()) {
|
|
keys_.emplace_back(key.data(), key.size());
|
|
} else {
|
|
keys_[size_].assign(key.data(), key.size());
|
|
}
|
|
size_++;
|
|
}
|
|
void SwapAssign(std::vector<std::string>& keys) {
|
|
size_ = keys.size();
|
|
std::swap(keys_, keys);
|
|
}
|
|
void Clear() { size_ = 0; }
|
|
size_t Size() { return size_; }
|
|
std::string& Back() { return keys_[size_ - 1]; }
|
|
std::string& operator[](size_t idx) {
|
|
assert(idx < size_);
|
|
return keys_[idx];
|
|
}
|
|
|
|
private:
|
|
static constexpr size_t kKeysInitSize = 32;
|
|
std::vector<std::string> keys_;
|
|
size_t size_;
|
|
};
|
|
Keys curr_block_keys;
|
|
|
|
struct BlockRep;
|
|
|
|
// Use BlockRepSlot to keep block order in write thread.
|
|
// slot_ will pass references to BlockRep
|
|
class BlockRepSlot {
|
|
public:
|
|
BlockRepSlot() : slot_(1) {}
|
|
template <typename T>
|
|
void Fill(T&& rep) {
|
|
slot_.push(std::forward<T>(rep));
|
|
}
|
|
void Take(BlockRep*& rep) { slot_.pop(rep); }
|
|
|
|
private:
|
|
// slot_ will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
WorkQueue<BlockRep*> slot_;
|
|
};
|
|
|
|
// BlockRep instances are fetched from and recycled to
|
|
// block_rep_pool during parallel compression.
|
|
struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
|
|
// Uncompressed block contents
|
|
std::string uncompressed;
|
|
std::string compressed;
|
|
CompressionType compression_type = kNoCompression;
|
|
// For efficiency, the std::string is repeatedly overwritten without
|
|
// checking for "has no value". Only at the end of its life will it be
|
|
// assigned "no value". Thus, it needs to start with a value.
|
|
std::optional<std::string> first_key_in_next_block = std::string{};
|
|
Keys keys;
|
|
BlockRepSlot slot;
|
|
Status status;
|
|
};
|
|
|
|
// Use a vector of BlockRep as a buffer for a determined number
|
|
// of BlockRep structures. All data referenced by pointers in
|
|
// BlockRep will be freed when this vector is destructed.
|
|
using BlockRepBuffer = std::vector<BlockRep>;
|
|
BlockRepBuffer block_rep_buf;
|
|
// Use a thread-safe queue for concurrent access from block
|
|
// building thread and writer thread.
|
|
using BlockRepPool = WorkQueue<BlockRep*>;
|
|
BlockRepPool block_rep_pool;
|
|
|
|
// Compression queue will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
using CompressQueue = WorkQueue<BlockRep*>;
|
|
CompressQueue compress_queue;
|
|
std::vector<port::Thread> compress_thread_pool;
|
|
|
|
// Write queue will pass references to BlockRep::slot in block_rep_buf,
|
|
// and those references are always valid before the corresponding
|
|
// BlockRep::slot is destructed, which is before the destruction of
|
|
// block_rep_buf.
|
|
using WriteQueue = WorkQueue<BlockRepSlot*>;
|
|
WriteQueue write_queue;
|
|
std::unique_ptr<port::Thread> write_thread;
|
|
|
|
// Estimate output file size when parallel compression is enabled. This is
|
|
// necessary because compression & flush are no longer synchronized,
|
|
// and BlockBasedTableBuilder::FileSize() is no longer accurate.
|
|
// memory_order_relaxed suffices because accurate statistics is not required.
|
|
class FileSizeEstimator {
|
|
public:
|
|
explicit FileSizeEstimator()
|
|
: uncomp_bytes_compressed(0),
|
|
uncomp_bytes_curr_block(0),
|
|
uncomp_bytes_curr_block_set(false),
|
|
uncomp_bytes_inflight(0),
|
|
blocks_inflight(0),
|
|
curr_compression_ratio(0),
|
|
estimated_file_size(0) {}
|
|
|
|
// Estimate file size when a block is about to be emitted to
|
|
// compression thread
|
|
void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) {
|
|
uint64_t new_uncomp_bytes_inflight =
|
|
uncomp_bytes_inflight.fetch_add(uncomp_block_size,
|
|
std::memory_order_relaxed) +
|
|
uncomp_block_size;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_uncomp_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
// Estimate file size when a block is already reaped from
|
|
// compression thread
|
|
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
|
|
assert(uncomp_bytes_curr_block_set);
|
|
|
|
uint64_t new_uncomp_bytes_compressed =
|
|
uncomp_bytes_compressed + uncomp_bytes_curr_block;
|
|
assert(new_uncomp_bytes_compressed > 0);
|
|
|
|
curr_compression_ratio.store(
|
|
(curr_compression_ratio.load(std::memory_order_relaxed) *
|
|
uncomp_bytes_compressed +
|
|
compressed_block_size) /
|
|
static_cast<double>(new_uncomp_bytes_compressed),
|
|
std::memory_order_relaxed);
|
|
uncomp_bytes_compressed = new_uncomp_bytes_compressed;
|
|
|
|
uint64_t new_uncomp_bytes_inflight =
|
|
uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block,
|
|
std::memory_order_relaxed) -
|
|
uncomp_bytes_curr_block;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_uncomp_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
|
|
uncomp_bytes_curr_block_set = false;
|
|
}
|
|
|
|
void SetEstimatedFileSize(uint64_t size) {
|
|
estimated_file_size.store(size, std::memory_order_relaxed);
|
|
}
|
|
|
|
uint64_t GetEstimatedFileSize() {
|
|
return estimated_file_size.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void SetCurrBlockUncompSize(uint64_t size) {
|
|
uncomp_bytes_curr_block = size;
|
|
uncomp_bytes_curr_block_set = true;
|
|
}
|
|
|
|
private:
|
|
// Input bytes compressed so far.
|
|
uint64_t uncomp_bytes_compressed;
|
|
// Size of current block being appended.
|
|
uint64_t uncomp_bytes_curr_block;
|
|
// Whether uncomp_bytes_curr_block has been set for next
|
|
// ReapBlock call.
|
|
bool uncomp_bytes_curr_block_set;
|
|
// Input bytes under compression and not appended yet.
|
|
std::atomic<uint64_t> uncomp_bytes_inflight;
|
|
// Number of blocks under compression and not appended yet.
|
|
std::atomic<uint64_t> blocks_inflight;
|
|
// Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock.
|
|
std::atomic<double> curr_compression_ratio;
|
|
// Estimated SST file size.
|
|
std::atomic<uint64_t> estimated_file_size;
|
|
};
|
|
FileSizeEstimator file_size_estimator;
|
|
|
|
// Facilities used for waiting first block completion. Need to Wait for
|
|
// the completion of first block compression and flush to get a non-zero
|
|
// compression ratio.
|
|
std::atomic<bool> first_block_processed;
|
|
std::condition_variable first_block_cond;
|
|
std::mutex first_block_mutex;
|
|
|
|
explicit ParallelCompressionRep(uint32_t parallel_threads)
|
|
: block_rep_buf(parallel_threads),
|
|
block_rep_pool(parallel_threads),
|
|
compress_queue(parallel_threads),
|
|
write_queue(parallel_threads),
|
|
first_block_processed(false) {
|
|
for (uint32_t i = 0; i < parallel_threads; i++) {
|
|
// Prime the queue of available BlockReps
|
|
block_rep_pool.push(&block_rep_buf[i]);
|
|
}
|
|
}
|
|
|
|
~ParallelCompressionRep() {
|
|
block_rep_pool.finish();
|
|
#ifndef NDEBUG
|
|
// Silence ASSERT_STATUS_CHECKED warnings
|
|
for (auto& block_rep : block_rep_buf) {
|
|
assert(block_rep.status.ok());
|
|
}
|
|
#endif
|
|
}
|
|
|
|
// Make a block prepared to be emitted to compression thread
|
|
// Used in non-buffered mode
|
|
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
|
|
BlockBuilder* data_block) {
|
|
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
data_block->SwapAndReset(block_rep->uncompressed);
|
|
std::swap(block_rep->keys, curr_block_keys);
|
|
curr_block_keys.Clear();
|
|
return block_rep;
|
|
}
|
|
|
|
// Used in EnterUnbuffered
|
|
BlockRep* PrepareBlock(const Slice* first_key_in_next_block,
|
|
std::string* data_block,
|
|
std::vector<std::string>* keys) {
|
|
BlockRep* block_rep = PrepareBlockInternal(first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
std::swap(block_rep->uncompressed, *data_block);
|
|
block_rep->keys.SwapAssign(*keys);
|
|
return block_rep;
|
|
}
|
|
|
|
// Emit a block to compression thread
|
|
void EmitBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
assert(block_rep->status.ok());
|
|
if (!write_queue.push(&block_rep->slot)) {
|
|
return;
|
|
}
|
|
if (!compress_queue.push(block_rep)) {
|
|
return;
|
|
}
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::unique_lock<std::mutex> lock(first_block_mutex);
|
|
first_block_cond.wait(lock, [this] {
|
|
return first_block_processed.load(std::memory_order_relaxed);
|
|
});
|
|
}
|
|
}
|
|
|
|
// Reap a block from compression thread
|
|
void ReapBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
block_rep->compressed.clear();
|
|
block_rep_pool.push(block_rep);
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::lock_guard<std::mutex> lock(first_block_mutex);
|
|
first_block_processed.store(true, std::memory_order_relaxed);
|
|
first_block_cond.notify_one();
|
|
}
|
|
}
|
|
|
|
private:
|
|
BlockRep* PrepareBlockInternal(const Slice* first_key_in_next_block) {
|
|
BlockRep* block_rep = nullptr;
|
|
block_rep_pool.pop(block_rep);
|
|
assert(block_rep != nullptr);
|
|
|
|
block_rep->compression_type = kNoCompression;
|
|
|
|
if (first_key_in_next_block == nullptr) {
|
|
block_rep->first_key_in_next_block = {};
|
|
} else {
|
|
block_rep->first_key_in_next_block->assign(
|
|
first_key_in_next_block->data(), first_key_in_next_block->size());
|
|
}
|
|
|
|
return block_rep;
|
|
}
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::Rep {
|
|
const ImmutableOptions ioptions;
|
|
// BEGIN from MutableCFOptions
|
|
std::shared_ptr<const SliceTransform> prefix_extractor;
|
|
// END from MutableCFOptions
|
|
const WriteOptions write_options;
|
|
const BlockBasedTableOptions table_options;
|
|
const InternalKeyComparator& internal_comparator;
|
|
// Size in bytes for the user-defined timestamps.
|
|
size_t ts_sz;
|
|
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
|
|
// user key will be stripped when creating the block based table. This
|
|
// stripping happens for all user keys, including the keys in data block,
|
|
// index block for data block, index block for index block (if index type is
|
|
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
|
|
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
|
|
// deletion entries.
|
|
// As long as the user keys are sorted when added via `Add` API, their logic
|
|
// ordering won't change after timestamps are stripped. However, for each user
|
|
// key to be logically equivalent before and after timestamp is stripped, the
|
|
// user key should contain the minimum timestamp.
|
|
bool persist_user_defined_timestamps;
|
|
WritableFileWriter* file;
|
|
std::atomic<uint64_t> offset;
|
|
size_t alignment;
|
|
BlockBuilder data_block;
|
|
// Buffers uncompressed data blocks to replay later. Needed when
|
|
// compression dictionary is enabled so we can finalize the dictionary before
|
|
// compressing any data blocks.
|
|
std::vector<std::string> data_block_buffers;
|
|
BlockBuilder range_del_block;
|
|
|
|
InternalKeySliceTransform internal_prefix_transform;
|
|
std::unique_ptr<IndexBuilder> index_builder;
|
|
std::string index_separator_scratch;
|
|
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
|
|
|
std::string last_ikey; // Internal key or empty (unset)
|
|
const Slice* first_key_in_next_block = nullptr;
|
|
bool warm_cache = false;
|
|
bool uses_explicit_compression_manager = false;
|
|
|
|
uint64_t sample_for_compression;
|
|
std::atomic<uint64_t> compressible_input_data_bytes;
|
|
std::atomic<uint64_t> uncompressible_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_slow_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_fast_data_bytes;
|
|
uint32_t compression_parallel_threads;
|
|
int max_compressed_bytes_per_kb;
|
|
size_t max_dict_sample_bytes = 0;
|
|
|
|
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
|
|
// *** these are distinct fields to minimize extra conditionals and ***
|
|
// *** field reads on hot code paths. ***
|
|
|
|
// A compressor for blocks in general, without dictionary compression
|
|
std::unique_ptr<Compressor> basic_compressor;
|
|
// A compressor using dictionary compression (when applicable)
|
|
std::unique_ptr<Compressor> compressor_with_dict;
|
|
// Once configured/determined, points to one of the above Compressors to
|
|
// use on data blocks.
|
|
Compressor* data_block_compressor = nullptr;
|
|
// A decompressor corresponding to basic_compressor (when non-nullptr).
|
|
// Used for verification and cache warming.
|
|
std::shared_ptr<Decompressor> basic_decompressor;
|
|
// When needed, a decompressor for verifying compression using a
|
|
// dictionary sampled/trained from this file.
|
|
std::unique_ptr<Decompressor> verify_decompressor_with_dict;
|
|
// When non-nullptr, compression should be verified with this corresponding
|
|
// decompressor, except for data blocks. (Points to same as basic_decompressor
|
|
// when verify_compression is set.)
|
|
UnownedPtr<Decompressor> verify_decompressor;
|
|
// Once configured/determined, points to one of the above Decompressors to use
|
|
// in verifying data blocks.
|
|
UnownedPtr<Decompressor> data_block_verify_decompressor;
|
|
|
|
// Set of compression types used for blocks in this file (mixing compression
|
|
// algorithms in a single file is allowed, using a CompressionManager)
|
|
SmallEnumSet<CompressionType, kDisableCompressionOption>
|
|
compression_types_used;
|
|
|
|
// Working area for basic_compressor when compression_parallel_threads==1
|
|
WorkingAreaPair basic_working_area;
|
|
// Working areas for data_block_compressor, for each of
|
|
// compression_parallel_threads
|
|
std::vector<WorkingAreaPair> data_block_working_areas;
|
|
|
|
size_t data_begin_offset = 0;
|
|
|
|
TableProperties props;
|
|
|
|
// States of the builder.
|
|
//
|
|
// - `kBuffered`: This is the initial state where zero or more data blocks are
|
|
// accumulated uncompressed in-memory. From this state, call
|
|
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
|
|
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
|
|
// state.
|
|
//
|
|
// - `kUnbuffered`: This is the state when compression dictionary is finalized
|
|
// either because it wasn't enabled in the first place or it's been created
|
|
// from sampling previously buffered data. In this state, blocks are simply
|
|
// compressed/written out as they fill up. From this state, call `Finish()`
|
|
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
|
|
// the partially created file.
|
|
//
|
|
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
|
|
// called, so the table builder is no longer usable. We must be in this
|
|
// state by the time the destructor runs.
|
|
enum class State {
|
|
kBuffered,
|
|
kUnbuffered,
|
|
kClosed,
|
|
};
|
|
State state = State::kUnbuffered;
|
|
// `kBuffered` state is allowed only as long as the buffering of uncompressed
|
|
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
|
|
uint64_t buffer_limit = 0;
|
|
std::shared_ptr<CacheReservationManager>
|
|
compression_dict_buffer_cache_res_mgr;
|
|
const bool use_delta_encoding_for_index_values;
|
|
std::unique_ptr<FilterBlockBuilder> filter_builder;
|
|
OffsetableCacheKey base_cache_key;
|
|
const TableFileCreationReason reason;
|
|
|
|
BlockHandle pending_handle; // Handle to add to index block
|
|
|
|
std::string single_threaded_compressed_output;
|
|
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
|
|
|
|
std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
|
|
|
|
std::unique_ptr<ParallelCompressionRep> pc_rep;
|
|
BlockCreateContext create_context;
|
|
|
|
// The size of the "tail" part of a SST file. "Tail" refers to
|
|
// all blocks after data blocks till the end of the SST file.
|
|
uint64_t tail_size;
|
|
|
|
// The total size of all blocks in this file before they are compressed.
|
|
// This is used for logging compaction stats.
|
|
uint64_t pre_compression_size = 0;
|
|
|
|
// See class Footer
|
|
uint32_t base_context_checksum;
|
|
|
|
uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
|
|
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
|
|
|
|
bool IsParallelCompressionEnabled() const {
|
|
return compression_parallel_threads > 1;
|
|
}
|
|
|
|
Status GetStatus() {
|
|
// We need to make modifications of status visible when status_ok is set
|
|
// to false, and this is ensured by status_mutex, so no special memory
|
|
// order for status_ok is required.
|
|
if (status_ok.load(std::memory_order_relaxed)) {
|
|
return Status::OK();
|
|
} else {
|
|
return CopyStatus();
|
|
}
|
|
}
|
|
|
|
Status CopyStatus() {
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
return status;
|
|
}
|
|
|
|
IOStatus GetIOStatus() {
|
|
// We need to make modifications of io_status visible when status_ok is set
|
|
// to false, and this is ensured by io_status_mutex, so no special memory
|
|
// order for io_status_ok is required.
|
|
if (io_status_ok.load(std::memory_order_relaxed)) {
|
|
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
|
|
auto ios = CopyIOStatus();
|
|
ios.PermitUncheckedError();
|
|
// Assume no races in unit tests
|
|
assert(ios.ok());
|
|
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
|
return IOStatus::OK();
|
|
} else {
|
|
return CopyIOStatus();
|
|
}
|
|
}
|
|
|
|
IOStatus CopyIOStatus() {
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
return io_status;
|
|
}
|
|
|
|
// Never erase an existing status that is not OK.
|
|
void SetStatus(Status s) {
|
|
if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
status = s;
|
|
status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Never erase an existing I/O status that is not OK.
|
|
// Calling this will also SetStatus(ios)
|
|
void SetIOStatus(IOStatus ios) {
|
|
if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
io_status = ios;
|
|
io_status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
SetStatus(ios);
|
|
}
|
|
|
|
Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* f)
|
|
: ioptions(tbo.ioptions),
|
|
prefix_extractor(tbo.moptions.prefix_extractor),
|
|
write_options(tbo.write_options),
|
|
table_options(table_opt),
|
|
internal_comparator(tbo.internal_comparator),
|
|
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
|
|
persist_user_defined_timestamps(
|
|
tbo.ioptions.persist_user_defined_timestamps),
|
|
file(f),
|
|
offset(0),
|
|
alignment(table_options.block_align
|
|
? std::min(static_cast<size_t>(table_options.block_size),
|
|
kDefaultPageSize)
|
|
: 0),
|
|
data_block(table_options.block_restart_interval,
|
|
table_options.use_delta_encoding,
|
|
false /* use_value_delta_encoding */,
|
|
tbo.internal_comparator.user_comparator()
|
|
->CanKeysWithDifferentByteContentsBeEqual()
|
|
? BlockBasedTableOptions::kDataBlockBinarySearch
|
|
: table_options.data_block_index_type,
|
|
table_options.data_block_hash_table_util_ratio, ts_sz,
|
|
persist_user_defined_timestamps),
|
|
range_del_block(
|
|
1 /* block_restart_interval */, true /* use_delta_encoding */,
|
|
false /* use_value_delta_encoding */,
|
|
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
|
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
|
persist_user_defined_timestamps),
|
|
internal_prefix_transform(prefix_extractor.get()),
|
|
sample_for_compression(tbo.moptions.sample_for_compression),
|
|
compressible_input_data_bytes(0),
|
|
uncompressible_input_data_bytes(0),
|
|
sampled_input_data_bytes(0),
|
|
sampled_output_slow_data_bytes(0),
|
|
sampled_output_fast_data_bytes(0),
|
|
compression_parallel_threads(tbo.compression_opts.parallel_threads),
|
|
max_compressed_bytes_per_kb(
|
|
tbo.compression_opts.max_compressed_bytes_per_kb),
|
|
data_block_working_areas(compression_parallel_threads),
|
|
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
|
!table_opt.block_align),
|
|
reason(tbo.reason),
|
|
flush_block_policy(
|
|
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
|
|
table_options, data_block)),
|
|
create_context(&table_options, &ioptions, ioptions.stats,
|
|
/*decompressor=*/nullptr,
|
|
tbo.moptions.block_protection_bytes_per_key,
|
|
tbo.internal_comparator.user_comparator(),
|
|
!use_delta_encoding_for_index_values,
|
|
table_opt.index_type ==
|
|
BlockBasedTableOptions::kBinarySearchWithFirstKey),
|
|
tail_size(0),
|
|
status_ok(true),
|
|
io_status_ok(true) {
|
|
FilterBuildingContext filter_context(table_options);
|
|
|
|
filter_context.info_log = ioptions.logger;
|
|
filter_context.column_family_name = tbo.column_family_name;
|
|
filter_context.reason = reason;
|
|
|
|
// Only populate other fields if known to be in LSM rather than
|
|
// generating external SST file
|
|
if (reason != TableFileCreationReason::kMisc) {
|
|
filter_context.compaction_style = ioptions.compaction_style;
|
|
filter_context.num_levels = ioptions.num_levels;
|
|
filter_context.level_at_creation = tbo.level_at_creation;
|
|
filter_context.is_bottommost = tbo.is_bottommost;
|
|
assert(filter_context.level_at_creation < filter_context.num_levels);
|
|
}
|
|
|
|
props.compression_options =
|
|
CompressionOptionsToString(tbo.compression_opts);
|
|
|
|
auto* mgr = tbo.moptions.compression_manager.get();
|
|
if (mgr == nullptr) {
|
|
uses_explicit_compression_manager = false;
|
|
mgr = GetBuiltinCompressionManager(
|
|
GetCompressFormatForVersion(
|
|
static_cast<uint32_t>(table_opt.format_version)))
|
|
.get();
|
|
} else {
|
|
uses_explicit_compression_manager = true;
|
|
|
|
// Stuff some extra debugging info as extra pseudo-options. Using
|
|
// underscore prefix to indicate they are special.
|
|
props.compression_options.append("_compression_manager=");
|
|
props.compression_options.append(mgr->GetId());
|
|
props.compression_options.append("; ");
|
|
}
|
|
|
|
// Sanitize to only allowing compression when it saves space.
|
|
max_compressed_bytes_per_kb =
|
|
std::min(int{1023}, tbo.compression_opts.max_compressed_bytes_per_kb);
|
|
|
|
basic_compressor = mgr->GetCompressorForSST(
|
|
filter_context, tbo.compression_opts, tbo.compression_type);
|
|
if (basic_compressor) {
|
|
if (table_options.enable_index_compression) {
|
|
basic_working_area.compress = basic_compressor->ObtainWorkingArea();
|
|
}
|
|
max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
|
|
CacheEntryRole::kDataBlock);
|
|
if (max_dict_sample_bytes > 0) {
|
|
state = State::kBuffered;
|
|
if (tbo.target_file_size == 0) {
|
|
buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
|
|
} else if (tbo.compression_opts.max_dict_buffer_bytes == 0) {
|
|
buffer_limit = tbo.target_file_size;
|
|
} else {
|
|
buffer_limit = std::min(tbo.target_file_size,
|
|
tbo.compression_opts.max_dict_buffer_bytes);
|
|
}
|
|
} else {
|
|
// No distinct data block compressor using dictionary
|
|
data_block_compressor = basic_compressor.get();
|
|
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
|
|
data_block_working_areas[i].compress =
|
|
data_block_compressor->ObtainWorkingArea();
|
|
}
|
|
}
|
|
basic_decompressor = mgr->GetDecompressorForCompressor(*basic_compressor);
|
|
create_context.decompressor = basic_decompressor.get();
|
|
|
|
if (table_options.verify_compression) {
|
|
verify_decompressor = basic_decompressor.get();
|
|
if (table_options.enable_index_compression) {
|
|
basic_working_area.verify = verify_decompressor->ObtainWorkingArea(
|
|
basic_compressor->GetPreferredCompressionType());
|
|
}
|
|
if (state == State::kUnbuffered) {
|
|
assert(data_block_compressor);
|
|
data_block_verify_decompressor = verify_decompressor.get();
|
|
for (uint32_t i = 0; i < compression_parallel_threads; i++) {
|
|
data_block_working_areas[i].verify =
|
|
data_block_verify_decompressor->ObtainWorkingArea(
|
|
data_block_compressor->GetPreferredCompressionType());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
switch (table_options.prepopulate_block_cache) {
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
|
|
warm_cache = (reason == TableFileCreationReason::kFlush);
|
|
break;
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
|
|
warm_cache = false;
|
|
break;
|
|
default:
|
|
// missing case
|
|
assert(false);
|
|
warm_cache = false;
|
|
}
|
|
|
|
const auto compress_dict_build_buffer_charged =
|
|
table_options.cache_usage_options.options_overrides
|
|
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
|
|
.charged;
|
|
if (table_options.block_cache &&
|
|
(compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kEnabled ||
|
|
compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kFallback)) {
|
|
compression_dict_buffer_cache_res_mgr =
|
|
std::make_shared<CacheReservationManagerImpl<
|
|
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
|
|
table_options.block_cache);
|
|
} else {
|
|
compression_dict_buffer_cache_res_mgr = nullptr;
|
|
}
|
|
|
|
if (table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
|
&internal_comparator, use_delta_encoding_for_index_values,
|
|
table_options, ts_sz, persist_user_defined_timestamps);
|
|
index_builder.reset(p_index_builder_);
|
|
} else {
|
|
index_builder.reset(IndexBuilder::CreateIndexBuilder(
|
|
table_options.index_type, &internal_comparator,
|
|
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
|
|
table_options, ts_sz, persist_user_defined_timestamps));
|
|
}
|
|
|
|
// If user_defined_index_factory is provided, wrap the index builder with
|
|
// UserDefinedIndexWrapper
|
|
if (table_options.user_defined_index_factory != nullptr) {
|
|
if (tbo.moptions.compression_opts.parallel_threads > 1 ||
|
|
tbo.moptions.bottommost_compression_opts.parallel_threads > 1) {
|
|
SetStatus(
|
|
Status::InvalidArgument("user_defined_index_factory not supported "
|
|
"with parallel compression"));
|
|
} else if (ioptions.user_comparator != BytewiseComparator()) {
|
|
// TODO: Pass the user_comparator to the UDI and let it validate. Do
|
|
// it in a major release.
|
|
SetStatus(
|
|
Status::InvalidArgument("user_defined_index_factory only supported "
|
|
"with bytewise comparator"));
|
|
} else {
|
|
std::unique_ptr<UserDefinedIndexBuilder> user_defined_index_builder(
|
|
table_options.user_defined_index_factory->NewBuilder());
|
|
if (user_defined_index_builder != nullptr) {
|
|
index_builder.reset(new UserDefinedIndexBuilderWrapper(
|
|
std::string(table_options.user_defined_index_factory->Name()),
|
|
std::move(index_builder), std::move(user_defined_index_builder),
|
|
&internal_comparator, ts_sz, persist_user_defined_timestamps));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
|
|
// Apply optimize_filters_for_hits setting here when applicable by
|
|
// skipping filter generation
|
|
filter_builder.reset();
|
|
} else if (tbo.skip_filters) {
|
|
// For SstFileWriter skip_filters
|
|
filter_builder.reset();
|
|
} else if (!table_options.filter_policy) {
|
|
// Null filter_policy -> no filter
|
|
filter_builder.reset();
|
|
} else {
|
|
filter_builder.reset(CreateFilterBlockBuilder(
|
|
ioptions, tbo.moptions, filter_context,
|
|
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
|
|
persist_user_defined_timestamps));
|
|
}
|
|
|
|
assert(tbo.internal_tbl_prop_coll_factories);
|
|
for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
|
|
assert(factory);
|
|
|
|
std::unique_ptr<InternalTblPropColl> collector{
|
|
factory->CreateInternalTblPropColl(
|
|
tbo.column_family_id, tbo.level_at_creation,
|
|
tbo.ioptions.num_levels,
|
|
tbo.last_level_inclusive_max_seqno_threshold)};
|
|
if (collector) {
|
|
table_properties_collectors.emplace_back(std::move(collector));
|
|
}
|
|
}
|
|
table_properties_collectors.emplace_back(
|
|
new BlockBasedTablePropertiesCollector(
|
|
table_options.index_type, table_options.whole_key_filtering,
|
|
prefix_extractor != nullptr,
|
|
table_options.decouple_partitioned_filters));
|
|
if (ts_sz > 0 && persist_user_defined_timestamps) {
|
|
table_properties_collectors.emplace_back(
|
|
new TimestampTablePropertiesCollector(
|
|
tbo.internal_comparator.user_comparator()));
|
|
}
|
|
|
|
// These are only needed for populating table properties
|
|
props.column_family_id = tbo.column_family_id;
|
|
props.column_family_name = tbo.column_family_name;
|
|
props.oldest_key_time = tbo.oldest_key_time;
|
|
props.newest_key_time = tbo.newest_key_time;
|
|
props.file_creation_time = tbo.file_creation_time;
|
|
props.orig_file_number = tbo.cur_file_num;
|
|
props.db_id = tbo.db_id;
|
|
props.db_session_id = tbo.db_session_id;
|
|
props.db_host_id = ioptions.db_host_id;
|
|
props.format_version = table_options.format_version;
|
|
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
|
|
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
|
|
}
|
|
// Default is UINT64_MAX for unknown. Setting it to 0 here
|
|
// to allow updating it by taking max in BlockBasedTableBuilder::Add().
|
|
props.key_largest_seqno = 0;
|
|
PrePopulateCompressionProperties(mgr);
|
|
|
|
if (FormatVersionUsesContextChecksum(table_options.format_version)) {
|
|
// Must be non-zero and semi- or quasi-random
|
|
// TODO: ideally guaranteed different for related files (e.g. use file
|
|
// number and db_session, for benefit of SstFileWriter)
|
|
do {
|
|
base_context_checksum = Random::GetTLSInstance()->Next();
|
|
} while (UNLIKELY(base_context_checksum == 0));
|
|
} else {
|
|
base_context_checksum = 0;
|
|
}
|
|
|
|
if (alignment > 0 && basic_compressor) {
|
|
// With better sanitization in `CompactionPicker::CompactFiles()`, we
|
|
// would not need to handle this case here and could change it to an
|
|
// assertion instead.
|
|
SetStatus(Status::InvalidArgument(
|
|
"Enable block_align, but compression enabled"));
|
|
}
|
|
}
|
|
|
|
Rep(const Rep&) = delete;
|
|
Rep& operator=(const Rep&) = delete;
|
|
|
|
void PrePopulateCompressionProperties(UnownedPtr<CompressionManager> mgr) {
|
|
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
|
|
assert(mgr);
|
|
// Use newer compression_name property
|
|
props.compression_name.reserve(32);
|
|
// If compression is disabled, use empty manager name
|
|
if (basic_compressor) {
|
|
props.compression_name.append(mgr->CompatibilityName());
|
|
}
|
|
props.compression_name.push_back(';');
|
|
// Rest of property to be filled out at the end of building the file
|
|
} else {
|
|
// Use legacy compression_name property, populated at the end of building
|
|
// the file. Not compatible with compression managers using custom
|
|
// algorithms / compression types.
|
|
assert(Slice(mgr->CompatibilityName())
|
|
.compare(GetBuiltinCompressionManager(
|
|
GetCompressFormatForVersion(
|
|
static_cast<uint32_t>(props.format_version)))
|
|
->CompatibilityName()) == 0);
|
|
}
|
|
}
|
|
void PostPopulateCompressionProperties() {
|
|
// Do not include "no compression" in the set. It's not really useful
|
|
// information whether there are any uncompressed blocks. Some kinds of
|
|
// blocks are never compressed anyway.
|
|
compression_types_used.Remove(kNoCompression);
|
|
size_t ctype_count = compression_types_used.count();
|
|
|
|
if (uses_explicit_compression_manager) {
|
|
// Stuff some extra debugging info as extra pseudo-options. Using
|
|
// underscore prefix to indicate they are special.
|
|
std::string& compression_options = props.compression_options;
|
|
compression_options.append("_compressor=");
|
|
compression_options.append(data_block_compressor
|
|
? data_block_compressor->GetId()
|
|
: std::string{});
|
|
compression_options.append("; ");
|
|
} else {
|
|
// No explicit compression manager
|
|
assert(compression_types_used.count() <= 1);
|
|
}
|
|
|
|
std::string& compression_name = props.compression_name;
|
|
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
|
|
// Fill in extended field of "compression name" property, which is the set
|
|
// of compression types used, sorted by unsigned byte and then hex
|
|
// encoded with two digits each (so that table properties are human
|
|
// readable).
|
|
assert(*compression_name.rbegin() == ';');
|
|
size_t pos = compression_name.size();
|
|
// Make space for the field contents
|
|
compression_name.append(ctype_count * 2, '\0');
|
|
char* ptr = compression_name.data() + pos;
|
|
// Populate the field contents
|
|
for (CompressionType t : compression_types_used) {
|
|
PutBaseChars<16>(&ptr, /*n=*/2, static_cast<unsigned char>(t),
|
|
/*uppercase=*/true);
|
|
}
|
|
assert(ptr == compression_name.data() + pos + ctype_count * 2);
|
|
// Allow additional fields in the future
|
|
compression_name.push_back(';');
|
|
} else {
|
|
// Use legacy compression naming. To adhere to requirements described in
|
|
// TableProperties::compression_name, we might have to replace the name
|
|
// based on the legacy configured compression type.
|
|
assert(compression_name.empty());
|
|
if (ctype_count == 0) {
|
|
// We could get a slight performance boost in the reader by marking the
|
|
// file as "no compression" if compression is configured but
|
|
// consistently rejected, but that would give misleading info for
|
|
// debugging purposes. So instead we record the configured compression
|
|
// type, matching the historical behavior.
|
|
if (data_block_compressor) {
|
|
compression_name = CompressionTypeToString(
|
|
data_block_compressor->GetPreferredCompressionType());
|
|
} else {
|
|
assert(basic_compressor == nullptr);
|
|
compression_name = CompressionTypeToString(kNoCompression);
|
|
}
|
|
} else if (compression_types_used.Contains(kZSTD)) {
|
|
compression_name = CompressionTypeToString(kZSTD);
|
|
} else {
|
|
compression_name =
|
|
CompressionTypeToString(*compression_types_used.begin());
|
|
}
|
|
}
|
|
}
|
|
|
|
private:
|
|
// Synchronize status & io_status accesses across threads from main thread,
|
|
// compression thread and write thread in parallel compression.
|
|
std::mutex status_mutex;
|
|
std::atomic<bool> status_ok;
|
|
Status status;
|
|
std::mutex io_status_mutex;
|
|
std::atomic<bool> io_status_ok;
|
|
IOStatus io_status;
|
|
};
|
|
|
|
BlockBasedTableBuilder::BlockBasedTableBuilder(
|
|
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* file) {
|
|
BlockBasedTableOptions sanitized_table_options(table_options);
|
|
auto ucmp = tbo.internal_comparator.user_comparator();
|
|
assert(ucmp);
|
|
(void)ucmp; // avoids unused variable error.
|
|
rep_ = new Rep(sanitized_table_options, tbo, file);
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
|
|
const_cast<TableProperties*>(&rep_->props));
|
|
|
|
BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
|
|
tbo.cur_file_num, &rep_->base_cache_key);
|
|
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StartParallelCompression();
|
|
} else if (rep_->basic_compressor) {
|
|
rep_->single_threaded_compressed_output.reserve(table_options.block_size);
|
|
}
|
|
}
|
|
|
|
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
|
|
// Catch errors where caller forgot to call Finish()
|
|
assert(rep_->state == Rep::State::kClosed);
|
|
delete rep_;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Add(const Slice& ikey, const Slice& value) {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
ValueType value_type;
|
|
SequenceNumber seq;
|
|
UnPackSequenceAndType(ExtractInternalKeyFooter(ikey), &seq, &value_type);
|
|
r->props.key_largest_seqno = std::max(r->props.key_largest_seqno, seq);
|
|
if (IsValueType(value_type)) {
|
|
#ifndef NDEBUG
|
|
if (r->props.num_entries > r->props.num_range_deletions) {
|
|
assert(r->internal_comparator.Compare(ikey, Slice(r->last_ikey)) > 0);
|
|
}
|
|
bool skip = false;
|
|
TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Add::skip", (void*)&skip);
|
|
if (skip) {
|
|
return;
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
auto should_flush = r->flush_block_policy->Update(ikey, value);
|
|
if (should_flush) {
|
|
assert(!r->data_block.empty());
|
|
r->first_key_in_next_block = &ikey;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
bool exceeds_buffer_limit =
|
|
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
|
|
bool exceeds_global_block_cache_limit = false;
|
|
|
|
// Increase cache charging for the last buffered data block
|
|
// only if the block is not going to be unbuffered immediately
|
|
// and there exists a cache reservation manager
|
|
if (!exceeds_buffer_limit &&
|
|
r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s =
|
|
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
exceeds_global_block_cache_limit = s.IsMemoryLimit();
|
|
}
|
|
|
|
if (exceeds_buffer_limit || exceeds_global_block_cache_limit) {
|
|
EnterUnbuffered();
|
|
}
|
|
}
|
|
|
|
// Add item to index block.
|
|
// We do not emit the index entry for a block until we have seen the
|
|
// first key for the next data block. This allows us to use shorter
|
|
// keys in the index block. For example, consider a block boundary
|
|
// between the keys "the quick brown fox" and "the who". We can use
|
|
// "the r" as the key for the index block entry since it is >= all
|
|
// entries in the first block and < all entries in subsequent
|
|
// blocks.
|
|
if (ok() && r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys.Clear();
|
|
} else {
|
|
r->index_builder->AddIndexEntry(r->last_ikey, &ikey,
|
|
r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Note: PartitionedFilterBlockBuilder requires key being added to filter
|
|
// builder after being added to index builder.
|
|
if (r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys.PushBack(ikey);
|
|
} else {
|
|
if (r->filter_builder != nullptr) {
|
|
r->filter_builder->AddWithPrevKey(
|
|
ExtractUserKeyAndStripTimestamp(ikey, r->ts_sz),
|
|
r->last_ikey.empty()
|
|
? Slice{}
|
|
: ExtractUserKeyAndStripTimestamp(r->last_ikey, r->ts_sz));
|
|
}
|
|
}
|
|
}
|
|
|
|
r->data_block.AddWithLastKey(ikey, value, r->last_ikey);
|
|
r->last_ikey.assign(ikey.data(), ikey.size());
|
|
assert(!r->last_ikey.empty());
|
|
if (r->state == Rep::State::kBuffered) {
|
|
// Buffered keys will be replayed from data_block_buffers during
|
|
// `Finish()` once compression dictionary has been finalized.
|
|
} else {
|
|
if (!r->IsParallelCompressionEnabled()) {
|
|
r->index_builder->OnKeyAdded(ikey, value);
|
|
}
|
|
}
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
Slice persisted_end = value;
|
|
// When timestamps should not be persisted, we physically strip away range
|
|
// tombstone end key's user timestamp before passing it along to block
|
|
// builder. Physically stripping away start key's user timestamp is
|
|
// handled at the block builder level in the same way as the other data
|
|
// blocks.
|
|
if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
|
|
persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
|
|
}
|
|
r->range_del_block.Add(ikey, persisted_end);
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
} else {
|
|
assert(false);
|
|
r->SetStatus(Status::InvalidArgument(
|
|
"BlockBasedBuilder::Add() received a key with invalid value type " +
|
|
std::to_string(static_cast<unsigned int>(value_type))));
|
|
return;
|
|
}
|
|
|
|
r->props.num_entries++;
|
|
r->props.raw_key_size += ikey.size();
|
|
if (!r->persist_user_defined_timestamps) {
|
|
r->props.raw_key_size -= r->ts_sz;
|
|
}
|
|
r->props.raw_value_size += value.size();
|
|
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
|
|
value_type == kTypeDeletionWithTimestamp) {
|
|
r->props.num_deletions++;
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
r->props.num_deletions++;
|
|
r->props.num_range_deletions++;
|
|
} else if (value_type == kTypeMerge) {
|
|
r->props.num_merge_operands++;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Flush() {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
if (r->data_block.empty()) {
|
|
return;
|
|
}
|
|
|
|
Slice uncompressed_block_data = r->data_block.Finish();
|
|
|
|
// NOTE: compression sampling is done here in the same thread as building
|
|
// the uncompressed block because of the requirements to call table
|
|
// property collectors:
|
|
// * BlockAdd function expects block_compressed_bytes_{fast,slow} for
|
|
// historical reasons. Probably a hassle to remove.
|
|
// * Collector is not thread safe so calls need to be serialized/synchronized.
|
|
// * Ideally, AddUserKey and BlockAdd calls need to line up such that a
|
|
// reported block corresponds to all the keys reported since the previous
|
|
// block.
|
|
|
|
// If requested, we sample one in every N block with a
|
|
// fast and slow compression algorithm and report the stats.
|
|
// The users can use these stats to decide if it is worthwhile
|
|
// enabling compression and they also get a hint about which
|
|
// compression algorithm wil be beneficial.
|
|
if (r->sample_for_compression > 0 &&
|
|
Random::GetTLSInstance()->OneIn(
|
|
static_cast<int>(r->sample_for_compression))) {
|
|
std::string sampled_output_fast;
|
|
std::string sampled_output_slow;
|
|
|
|
// Sampling with a fast compression algorithm
|
|
if (LZ4_Supported() || Snappy_Supported()) {
|
|
CompressionType c =
|
|
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
|
|
CompressionOptions options;
|
|
CompressionContext context(c, options);
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c);
|
|
|
|
OLD_CompressData(
|
|
uncompressed_block_data, info_tmp,
|
|
GetCompressFormatForVersion(r->table_options.format_version),
|
|
&sampled_output_fast);
|
|
}
|
|
|
|
// Sampling with a slow but high-compression algorithm
|
|
if (ZSTD_Supported() || Zlib_Supported()) {
|
|
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
|
|
CompressionOptions options;
|
|
CompressionContext context(c, options);
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c);
|
|
|
|
OLD_CompressData(
|
|
uncompressed_block_data, info_tmp,
|
|
GetCompressFormatForVersion(r->table_options.format_version),
|
|
&sampled_output_slow);
|
|
}
|
|
|
|
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
|
|
// Currently compression sampling is only enabled for data block.
|
|
r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
NotifyCollectTableCollectorsOnBlockAdd(
|
|
r->table_properties_collectors, uncompressed_block_data.size(),
|
|
sampled_output_slow.size(), sampled_output_fast.size());
|
|
} else {
|
|
NotifyCollectTableCollectorsOnBlockAdd(
|
|
r->table_properties_collectors, uncompressed_block_data.size(),
|
|
0 /*block_compressed_bytes_slow*/, 0 /*block_compressed_bytes_fast*/);
|
|
}
|
|
|
|
if (rep_->state == Rep::State::kBuffered) {
|
|
std::string uncompressed_block_holder;
|
|
uncompressed_block_holder.reserve(rep_->table_options.block_size);
|
|
r->data_block.SwapAndReset(uncompressed_block_holder);
|
|
assert(uncompressed_block_data.size() == uncompressed_block_holder.size());
|
|
rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_holder));
|
|
rep_->data_begin_offset += uncompressed_block_data.size();
|
|
} else if (r->IsParallelCompressionEnabled()) {
|
|
assert(rep_->state == Rep::State::kUnbuffered);
|
|
ParallelCompressionRep::BlockRep* block_rep =
|
|
r->pc_rep->PrepareBlock(r->first_key_in_next_block, &(r->data_block));
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
assert(rep_->state == Rep::State::kUnbuffered);
|
|
WriteBlock(uncompressed_block_data, &r->pending_handle, BlockType::kData);
|
|
r->data_block.Reset();
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
|
|
BlockHandle* handle,
|
|
BlockType block_type) {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kUnbuffered);
|
|
CompressionType type;
|
|
Status compress_status;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
CompressAndVerifyBlock(
|
|
uncompressed_block_data, is_data_block,
|
|
is_data_block ? r->data_block_working_areas[0] : r->basic_working_area,
|
|
&r->single_threaded_compressed_output, &type, &compress_status);
|
|
r->SetStatus(compress_status);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
|
|
&r->single_threaded_compressed_output);
|
|
WriteMaybeCompressedBlock(type == kNoCompression
|
|
? uncompressed_block_data
|
|
: Slice(r->single_threaded_compressed_output),
|
|
type, handle, block_type, &uncompressed_block_data);
|
|
r->single_threaded_compressed_output.clear();
|
|
if (is_data_block) {
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkCompression(WorkingAreaPair& working_area) {
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
|
|
assert(block_rep != nullptr);
|
|
// Skip compression if we are aborting anyway
|
|
if (ok()) {
|
|
CompressAndVerifyBlock(block_rep->uncompressed, true, /* is_data_block*/
|
|
working_area, &block_rep->compressed,
|
|
&block_rep->compression_type, &block_rep->status);
|
|
}
|
|
block_rep->slot.Fill(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::CompressAndVerifyBlock(
|
|
const Slice& uncompressed_block_data, bool is_data_block,
|
|
WorkingAreaPair& working_area, std::string* compressed_output,
|
|
CompressionType* result_compression_type, Status* out_status) {
|
|
Rep* r = rep_;
|
|
|
|
Compressor* compressor = nullptr;
|
|
Decompressor* verify_decomp = nullptr;
|
|
if (is_data_block) {
|
|
compressor = r->data_block_compressor;
|
|
verify_decomp = r->data_block_verify_decompressor.get();
|
|
} else {
|
|
compressor = r->basic_compressor.get();
|
|
verify_decomp = r->verify_decompressor.get();
|
|
}
|
|
|
|
CompressionType type = kNoCompression;
|
|
if (LIKELY(uncompressed_block_data.size() < kCompressionSizeLimit)) {
|
|
if (compressor) {
|
|
StopWatchNano timer(
|
|
r->ioptions.clock,
|
|
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
|
|
|
|
*out_status =
|
|
compressor->CompressBlock(uncompressed_block_data, compressed_output,
|
|
&type, &working_area.compress);
|
|
|
|
// Post-condition of Compressor::CompressBlock
|
|
assert(type == kNoCompression || out_status->ok());
|
|
assert(type == kNoCompression ||
|
|
r->table_options.verify_compression == (verify_decomp != nullptr));
|
|
|
|
// Check for acceptable compression ratio. (For efficiency, avoid floating
|
|
// point and division.)
|
|
// TODO: integrate into Compressor?
|
|
if (compressed_output->size() >
|
|
(static_cast<uint64_t>(r->max_compressed_bytes_per_kb) *
|
|
uncompressed_block_data.size()) >>
|
|
10) {
|
|
// Prefer to keep uncompressed
|
|
type = kNoCompression;
|
|
}
|
|
|
|
// Some of the compression algorithms are known to be unreliable. If
|
|
// the verify_compression flag is set then try to de-compress the
|
|
// compressed data and compare to the input.
|
|
if (verify_decomp && type != kNoCompression) {
|
|
BlockContents contents;
|
|
Status uncompress_status = DecompressBlockData(
|
|
compressed_output->data(), compressed_output->size(), type,
|
|
*verify_decomp, &contents, r->ioptions,
|
|
/*allocator=*/nullptr, &working_area.verify);
|
|
|
|
if (uncompress_status.ok()) {
|
|
bool data_match = contents.data.compare(uncompressed_block_data) == 0;
|
|
if (!data_match) {
|
|
// The result of the compression was invalid. abort.
|
|
const char* const msg =
|
|
"Decompressed block did not match pre-compression block";
|
|
ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
|
|
*out_status = Status::Corruption(msg);
|
|
type = kNoCompression;
|
|
}
|
|
} else {
|
|
// Decompression reported an error. abort.
|
|
*out_status =
|
|
Status::Corruption(std::string("Could not decompress: ") +
|
|
uncompress_status.getState());
|
|
type = kNoCompression;
|
|
}
|
|
}
|
|
if (timer.IsStarted()) {
|
|
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
|
|
timer.ElapsedNanos());
|
|
}
|
|
}
|
|
if (is_data_block) {
|
|
r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(),
|
|
std::memory_order_relaxed);
|
|
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
} else {
|
|
// Status is not OK, or block is too big to be compressed.
|
|
if (is_data_block) {
|
|
r->uncompressible_input_data_bytes.fetch_add(
|
|
uncompressed_block_data.size() + kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Abort compression if the block is too big, or did not pass
|
|
// verification.
|
|
if (type == kNoCompression) {
|
|
bool compression_attempted = !compressed_output->empty();
|
|
RecordTick(r->ioptions.stats, compression_attempted
|
|
? NUMBER_BLOCK_COMPRESSION_REJECTED
|
|
: NUMBER_BLOCK_COMPRESSION_BYPASSED);
|
|
RecordTick(r->ioptions.stats,
|
|
compression_attempted ? BYTES_COMPRESSION_REJECTED
|
|
: BYTES_COMPRESSION_BYPASSED,
|
|
uncompressed_block_data.size());
|
|
} else {
|
|
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
|
|
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
|
|
uncompressed_block_data.size());
|
|
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
|
|
compressed_output->size());
|
|
}
|
|
*result_compression_type = type;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
|
|
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
|
|
BlockType block_type, const Slice* uncompressed_block_data) {
|
|
// File format contains a sequence of blocks where each block has:
|
|
// block_data: uint8[n]
|
|
// compression_type: uint8
|
|
// checksum: uint32
|
|
Rep* r = rep_;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
IOOptions io_options;
|
|
IOStatus io_s =
|
|
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
// Old, misleading name of this function: WriteRawBlock
|
|
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
|
|
const uint64_t offset = r->get_offset();
|
|
handle->set_offset(offset);
|
|
handle->set_size(block_contents.size());
|
|
assert(status().ok());
|
|
assert(io_status().ok());
|
|
if (uncompressed_block_data == nullptr) {
|
|
uncompressed_block_data = &block_contents;
|
|
assert(comp_type == kNoCompression);
|
|
}
|
|
|
|
// TODO: consider a variant of this function that puts the trailer after
|
|
// block_contents (if it comes from a std::string) so we only need one
|
|
// r->file->Append call
|
|
{
|
|
io_s = r->file->Append(io_options, block_contents);
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
r->compression_types_used.Add(comp_type);
|
|
std::array<char, kBlockTrailerSize> trailer;
|
|
trailer[0] = comp_type;
|
|
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
|
|
r->table_options.checksum, block_contents.data(), block_contents.size(),
|
|
/*last_byte*/ comp_type);
|
|
checksum += ChecksumModifierForContext(r->base_context_checksum, offset);
|
|
|
|
if (block_type == BlockType::kFilter) {
|
|
Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
EncodeFixed32(trailer.data() + 1, checksum);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
|
|
trailer.data());
|
|
{
|
|
io_s = r->file->Append(io_options, Slice(trailer.data(), trailer.size()));
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (r->warm_cache) {
|
|
Status s =
|
|
InsertBlockInCacheHelper(*uncompressed_block_data, handle, block_type);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
r->pre_compression_size +=
|
|
uncompressed_block_data->size() + kBlockTrailerSize;
|
|
r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
|
|
if (r->table_options.block_align && is_data_block) {
|
|
size_t pad_bytes =
|
|
(r->alignment -
|
|
((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
|
|
(r->alignment - 1);
|
|
|
|
io_s = r->file->Pad(io_options, pad_bytes);
|
|
if (io_s.ok()) {
|
|
r->pre_compression_size += pad_bytes;
|
|
r->set_offset(r->get_offset() + pad_bytes);
|
|
} else {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
if (is_data_block) {
|
|
r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
|
|
r->get_offset());
|
|
} else {
|
|
r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
|
|
Rep* r = rep_;
|
|
ParallelCompressionRep::BlockRepSlot* slot = nullptr;
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
// Starts empty; see FilterBlockBuilder::AddWithPrevKey
|
|
std::string prev_block_last_key_no_ts;
|
|
while (r->pc_rep->write_queue.pop(slot)) {
|
|
// FIXME: this is weird popping off write queue just to wait again on
|
|
// compress queue
|
|
assert(slot != nullptr);
|
|
slot->Take(block_rep);
|
|
assert(block_rep != nullptr);
|
|
if (!block_rep->status.ok()) {
|
|
r->SetStatus(block_rep->status);
|
|
// Reap block so that blocked Flush() can finish
|
|
// if there is one, and Flush() will notice !ok() next time.
|
|
block_rep->status = Status::OK();
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
continue;
|
|
}
|
|
|
|
Slice prev_key_no_ts = prev_block_last_key_no_ts;
|
|
for (size_t i = 0; i < block_rep->keys.Size(); i++) {
|
|
auto& key = block_rep->keys[i];
|
|
if (r->filter_builder != nullptr) {
|
|
Slice key_no_ts = ExtractUserKeyAndStripTimestamp(key, r->ts_sz);
|
|
r->filter_builder->AddWithPrevKey(key_no_ts, prev_key_no_ts);
|
|
prev_key_no_ts = key_no_ts;
|
|
}
|
|
r->index_builder->OnKeyAdded(key, {});
|
|
}
|
|
if (r->filter_builder != nullptr) {
|
|
prev_block_last_key_no_ts.assign(prev_key_no_ts.data(),
|
|
prev_key_no_ts.size());
|
|
}
|
|
|
|
r->pc_rep->file_size_estimator.SetCurrBlockUncompSize(
|
|
block_rep->uncompressed.size());
|
|
Slice compressed = block_rep->compressed;
|
|
Slice uncompressed = block_rep->uncompressed;
|
|
WriteMaybeCompressedBlock(block_rep->compression_type == kNoCompression
|
|
? uncompressed
|
|
: compressed,
|
|
block_rep->compression_type, &r->pending_handle,
|
|
BlockType::kData, &uncompressed);
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
|
|
if (!block_rep->first_key_in_next_block.has_value()) {
|
|
r->index_builder->AddIndexEntry(block_rep->keys.Back(), nullptr,
|
|
r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
} else {
|
|
Slice first_key_in_next_block =
|
|
Slice(*block_rep->first_key_in_next_block);
|
|
r->index_builder->AddIndexEntry(
|
|
block_rep->keys.Back(), &first_key_in_next_block, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StartParallelCompression() {
|
|
rep_->pc_rep.reset(
|
|
new ParallelCompressionRep(rep_->compression_parallel_threads));
|
|
rep_->pc_rep->compress_thread_pool.reserve(
|
|
rep_->compression_parallel_threads);
|
|
for (uint32_t i = 0; i < rep_->compression_parallel_threads; i++) {
|
|
rep_->pc_rep->compress_thread_pool.emplace_back(
|
|
[this, i] { BGWorkCompression(rep_->data_block_working_areas[i]); });
|
|
}
|
|
rep_->pc_rep->write_thread.reset(
|
|
new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); }));
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StopParallelCompression() {
|
|
rep_->pc_rep->compress_queue.finish();
|
|
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
|
|
thread.join();
|
|
}
|
|
rep_->pc_rep->write_queue.finish();
|
|
rep_->pc_rep->write_thread->join();
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
|
|
|
|
IOStatus BlockBasedTableBuilder::io_status() const {
|
|
return rep_->GetIOStatus();
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
|
|
const Slice& block_contents, const BlockHandle* handle,
|
|
BlockType block_type) {
|
|
Cache* block_cache = rep_->table_options.block_cache.get();
|
|
Status s;
|
|
auto helper =
|
|
GetCacheItemHelper(block_type, rep_->ioptions.lowest_used_cache_tier);
|
|
if (block_cache && helper && helper->create_cb) {
|
|
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
|
|
size_t charge;
|
|
// NOTE: data blocks (and everything else) will be warmed in decompressed
|
|
// state, so does not need a dictionary-aware decompressor. The only thing
|
|
// needing a decompressor here (in create_context) is warming the
|
|
// (de)compression dictionary, which will clone and save a dict-based
|
|
// decompressor from the corresponding non-dict decompressor.
|
|
s = WarmInCache(block_cache, key.AsSlice(), block_contents,
|
|
&rep_->create_context, helper, Cache::Priority::LOW,
|
|
&charge);
|
|
if (s.ok()) {
|
|
BlockBasedTable::UpdateCacheInsertionMetrics(
|
|
block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
|
|
rep_->ioptions.stats);
|
|
} else {
|
|
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFilterBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
|
|
// No filter block needed
|
|
return;
|
|
}
|
|
if (!rep_->last_ikey.empty()) {
|
|
// We might have been using AddWithPrevKey, so need PrevKeyBeforeFinish
|
|
// to be safe. And because we are re-synchronized after buffered/parallel
|
|
// operation, rep_->last_ikey is accurate.
|
|
rep_->filter_builder->PrevKeyBeforeFinish(
|
|
ExtractUserKeyAndStripTimestamp(rep_->last_ikey, rep_->ts_sz));
|
|
}
|
|
BlockHandle filter_block_handle;
|
|
bool is_partitioned_filter = rep_->table_options.partition_filters;
|
|
if (ok()) {
|
|
rep_->props.num_filter_entries +=
|
|
rep_->filter_builder->EstimateEntriesAdded();
|
|
Status s = Status::Incomplete();
|
|
while (ok() && s.IsIncomplete()) {
|
|
// filter_data is used to store the transferred filter data payload from
|
|
// FilterBlockBuilder and deallocate the payload by going out of scope.
|
|
// Otherwise, the payload will unnecessarily remain until
|
|
// BlockBasedTableBuilder is deallocated.
|
|
//
|
|
// See FilterBlockBuilder::Finish() for more on the difference in
|
|
// transferred filter data payload among different FilterBlockBuilder
|
|
// subtypes.
|
|
std::unique_ptr<const char[]> filter_owner;
|
|
Slice filter_content;
|
|
s = rep_->filter_builder->Finish(filter_block_handle, &filter_content,
|
|
&filter_owner);
|
|
|
|
assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
|
|
if (s.IsCorruption()) {
|
|
rep_->SetStatus(s);
|
|
break;
|
|
}
|
|
|
|
rep_->props.filter_size += filter_content.size();
|
|
|
|
BlockType btype = is_partitioned_filter && /* last */ s.ok()
|
|
? BlockType::kFilterPartitionIndex
|
|
: BlockType::kFilter;
|
|
WriteMaybeCompressedBlock(filter_content, kNoCompression,
|
|
&filter_block_handle, btype);
|
|
}
|
|
rep_->filter_builder->ResetFilterBitsBuilder();
|
|
}
|
|
if (ok()) {
|
|
// Add mapping from "<filter_block_prefix>.Name" to location
|
|
// of filter data.
|
|
std::string key;
|
|
key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
|
|
: BlockBasedTable::kFullFilterBlockPrefix;
|
|
key.append(rep_->table_options.filter_policy->CompatibilityName());
|
|
meta_index_builder->Add(key, filter_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteIndexBlock(
|
|
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
IndexBuilder::IndexBlocks index_blocks;
|
|
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
|
|
if (ok() && !index_builder_status.ok() &&
|
|
!index_builder_status.IsIncomplete()) {
|
|
// If the index builder failed for non-Incomplete errors, we should
|
|
// mark the entire builder as having failed wit that status. However,
|
|
// If the index builder failed with an incomplete error, we should
|
|
// continue writing out any meta blocks that may have been generated.
|
|
rep_->SetStatus(index_builder_status);
|
|
}
|
|
|
|
if (ok()) {
|
|
for (const auto& item : index_blocks.meta_blocks) {
|
|
BlockHandle block_handle;
|
|
if (item.second.first == BlockType::kIndex) {
|
|
WriteBlock(item.second.second, &block_handle, item.second.first);
|
|
} else {
|
|
assert(item.second.first == BlockType::kUserDefinedIndex);
|
|
WriteMaybeCompressedBlock(item.second.second, kNoCompression,
|
|
&block_handle, item.second.first);
|
|
}
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
meta_index_builder->Add(item.first, block_handle);
|
|
}
|
|
}
|
|
if (ok()) {
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
|
|
kNoCompression, index_block_handle,
|
|
BlockType::kIndex);
|
|
}
|
|
}
|
|
// If there are more index partitions, finish them and write them out
|
|
if (index_builder_status.IsIncomplete()) {
|
|
bool index_building_finished = false;
|
|
while (ok() && !index_building_finished) {
|
|
Status s =
|
|
rep_->index_builder->Finish(&index_blocks, *index_block_handle);
|
|
if (s.ok()) {
|
|
index_building_finished = true;
|
|
} else if (s.IsIncomplete()) {
|
|
// More partitioned index after this one
|
|
assert(!index_building_finished);
|
|
} else {
|
|
// Error
|
|
rep_->SetStatus(s);
|
|
return;
|
|
}
|
|
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
|
|
kNoCompression, index_block_handle,
|
|
BlockType::kIndex);
|
|
}
|
|
// The last index_block_handle will be for the partition index block
|
|
}
|
|
}
|
|
// If success and need to record in metaindex rather than footer...
|
|
if (ok() && !FormatVersionUsesIndexHandleInFooter(
|
|
rep_->table_options.format_version)) {
|
|
meta_index_builder->Add(kIndexBlockName, *index_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WritePropertiesBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
BlockHandle properties_block_handle;
|
|
if (ok()) {
|
|
PropertyBlockBuilder property_block_builder;
|
|
rep_->props.filter_policy_name =
|
|
rep_->table_options.filter_policy != nullptr
|
|
? rep_->table_options.filter_policy->Name()
|
|
: "";
|
|
rep_->props.index_size =
|
|
rep_->index_builder->IndexSize() + kBlockTrailerSize;
|
|
rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
|
|
? rep_->ioptions.user_comparator->Name()
|
|
: "nullptr";
|
|
rep_->props.merge_operator_name =
|
|
rep_->ioptions.merge_operator != nullptr
|
|
? rep_->ioptions.merge_operator->Name()
|
|
: "nullptr";
|
|
rep_->props.prefix_extractor_name =
|
|
rep_->prefix_extractor ? rep_->prefix_extractor->AsString() : "nullptr";
|
|
std::string property_collectors_names = "[";
|
|
for (size_t i = 0;
|
|
i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
|
|
if (i != 0) {
|
|
property_collectors_names += ",";
|
|
}
|
|
property_collectors_names +=
|
|
rep_->ioptions.table_properties_collector_factories[i]->Name();
|
|
}
|
|
property_collectors_names += "]";
|
|
rep_->props.property_collectors_names = property_collectors_names;
|
|
|
|
rep_->PostPopulateCompressionProperties();
|
|
|
|
if (rep_->table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
assert(rep_->p_index_builder_ != nullptr);
|
|
rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
|
|
rep_->props.top_level_index_size =
|
|
rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
|
|
}
|
|
rep_->props.index_key_is_user_key =
|
|
!rep_->index_builder->seperator_is_key_plus_seq();
|
|
rep_->props.index_value_is_delta_encoded =
|
|
rep_->use_delta_encoding_for_index_values;
|
|
if (rep_->sampled_input_data_bytes > 0) {
|
|
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_slow_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_fast_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
} else if (rep_->sample_for_compression > 0) {
|
|
// We tried to sample but none were found. Assume worst-case (compression
|
|
// ratio 1.0) so data is complete and aggregatable.
|
|
rep_->props.slow_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
rep_->props.fast_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
}
|
|
rep_->props.user_defined_timestamps_persisted =
|
|
rep_->persist_user_defined_timestamps;
|
|
|
|
assert(IsEmpty() || rep_->props.key_largest_seqno != UINT64_MAX);
|
|
// Add basic properties
|
|
property_block_builder.AddTableProperty(rep_->props);
|
|
|
|
// Add use collected properties
|
|
NotifyCollectTableCollectorsOnFinish(
|
|
rep_->table_properties_collectors, rep_->ioptions.logger,
|
|
&property_block_builder, rep_->props.user_collected_properties,
|
|
rep_->props.readable_properties);
|
|
|
|
Slice block_data = property_block_builder.Finish();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
|
|
WriteMaybeCompressedBlock(block_data, kNoCompression,
|
|
&properties_block_handle, BlockType::kProperties);
|
|
}
|
|
if (ok()) {
|
|
#ifndef NDEBUG
|
|
{
|
|
uint64_t props_block_offset = properties_block_handle.offset();
|
|
uint64_t props_block_size = properties_block_handle.size();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
|
|
&props_block_offset);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
|
|
&props_block_size);
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
const std::string* properties_block_meta = &kPropertiesBlockName;
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:Meta",
|
|
&properties_block_meta);
|
|
meta_index_builder->Add(*properties_block_meta, properties_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
Slice compression_dict;
|
|
if (rep_->compressor_with_dict) {
|
|
compression_dict = rep_->compressor_with_dict->GetSerializedDict();
|
|
}
|
|
if (!compression_dict.empty()) {
|
|
BlockHandle compression_dict_block_handle;
|
|
if (ok()) {
|
|
WriteMaybeCompressedBlock(compression_dict, kNoCompression,
|
|
&compression_dict_block_handle,
|
|
BlockType::kCompressionDictionary);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
|
|
&compression_dict);
|
|
}
|
|
if (ok()) {
|
|
meta_index_builder->Add(kCompressionDictBlockName,
|
|
compression_dict_block_handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteRangeDelBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (ok() && !rep_->range_del_block.empty()) {
|
|
BlockHandle range_del_block_handle;
|
|
WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
|
|
&range_del_block_handle,
|
|
BlockType::kRangeDeletion);
|
|
meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
|
|
BlockHandle& index_block_handle) {
|
|
assert(ok());
|
|
Rep* r = rep_;
|
|
// this is guaranteed by BlockBasedTableBuilder's constructor
|
|
assert(r->table_options.checksum == kCRC32c ||
|
|
r->table_options.format_version != 0);
|
|
FooterBuilder footer;
|
|
Status s = footer.Build(kBlockBasedTableMagicNumber,
|
|
r->table_options.format_version, r->get_offset(),
|
|
r->table_options.checksum, metaindex_block_handle,
|
|
index_block_handle, r->base_context_checksum);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
IOOptions io_options;
|
|
IOStatus ios =
|
|
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
|
|
if (!ios.ok()) {
|
|
r->SetIOStatus(ios);
|
|
return;
|
|
}
|
|
ios = r->file->Append(io_options, footer.GetSlice());
|
|
if (ios.ok()) {
|
|
r->pre_compression_size += footer.GetSlice().size();
|
|
r->set_offset(r->get_offset() + footer.GetSlice().size());
|
|
} else {
|
|
r->SetIOStatus(ios);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::EnterUnbuffered() {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kBuffered);
|
|
r->state = Rep::State::kUnbuffered;
|
|
const size_t kNumBlocksBuffered = r->data_block_buffers.size();
|
|
if (kNumBlocksBuffered == 0) {
|
|
// The below code is neither safe nor necessary for handling zero data
|
|
// blocks.
|
|
// For PostPopulateCompressionProperties()
|
|
r->data_block_compressor = r->basic_compressor.get();
|
|
return;
|
|
}
|
|
|
|
// Abstract algebra teaches us that a finite cyclic group (such as the
|
|
// additive group of integers modulo N) can be generated by a number that is
|
|
// coprime with N. Since N is variable (number of buffered data blocks), we
|
|
// must then pick a prime number in order to guarantee coprimeness with any N.
|
|
//
|
|
// One downside of this approach is the spread will be poor when
|
|
// `kPrimeGeneratorRemainder` is close to zero or close to
|
|
// `kNumBlocksBuffered`.
|
|
//
|
|
// Picked a random number between one and one trillion and then chose the
|
|
// next prime number greater than or equal to it.
|
|
const uint64_t kPrimeGenerator = 545055921143ull;
|
|
// Can avoid repeated division by just adding the remainder repeatedly.
|
|
const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
|
|
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
|
|
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
|
|
|
|
Compressor::DictSampleArgs samples;
|
|
size_t buffer_idx = kInitSampleIdx;
|
|
for (size_t i = 0; i < kNumBlocksBuffered &&
|
|
samples.sample_data.size() < r->max_dict_sample_bytes;
|
|
++i) {
|
|
size_t copy_len =
|
|
std::min(r->max_dict_sample_bytes - samples.sample_data.size(),
|
|
r->data_block_buffers[buffer_idx].size());
|
|
samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
|
|
samples.sample_lens.emplace_back(copy_len);
|
|
|
|
buffer_idx += kPrimeGeneratorRemainder;
|
|
if (buffer_idx >= kNumBlocksBuffered) {
|
|
buffer_idx -= kNumBlocksBuffered;
|
|
}
|
|
}
|
|
|
|
assert(samples.sample_data.size() > 0);
|
|
|
|
// final sample data block flushed, now we can generate dictionary
|
|
r->compressor_with_dict = r->basic_compressor->MaybeCloneSpecialized(
|
|
CacheEntryRole::kDataBlock, std::move(samples));
|
|
|
|
// The compressor might opt not to use a dictionary, in which case we
|
|
// can use the same compressor as for e.g. index blocks.
|
|
r->data_block_compressor = r->compressor_with_dict
|
|
? r->compressor_with_dict.get()
|
|
: r->basic_compressor.get();
|
|
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
|
|
r->data_block_working_areas[i].compress =
|
|
r->data_block_compressor->ObtainWorkingArea();
|
|
}
|
|
Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
|
|
if (r->verify_decompressor) {
|
|
if (serialized_dict.empty()) {
|
|
// No dictionary
|
|
r->data_block_verify_decompressor = r->verify_decompressor.get();
|
|
} else {
|
|
// Get an updated dictionary-aware decompressor for verification.
|
|
Status s = r->verify_decompressor->MaybeCloneForDict(
|
|
serialized_dict, &r->verify_decompressor_with_dict);
|
|
// Dictionary support must be present on the decompressor side if it's on
|
|
// the compressor side.
|
|
assert(r->verify_decompressor_with_dict);
|
|
if (r->verify_decompressor_with_dict) {
|
|
r->data_block_verify_decompressor =
|
|
r->verify_decompressor_with_dict.get();
|
|
for (uint32_t i = 0; i < r->compression_parallel_threads; i++) {
|
|
r->data_block_working_areas[i].verify =
|
|
r->data_block_verify_decompressor->ObtainWorkingArea(
|
|
r->data_block_compressor->GetPreferredCompressionType());
|
|
}
|
|
assert(s.ok());
|
|
} else {
|
|
assert(!s.ok());
|
|
r->SetStatus(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
auto get_iterator_for_block = [&r](size_t i) {
|
|
auto& data_block = r->data_block_buffers[i];
|
|
assert(!data_block.empty());
|
|
|
|
Block reader{BlockContents{data_block}};
|
|
DataBlockIter* iter = reader.NewDataIterator(
|
|
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
|
|
nullptr /* iter */, nullptr /* stats */,
|
|
false /* block_contents_pinned */, r->persist_user_defined_timestamps);
|
|
|
|
iter->SeekToFirst();
|
|
assert(iter->Valid());
|
|
return std::unique_ptr<DataBlockIter>(iter);
|
|
};
|
|
|
|
std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
|
|
|
|
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
|
|
if (iter == nullptr) {
|
|
iter = get_iterator_for_block(i);
|
|
assert(iter != nullptr);
|
|
};
|
|
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
next_block_iter = get_iterator_for_block(i + 1);
|
|
}
|
|
|
|
auto& data_block = r->data_block_buffers[i];
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
Slice first_key_in_next_block;
|
|
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
first_key_in_next_block = next_block_iter->key();
|
|
} else {
|
|
first_key_in_next_block_ptr = r->first_key_in_next_block;
|
|
}
|
|
|
|
std::vector<std::string> keys;
|
|
for (; iter->Valid(); iter->Next()) {
|
|
keys.emplace_back(iter->key().ToString());
|
|
}
|
|
|
|
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
|
|
first_key_in_next_block_ptr, &data_block, &keys);
|
|
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->uncompressed.size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
for (; iter->Valid(); iter->Next()) {
|
|
Slice key = iter->key();
|
|
if (r->filter_builder != nullptr) {
|
|
// NOTE: AddWithPrevKey here would only save key copying if prev is
|
|
// pinned (iter->IsKeyPinned()), which is probably rare with delta
|
|
// encoding. OK to go from Add() here to AddWithPrevKey() in
|
|
// unbuffered operation.
|
|
r->filter_builder->Add(
|
|
ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
|
|
}
|
|
r->index_builder->OnKeyAdded(key, iter->value());
|
|
}
|
|
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
|
|
if (ok() && i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
Slice first_key_in_next_block = next_block_iter->key();
|
|
|
|
Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
|
|
iter->SeekToLast();
|
|
assert(iter->Valid());
|
|
r->index_builder->AddIndexEntry(
|
|
iter->key(), first_key_in_next_block_ptr, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
std::swap(iter, next_block_iter);
|
|
}
|
|
r->data_block_buffers.clear();
|
|
r->data_begin_offset = 0;
|
|
// Release all reserved cache for data block buffers
|
|
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
s.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::Finish() {
|
|
Rep* r = rep_;
|
|
assert(r->state != Rep::State::kClosed);
|
|
bool empty_data_block = r->data_block.empty();
|
|
r->first_key_in_next_block = nullptr;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
EnterUnbuffered();
|
|
}
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
#ifndef NDEBUG
|
|
for (const auto& br : r->pc_rep->block_rep_buf) {
|
|
assert(br.status.ok());
|
|
}
|
|
#endif // !NDEBUG
|
|
} else {
|
|
// To make sure properties block is able to keep the accurate size of index
|
|
// block, we will finish writing all index entries first.
|
|
if (ok() && !empty_data_block) {
|
|
r->index_builder->AddIndexEntry(
|
|
r->last_ikey, nullptr /* no next data block */, r->pending_handle,
|
|
&r->index_separator_scratch);
|
|
}
|
|
}
|
|
|
|
r->props.tail_start_offset = r->offset;
|
|
|
|
// Write meta blocks, metaindex block and footer in the following order.
|
|
// 1. [meta block: filter]
|
|
// 2. [meta block: index]
|
|
// 3. [meta block: compression dictionary]
|
|
// 4. [meta block: range deletion tombstone]
|
|
// 5. [meta block: properties]
|
|
// 6. [metaindex block]
|
|
// 7. Footer
|
|
BlockHandle metaindex_block_handle, index_block_handle;
|
|
MetaIndexBuilder meta_index_builder;
|
|
WriteFilterBlock(&meta_index_builder);
|
|
WriteIndexBlock(&meta_index_builder, &index_block_handle);
|
|
WriteCompressionDictBlock(&meta_index_builder);
|
|
WriteRangeDelBlock(&meta_index_builder);
|
|
WritePropertiesBlock(&meta_index_builder);
|
|
if (ok()) {
|
|
// flush the meta index block
|
|
WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
|
|
&metaindex_block_handle, BlockType::kMetaIndex);
|
|
}
|
|
if (ok()) {
|
|
WriteFooter(metaindex_block_handle, index_block_handle);
|
|
}
|
|
r->state = Rep::State::kClosed;
|
|
r->tail_size = r->offset - r->props.tail_start_offset;
|
|
|
|
Status ret_status = r->CopyStatus();
|
|
IOStatus ios = r->GetIOStatus();
|
|
if (!ios.ok() && ret_status.ok()) {
|
|
// Let io_status supersede ok status (otherwise status takes precedennce)
|
|
ret_status = ios;
|
|
}
|
|
return ret_status;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Abandon() {
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
}
|
|
rep_->state = Rep::State::kClosed;
|
|
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition
|
|
rep_->CopyStatus().PermitUncheckedError();
|
|
rep_->CopyIOStatus().PermitUncheckedError();
|
|
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::NumEntries() const {
|
|
return rep_->props.num_entries;
|
|
}
|
|
|
|
bool BlockBasedTableBuilder::IsEmpty() const {
|
|
return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::PreCompressionSize() const {
|
|
return rep_->pre_compression_size;
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
|
|
|
|
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
// Use compression ratio so far and inflight uncompressed bytes to estimate
|
|
// final SST size.
|
|
return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
|
|
} else {
|
|
return FileSize();
|
|
}
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::GetTailSize() const { return rep_->tail_size; }
|
|
|
|
bool BlockBasedTableBuilder::NeedCompact() const {
|
|
for (const auto& collector : rep_->table_properties_collectors) {
|
|
if (collector->NeedCompact()) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
|
|
return rep_->props;
|
|
}
|
|
|
|
std::string BlockBasedTableBuilder::GetFileChecksum() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksum();
|
|
} else {
|
|
return kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksumFuncName();
|
|
} else {
|
|
return kUnknownFileChecksumFuncName;
|
|
}
|
|
}
|
|
void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
|
|
const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) {
|
|
assert(rep_->props.seqno_to_time_mapping.empty());
|
|
relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping);
|
|
rep_->props.creation_time = oldest_ancestor_time;
|
|
}
|
|
|
|
const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
|
|
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
|
|
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
|
|
"partitionedfilter.";
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|