Summary: This exposes CompressionManager and related classes to the public API and adds `ColumnFamilyOptions::compression_manager` for tying a custom compression strategy to a column family. At the moment, this does not support custom/pluggable compression algorithms, just custom strategies around the built-in algorithms, e.g. which compression to use when and where. A large part of the change is moving code from internal compression.h to a new public header advanced_compression.h, with some minor changes: * `Decompressor::ExtractUncompressedSize()` is out-of-lined * CompressionManager inherits Customizable and some related changes to members of CompressionManager are made. (Core functionality of CompressionManager is unchanged.) This depends on a smart pointer I'm calling `ManagedPtr` which I'm adding to data_structure.h. Additionally, advanced_compression.h gets CompressorWrapper and CompressionManagerWrapper as building blocks for overriding aspects of compression strategy while leveraging existing compression algorithms / schemas. Some pieces needed to support the `compression_manager` option and rudimentary Customizable implementation are included. More work will be needed to make this general and well-behaved (see e.g. https://github.com/facebook/rocksdb/issues/8641; I still hit inscrutible problems every time I touch Customizable). I'll add a release note for the experimental feature once pluggable compression algorithms and more of the Customizable things are working. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13626 Test Plan: Added a unit test demonstrating how a custom compressor can "bypass" or "reject" compressions. Expected next follow-up (probably someone else): use a custom CompressionManager/Compressor to replace the internal hack for testing mixed compressions. Reviewed By: hx235 Differential Revision: D75028850 Pulled By: pdillinger fbshipit-source-id: 8565bb8ba4b5fa923b1e29e76b4f7bb4faa42381
172 lines
6.4 KiB
C++
172 lines
6.4 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
#include "file/file_util.h"
|
|
#include "memory/memory_allocator_impl.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_type.h"
|
|
#include "table/format.h"
|
|
#include "table/persistent_cache_options.h"
|
|
#include "util/cast_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Retrieves a single block of a given file. Utilizes the prefetch buffer and/or
|
|
// persistent cache provided (if any) to try to avoid reading from the file
|
|
// directly. Note that both the prefetch buffer and the persistent cache are
|
|
// optional; also, note that the persistent cache may be configured to store
|
|
// either compressed or uncompressed blocks.
|
|
//
|
|
// If the retrieved block is compressed and the do_uncompress flag is set,
|
|
// BlockFetcher uncompresses the block (using the uncompression dictionary,
|
|
// if provided, to prime the compression algorithm), and returns the resulting
|
|
// uncompressed block data. Otherwise, it returns the original block.
|
|
//
|
|
// Two read options affect the behavior of BlockFetcher: if verify_checksums is
|
|
// true, the checksum of the (original) block is checked; if fill_cache is true,
|
|
// the block is added to the persistent cache if needed.
|
|
//
|
|
// Memory for uncompressed and compressed blocks is allocated as needed
|
|
// using memory_allocator and memory_allocator_compressed, respectively
|
|
// (if provided; otherwise, the default allocator is used).
|
|
|
|
class BlockFetcher {
|
|
public:
|
|
BlockFetcher(RandomAccessFileReader* file,
|
|
FilePrefetchBuffer* prefetch_buffer,
|
|
const Footer& footer /* ref retained */,
|
|
const ReadOptions& read_options,
|
|
const BlockHandle& handle /* ref retained */,
|
|
BlockContents* contents,
|
|
const ImmutableOptions& ioptions /* ref retained */,
|
|
bool do_uncompress, bool maybe_compressed, BlockType block_type,
|
|
UnownedPtr<Decompressor> decompressor,
|
|
const PersistentCacheOptions& cache_options /* ref retained */,
|
|
MemoryAllocator* memory_allocator = nullptr,
|
|
MemoryAllocator* memory_allocator_compressed = nullptr,
|
|
bool for_compaction = false)
|
|
: file_(file),
|
|
prefetch_buffer_(prefetch_buffer),
|
|
footer_(footer),
|
|
read_options_(read_options),
|
|
handle_(handle),
|
|
contents_(contents),
|
|
ioptions_(ioptions),
|
|
do_uncompress_(do_uncompress),
|
|
maybe_compressed_(maybe_compressed),
|
|
block_type_(block_type),
|
|
block_size_(static_cast<size_t>(handle_.size())),
|
|
block_size_with_trailer_(block_size_ + footer.GetBlockTrailerSize()),
|
|
decompressor_(decompressor),
|
|
cache_options_(cache_options),
|
|
memory_allocator_(memory_allocator),
|
|
memory_allocator_compressed_(memory_allocator_compressed),
|
|
for_compaction_(for_compaction) {
|
|
io_status_.PermitUncheckedError(); // TODO(AR) can we improve on this?
|
|
if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) {
|
|
use_fs_scratch_ = true;
|
|
}
|
|
if (CheckFSFeatureSupport(ioptions_.fs.get(),
|
|
FSSupportedOps::kVerifyAndReconstructRead)) {
|
|
retry_corrupt_read_ = true;
|
|
}
|
|
}
|
|
|
|
IOStatus ReadBlockContents();
|
|
IOStatus ReadAsyncBlockContents();
|
|
|
|
inline CompressionType compression_type() const {
|
|
return decomp_args_.compression_type;
|
|
}
|
|
inline CompressionType& compression_type() {
|
|
return decomp_args_.compression_type;
|
|
}
|
|
inline size_t GetBlockSizeWithTrailer() const {
|
|
return block_size_with_trailer_;
|
|
}
|
|
inline Slice& GetCompressedBlock() {
|
|
assert(compression_type() != kNoCompression);
|
|
return slice_;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
int TEST_GetNumStackBufMemcpy() const { return num_stack_buf_memcpy_; }
|
|
int TEST_GetNumHeapBufMemcpy() const { return num_heap_buf_memcpy_; }
|
|
int TEST_GetNumCompressedBufMemcpy() const {
|
|
return num_compressed_buf_memcpy_;
|
|
}
|
|
|
|
#endif
|
|
private:
|
|
#ifndef NDEBUG
|
|
int num_stack_buf_memcpy_ = 0;
|
|
int num_heap_buf_memcpy_ = 0;
|
|
int num_compressed_buf_memcpy_ = 0;
|
|
|
|
#endif
|
|
static const uint32_t kDefaultStackBufferSize = 5000;
|
|
|
|
RandomAccessFileReader* file_;
|
|
FilePrefetchBuffer* prefetch_buffer_;
|
|
const Footer& footer_;
|
|
const ReadOptions read_options_;
|
|
const BlockHandle& handle_;
|
|
BlockContents* contents_;
|
|
const ImmutableOptions& ioptions_;
|
|
const bool do_uncompress_;
|
|
const bool maybe_compressed_;
|
|
const BlockType block_type_;
|
|
const size_t block_size_;
|
|
const size_t block_size_with_trailer_;
|
|
UnownedPtr<Decompressor> decompressor_;
|
|
const PersistentCacheOptions& cache_options_;
|
|
MemoryAllocator* memory_allocator_;
|
|
MemoryAllocator* memory_allocator_compressed_;
|
|
IOStatus io_status_;
|
|
Slice slice_;
|
|
char* used_buf_ = nullptr;
|
|
AlignedBuf direct_io_buf_;
|
|
CacheAllocationPtr heap_buf_;
|
|
CacheAllocationPtr compressed_buf_;
|
|
char stack_buf_[kDefaultStackBufferSize];
|
|
bool got_from_prefetch_buffer_ = false;
|
|
bool for_compaction_ = false;
|
|
bool use_fs_scratch_ = false;
|
|
bool retry_corrupt_read_ = false;
|
|
FSAllocationPtr fs_buf_;
|
|
Decompressor::Args decomp_args_;
|
|
|
|
// return true if found
|
|
bool TryGetUncompressBlockFromPersistentCache();
|
|
// return true if found
|
|
bool TryGetFromPrefetchBuffer();
|
|
bool TryGetSerializedBlockFromPersistentCache();
|
|
void PrepareBufferForBlockFromFile();
|
|
// Copy content from used_buf_ to new heap_buf_.
|
|
void CopyBufferToHeapBuf();
|
|
// Copy content from used_buf_ to new compressed_buf_.
|
|
void CopyBufferToCompressedBuf();
|
|
void GetBlockContents();
|
|
void InsertCompressedBlockToPersistentCacheIfNeeded();
|
|
void InsertUncompressedBlockToPersistentCacheIfNeeded();
|
|
void ProcessTrailerIfPresent();
|
|
void ReadBlock(bool retry);
|
|
|
|
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
|
|
if (use_fs_scratch_) {
|
|
// Free the scratch buffer allocated by FileSystem.
|
|
if (read_req->fs_scratch != nullptr) {
|
|
read_req->fs_scratch.reset();
|
|
read_req->fs_scratch = nullptr;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|