forked from continuwuation/rocksdb
Summary: Complete redo of parallel compression in block_based_table_builder.cc to greatly reduce cross-thread hand-off and blocking. A ring buffer of blocks-in-progress is used to essentially bound working memory while enabling high throughput. Unlike before, all threads can participate in compression work, for a kind of work-stealing algorithm that reduces the need for threads to block. This builds on improvements in https://github.com/facebook/rocksdb/pull/13850 Previously, there was either * parallel_threads==1, the *emit thread* (caller from flush/compaction) doing all the work * parallel_threads > 1, the emit thread generates uncompressed blocks, `parallel_threads` worker threads compress blocks, and a writer thread writes to the SST file. Total of `parallel_threads + 2` threads participating. (Other bookkeeping in emit and write steps omitted from description for simplicity.) Now we have either * parallel_threads==1 (same), the emit thread doing all the work * parallel_threads > 1, the emit thread generates uncompressed blocks and can take up compression work when the ring buffer is full; `parallel_threads` worker threads have as their top priority to write compressed blocks to the SST file but also take up compression work in priority order of next-to-write. Total of `parallel_threads + 1` threads participating. In some cases, this could result in less throughput than before, but arguably the previous implementation was using more threads than explicitly allowed. ## Future/alternate considerations Although we could likely have used some framework for micro-work sharing across threads, that could be difficult with the asymmetry of work loads and thread affinity. Specifically, (a) it would be quite challenging to allow emit work in other threads, because it happens in the caller of BlockBasedTableBuilder, (b) async programming is unlikely to pay off until we have an async interface for writing SST files, and (c) this implementation will nevertheless serve as a benchmark for what we lose or gain in such a framework vs. a hand-tuned system. This implementation still creates and destroys threads for each SST file created. We hope in the future to have more governance and/or pooling of worker threads across various flushes and compactions, but that is not available currently and would require significant design and implementation work. ## More details * This implementation makes use of semaphores for idling and re-waking threads. `std::counting_semaphore` and `binary_semaphore` offer the best performance (see benchmark results below) but some implementations are known to have correctness bugs. Also, my attempt at upgrading CI for C++20 support (required for these) in https://github.com/facebook/rocksdb/pull/13904 is actually incomplete. Therefore, using these structures is opt-in with `-DROCKSDB_USE_STD_SEMAPHORES` at compile time, and a naive semaphore implementation based on mutex and condvar is used by default. A folly alternative (folly::fibers::Semaphore) was dropped in during development and found to be less efficient than the naive implementation. One CI job is upgraded to test with the new opt-in. * One of the biggest concerns about correctness/reliability for this implementation is the possibility of hitting a deadlock, in part because that is not well checked in the DB crash test (a challenging problem!). Note also that with the parallel compression improvements in this release, I am calling the feature production-ready, so there is an extra level of confidence needed in the reliability of the feature. Thus, for DEBUG builds including crash test, I have added a watchdog thread to each parallel SST construction that heuristically checks for the most likely kinds of deadlock that could happen, including for the case of buggy semaphore implementations. It periodically verifies that some thread is outside of its "idle" state, and if the watchdog wakes up repeatedly to see all live threads stuck in their idle state (even if wake-up was attempted) then it declares a deadlock. This feature was manually verified for several seeded deadlock bugs. (More details in code comments.) * For CPU efficiency, this implementation greatly simplifies the logic to estimate the outstanding or "inflight" size not yet written to the SST file. I expect this size to generally be insignificant relative to the full SST file size so is not worth careful engineering. And based on Meta's current needs, landing under-size for an SST file is better than over-size. See comments on `estimated_inflight_size` for details. * Some other existing atomics in block_based_table_builder.cc modified to use safe atomic wrappers. * Status handling in BlockBasedTableBuilder was streamlined to get rid of essentially redundant `status`+`io_status` fields and associated code. Made small optimizations to reduce unnecessary IOStatus copies (with StatusOk()) and mark status conditional branches as LIKELY or UNLIKELY. * Prefer inline field initialization to initialization in constructor. * Minimize references to the `parallel_threads` configuration parameter for better separation of concerns / sanitization / etc. For example, use non-nullity of `pc_rep` to indicate that parallel compression is enabled (and active). * Some other refactoring to aid the new implementation. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13910 Test Plan: ## Correctness Already integrated into unit tests and crash test. CI updated for opt-in semaphore implementation. Basic semaphore unit tests added/updated. As for the tremendous simplification of logic relating to hitting target SST file size, as expected, the new behavior could under-shoot the single-threaded behavior by a small number of blocks, which will typically affect the file size by ~1/1000th or less. I think that's a good trade-off for cutting out unnecessarily complex code with non-trivial CPU cost (FileSizeEstimator). ``` ./db_bench -db=/dev/shm/dbbench_filesize_after8 -benchmarks=fillseq,compact -num=10000000 -compression_type=zstd -compression_level=8 -compression_parallel_threads=8 ``` Before, PT=8 & PT=1, and After PT=1 the same or very similar ``` -rw-r--r-- 1 peterd users 67474097 Sep 12 15:32 000052.sst -rw-r--r-- 1 peterd users 67474214 Sep 12 15:32 000053.sst -rw-r--r-- 1 peterd users 67473834 Sep 12 15:32 000054.sst -rw-r--r-- 1 peterd users 67473437 Sep 12 15:32 000055.sst -rw-r--r-- 1 peterd users 67473835 Sep 12 15:32 000056.sst -rw-r--r-- 1 peterd users 67473204 Sep 12 15:33 000057.sst -rw-r--r-- 1 peterd users 67473294 Sep 12 15:33 000058.sst -rw-r--r-- 1 peterd users 67473839 Sep 12 15:33 000059.sst ``` After, PT=8 (worst case here ~0.05% smaller) ``` -rw-r--r-- 1 peterd users 67463189 Sep 12 14:55 000052.sst -rw-r--r-- 1 peterd users 67465233 Sep 12 14:55 000053.sst -rw-r--r-- 1 peterd users 67466822 Sep 12 14:55 000054.sst -rw-r--r-- 1 peterd users 67466221 Sep 12 14:55 000055.sst -rw-r--r-- 1 peterd users 67441675 Sep 12 14:55 000056.sst -rw-r--r-- 1 peterd users 67467855 Sep 12 14:55 000057.sst -rw-r--r-- 1 peterd users 67455132 Sep 12 14:55 000058.sst -rw-r--r-- 1 peterd users 67458334 Sep 12 14:55 000059.sst ``` ## Performance, modest load We are primarily interested in balancing throughput in building SST files and CPU usage in doing so. (For example, we could maximize throughput by having worker threads only spin waiting for work, but that would likely be extra CPU usage we want to avoid to allow other productive CPU work to be scheduled.) No read path code has been touched. A benchmark script running "before" and "after" configurations at the same time to minimize random machine load effects: ``` $ SUFFIX=`tty | sed 's|/|_|g'`; for CT in none lz4 zstd; do for PT in 1 2 3 4 6 8; do echo -n "$CT pt=$PT -> "; (for I in `seq 1 10`; do BIN=/tmp/dbbench${SUFFIX}.bin; rm -f $BIN; cp db_bench $BIN; /usr/bin/time $BIN -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -format_version=7 -compression_type=$CT -compression_parallel_threads=$PT 2>&1; done) | awk '/micros.op/ {n++; sum += $5;} /system / { cpu += $1 + $2; } END { print "ops/s: " int(sum/n) " cpu*s: " cpu; }'; done; done ``` Before this change: ``` none pt=1 -> ops/s: 1999603 cpu*s: 72.08 none pt=2 -> ops/s: 1871094 cpu*s: 148.3 none pt=3 -> ops/s: 1882907 cpu*s: 147.7 lz4 pt=1 -> ops/s: 1987858 cpu*s: 94.74 lz4 pt=2 -> ops/s: 1590192 cpu*s: 182.65 lz4 pt=3 -> ops/s: 1896294 cpu*s: 174.7 lz4 pt=4 -> ops/s: 1949174 cpu*s: 172.26 lz4 pt=6 -> ops/s: 1912517 cpu*s: 175.91 lz4 pt=8 -> ops/s: 1930585 cpu*s: 176.71 zstd pt=1 -> ops/s: 1239379 cpu*s: 129.85 zstd pt=2 -> ops/s: 1171742 cpu*s: 226.12 zstd pt=3 -> ops/s: 1832574 cpu*s: 214.21 zstd pt=4 -> ops/s: 1887124 cpu*s: 212.51 zstd pt=6 -> ops/s: 1920936 cpu*s: 211.7 zstd pt=8 -> ops/s: 1885544 cpu*s: 214.87 ``` After this change: ``` none pt=1 -> ops/s: 1964361 cpu*s: 72.66 none pt=2 -> ops/s: 1914033 cpu*s: 104.95 none pt=3 -> ops/s: 1978567 cpu*s: 100.24 lz4 pt=1 -> ops/s: 2041703 cpu*s: 92.88 lz4 pt=2 -> ops/s: 1903210 cpu*s: 121.64 lz4 pt=3 -> ops/s: 1973906 cpu*s: 122.22 lz4 pt=4 -> ops/s: 1952605 cpu*s: 123.05 lz4 pt=6 -> ops/s: 1957524 cpu*s: 124.31 lz4 pt=8 -> ops/s: 1986274 cpu*s: 129.06 zstd pt=1 -> ops/s: 1233748 cpu*s: 130.43 zstd pt=2 -> ops/s: 1675226 cpu*s: 158.41 zstd pt=3 -> ops/s: 1929878 cpu*s: 159.77 zstd pt=4 -> ops/s: 1916403 cpu*s: 160.99 zstd pt=6 -> ops/s: 1942526 cpu*s: 166.21 zstd pt=8 -> ops/s: 1966704 cpu*s: 171.56 ``` For parallel_threads=1, results are very similar, as expected. For parallel_threads>1, throughput is usually improved a bit, but cpu consumption is dramatically reduced. For zstd, maximum throughput is essentially achieved with pt=3 rather than the previous roughly pt=4 to 6. And the old used about 30% more CPU. We can also compare with more expensive compression by raising the compression level. ``` SUFFIX=`tty | sed 's|/|_|g'`; CT=zstd; for CL in 4 6 8; do for PT in 1 4 8; do echo -n "$CT@$CL pt=$PT -> "; (for I in `seq 1 10`; do BIN=/tmp/dbbench${SUFFIX}.bin; rm -f $BIN; cp db_bench $BIN; /usr/bin/time $BIN -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 -format_version=7 -compression_type=$CT -compression_parallel_threads=$PT -compression_level=$CL 2>&1; done) | awk '/micros.op/ {n++; sum += $5;} /system / { cpu += $1 + $2; } END { print "ops/s: " int(sum/n) " cpu*s: " cpu; }'; done; done ``` Before: ``` zstd@4 pt=1 -> ops/s: 883630 cpu*s: 161.12 zstd@4 pt=4 -> ops/s: 1878206 cpu*s: 243.25 zstd@4 pt=8 -> ops/s: 1885002 cpu*s: 245.89 zstd@6 pt=1 -> ops/s: 710767 cpu*s: 189.44 zstd@6 pt=4 -> ops/s: 1706377 cpu*s: 277.29 zstd@6 pt=8 -> ops/s: 1866736 cpu*s: 275.07 zstd@8 pt=1 -> ops/s: 529047 cpu*s: 237.87 zstd@8 pt=4 -> ops/s: 1401379 cpu*s: 330.61 zstd@8 pt=8 -> ops/s: 1895601 cpu*s: 321.59 ``` After: ``` zstd@4 pt=1 -> ops/s: 889905 cpu*s: 161.03 zstd@4 pt=4 -> ops/s: 1942240 cpu*s: 193.18 zstd@4 pt=8 -> ops/s: 1922367 cpu*s: 205.21 zstd@6 pt=1 -> ops/s: 713870 cpu*s: 188.91 zstd@6 pt=4 -> ops/s: 1832314 cpu*s: 219.66 zstd@6 pt=8 -> ops/s: 1949631 cpu*s: 229.34 zstd@8 pt=1 -> ops/s: 530324 cpu*s: 238.02 zstd@8 pt=4 -> ops/s: 1479767 cpu*s: 271.65 zstd@8 pt=8 -> ops/s: 1949631 cpu*s: 275.6 ``` And we can also look at the cumulative effect of this change and https://github.com/facebook/rocksdb/pull/13850 that will combine for the parallel compression improvements in the upcoming 10.7 release: Before both: ``` lz4 pt=1 -> ops/s: 1954445 cpu*s: 95.14 lz4 pt=3 -> ops/s: 1687043 cpu*s: 186.62 lz4 pt=5 -> ops/s: 1708196 cpu*s: 188.33 zstd pt=1 -> ops/s: 1220649 cpu*s: 131.2 zstd pt=3 -> ops/s: 1658100 cpu*s: 227.08 zstd pt=5 -> ops/s: 1685074 cpu*s: 226.08 ``` After: ``` lz4 pt=1 -> ops/s: 2048214 cpu*s: 93.24 lz4 pt=3 -> ops/s: 1922049 cpu*s: 122.9 lz4 pt=5 -> ops/s: 1980165 cpu*s: 122.49 zstd pt=1 -> ops/s: 1245165 cpu*s: 128.84 zstd pt=3 -> ops/s: 1956961 cpu*s: 158.73 zstd pt=5 -> ops/s: 1970458 cpu*s: 161.02 ``` In summary, before with zstd default level, you could see only * about 38% increase in throughput for about 73% increase in CPU usage Now you can get * about 58% increase in throughput for about 25% increase in CPU usage ## Performance, high load To validate this for usage on remote compaction workers, we also need to test whether it falls over at high load or anything concerning like that. For this I did a lot of testing with concurrent db_bench and zstd compression_level=8 and parallel_thread (PT) in {1,8} trying to observe "bad" behaviors such as stalls due to preempted threads and such. On a 166 core machine where a "job" is a db_bench process running a fillseq benchmark similar to above in parallel with others, I could summarize the results like this: 10 jobs PT=8 vs. PT=1 -> 12% more CPU usage, 75% reduction in wall time, 1.9 jobs/sec (vs. 0.5) 50 jobs PT=8 vs. PT=1 -> 89% more CPU usage, 27% reduction in wall time, 3.1 jobs/sec (vs. 2.3) 100 jobs PT=8 vs. PT=1 -> 24% more CPU usage, 5% reduction in wall time, 3.25 jobs/sec (vs. 3.1) 150 jobs PT=8 vs. PT=1 -> 4% more CPU usage, 2% increase in wall time, 3.3 jobs/sec (vs. 3.4) 500 jobs PT=8 vs. PT=1 -> 1% more CPU usage, insignificant difference in wall time, 3.3 jobs/sec Even when there are 4000 threads potentially competing for 166 cores, the throughput (3.3 jobs / sec) is still very close to maximum (3.4). Enabling parallel compression didn't result in notably less throughput (based on wall clock time for all jobs to complete) in any case tested above, and much higher throughput for many cases. If parallel compression causes us to tip from comfortably under-saturating to over-saturating the cores (as in the 50 jobs case), the overall CPU usage can be much higher, presumably due to lower CPU cache hit rates and maybe clock throttling, but parallel compression still has the throughput advantage in those cases. In other words, what would we stand to gain from being able to intelligently share worker threads between compaction jobs? It doesn't seem that much. Reviewed By: xingbowang Differential Revision: D81365623 Pulled By: pdillinger fbshipit-source-id: 5db5151a959b5d25b84dbe185bc208bd188f2d1c
164 lines
4.8 KiB
C++
164 lines
4.8 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#include <cassert>
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
#include <semaphore>
|
|
#endif
|
|
|
|
#include "port/port.h"
|
|
#include "rocksdb/rocksdb_namespace.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Wrapper providing a chosen counting semaphore implementation. The default
|
|
// implementation based on a mutex and condvar unfortunately can result in
|
|
// Release() temporarily waiting on another thread to make progress (if that
|
|
// other thread is preempted while holding the mutex), but that should be rare.
|
|
// However, alternative implementations may have correctness issues or even
|
|
// worse performance. See std::counting_semaphore for general contract.
|
|
//
|
|
// NOTE1: std::counting_semaphore is known to be buggy on many std library
|
|
// implementations, so be cautious about enabling it. Reportedly, an acquire()
|
|
// can falsely block indefinitely. And we can't easily work around that with
|
|
// try_acquire_for because another common bug has that function consistently
|
|
// sleeping for the entire timeout duration even if a release() happens earlier.
|
|
// Therefore, using std::counting_semaphore/binary_semaphore is strictly opt-in
|
|
// for now.
|
|
//
|
|
// NOTE2: Also tried wrapping folly::fibers::Semaphore here but it was not as
|
|
// efficient (for parallel compression) as even the mutex+condvar version.
|
|
class ALIGN_AS(CACHE_LINE_SIZE) CountingSemaphore {
|
|
public:
|
|
explicit CountingSemaphore(std::ptrdiff_t starting_count)
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
: sem_(starting_count)
|
|
#else
|
|
: count_(static_cast<int32_t>(starting_count))
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
{
|
|
assert(starting_count >= 0);
|
|
assert(starting_count <= INT32_MAX);
|
|
}
|
|
void Acquire() {
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
sem_.acquire();
|
|
#else
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
assert(count_ >= 0);
|
|
cv_.wait(lock, [this] { return count_ > 0; });
|
|
--count_;
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
bool TryAcquire() {
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
return sem_.try_acquire();
|
|
#else
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
assert(count_ >= 0);
|
|
if (count_ == 0) {
|
|
return false;
|
|
} else {
|
|
--count_;
|
|
return true;
|
|
}
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
void Release(std::ptrdiff_t n = 1) {
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
sem_.release(n);
|
|
#else
|
|
assert(n >= 0);
|
|
assert(n <= INT32_MAX);
|
|
if (n > 0) {
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
assert(count_ >= 0);
|
|
count_ += static_cast<int32_t>(n);
|
|
assert(count_ >= 0); // no overflow
|
|
if (n == 1) {
|
|
cv_.notify_one();
|
|
} else {
|
|
cv_.notify_all();
|
|
}
|
|
}
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
|
|
private:
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
std::counting_semaphore<INT32_MAX> sem_;
|
|
#else
|
|
int32_t count_;
|
|
std::mutex mutex_;
|
|
std::condition_variable cv_;
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}; // namespace ROCKSDB_NAMESPACE
|
|
|
|
// Wrapper providing a chosen binary semaphore implementation. See notes on
|
|
// CountingSemaphore above, and on Release() below.
|
|
class BinarySemaphore {
|
|
public:
|
|
explicit BinarySemaphore(std::ptrdiff_t starting_count)
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
: sem_(starting_count)
|
|
#else
|
|
: state_(starting_count > 0)
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
{
|
|
assert(starting_count >= 0);
|
|
}
|
|
void Acquire() {
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
sem_.acquire();
|
|
#else
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
cv_.wait(lock, [this] { return state_; });
|
|
state_ = false;
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
bool TryAcquire() {
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
return sem_.try_acquire();
|
|
#else
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
if (state_) {
|
|
state_ = false;
|
|
return true;
|
|
} else {
|
|
return false;
|
|
}
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
void Release() {
|
|
// NOTE: implementations of std::binary_semaphore::release() tend to behave
|
|
// like counting semaphores in the case of multiple Release() calls without
|
|
// Acquire() in between, though it is undefined behavior. It is also OK to
|
|
// cap the count at 1.
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
sem_.release();
|
|
#else
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
// check precondition to avoid UB in std implementation
|
|
assert(state_ == false);
|
|
state_ = true;
|
|
cv_.notify_one();
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
}
|
|
|
|
private:
|
|
#ifdef ROCKSDB_USE_STD_SEMAPHORES
|
|
std::binary_semaphore sem_;
|
|
#else
|
|
bool state_;
|
|
std::mutex mutex_;
|
|
std::condition_variable cv_;
|
|
#endif // ROCKSDB_USE_STD_SEMAPHORES
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|