Summary: While working on some compression refactoring, I noticed that `NotifyCollectTableCollectorsOnBlockAdd()` was being called from multiple threads (with `parallel_threads` > 1), meaning we were violating the promise that TablePropertiesCollectors need not be thread safe (and typically will not be, for efficiency). Fixing this is a bit awkward or intrusive. Even though it seems weird to expose `block_compressed_bytes_fast` and `block_compressed_bytes_fast` in the public `BlockAdd()` function, and NOT the actual compressed block size used, there are some Meta-internal uses that would at least require negotiation / coordination to deprecate and remove. So it's probably easiest to just keep the awkward functionality and do the necessary modifications to call from a single thread. The simplest solution that preserves the functionality with `parallel_threads` > 1 (provide the sampling data, expected ordering between `BlockAdd()` and `AddUserKey()`, no races) is to do the compression sampling in the thread building uncompressed blocks. Specifically, moving `NotifyCollectTableCollectorsOnBlockAdd()` and the compression sampling from `CompressAndVerifyBlock()`, which is called in parallel, to table builder `Flush()`, which is only called serially (per file). Even though this adds some compression to that single thread when sampling is enabled, that should be tolerable without complicating the code or regressing performance. Some related or nearby optimizations are included to ensure this. * Got rid of a lot of unnecessary indirection and unnecessary fields in BlockRep, which should be a step in improving parallel compression performance (still bad IMHO). * Restructured some `if`s etc. to streamline some logic This satisfies my original refactoring need to moving the sampling code higher up the stack from `CompressBlock()`, to set up some other upcoming refactorings. The other caller of `CompressBlock()` (legacy BlobDB) doesn't need it, and in fact is better off calling `CompressData()` directly because it does not appear to be dealing with the various "no compression" outcomes introduced by `CompressBlock()`. Eventual follow-up: * Performance data below shows how the overhead of parallel compression can make it slower, with available CPUs, compared to serial compression. This infrastructure should be re-designed/re-engineered to reduce thread creation, context switches, etc. Also, more of the processing such as checksumming could be parallelized. (Things dependent on the block location in the file, such as ChecksumModifierForContext and cache warming, cannot be parallelized.) Pull Request resolved: https://github.com/facebook/rocksdb/pull/13583 ThreadSanitizer: data race /data/users/peterd/rocksdb/./db_stress_tool/db_stress_table_properties_collector.h:36:5 in rocksdb::DbStressTablePropertiesCollector::BlockAdd(unsigned long, unsigned long, unsigned long) ``` Performance: ``` SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=none" "-compression_parallel_threads=1 -compression_type=snappy" "-compression_parallel_threads=4 -compression_type=snappy"; do echo $ARGS; (for I in `seq 1 100`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Average ops/s of 100 runs, running before & after at the same time, using clang DEBUG_LEVEL=0: -compression_parallel_threads=1 -compression_type=none Before: 1976319 After: 1983840 (+0.3%) -compression_parallel_threads=1 -compression_type=snappy Before: 1945576 After: 1953473 (+0.4%) -compression_parallel_threads=4 -compression_type=snappy Before: 1573190 After: 1611881 (+2.4%) -compression_parallel_threads=4 -sample_for_compression=100 (pretty high sample rate) Before: 1577167 After: 1589704 (+0.8%) -compression_parallel_threads=4 -sample_for_compression=10 (crazy high sample rate) Before: 1581276 After: 1393453 (-11.9%) As seen, you need a very very high compression sample rate to see a regression. I would expect a setting like 1000 to be more typical. Test Plan: Along with existing unit tests + CI, expanded crash test to make its TablePropertiesCollector non-trivial, to exercise the bug (and other potential bugs), which was confirmed with local run of whitebox_crash_test with TSAN: ``` Reviewed By: hx235 Differential Revision: D73944593 Pulled By: pdillinger fbshipit-source-id: f1dcba4ebdc01e735251037395003945c9b34e62
90 lines
2.9 KiB
C++
90 lines
2.9 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#include "rocksdb/table.h"
|
|
#include "util/gflags_compat.h"
|
|
#include "util/random.h"
|
|
|
|
DECLARE_int32(mark_for_compaction_one_file_in);
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// A `DbStressTablePropertiesCollector` ignores what keys/values were added to
|
|
// the table, adds no properties to the table, and decides at random whether the
|
|
// table will be marked for compaction according to
|
|
// `FLAGS_mark_for_compaction_one_file_in`.
|
|
class DbStressTablePropertiesCollector : public TablePropertiesCollector {
|
|
public:
|
|
DbStressTablePropertiesCollector()
|
|
: need_compact_(Random::GetTLSInstance()->OneInOpt(
|
|
FLAGS_mark_for_compaction_one_file_in)) {}
|
|
|
|
Status AddUserKey(const Slice& /* key */, const Slice& /* value */,
|
|
EntryType /*type*/, SequenceNumber /*seq*/,
|
|
uint64_t /*file_size*/) override {
|
|
++keys_added;
|
|
++all_calls;
|
|
return Status::OK();
|
|
}
|
|
|
|
void BlockAdd(uint64_t /* block_uncomp_bytes */,
|
|
uint64_t /* block_compressed_bytes_fast */,
|
|
uint64_t /* block_compressed_bytes_slow */) override {
|
|
++blocks_added;
|
|
++all_calls;
|
|
}
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
++all_calls;
|
|
(*properties)["db_stress_collector_property"] =
|
|
std::to_string(keys_added) + ";" + std::to_string(blocks_added) + ";" +
|
|
std::to_string(all_calls);
|
|
return Status::OK();
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
UserCollectedProperties props;
|
|
const_cast<DbStressTablePropertiesCollector*>(this)->Finish(&props);
|
|
return props;
|
|
}
|
|
|
|
const char* Name() const override {
|
|
return "DbStressTablePropertiesCollector";
|
|
}
|
|
|
|
bool NeedCompact() const override {
|
|
++all_calls;
|
|
return need_compact_;
|
|
}
|
|
|
|
private:
|
|
const bool need_compact_;
|
|
// These are tracked to detect race conditions that would arise from RocksDB
|
|
// invoking TablePropertiesCollector functions in an unsynchronized way, as
|
|
// TablePropertiesCollectors are allowed (encouraged) not to be thread safe.
|
|
size_t keys_added = 0;
|
|
size_t blocks_added = 0;
|
|
// Including race between BlockAdd and AddUserKey (etc.)
|
|
mutable size_t all_calls = 0;
|
|
};
|
|
|
|
// A `DbStressTablePropertiesCollectorFactory` creates
|
|
// `DbStressTablePropertiesCollectorFactory`s.
|
|
class DbStressTablePropertiesCollectorFactory
|
|
: public TablePropertiesCollectorFactory {
|
|
public:
|
|
TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
TablePropertiesCollectorFactory::Context /* context */) override {
|
|
return new DbStressTablePropertiesCollector();
|
|
}
|
|
|
|
const char* Name() const override {
|
|
return "DbStressTablePropertiesCollectorFactory";
|
|
}
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|