rocksdb/port/win/xpress_win.cc
Peter Dillinger 0a169cea0e Compressor::CompressBlock API change and refactoring/improvement (#13805)
Summary:
The main motivation for this change is to more flexibly and efficiently support compressing data without extra copies when we do not want to support saving compressed data that is LARGER than the uncompressed. We believe pretty strongly that for the various workloads served by RocksDB, it is well worth a single byte compression marker so that we have the flexibility to save compressed or uncompressed data when compression is attempted. Why? Compression algorithms can add tens of bytes in fixed overheads and percents of bytes in relative overheads. It is also an advantage for the reader when they can bypass decompression, including at least a buffer copy in most cases, after reading just one byte.

The block-based table format in RocksDB follows this model with a single-byte compression marker, and at least after https://github.com/facebook/rocksdb/pull/13797 so does CompressedSecondaryCache. (Notably, the blob file format DOES NOT. This is left to follow-up work.)

In particular, Compressor::CompressBlock now takes in a fixed size buffer for output rather than a `std::string*`. CompressBlock itself rejects the compression if the output would not fit in the provided buffer. This also works well with `max_compressed_bytes_per_kb` option to reject compression even sooner if its ratio is insufficient (implemented in this change). In the future we might use this functionality to reduce a buffer copy (in many cases) into the WritableFileWriter buffer of the block based table builder.

This is a large change because we needed to (or were compelled to)
* Update all the existing callers of CompressBlock, sometimes with substantial changes. This includes introducing GrowableBuffer to reuse between calls rather than std::string, which (at least in C++17) requires zeroing out data when allocating/growing a buffer.
* Re-implement built-in Compressors (V2; V1 is obsolete) to efficiently implement the new version of the API, no longer wrapping the `OLD_CompressData()` function. The new compressors appropriately leverage the CompressBlock virtual call required for the customization interface and no rely on `switch` on compression type for each block. The implementations are largely adaptations of the old implementations, except
  * LZ4 and LZ4HC are notably upgraded to take advantage of WorkingArea (see performance tests). And for simplicity in the new implementation, we are dropping support for some super old versions of the library.
  * Getting snappy to work with limited-size output buffer required using the Sink/Source interfaces, which appear to be well supported for a long time and efficient (see performance tests).
* Replace awkward old CompressionManager::GetDecompressorForCompressor with Compressor::GetOptimizedDecompressor (which is optional to implement)
* Small behavior change where we treat lack of support for compression closer to not configuring compression, such as incompatibility with block_align. This is motivated by giving CompressionManager the freedom of determining when compression can be excluded for an entire file despite the configured "compression" type, and thus only surfacing actual incompatibilities not hypothetical ones that might be irrelevant to the CompressionManager (or build configuration). Unit tests in `table_test` and `compact_files_test` required update.
* Some lingering clean up of CompressedSecondaryCache and a re-optimization made possible by compressing into an existing buffer.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13805

Test Plan:
for correctness, existing tests

## Performance Test

As I generally only modified compression paths, I'm using a db_bench write benchmark, with before & after configurations running at the same time. vc=1 means verify_compression=1

```
USE_CLANG=1 DEBUG_LEVEL=0 LIB_MODE=static make -j100 db_bench
SUFFIX=`tty | sed 's|/|_|g'`; for CT in zlib bzip2 none snappy zstd lz4 lz4hc none snappy zstd lz4 bzip2; do for VC in 0 1; do echo "$CT vc=$VC"; (for I in `seq 1 20`; do BIN=/dev/shm/dbbench${SUFFIX}.bin; rm -f $BIN; cp db_bench $BIN; $BIN -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -format_version=7 -compression_type=$CT -verify_compression=$VC 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done; done
```

zlib vc=0 524198 -> 524904 (+0.1%)
zlib vc=1 430521 -> 430699 (+0.0%)
bzip2 vc=0 61841 -> 60835 (-1.6%)
bzip2 vc=1 49232 -> 48734 (-1.0%)
none vc=0 1802375 -> 1906227 (+5.8%)
none vc=1 1837181 -> 1950308 (+6.2%)
snappy vc=0 1783266 -> 1901461 (+6.6%)
snappy vc=1 1799703 -> 1879660 (+4.4%)
zstd vc=0 1216779 -> 1230507 (+1.1%)
zstd vc=1 996370 -> 1015415 (+1.9%)
lz4 vc=0 1801473 -> 1943095 (+7.9%)
lz4 vc=1 1799155 -> 1935242 (+7.6%)
lz4hc vc=0 349719 -> 1126909 (+222.2%)
lz4hc vc=1 348099 -> 1108933 (+218.6%)
(Repeating the most important ones)
none vc=0 1816878 -> 1952221 (+7.4%)
none vc=1 1813736 -> 1904622 (+5.0%)
snappy vc=0 1794816 -> 1875062 (+4.5%)
snappy vc=1 1789363 -> 1873771 (+4.7%)
zstd vc=0 1202592 -> 1225164 (+1.9%)
zstd vc=1 994322 -> 1016688 (+2.2%)
lz4 vc=0 1786959 -> 1971518 (+10.3%)
lz4 vc=1 1829483 -> 1935871 (+5.8%)

I confirmed manually that the new WorkingArea for LZ4HC makes the huge difference on that one, but not as much difference for LZ4, presumably because LZ4HC uses much larger buffers/structures/whatever for better compression ratios.

Reviewed By: hx235

Differential Revision: D79111736

Pulled By: pdillinger

fbshipit-source-id: 1ce1b14af9f15365f1b6da49906b5073a8cecc14
2025-07-31 08:39:56 -07:00

359 lines
10 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#if defined(OS_WIN)
#include "port/win/xpress_win.h"
#include <windows.h>
#include <cassert>
#include <iostream>
#include <limits>
#include <memory>
#ifdef XPRESS
// Put this under ifdef so windows systems w/o this
// can still build
#include <compressapi.h>
namespace ROCKSDB_NAMESPACE {
namespace port {
namespace xpress {
// Helpers
namespace {
auto CloseCompressorFun = [](void* h) {
if (NULL != h) {
::CloseCompressor(reinterpret_cast<COMPRESSOR_HANDLE>(h));
}
};
auto CloseDecompressorFun = [](void* h) {
if (NULL != h) {
::CloseDecompressor(reinterpret_cast<DECOMPRESSOR_HANDLE>(h));
}
};
} // namespace
bool Compress(const char* input, size_t length, std::string* output) {
assert(input != nullptr);
assert(output != nullptr);
if (length == 0) {
output->clear();
return true;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
COMPRESSOR_HANDLE compressor = NULL;
BOOL success =
CreateCompressor(COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&compressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Compressor LastError: "
<< GetLastError() << std::endl;
#endif
return false;
}
std::unique_ptr<void, decltype(CloseCompressorFun)> compressorGuard(
compressor, CloseCompressorFun);
SIZE_T compressedBufferSize = 0;
// Query compressed buffer size.
success = ::Compress(compressor, // Compressor Handle
const_cast<char*>(input), // Input buffer
length, // Uncompressed data size
NULL, // Compressed Buffer
0, // Compressed Buffer size
&compressedBufferSize); // Compressed Data size
if (!success) {
auto lastError = GetLastError();
if (lastError != ERROR_INSUFFICIENT_BUFFER) {
#ifdef _DEBUG
std::cerr
<< "XPRESS: Failed to estimate compressed buffer size LastError "
<< lastError << std::endl;
#endif
return false;
}
}
assert(compressedBufferSize > 0);
std::string result;
result.resize(compressedBufferSize);
SIZE_T compressedDataSize = 0;
// Compress
success = ::Compress(compressor, // Compressor Handle
const_cast<char*>(input), // Input buffer
length, // Uncompressed data size
&result[0], // Compressed Buffer
compressedBufferSize, // Compressed Buffer size
&compressedDataSize); // Compressed Data size
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to compress LastError " << GetLastError()
<< std::endl;
#endif
return false;
}
result.resize(compressedDataSize);
output->swap(result);
return true;
}
size_t CompressWithMaxSize(const char* input, size_t length, char* output,
size_t max_output_size) {
assert(input != nullptr);
if (max_output_size == 0) {
return 0;
}
assert(output != nullptr);
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
COMPRESSOR_HANDLE compressor = NULL;
BOOL success =
CreateCompressor(COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&compressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Compressor LastError: "
<< GetLastError() << std::endl;
#endif
return 0;
}
std::unique_ptr<void, decltype(CloseCompressorFun)> compressorGuard(
compressor, CloseCompressorFun);
SIZE_T compressed_size = 0;
// Compress
success = ::Compress(compressor, // Compressor Handle
const_cast<char*>(input), // Input buffer
length, // Uncompressed data size
output, // Compressed Buffer
max_output_size, // Compressed Buffer size
&compressed_size); // Compressed Data size
if (!success) {
#ifdef _DEBUG
auto error = GetLastError();
if (error != ERROR_INSUFFICIENT_BUFFER) {
std::cerr << "XPRESS: Failed to compress LastError " << error
<< std::endl;
}
#endif
return 0;
} else {
return compressed_size;
}
}
char* Decompress(const char* input_data, size_t input_length,
size_t* uncompressed_size) {
assert(input_data != nullptr);
assert(uncompressed_size != nullptr);
if (input_length == 0) {
return nullptr;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
DECOMPRESSOR_HANDLE decompressor = NULL;
BOOL success =
CreateDecompressor(COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&decompressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Decompressor LastError "
<< GetLastError() << std::endl;
#endif
return nullptr;
}
std::unique_ptr<void, decltype(CloseDecompressorFun)> decompressorGuard(
decompressor, CloseDecompressorFun);
SIZE_T decompressedBufferSize = 0;
success = ::Decompress(decompressor, // Compressor Handle
const_cast<char*>(input_data), // Compressed data
input_length, // Compressed data size
NULL, // Buffer set to NULL
0, // Buffer size set to 0
&decompressedBufferSize); // Decompressed Data size
if (!success) {
auto lastError = GetLastError();
if (lastError != ERROR_INSUFFICIENT_BUFFER) {
#ifdef _DEBUG
std::cerr
<< "XPRESS: Failed to estimate decompressed buffer size LastError "
<< lastError << std::endl;
#endif
return nullptr;
}
}
assert(decompressedBufferSize > 0);
// The callers are deallocating using delete[]
// thus we must allocate with new[]
std::unique_ptr<char[]> outputBuffer(new char[decompressedBufferSize]);
SIZE_T decompressedDataSize = 0;
success = ::Decompress(decompressor, const_cast<char*>(input_data),
input_length, outputBuffer.get(),
decompressedBufferSize, &decompressedDataSize);
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to decompress LastError " << GetLastError()
<< std::endl;
#endif
return nullptr;
}
*uncompressed_size = decompressedDataSize;
// Return the raw buffer to the caller supporting the tradition
return outputBuffer.release();
}
int64_t GetDecompressedSize(const char* input_data, size_t input_length) {
assert(input_data != nullptr);
if (input_length == 0) {
return 0;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
DECOMPRESSOR_HANDLE decompressor = NULL;
BOOL success =
CreateDecompressor(COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&decompressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Decompressor LastError "
<< GetLastError() << std::endl;
#endif
return -1;
}
std::unique_ptr<void, decltype(CloseDecompressorFun)> decompressorGuard(
decompressor, CloseDecompressorFun);
SIZE_T decompressedBufferSize = 0;
success = ::Decompress(decompressor, // Compressor Handle
const_cast<char*>(input_data), // Compressed data
input_length, // Compressed data size
NULL, // Buffer set to NULL
0, // Buffer size set to 0
&decompressedBufferSize); // Decompressed Data size
assert(!success);
auto lastError = GetLastError();
if (lastError != ERROR_INSUFFICIENT_BUFFER) {
#ifdef _DEBUG
std::cerr
<< "XPRESS: Failed to estimate decompressed buffer size LastError "
<< lastError << std::endl;
#endif
return -1;
}
assert(decompressedBufferSize > 0);
return static_cast<int64_t>(decompressedBufferSize);
}
int64_t DecompressToBuffer(const char* input, size_t input_length, char* output,
size_t output_length) {
assert(input != nullptr);
assert(output != nullptr);
if (input_length == 0) {
return 0;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
DECOMPRESSOR_HANDLE decompressor = NULL;
BOOL success =
CreateDecompressor(COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&decompressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Decompressor LastError "
<< GetLastError() << std::endl;
#endif
return -1;
}
std::unique_ptr<void, decltype(CloseDecompressorFun)> decompressorGuard(
decompressor, CloseDecompressorFun);
SIZE_T decompressedDataSize = 0;
success = ::Decompress(decompressor, const_cast<char*>(input), input_length,
output, output_length, &decompressedDataSize);
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to decompress LastError " << GetLastError()
<< std::endl;
#endif
return -1;
}
return static_cast<int64_t>(decompressedDataSize);
}
} // namespace xpress
} // namespace port
} // namespace ROCKSDB_NAMESPACE
#endif
#endif