Summary: and remove deprecated DB::MaxMemCompactionLevel(). In the process of pushing through a relatively clean refactoring of uses of the old functions, some other minor public APIs are also migrated from raw DB pointers to unique_ptr. Claude did pretty much all the work, but requiring dozens of prompts to actually push through relatively clean phase out of raw DB pointers from what needed to be touched, and leaving that code in better shape. (Hundreds of `DB*` still remain all over the place even outside C and Java bindings.) Pull Request resolved: https://github.com/facebook/rocksdb/pull/14335 Test Plan: existing tests; no functional changes intended Reviewed By: xingbowang, mszeszko-meta Differential Revision: D93523820 Pulled By: pdillinger fbshipit-source-id: e4ca22ad81cd2cfe91122d7507d7ca34fe03d043
377 lines
12 KiB
C++
377 lines
12 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#if !defined(GFLAGS)
|
|
#include <cstdio>
|
|
int main() {
|
|
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
|
|
return 1;
|
|
}
|
|
#elif defined(OS_MACOSX) || defined(OS_WIN)
|
|
// Block forward_iterator_bench under MAC and Windows
|
|
int main() { return 0; }
|
|
#else
|
|
#include <semaphore.h>
|
|
|
|
#include <atomic>
|
|
#include <bitset>
|
|
#include <chrono>
|
|
#include <climits>
|
|
#include <condition_variable>
|
|
#include <limits>
|
|
#include <mutex>
|
|
#include <queue>
|
|
#include <random>
|
|
#include <thread>
|
|
|
|
#include "port/port.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/table.h"
|
|
#include "test_util/testharness.h"
|
|
#include "util/gflags_compat.h"
|
|
|
|
const int MAX_SHARDS = 100000;
|
|
|
|
DEFINE_int32(writers, 8, "");
|
|
DEFINE_int32(readers, 8, "");
|
|
DEFINE_int64(rate, 100000, "");
|
|
DEFINE_int64(value_size, 300, "");
|
|
DEFINE_int64(shards, 1000, "");
|
|
DEFINE_int64(memtable_size, 500000000, "");
|
|
DEFINE_int64(block_cache_size, 300000000, "");
|
|
DEFINE_int64(block_size, 65536, "");
|
|
DEFINE_double(runtime, 300.0, "");
|
|
DEFINE_bool(cache_only_first, true, "");
|
|
DEFINE_bool(iterate_upper_bound, true, "");
|
|
|
|
struct Stats {
|
|
char pad1[128] __attribute__((__unused__));
|
|
std::atomic<uint64_t> written{0};
|
|
char pad2[128] __attribute__((__unused__));
|
|
std::atomic<uint64_t> read{0};
|
|
std::atomic<uint64_t> cache_misses{0};
|
|
char pad3[128] __attribute__((__unused__));
|
|
} stats;
|
|
|
|
struct Key {
|
|
Key() {}
|
|
Key(uint64_t shard_in, uint64_t seqno_in)
|
|
: shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
|
|
|
|
uint64_t shard() const { return be64toh(shard_be); }
|
|
uint64_t seqno() const { return be64toh(seqno_be); }
|
|
|
|
private:
|
|
uint64_t shard_be;
|
|
uint64_t seqno_be;
|
|
} __attribute__((__packed__));
|
|
|
|
struct Reader;
|
|
struct Writer;
|
|
|
|
struct ShardState {
|
|
char pad1[128] __attribute__((__unused__));
|
|
std::atomic<uint64_t> last_written{0};
|
|
Writer* writer;
|
|
Reader* reader;
|
|
char pad2[128] __attribute__((__unused__));
|
|
std::atomic<uint64_t> last_read{0};
|
|
std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it;
|
|
std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it_cacheonly;
|
|
Key upper_bound;
|
|
ROCKSDB_NAMESPACE::Slice upper_bound_slice;
|
|
char pad3[128] __attribute__((__unused__));
|
|
};
|
|
|
|
struct Reader {
|
|
public:
|
|
explicit Reader(std::vector<ShardState>* shard_states,
|
|
ROCKSDB_NAMESPACE::DB* db)
|
|
: shard_states_(shard_states), db_(db) {
|
|
sem_init(&sem_, 0, 0);
|
|
thread_ = port::Thread(&Reader::run, this);
|
|
}
|
|
|
|
void run() {
|
|
while (1) {
|
|
sem_wait(&sem_);
|
|
if (done_.load()) {
|
|
break;
|
|
}
|
|
|
|
uint64_t shard;
|
|
{
|
|
std::lock_guard<std::mutex> guard(queue_mutex_);
|
|
assert(!shards_pending_queue_.empty());
|
|
shard = shards_pending_queue_.front();
|
|
shards_pending_queue_.pop();
|
|
shards_pending_set_.reset(shard);
|
|
}
|
|
readOnceFromShard(shard);
|
|
}
|
|
}
|
|
|
|
void readOnceFromShard(uint64_t shard) {
|
|
ShardState& state = (*shard_states_)[shard];
|
|
if (!state.it) {
|
|
// Initialize iterators
|
|
ROCKSDB_NAMESPACE::ReadOptions options;
|
|
options.tailing = true;
|
|
if (FLAGS_iterate_upper_bound) {
|
|
state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
|
|
state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice(
|
|
(const char*)&state.upper_bound, sizeof(state.upper_bound));
|
|
options.iterate_upper_bound = &state.upper_bound_slice;
|
|
}
|
|
|
|
state.it.reset(db_->NewIterator(options));
|
|
|
|
if (FLAGS_cache_only_first) {
|
|
options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier;
|
|
state.it_cacheonly.reset(db_->NewIterator(options));
|
|
}
|
|
}
|
|
|
|
const uint64_t upto = state.last_written.load();
|
|
for (ROCKSDB_NAMESPACE::Iterator* it :
|
|
{state.it_cacheonly.get(), state.it.get()}) {
|
|
if (it == nullptr) {
|
|
continue;
|
|
}
|
|
if (state.last_read.load() >= upto) {
|
|
break;
|
|
}
|
|
bool need_seek = true;
|
|
for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
|
|
if (need_seek) {
|
|
Key from(shard, state.last_read.load() + 1);
|
|
it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from)));
|
|
need_seek = false;
|
|
} else {
|
|
it->Next();
|
|
}
|
|
if (it->status().IsIncomplete()) {
|
|
++::stats.cache_misses;
|
|
break;
|
|
}
|
|
assert(it->Valid());
|
|
assert(it->key().size() == sizeof(Key));
|
|
Key key;
|
|
memcpy(&key, it->key().data(), it->key().size());
|
|
// fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
|
|
// shard, seq, key.shard(), key.seqno());
|
|
assert(key.shard() == shard);
|
|
assert(key.seqno() == seq);
|
|
state.last_read.store(seq);
|
|
++::stats.read;
|
|
}
|
|
}
|
|
}
|
|
|
|
void onWrite(uint64_t shard) {
|
|
{
|
|
std::lock_guard<std::mutex> guard(queue_mutex_);
|
|
if (!shards_pending_set_.test(shard)) {
|
|
shards_pending_queue_.push(shard);
|
|
shards_pending_set_.set(shard);
|
|
sem_post(&sem_);
|
|
}
|
|
}
|
|
}
|
|
|
|
~Reader() {
|
|
done_.store(true);
|
|
sem_post(&sem_);
|
|
thread_.join();
|
|
}
|
|
|
|
private:
|
|
char pad1[128] __attribute__((__unused__));
|
|
std::vector<ShardState>* shard_states_;
|
|
ROCKSDB_NAMESPACE::DB* db_;
|
|
ROCKSDB_NAMESPACE::port::Thread thread_;
|
|
sem_t sem_;
|
|
std::mutex queue_mutex_;
|
|
std::bitset<MAX_SHARDS + 1> shards_pending_set_;
|
|
std::queue<uint64_t> shards_pending_queue_;
|
|
std::atomic<bool> done_{false};
|
|
char pad2[128] __attribute__((__unused__));
|
|
};
|
|
|
|
struct Writer {
|
|
explicit Writer(std::vector<ShardState>* shard_states,
|
|
ROCKSDB_NAMESPACE::DB* db)
|
|
: shard_states_(shard_states), db_(db) {}
|
|
|
|
void start() { thread_ = port::Thread(&Writer::run, this); }
|
|
|
|
void run() {
|
|
std::queue<std::chrono::steady_clock::time_point> workq;
|
|
std::chrono::steady_clock::time_point deadline(
|
|
std::chrono::steady_clock::now() +
|
|
std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
|
|
std::vector<uint64_t> my_shards;
|
|
for (int i = 1; i <= FLAGS_shards; ++i) {
|
|
if ((*shard_states_)[i].writer == this) {
|
|
my_shards.push_back(i);
|
|
}
|
|
}
|
|
|
|
std::mt19937 rng{std::random_device()()};
|
|
std::uniform_int_distribution<int> shard_dist(
|
|
0, static_cast<int>(my_shards.size()) - 1);
|
|
std::string value(FLAGS_value_size, '*');
|
|
|
|
while (1) {
|
|
auto now = std::chrono::steady_clock::now();
|
|
if (FLAGS_runtime >= 0 && now >= deadline) {
|
|
break;
|
|
}
|
|
if (workq.empty()) {
|
|
for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
|
|
std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
|
|
workq.push(now + offset);
|
|
}
|
|
}
|
|
while (!workq.empty() && workq.front() < now) {
|
|
workq.pop();
|
|
uint64_t shard = my_shards[shard_dist(rng)];
|
|
ShardState& state = (*shard_states_)[shard];
|
|
uint64_t seqno = state.last_written.load() + 1;
|
|
Key key(shard, seqno);
|
|
// fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
|
|
ROCKSDB_NAMESPACE::Status status =
|
|
db_->Put(ROCKSDB_NAMESPACE::WriteOptions(),
|
|
ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)),
|
|
ROCKSDB_NAMESPACE::Slice(value));
|
|
assert(status.ok());
|
|
state.last_written.store(seqno);
|
|
state.reader->onWrite(shard);
|
|
++::stats.written;
|
|
}
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
}
|
|
// fprintf(stderr, "Writer done\n");
|
|
}
|
|
|
|
~Writer() { thread_.join(); }
|
|
|
|
private:
|
|
char pad1[128] __attribute__((__unused__));
|
|
std::vector<ShardState>* shard_states_;
|
|
ROCKSDB_NAMESPACE::DB* db_;
|
|
ROCKSDB_NAMESPACE::port::Thread thread_;
|
|
char pad2[128] __attribute__((__unused__));
|
|
};
|
|
|
|
struct StatsThread {
|
|
explicit StatsThread(ROCKSDB_NAMESPACE::DB* db)
|
|
: db_(db), thread_(&StatsThread::run, this) {}
|
|
|
|
void run() {
|
|
auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
|
|
uint64_t wlast = 0, rlast = 0;
|
|
while (!done_.load()) {
|
|
{
|
|
std::unique_lock<std::mutex> lock(cvm_);
|
|
cv_.wait_for(lock, std::chrono::seconds(1));
|
|
}
|
|
auto now = std::chrono::steady_clock::now();
|
|
double elapsed =
|
|
std::chrono::duration_cast<std::chrono::duration<double> >(now -
|
|
tlast)
|
|
.count();
|
|
uint64_t w = ::stats.written.load();
|
|
uint64_t r = ::stats.read.load();
|
|
fprintf(stderr,
|
|
"%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
|
|
"r/s %10.0f | cache misses %10ld\n",
|
|
db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
|
|
std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
|
|
.count(),
|
|
w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
|
|
::stats.cache_misses.load());
|
|
wlast = w;
|
|
rlast = r;
|
|
tlast = now;
|
|
}
|
|
}
|
|
|
|
~StatsThread() {
|
|
{
|
|
std::lock_guard<std::mutex> guard(cvm_);
|
|
done_.store(true);
|
|
}
|
|
cv_.notify_all();
|
|
thread_.join();
|
|
}
|
|
|
|
private:
|
|
ROCKSDB_NAMESPACE::DB* db_;
|
|
std::mutex cvm_;
|
|
std::condition_variable cv_;
|
|
ROCKSDB_NAMESPACE::port::Thread thread_;
|
|
std::atomic<bool> done_{false};
|
|
};
|
|
|
|
int main(int argc, char** argv) {
|
|
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
|
|
|
|
std::mt19937 rng{std::random_device()()};
|
|
ROCKSDB_NAMESPACE::Status status;
|
|
std::string path =
|
|
ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test");
|
|
fprintf(stderr, "db path is %s\n", path.c_str());
|
|
ROCKSDB_NAMESPACE::Options options;
|
|
options.create_if_missing = true;
|
|
options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression;
|
|
options.compaction_style =
|
|
ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone;
|
|
options.level0_slowdown_writes_trigger = 99999;
|
|
options.level0_stop_writes_trigger = 99999;
|
|
options.use_direct_io_for_flush_and_compaction = true;
|
|
options.write_buffer_size = FLAGS_memtable_size;
|
|
ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
|
|
table_options.block_cache =
|
|
ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size);
|
|
table_options.block_size = FLAGS_block_size;
|
|
options.table_factory.reset(
|
|
ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options));
|
|
|
|
status = ROCKSDB_NAMESPACE::DestroyDB(path, options);
|
|
assert(status.ok());
|
|
std::unique_ptr<ROCKSDB_NAMESPACE::DB> db;
|
|
status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db);
|
|
assert(status.ok());
|
|
|
|
std::vector<ShardState> shard_states(FLAGS_shards + 1);
|
|
std::deque<Reader> readers;
|
|
while (static_cast<int>(readers.size()) < FLAGS_readers) {
|
|
readers.emplace_back(&shard_states, db.get());
|
|
}
|
|
std::deque<Writer> writers;
|
|
while (static_cast<int>(writers.size()) < FLAGS_writers) {
|
|
writers.emplace_back(&shard_states, db.get());
|
|
}
|
|
|
|
// Each shard gets a random reader and random writer assigned to it
|
|
for (int i = 1; i <= FLAGS_shards; ++i) {
|
|
std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
|
|
std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
|
|
shard_states[i].reader = &readers[reader_dist(rng)];
|
|
shard_states[i].writer = &writers[writer_dist(rng)];
|
|
}
|
|
|
|
StatsThread stats_thread(db.get());
|
|
for (Writer& w : writers) {
|
|
w.start();
|
|
}
|
|
|
|
writers.clear();
|
|
readers.clear();
|
|
}
|
|
#endif // !defined(GFLAGS)
|