rocksdb/db/db_iterator_test.cc
Ryan Hancock b9951ded37 Introducing Prepare all iterators for LevelIterator (#14100)
Summary:
This diff introduces the async prepare of all iterators within a MultiScan. The current state has each iterator be prepared as its needed, and with this diff, we prepare all iterators during the prepare phase of the Level Iterator, this will allow more time for each IO to be dispatched and serviced, increasing the odds that a block is ready as the scan seeks to it.

Benchmark is prefilled using
```
KEYSIZE=64
VALUESIZE=512
NUMKEYS=5000000
SCAN_SIZE=100
DISTANCE=25000
NUM_SCANS=15
THREADS=1

./db_bench --db=$DB \
    --benchmarks="fillseq" \
    --write_buffer_size=5242880 \
    --max_write_buffer_number=4 \
    --target_file_size_base=5242880 \
    --disable_wal=1 --key_size=$KEYSIZE \
    --value_size=$VALUESIZE --num=$NUMKEYS --threads=32

}
```

And benchmark ran is
```
run() {
echo 1 | sudo tee /proc/sys/vm/drop_caches
./db_bench --db=$DB --use_existing_db=1 \
    --benchmarks=multiscan \
    --disable_auto_compactions=1 --seek_nexts=$SCAN_SIZE \
    --multiscan-use-async-io=1 \
    --multiscan-size=$NUM_SCANS --multiscan-stride=$DISTANCE \
    --key_size=$KEYSIZE --value_size=$VALUESIZE \
    --num=$NUMKEYS --threads=$THREADS --duration=60 --statistics
}
```

The benchmark uses large stride sides to ensure that two scans would touch separate files. We reduce the size of the block cache to increase likelyhood of reads (and simulate larger data sets)

**Branch:**

```
Integrated BlobDB: blob cache disabled
RocksDB:    version 10.8.0
Date:       Tue Nov 11 13:26:29 2025
CPU:        166 * AMD EPYC-Milan Processor
CPUCache:   512 KB
Keys:       64 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2746.6 MB (estimated)
FileSize:   1525.9 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
multiscan_stride = 25000
multiscan_size = 15
seek_nexts = 100
DB path: [/data/rocksdb/mydb]
multiscan    :     837.941 micros/op 1193 ops/sec 60.001 seconds 71605 operations; (multscans:71605)
```

**Baseline:**

```
Set seed to 1762898809121995 because --seed was 0
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
Integrated BlobDB: blob cache disabled
RocksDB:    version 10.9.0
Date:       Tue Nov 11 14:06:49 2025
CPU:        166 * AMD EPYC-Milan Processor
CPUCache:   512 KB
Keys:       64 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2746.6 MB (estimated)
FileSize:   1525.9 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
multiscan_stride = 25000
multiscan_size = 15
seek_nexts = 100
DB path: [/data/rocksdb/mydb]
multiscan    :    1129.916 micros/op 885 ops/sec 60.001 seconds 53102 operations; (multscans:53102)
```
Repeated for confirmation.

This introduces a ~20% improvement in latency and op/s.

Note: Benchmarks are single threaded as, when increasing thread count, we start seeing large amounts of overhead being induced by block cache contention, finally resulting in both baseline and branch becoming equal.

Further on network attached storage with high latency, the level iterator, preparing all iterators so a 20% improvement even at high thread counts.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14100

Reviewed By: anand1976

Differential Revision: D86913584

Pulled By: krhancoc

fbshipit-source-id: da9d0c890e25e392a33389ce6b80f9bfb84d3f85
2025-11-18 15:57:03 -08:00

5036 lines
159 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <functional>
#include <iomanip>
#include <iostream>
#include "db/arena_wrapped_db_iter.h"
#include "db/db_iter.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h"
#include "table/block_based/flush_block_policy_impl.h"
#include "util/random.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace ROCKSDB_NAMESPACE {
// A dumb ReadCallback which saying every key is committed.
class DummyReadCallback : public ReadCallback {
public:
DummyReadCallback() : ReadCallback(kMaxSequenceNumber) {}
bool IsVisibleFullCheck(SequenceNumber /*seq*/) override { return true; }
void SetSnapshot(SequenceNumber seq) { max_visible_seq_ = seq; }
};
class DBIteratorBaseTest : public DBTestBase {
public:
DBIteratorBaseTest()
: DBTestBase("db_iterator_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBIteratorBaseTest, APICallsWithPerfContext) {
// Set up the DB
Options options = CurrentOptions();
DestroyAndReopen(options);
Random rnd(301);
for (int i = 1; i <= 3; i++) {
ASSERT_OK(Put(std::to_string(i), std::to_string(i)));
}
// Setup iterator and PerfContext
Iterator* iter = db_->NewIterator(ReadOptions());
std::string key_str = std::to_string(2);
Slice key(key_str);
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
// Initial PerfContext counters
ASSERT_EQ(0, get_perf_context()->iter_seek_count);
ASSERT_EQ(0, get_perf_context()->iter_next_count);
ASSERT_EQ(0, get_perf_context()->iter_prev_count);
// Test Seek-related API calls PerfContext counter
iter->Seek(key);
iter->SeekToFirst();
iter->SeekToLast();
iter->SeekForPrev(key);
ASSERT_EQ(4, get_perf_context()->iter_seek_count);
ASSERT_EQ(0, get_perf_context()->iter_next_count);
ASSERT_EQ(0, get_perf_context()->iter_prev_count);
// Test Next() calls PerfContext counter
iter->Next();
ASSERT_EQ(4, get_perf_context()->iter_seek_count);
ASSERT_EQ(1, get_perf_context()->iter_next_count);
ASSERT_EQ(0, get_perf_context()->iter_prev_count);
// Test Prev() calls PerfContext counter
iter->Prev();
ASSERT_EQ(4, get_perf_context()->iter_seek_count);
ASSERT_EQ(1, get_perf_context()->iter_next_count);
ASSERT_EQ(1, get_perf_context()->iter_prev_count);
delete iter;
}
// Test param:
// bool: whether to pass read_callback to NewIterator().
class DBIteratorTest : public DBIteratorBaseTest,
public testing::WithParamInterface<bool> {
public:
DBIteratorTest() = default;
Iterator* NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family = nullptr) {
if (column_family == nullptr) {
column_family = db_->DefaultColumnFamily();
}
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto* cfd = cfh->cfd();
SequenceNumber seq = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: db_->GetLatestSequenceNumber();
bool use_read_callback = GetParam();
DummyReadCallback* read_callback = nullptr;
if (use_read_callback) {
read_callback = new DummyReadCallback();
read_callback->SetSnapshot(seq);
InstrumentedMutexLock lock(&mutex_);
read_callbacks_.push_back(
std::unique_ptr<DummyReadCallback>(read_callback));
}
DBImpl* db_impl = dbfull();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
return db_impl->NewIteratorImpl(read_options, cfh, super_version, seq,
read_callback);
}
private:
InstrumentedMutex mutex_;
std::vector<std::unique_ptr<DummyReadCallback>> read_callbacks_;
};
TEST_P(DBIteratorTest, IteratorProperty) {
// The test needs to be changed if kPersistedTier is supported in iterator.
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "1", "2"));
ASSERT_OK(Delete(1, "2"));
ReadOptions ropt;
ropt.pin_data = false;
{
std::unique_ptr<Iterator> iter(NewIterator(ropt, handles_[1]));
iter->SeekToFirst();
std::string prop_value;
ASSERT_NOK(iter->GetProperty("non_existing.value", &prop_value));
ASSERT_OK(iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("0", prop_value);
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-value-pinned", &prop_value));
ASSERT_EQ("0", prop_value);
ASSERT_OK(iter->GetProperty("rocksdb.iterator.internal-key", &prop_value));
ASSERT_EQ("1", prop_value);
iter->Next();
ASSERT_OK(iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("Iterator is not valid.", prop_value);
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-value-pinned", &prop_value));
ASSERT_EQ("Iterator is not valid.", prop_value);
// Get internal key at which the iteration stopped (tombstone in this case).
ASSERT_OK(iter->GetProperty("rocksdb.iterator.internal-key", &prop_value));
ASSERT_EQ("2", prop_value);
prop_value.clear();
ASSERT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop_value));
uint64_t write_time;
Slice prop_slice = prop_value;
ASSERT_TRUE(GetFixed64(&prop_slice, &write_time));
ASSERT_EQ(std::numeric_limits<uint64_t>::max(), write_time);
}
Close();
}
TEST_P(DBIteratorTest, PersistedTierOnIterator) {
// The test needs to be changed if kPersistedTier is supported in iterator.
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions ropt;
ropt.read_tier = kPersistedTier;
auto* iter = db_->NewIterator(ropt, handles_[1]);
ASSERT_TRUE(iter->status().IsNotSupported());
delete iter;
std::vector<Iterator*> iters;
ASSERT_TRUE(db_->NewIterators(ropt, {handles_[1]}, &iters).IsNotSupported());
Close();
}
TEST_P(DBIteratorTest, NonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
anon::OptionsOverride options_override;
options_override.full_block_cache = true;
Options options = CurrentOptions(options_override);
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
non_blocking_opts.read_tier = kBlockCacheTier;
CreateAndReopenWithCF({"pikachu"}, options);
// write one kv to the database.
ASSERT_OK(Put(1, "a", "b"));
// scan using non-blocking iterator. We should find it because
// it is in memtable.
Iterator* iter = NewIterator(non_blocking_opts, handles_[1]);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, 1);
delete iter;
// flush memtable to storage. Now, the key should not be in the
// memtable neither in the block cache.
ASSERT_OK(Flush(1));
// verify that a non-blocking iterator does not find any
// kvs. Neither does it do any IOs to storage.
uint64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS);
uint64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
count++;
}
ASSERT_EQ(count, 0);
ASSERT_TRUE(iter->status().IsIncomplete());
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
delete iter;
// read in the specified block via a regular get
ASSERT_EQ(Get(1, "a"), "b");
// verify that we can find it via a non-blocking scan
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, 1);
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
delete iter;
// This test verifies block cache behaviors, which is not used by plain
// table format.
} while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | kSkipMmapReads));
}
TEST_P(DBIteratorTest, IterSeekBeforePrev) {
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("0", "f"));
ASSERT_OK(Put("1", "h"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("2", "j"));
auto iter = NewIterator(ReadOptions());
iter->Seek(Slice("c"));
iter->Prev();
iter->Seek(Slice("a"));
iter->Prev();
delete iter;
}
TEST_P(DBIteratorTest, IterReseekNewUpperBound) {
Random rnd(301);
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
table_options.block_size_deviation = 50;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = kNoCompression;
Reopen(options);
ASSERT_OK(Put("a", rnd.RandomString(400)));
ASSERT_OK(Put("aabb", rnd.RandomString(400)));
ASSERT_OK(Put("aaef", rnd.RandomString(400)));
ASSERT_OK(Put("b", rnd.RandomString(400)));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ReadOptions opts;
Slice ub = Slice("aa");
opts.iterate_upper_bound = &ub;
auto iter = NewIterator(opts);
iter->Seek(Slice("a"));
ub = Slice("b");
iter->Seek(Slice("aabc"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "aaef");
delete iter;
}
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("0", "f"));
ASSERT_OK(Put("1", "h"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("2", "j"));
auto iter = NewIterator(ReadOptions());
iter->SeekForPrev(Slice("0"));
iter->Next();
iter->SeekForPrev(Slice("1"));
iter->Next();
delete iter;
}
namespace {
std::string MakeLongKey(size_t length, char c) {
return std::string(length, c);
}
} // anonymous namespace
TEST_P(DBIteratorTest, IterLongKeys) {
ASSERT_OK(Put(MakeLongKey(20, 0), "0"));
ASSERT_OK(Put(MakeLongKey(32, 2), "2"));
ASSERT_OK(Put("a", "b"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put(MakeLongKey(50, 1), "1"));
ASSERT_OK(Put(MakeLongKey(127, 3), "3"));
ASSERT_OK(Put(MakeLongKey(64, 4), "4"));
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
iter->Seek(MakeLongKey(20, 0));
ASSERT_EQ(IterStatus(iter), MakeLongKey(20, 0) + "->0");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(32, 2) + "->2");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(127, 3) + "->3");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(64, 4) + "->4");
iter->SeekForPrev(MakeLongKey(127, 3));
ASSERT_EQ(IterStatus(iter), MakeLongKey(127, 3) + "->3");
iter->Prev();
ASSERT_EQ(IterStatus(iter), MakeLongKey(32, 2) + "->2");
iter->Prev();
ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1");
delete iter;
iter = NewIterator(ReadOptions());
iter->Seek(MakeLongKey(50, 1));
ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(32, 2) + "->2");
iter->Next();
ASSERT_EQ(IterStatus(iter), MakeLongKey(127, 3) + "->3");
delete iter;
}
TEST_P(DBIteratorTest, IterNextWithNewerSeq) {
ASSERT_OK(Put("0", "0"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("d", "e"));
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1;
i++) {
ASSERT_OK(Put("b", "f"));
}
iter->Seek(Slice("a"));
ASSERT_EQ(IterStatus(iter), "a->b");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->d");
iter->SeekForPrev(Slice("b"));
ASSERT_EQ(IterStatus(iter), "a->b");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->d");
delete iter;
}
TEST_P(DBIteratorTest, IterPrevWithNewerSeq) {
ASSERT_OK(Put("0", "0"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("d", "e"));
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1;
i++) {
ASSERT_OK(Put("b", "f"));
}
iter->Seek(Slice("d"));
ASSERT_EQ(IterStatus(iter), "d->e");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "c->d");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->b");
iter->Prev();
iter->SeekForPrev(Slice("d"));
ASSERT_EQ(IterStatus(iter), "d->e");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "c->d");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->b");
iter->Prev();
delete iter;
}
TEST_P(DBIteratorTest, IterPrevWithNewerSeq2) {
ASSERT_OK(Put("0", "0"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("e", "f"));
auto iter = NewIterator(ReadOptions());
auto iter2 = NewIterator(ReadOptions());
iter->Seek(Slice("c"));
iter2->SeekForPrev(Slice("d"));
ASSERT_EQ(IterStatus(iter), "c->d");
ASSERT_EQ(IterStatus(iter2), "c->d");
// Create a key that needs to be skipped for Seq too new
for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1;
i++) {
ASSERT_OK(Put("b", "f"));
}
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->b");
iter->Prev();
iter2->Prev();
ASSERT_EQ(IterStatus(iter2), "a->b");
iter2->Prev();
delete iter;
delete iter2;
}
TEST_P(DBIteratorTest, IterEmpty) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("foo");
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("foo");
ASSERT_EQ(IterStatus(iter), "(invalid)");
ASSERT_OK(iter->status());
delete iter;
} while (ChangeCompactOptions());
}
TEST_P(DBIteratorTest, IterSingle) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("b");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST_P(DBIteratorTest, IterMulti) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
ASSERT_OK(Put(1, "b", "vb"));
ASSERT_OK(Put(1, "c", "vc"));
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->SeekForPrev("d");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("c");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("bx");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("z");
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("b");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter), "(invalid)");
// Switch from reverse to forward
iter->SeekToLast();
iter->Prev();
iter->Prev();
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Switch from forward to reverse
iter->SeekToFirst();
iter->Next();
iter->Next();
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Make sure iter stays at snapshot
ASSERT_OK(Put(1, "a", "va2"));
ASSERT_OK(Put(1, "a2", "va3"));
ASSERT_OK(Put(1, "b", "vb2"));
ASSERT_OK(Put(1, "c", "vc2"));
ASSERT_OK(Delete(1, "b"));
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
// Check that we can skip over a run of user keys
// by using reseek rather than sequential scan
TEST_P(DBIteratorTest, IterReseek) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
Options options = CurrentOptions(options_override);
options.max_sequential_skip_in_iterations = 3;
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// insert three keys with same userkey and verify that
// reseek is not invoked. For each of these test cases,
// verify that we can find the next key "b".
ASSERT_OK(Put(1, "a", "zero"));
ASSERT_OK(Put(1, "a", "one"));
ASSERT_OK(Put(1, "a", "two"));
ASSERT_OK(Put(1, "b", "bone"));
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "a->two");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "b->bone");
delete iter;
// insert a total of three keys with same userkey and verify
// that reseek is still not invoked.
ASSERT_OK(Put(1, "a", "three"));
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->three");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "b->bone");
delete iter;
// insert a total of four keys with same userkey and verify
// that reseek is invoked.
ASSERT_OK(Put(1, "a", "four"));
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->four");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "b->bone");
delete iter;
// Testing reverse iterator
// At this point, we have three versions of "a" and one version of "b".
// The reseek statistics is already at 1.
int num_reseeks = static_cast<int>(
TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION));
// Insert another version of b and assert that reseek is not invoked
ASSERT_OK(Put(1, "b", "btwo"));
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "b->btwo");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
num_reseeks);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
num_reseeks + 1);
ASSERT_EQ(IterStatus(iter), "a->four");
delete iter;
// insert two more versions of b. This makes a total of 4 versions
// of b and 4 versions of a.
ASSERT_OK(Put(1, "b", "bthree"));
ASSERT_OK(Put(1, "b", "bfour"));
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "b->bfour");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
num_reseeks + 2);
iter->Prev();
// the previous Prev call should have invoked reseek
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
num_reseeks + 3);
ASSERT_EQ(IterStatus(iter), "a->four");
delete iter;
}
TEST_F(DBIteratorTest, ReseekUponDirectionChange) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.merge_operator.reset(
new StringAppendTESTOperator(/*delim_char=*/' '));
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Put("bar", "value"));
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->SeekToLast();
it->Prev();
it->Next();
}
ASSERT_EQ(1,
options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
const std::string merge_key("good");
ASSERT_OK(Put(merge_key, "orig"));
ASSERT_OK(Merge(merge_key, "suffix"));
{
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek(merge_key);
ASSERT_TRUE(it->Valid());
const uint64_t prev_reseek_count =
options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
it->Prev();
ASSERT_EQ(prev_reseek_count + 1, options.statistics->getTickerCount(
NUMBER_OF_RESEEKS_IN_ITERATION));
}
}
TEST_P(DBIteratorTest, IterSmallAndLargeMix) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
ASSERT_OK(Put(1, "b", std::string(100000, 'b')));
ASSERT_OK(Put(1, "c", "vc"));
ASSERT_OK(Put(1, "d", std::string(100000, 'd')));
ASSERT_OK(Put(1, "e", std::string(100000, 'e')));
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST_P(DBIteratorTest, IterMultiWithDelete) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "ka", "va"));
ASSERT_OK(Put(1, "kb", "vb"));
ASSERT_OK(Put(1, "kc", "vc"));
ASSERT_OK(Delete(1, "kb"));
ASSERT_EQ("NOT_FOUND", Get(1, "kb"));
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->Seek("kc");
ASSERT_EQ(IterStatus(iter), "kc->vc");
if (!CurrentOptions().merge_operator) {
// TODO: merge operator does not support backward iteration yet
if (kPlainTableAllBytesPrefix != option_config_ &&
kBlockBasedTableWithWholeKeyHashIndex != option_config_ &&
kHashLinkList != option_config_ &&
kHashSkipList != option_config_) { // doesn't support SeekToLast
iter->Prev();
ASSERT_EQ(IterStatus(iter), "ka->va");
}
}
delete iter;
} while (ChangeOptions());
}
TEST_P(DBIteratorTest, IterPrevMaxSkip) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
for (int i = 0; i < 2; i++) {
ASSERT_OK(Put(1, "key1", "v1"));
ASSERT_OK(Put(1, "key2", "v2"));
ASSERT_OK(Put(1, "key3", "v3"));
ASSERT_OK(Put(1, "key4", "v4"));
ASSERT_OK(Put(1, "key5", "v5"));
}
VerifyIterLast("key5->v5", 1);
ASSERT_OK(Delete(1, "key5"));
VerifyIterLast("key4->v4", 1);
ASSERT_OK(Delete(1, "key4"));
VerifyIterLast("key3->v3", 1);
ASSERT_OK(Delete(1, "key3"));
VerifyIterLast("key2->v2", 1);
ASSERT_OK(Delete(1, "key2"));
VerifyIterLast("key1->v1", 1);
ASSERT_OK(Delete(1, "key1"));
VerifyIterLast("(invalid)", 1);
} while (ChangeOptions(kSkipMergePut | kSkipNoSeekToLast));
}
TEST_P(DBIteratorTest, IterWithSnapshot) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
ASSERT_OK(Put(1, "key1", "val1"));
ASSERT_OK(Put(1, "key2", "val2"));
ASSERT_OK(Put(1, "key3", "val3"));
ASSERT_OK(Put(1, "key4", "val4"));
ASSERT_OK(Put(1, "key5", "val5"));
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions options;
options.snapshot = snapshot;
Iterator* iter = NewIterator(options, handles_[1]);
ASSERT_OK(Put(1, "key0", "val0"));
// Put more values after the snapshot
ASSERT_OK(Put(1, "key100", "val100"));
ASSERT_OK(Put(1, "key101", "val101"));
iter->Seek("key5");
ASSERT_EQ(IterStatus(iter), "key5->val5");
if (!CurrentOptions().merge_operator) {
// TODO: merge operator does not support backward iteration yet
if (kPlainTableAllBytesPrefix != option_config_ &&
kBlockBasedTableWithWholeKeyHashIndex != option_config_ &&
kHashLinkList != option_config_ && kHashSkipList != option_config_) {
iter->Prev();
ASSERT_EQ(IterStatus(iter), "key4->val4");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "key3->val3");
iter->Next();
ASSERT_EQ(IterStatus(iter), "key4->val4");
iter->Next();
ASSERT_EQ(IterStatus(iter), "key5->val5");
}
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
if (!CurrentOptions().merge_operator) {
// TODO(gzh): merge operator does not support backward iteration yet
if (kPlainTableAllBytesPrefix != option_config_ &&
kBlockBasedTableWithWholeKeyHashIndex != option_config_ &&
kHashLinkList != option_config_ && kHashSkipList != option_config_) {
iter->SeekForPrev("key1");
ASSERT_EQ(IterStatus(iter), "key1->val1");
iter->Next();
ASSERT_EQ(IterStatus(iter), "key2->val2");
iter->Next();
ASSERT_EQ(IterStatus(iter), "key3->val3");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "key2->val2");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "key1->val1");
iter->Prev();
ASSERT_TRUE(!iter->Valid());
}
}
db_->ReleaseSnapshot(snapshot);
ASSERT_OK(iter->status());
delete iter;
} while (ChangeOptions());
}
TEST_P(DBIteratorTest, IteratorPinsRef) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "hello"));
// Get iterator that will yield the current contents of the DB.
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
// Write to force compactions
ASSERT_OK(Put(1, "foo", "newvalue1"));
for (int i = 0; i < 100; i++) {
// 100K values
ASSERT_OK(Put(1, Key(i), Key(i) + std::string(100000, 'v')));
}
ASSERT_OK(Put(1, "foo", "newvalue2"));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("hello", iter->value().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
delete iter;
} while (ChangeCompactOptions());
}
TEST_P(DBIteratorTest, IteratorDeleteAfterCfDelete) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "delete-cf-then-delete-iter"));
ASSERT_OK(Put(1, "hello", "value2"));
ColumnFamilyHandle* cf = handles_[1];
ReadOptions ro;
auto* iter = db_->NewIterator(ro, cf);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "foo->delete-cf-then-delete-iter");
// delete CF handle
EXPECT_OK(db_->DestroyColumnFamilyHandle(cf));
handles_.erase(std::begin(handles_) + 1);
// delete Iterator after CF handle is deleted
iter->Next();
ASSERT_EQ(IterStatus(iter), "hello->value2");
delete iter;
}
TEST_P(DBIteratorTest, IteratorDeleteAfterCfDrop) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "drop-cf-then-delete-iter"));
ReadOptions ro;
ColumnFamilyHandle* cf = handles_[1];
auto* iter = db_->NewIterator(ro, cf);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "foo->drop-cf-then-delete-iter");
// drop and delete CF
EXPECT_OK(db_->DropColumnFamily(cf));
EXPECT_OK(db_->DestroyColumnFamilyHandle(cf));
handles_.erase(std::begin(handles_) + 1);
// delete Iterator after CF handle is dropped
delete iter;
}
// SetOptions not defined in ROCKSDB LITE
TEST_P(DBIteratorTest, DBIteratorBoundTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("g1", "0"));
// testing basic case with no iterate_upper_bound and no prefix_extractor
{
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("g1")), 0);
iter->SeekForPrev("g1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("g1")), 0);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo")), 0);
}
// testing iterate_upper_bound and forward iterator
// to make sure it stops at bound
{
ReadOptions ro;
// iterate_upper_bound points beyond the last expected entry
Slice prefix("foo2");
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("foo1")), 0);
iter->Next();
// should stop here...
ASSERT_TRUE(!iter->Valid());
}
// Testing SeekToLast with iterate_upper_bound set
{
ReadOptions ro;
Slice prefix("foo");
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("a")), 0);
}
// prefix is the first letter of the key
ASSERT_OK(dbfull()->SetOptions({{"prefix_extractor", "fixed:1"}}));
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("g1", "0"));
// testing with iterate_upper_bound and prefix_extractor
// Seek target and iterate_upper_bound are not is same prefix
// This should be an error
{
ReadOptions ro;
Slice upper_bound("g");
ro.iterate_upper_bound = &upper_bound;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo1", iter->key().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
// testing that iterate_upper_bound prevents iterating over deleted items
// if the bound has already reached
{
options.prefix_extractor = nullptr;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("b", "0"));
ASSERT_OK(Put("b1", "0"));
ASSERT_OK(Put("c", "0"));
ASSERT_OK(Put("d", "0"));
ASSERT_OK(Put("e", "0"));
ASSERT_OK(Delete("c"));
ASSERT_OK(Delete("d"));
// base case with no bound
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("b")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("b1")), 0);
get_perf_context()->Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(
static_cast<int>(get_perf_context()->internal_delete_skipped_count), 2);
// now testing with iterate_bound
Slice prefix("c");
ro.iterate_upper_bound = &prefix;
iter.reset(NewIterator(ro));
get_perf_context()->Reset();
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("b")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("b1")), 0);
iter->Next();
// the iteration should stop as soon as the bound key is reached
// even though the key is deleted
// hence internal_delete_skipped_count should be 0
ASSERT_TRUE(!iter->Valid());
ASSERT_EQ(
static_cast<int>(get_perf_context()->internal_delete_skipped_count), 0);
}
}
TEST_P(DBIteratorTest, DBIteratorBoundMultiSeek) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.prefix_extractor = nullptr;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("z", "0"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Put("foo3", "bar3"));
ASSERT_OK(Put("foo4", "bar4"));
{
std::string up_str = "foo5";
Slice up(up_str);
ReadOptions ro;
ro.iterate_upper_bound = &up;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
uint64_t prev_block_cache_hit =
TestGetTickerCount(options, BLOCK_CACHE_HIT);
uint64_t prev_block_cache_miss =
TestGetTickerCount(options, BLOCK_CACHE_MISS);
ASSERT_GT(prev_block_cache_hit + prev_block_cache_miss, 0);
iter->Seek("foo4");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo4")), 0);
ASSERT_EQ(prev_block_cache_hit,
TestGetTickerCount(options, BLOCK_CACHE_HIT));
ASSERT_EQ(prev_block_cache_miss,
TestGetTickerCount(options, BLOCK_CACHE_MISS));
iter->Seek("foo2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo2")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo3")), 0);
ASSERT_EQ(prev_block_cache_hit,
TestGetTickerCount(options, BLOCK_CACHE_HIT));
ASSERT_EQ(prev_block_cache_miss,
TestGetTickerCount(options, BLOCK_CACHE_MISS));
}
}
TEST_P(DBIteratorTest, DBIteratorBoundOptimizationTest) {
for (auto format_version : {2, 3, 4}) {
int upper_bound_hits = 0;
Options options = CurrentOptions();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableIterator:out_of_bound",
[&upper_bound_hits](void*) { upper_bound_hits++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
BlockBasedTableOptions table_options;
table_options.format_version = format_version;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Put("foo4", "bar4"));
ASSERT_OK(Flush());
Slice ub("foo3");
ReadOptions ro;
ro.iterate_upper_bound = &ub;
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
ASSERT_EQ(upper_bound_hits, 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo2")), 0);
ASSERT_EQ(upper_bound_hits, 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(upper_bound_hits, 1);
}
}
// Enable kBinarySearchWithFirstKey, do some iterator operations and check that
// they don't do unnecessary block reads.
TEST_P(DBIteratorTest, IndexWithFirstKey) {
for (int tailing = 0; tailing < 2; ++tailing) {
SCOPED_TRACE("tailing = " + std::to_string(tailing));
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
Statistics* stats = options.statistics.get();
BlockBasedTableOptions table_options;
table_options.index_type =
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey;
table_options.index_shortening =
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
table_options.block_cache =
NewLRUCache(8000); // fits all blocks and their cache metadata overhead
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Merge("a1", "x1"));
ASSERT_OK(Merge("b1", "y1"));
ASSERT_OK(Merge("c0", "z1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("a2", "x2"));
ASSERT_OK(Merge("b2", "y2"));
ASSERT_OK(Merge("c0", "z2"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("a3", "x3"));
ASSERT_OK(Merge("b3", "y3"));
ASSERT_OK(Merge("c3", "z3"));
ASSERT_OK(Flush());
// Block cache is not important for this test.
// We use BLOCK_CACHE_DATA_* counters just because they're the most readily
// available way of counting block accesses.
ReadOptions ropt;
ropt.tailing = tailing;
std::unique_ptr<Iterator> iter(NewIterator(ropt));
ropt.read_tier = ReadTier::kBlockCacheTier;
std::unique_ptr<Iterator> nonblocking_iter(NewIterator(ropt));
iter->Seek("b10");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b2", iter->key().ToString());
EXPECT_EQ("y2", iter->value().ToString());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
// The cache-only iterator should succeed too, using the blocks pulled into
// the cache by the previous iterator.
nonblocking_iter->Seek("b10");
ASSERT_TRUE(nonblocking_iter->Valid());
EXPECT_EQ("b2", nonblocking_iter->key().ToString());
EXPECT_EQ("y2", nonblocking_iter->value().ToString());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
// ... but it shouldn't be able to step forward since the next block is
// not in cache yet.
nonblocking_iter->Next();
ASSERT_FALSE(nonblocking_iter->Valid());
ASSERT_TRUE(nonblocking_iter->status().IsIncomplete());
// ... nor should a seek to the next key succeed.
nonblocking_iter->Seek("b20");
ASSERT_FALSE(nonblocking_iter->Valid());
ASSERT_TRUE(nonblocking_iter->status().IsIncomplete());
iter->Next();
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b3", iter->key().ToString());
EXPECT_EQ("y3", iter->value().ToString());
EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
// After the blocking iterator loaded the next block, the nonblocking
// iterator's seek should succeed.
nonblocking_iter->Seek("b20");
ASSERT_TRUE(nonblocking_iter->Valid());
EXPECT_EQ("b3", nonblocking_iter->key().ToString());
EXPECT_EQ("y3", nonblocking_iter->value().ToString());
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
iter->Seek("c0");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("c0", iter->key().ToString());
EXPECT_EQ("z1,z2", iter->value().ToString());
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(6, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
iter->Next();
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("c3", iter->key().ToString());
EXPECT_EQ("z3", iter->value().ToString());
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
iter.reset();
// Enable iterate_upper_bound and check that iterator is not trying to read
// blocks that are fully above upper bound.
std::string ub = "b3";
Slice ub_slice(ub);
ropt.iterate_upper_bound = &ub_slice;
iter.reset(NewIterator(ropt));
iter->Seek("b2");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b2", iter->key().ToString());
EXPECT_EQ("y2", iter->value().ToString());
EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
}
}
TEST_P(DBIteratorTest, IndexWithFirstKeyGet) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
Statistics* stats = options.statistics.get();
BlockBasedTableOptions table_options;
table_options.index_type =
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey;
table_options.index_shortening =
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
table_options.block_cache = NewLRUCache(1000); // fits all blocks
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Merge("a", "x1"));
ASSERT_OK(Merge("c", "y1"));
ASSERT_OK(Merge("e", "z1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("c", "y2"));
ASSERT_OK(Merge("e", "z2"));
ASSERT_OK(Flush());
// Get() between blocks shouldn't read any blocks.
ASSERT_EQ("NOT_FOUND", Get("b"));
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
// Get() of an existing key shouldn't read any unnecessary blocks when there's
// only one key per block.
ASSERT_EQ("y1,y2", Get("c"));
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
ASSERT_EQ("x1", Get("a"));
EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(std::vector<std::string>({"NOT_FOUND", "z1,z2"}),
MultiGet({"b", "e"}));
}
// TODO(3.13): fix the issue of Seek() + Prev() which might not necessary
// return the biggest key which is smaller than the seek key.
TEST_P(DBIteratorTest, PrevAfterAndNextAfterMerge) {
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
options.env = env_;
DestroyAndReopen(options);
// write three entries with different keys using Merge()
WriteOptions wopts;
ASSERT_OK(db_->Merge(wopts, "1", "data1"));
ASSERT_OK(db_->Merge(wopts, "2", "data2"));
ASSERT_OK(db_->Merge(wopts, "3", "data3"));
std::unique_ptr<Iterator> it(NewIterator(ReadOptions()));
it->Seek("2");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("2", it->key().ToString());
it->Prev();
ASSERT_TRUE(it->Valid());
ASSERT_EQ("1", it->key().ToString());
it->SeekForPrev("1");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("1", it->key().ToString());
it->Next();
ASSERT_TRUE(it->Valid());
ASSERT_EQ("2", it->key().ToString());
}
class DBIteratorTestForPinnedData : public DBIteratorTest {
public:
enum TestConfig {
NORMAL,
CLOSE_AND_OPEN,
COMPACT_BEFORE_READ,
FLUSH_EVERY_1000,
MAX
};
DBIteratorTestForPinnedData() : DBIteratorTest() {}
void PinnedDataIteratorRandomized(TestConfig run_config) {
// Generate Random data
Random rnd(301);
int puts = 100000;
int key_pool = static_cast<int>(puts * 0.7);
int key_size = 100;
int val_size = 1000;
int seeks_percentage = 20; // 20% of keys will be used to test seek()
int delete_percentage = 20; // 20% of keys will be deleted
int merge_percentage = 20; // 20% of keys will be added using Merge()
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
std::vector<std::string> generated_keys(key_pool);
for (int i = 0; i < key_pool; i++) {
generated_keys[i] = rnd.RandomString(key_size);
}
std::map<std::string, std::string> true_data;
std::vector<std::string> random_keys;
std::vector<std::string> deleted_keys;
for (int i = 0; i < puts; i++) {
auto& k = generated_keys[rnd.Next() % key_pool];
auto v = rnd.RandomString(val_size);
// Insert data to true_data map and to DB
true_data[k] = v;
if (rnd.PercentTrue(merge_percentage)) {
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
} else {
ASSERT_OK(Put(k, v));
}
// Pick random keys to be used to test Seek()
if (rnd.PercentTrue(seeks_percentage)) {
random_keys.push_back(k);
}
// Delete some random keys
if (rnd.PercentTrue(delete_percentage)) {
deleted_keys.push_back(k);
true_data.erase(k);
ASSERT_OK(Delete(k));
}
if (run_config == TestConfig::FLUSH_EVERY_1000) {
if (i && i % 1000 == 0) {
ASSERT_OK(Flush());
}
}
}
if (run_config == TestConfig::CLOSE_AND_OPEN) {
Close();
Reopen(options);
} else if (run_config == TestConfig::COMPACT_BEFORE_READ) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
ReadOptions ro;
ro.pin_data = true;
auto iter = NewIterator(ro);
{
// Test Seek to random keys
std::vector<Slice> keys_slices;
std::vector<std::string> true_keys;
for (auto& k : random_keys) {
iter->Seek(k);
if (!iter->Valid()) {
ASSERT_EQ(true_data.lower_bound(k), true_data.end());
continue;
}
std::string prop_value;
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
keys_slices.push_back(iter->key());
true_keys.push_back(true_data.lower_bound(k)->first);
}
for (size_t i = 0; i < keys_slices.size(); i++) {
ASSERT_EQ(keys_slices[i].ToString(), true_keys[i]);
}
}
{
// Test SeekForPrev to random keys
std::vector<Slice> keys_slices;
std::vector<std::string> true_keys;
for (auto& k : random_keys) {
iter->SeekForPrev(k);
if (!iter->Valid()) {
ASSERT_EQ(true_data.upper_bound(k), true_data.begin());
continue;
}
std::string prop_value;
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
keys_slices.push_back(iter->key());
true_keys.push_back((--true_data.upper_bound(k))->first);
}
for (size_t i = 0; i < keys_slices.size(); i++) {
ASSERT_EQ(keys_slices[i].ToString(), true_keys[i]);
}
}
{
// Test iterating all data forward
std::vector<Slice> all_keys;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string prop_value;
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
all_keys.push_back(iter->key());
}
ASSERT_EQ(all_keys.size(), true_data.size());
// Verify that all keys slices are valid
auto data_iter = true_data.begin();
for (size_t i = 0; i < all_keys.size(); i++) {
ASSERT_EQ(all_keys[i].ToString(), data_iter->first);
data_iter++;
}
}
{
// Test iterating all data backward
std::vector<Slice> all_keys;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
std::string prop_value;
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
all_keys.push_back(iter->key());
}
ASSERT_OK(iter->status());
ASSERT_EQ(all_keys.size(), true_data.size());
// Verify that all keys slices are valid (backward)
auto data_iter = true_data.rbegin();
for (size_t i = 0; i < all_keys.size(); i++) {
ASSERT_EQ(all_keys[i].ToString(), data_iter->first);
data_iter++;
}
}
delete iter;
}
};
#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedNormal) {
PinnedDataIteratorRandomized(TestConfig::NORMAL);
}
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCLoseAndOpen) {
PinnedDataIteratorRandomized(TestConfig::CLOSE_AND_OPEN);
}
TEST_P(DBIteratorTestForPinnedData,
PinnedDataIteratorRandomizedCompactBeforeRead) {
PinnedDataIteratorRandomized(TestConfig::COMPACT_BEFORE_READ);
}
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedFlush) {
PinnedDataIteratorRandomized(TestConfig::FLUSH_EVERY_1000);
}
INSTANTIATE_TEST_CASE_P(DBIteratorTestForPinnedDataInstance,
DBIteratorTestForPinnedData,
testing::Values(true, false));
TEST_P(DBIteratorTest, PinnedDataIteratorMultipleFiles) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.disable_auto_compactions = true;
options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb
DestroyAndReopen(options);
std::map<std::string, std::string> true_data;
// Generate 4 sst files in L2
Random rnd(301);
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i * 3);
std::string v = rnd.RandomString(100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
if (i % 250 == 0) {
ASSERT_OK(Flush());
}
}
ASSERT_EQ(FilesPerLevel(0), "4");
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(FilesPerLevel(0), "0,4");
// Generate 4 sst files in L0
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i * 2);
std::string v = rnd.RandomString(100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
if (i % 250 == 0) {
ASSERT_OK(Flush());
}
}
ASSERT_EQ(FilesPerLevel(0), "4,4");
// Add some keys/values in memtables
for (int i = 1; i <= 1000; i++) {
std::string k = Key(i);
std::string v = rnd.RandomString(100);
ASSERT_OK(Put(k, v));
true_data[k] = v;
}
ASSERT_EQ(FilesPerLevel(0), "4,4");
ReadOptions ro;
ro.pin_data = true;
auto iter = NewIterator(ro);
std::vector<std::pair<Slice, Slice>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-value-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
results.emplace_back(iter->key(), iter->value());
}
ASSERT_EQ(results.size(), true_data.size());
auto data_iter = true_data.begin();
for (size_t i = 0; i < results.size(); i++, data_iter++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, data_iter->first);
ASSERT_EQ(kv.second, data_iter->second);
}
ASSERT_OK(iter->status());
delete iter;
}
TEST_P(DBIteratorTest, PinnedDataIteratorMergeOperator) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
DestroyAndReopen(options);
std::string numbers[7];
for (int val = 0; val <= 6; val++) {
PutFixed64(numbers + val, val);
}
// +1 all keys in range [ 0 => 999]
for (int i = 0; i < 1000; i++) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[1]));
}
// +2 all keys divisible by 2 in range [ 0 => 999]
for (int i = 0; i < 1000; i += 2) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[2]));
}
// +3 all keys divisible by 5 in range [ 0 => 999]
for (int i = 0; i < 1000; i += 5) {
WriteOptions wo;
ASSERT_OK(db_->Merge(wo, Key(i), numbers[3]));
}
ReadOptions ro;
ro.pin_data = true;
auto iter = NewIterator(ro);
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-value-pinned", &prop_value));
ASSERT_EQ("0", prop_value);
results.emplace_back(iter->key(), iter->value().ToString());
}
ASSERT_OK(iter->status());
ASSERT_EQ(results.size(), 1000);
for (size_t i = 0; i < results.size(); i++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, Key(static_cast<int>(i)));
int expected_val = 1;
if (i % 2 == 0) {
expected_val += 2;
}
if (i % 5 == 0) {
expected_val += 3;
}
ASSERT_EQ(kv.second, numbers[expected_val]);
}
delete iter;
}
TEST_P(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.write_buffer_size = 100000;
DestroyAndReopen(options);
Random rnd(301);
std::map<std::string, std::string> true_data;
for (int i = 0; i < 1000; i++) {
std::string k = rnd.RandomString(10);
std::string v = rnd.RandomString(1000);
ASSERT_OK(Put(k, v));
true_data[k] = v;
}
ReadOptions ro;
ro.pin_data = true;
auto iter = NewIterator(ro);
// Delete 50% of the keys and update the other 50%
for (auto& kv : true_data) {
if (rnd.OneIn(2)) {
ASSERT_OK(Delete(kv.first));
} else {
std::string new_val = rnd.RandomString(1000);
ASSERT_OK(Put(kv.first, new_val));
}
}
std::vector<std::pair<Slice, Slice>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.is-key-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
ASSERT_OK(
iter->GetProperty("rocksdb.iterator.is-value-pinned", &prop_value));
ASSERT_EQ("1", prop_value);
results.emplace_back(iter->key(), iter->value());
}
ASSERT_OK(iter->status());
auto data_iter = true_data.begin();
for (size_t i = 0; i < results.size(); i++, data_iter++) {
auto& kv = results[i];
ASSERT_EQ(kv.first, data_iter->first);
ASSERT_EQ(kv.second, data_iter->second);
}
delete iter;
}
class SliceTransformLimitedDomainGeneric : public SliceTransform {
const char* Name() const override {
return "SliceTransformLimitedDomainGeneric";
}
Slice Transform(const Slice& src) const override {
return Slice(src.data(), 1);
}
bool InDomain(const Slice& src) const override {
// prefix will be x????
return src.size() >= 1;
}
bool InRange(const Slice& dst) const override {
// prefix will be x????
return dst.size() == 1;
}
};
TEST_P(DBIteratorTest, IterSeekForPrevCrossingFiles) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.disable_auto_compactions = true;
// Enable prefix bloom for SST files
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Put("a1", "va1"));
ASSERT_OK(Put("a2", "va2"));
ASSERT_OK(Put("a3", "va3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b1", "vb1"));
ASSERT_OK(Put("b2", "vb2"));
ASSERT_OK(Put("b3", "vb3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b4", "vb4"));
ASSERT_OK(Put("d1", "vd1"));
ASSERT_OK(Put("d2", "vd2"));
ASSERT_OK(Put("d4", "vd4"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
{
ReadOptions ro;
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("a4");
ASSERT_EQ(iter->key().ToString(), "a3");
ASSERT_EQ(iter->value().ToString(), "va3");
iter->SeekForPrev("c2");
ASSERT_EQ(iter->key().ToString(), "b3");
iter->SeekForPrev("d3");
ASSERT_EQ(iter->key().ToString(), "d2");
iter->SeekForPrev("b5");
ASSERT_EQ(iter->key().ToString(), "b4");
delete iter;
}
{
ReadOptions ro;
ro.prefix_same_as_start = true;
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("c2");
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
TEST_P(DBIteratorTest, IterSeekForPrevCrossingFilesCustomPrefixExtractor) {
Options options = CurrentOptions();
options.prefix_extractor =
std::make_shared<SliceTransformLimitedDomainGeneric>();
options.disable_auto_compactions = true;
// Enable prefix bloom for SST files
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Put("a1", "va1"));
ASSERT_OK(Put("a2", "va2"));
ASSERT_OK(Put("a3", "va3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b1", "vb1"));
ASSERT_OK(Put("b2", "vb2"));
ASSERT_OK(Put("b3", "vb3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b4", "vb4"));
ASSERT_OK(Put("d1", "vd1"));
ASSERT_OK(Put("d2", "vd2"));
ASSERT_OK(Put("d4", "vd4"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
{
ReadOptions ro;
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("a4");
ASSERT_EQ(iter->key().ToString(), "a3");
ASSERT_EQ(iter->value().ToString(), "va3");
iter->SeekForPrev("c2");
ASSERT_EQ(iter->key().ToString(), "b3");
iter->SeekForPrev("d3");
ASSERT_EQ(iter->key().ToString(), "d2");
iter->SeekForPrev("b5");
ASSERT_EQ(iter->key().ToString(), "b4");
delete iter;
}
{
ReadOptions ro;
ro.prefix_same_as_start = true;
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("c2");
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocks) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.block_size = 1; // every block will contain one entry
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
options.disable_auto_compactions = true;
options.max_sequential_skip_in_iterations = 8;
DestroyAndReopen(options);
// Putting such deletes will force DBIter::Prev() to fallback to a Seek
for (int file_num = 0; file_num < 10; file_num++) {
ASSERT_OK(Delete("key4"));
ASSERT_OK(Flush());
}
// First File containing 5 blocks of puts
ASSERT_OK(Put("key1", "val1.0"));
ASSERT_OK(Put("key2", "val2.0"));
ASSERT_OK(Put("key3", "val3.0"));
ASSERT_OK(Put("key4", "val4.0"));
ASSERT_OK(Put("key5", "val5.0"));
ASSERT_OK(Flush());
// Second file containing 9 blocks of merge operands
ASSERT_OK(db_->Merge(WriteOptions(), "key1", "val1.1"));
ASSERT_OK(db_->Merge(WriteOptions(), "key1", "val1.2"));
ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.1"));
ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.2"));
ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.3"));
ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.1"));
ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.2"));
ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.3"));
ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.4"));
ASSERT_OK(Flush());
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = NewIterator(ro);
iter->SeekToLast();
ASSERT_EQ(iter->key().ToString(), "key5");
ASSERT_EQ(iter->value().ToString(), "val5.0");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "key4");
ASSERT_EQ(iter->value().ToString(), "val4.0");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "key3");
ASSERT_EQ(iter->value().ToString(), "val3.0,val3.1,val3.2,val3.3,val3.4");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "key2");
ASSERT_EQ(iter->value().ToString(), "val2.0,val2.1,val2.2,val2.3");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "key1");
ASSERT_EQ(iter->value().ToString(), "val1.0,val1.1,val1.2");
delete iter;
}
}
TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
options.disable_auto_compactions = true;
options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = (1 << 30);
options.max_sequential_skip_in_iterations = 8;
DestroyAndReopen(options);
const int kNumKeys = 500;
// Small number of merge operands to make sure that DBIter::Prev() don't
// fall back to Seek()
const int kNumMergeOperands = 3;
// Use value size that will make sure that every block contain 1 key
const int kValSize =
static_cast<int>(BlockBasedTableOptions().block_size) * 4;
// Percentage of keys that wont get merge operations
const int kNoMergeOpPercentage = 20;
// Percentage of keys that will be deleted
const int kDeletePercentage = 10;
// For half of the key range we will write multiple deletes first to
// force DBIter::Prev() to fall back to Seek()
for (int file_num = 0; file_num < 10; file_num++) {
for (int i = 0; i < kNumKeys; i += 2) {
ASSERT_OK(Delete(Key(i)));
}
ASSERT_OK(Flush());
}
Random rnd(301);
std::map<std::string, std::string> true_data;
std::string gen_key;
std::string gen_val;
for (int i = 0; i < kNumKeys; i++) {
gen_key = Key(i);
gen_val = rnd.RandomString(kValSize);
ASSERT_OK(Put(gen_key, gen_val));
true_data[gen_key] = gen_val;
}
ASSERT_OK(Flush());
// Separate values and merge operands in different file so that we
// make sure that we don't merge them while flushing but actually
// merge them in the read path
for (int i = 0; i < kNumKeys; i++) {
if (rnd.PercentTrue(kNoMergeOpPercentage)) {
// Dont give merge operations for some keys
continue;
}
for (int j = 0; j < kNumMergeOperands; j++) {
gen_key = Key(i);
gen_val = rnd.RandomString(kValSize);
ASSERT_OK(db_->Merge(WriteOptions(), gen_key, gen_val));
true_data[gen_key] += "," + gen_val;
}
}
ASSERT_OK(Flush());
for (int i = 0; i < kNumKeys; i++) {
if (rnd.PercentTrue(kDeletePercentage)) {
gen_key = Key(i);
ASSERT_OK(Delete(gen_key));
true_data.erase(gen_key);
}
}
ASSERT_OK(Flush());
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = NewIterator(ro);
auto data_iter = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
data_iter++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(data_iter, true_data.rend());
delete iter;
}
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = NewIterator(ro);
auto data_iter = true_data.rbegin();
int entries_right = 0;
std::string seek_key;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
// Verify key/value of current position
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
bool restore_position_with_seek = rnd.Uniform(2);
if (restore_position_with_seek) {
seek_key = iter->key().ToString();
}
// Do some Next() operations the restore the iterator to orignal position
int next_count =
entries_right > 0 ? rnd.Uniform(std::min(entries_right, 10)) : 0;
for (int i = 0; i < next_count; i++) {
iter->Next();
data_iter--;
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
if (restore_position_with_seek) {
// Restore orignal position using Seek()
iter->Seek(seek_key);
for (int i = 0; i < next_count; i++) {
data_iter++;
}
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
} else {
// Restore original position using Prev()
for (int i = 0; i < next_count; i++) {
iter->Prev();
data_iter++;
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
}
entries_right++;
data_iter++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(data_iter, true_data.rend());
delete iter;
}
}
TEST_P(DBIteratorTest, IteratorWithLocalStatistics) {
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 1000; i++) {
// Key 10 bytes / Value 10 bytes
ASSERT_OK(Put(rnd.RandomString(10), rnd.RandomString(10)));
}
std::atomic<uint64_t> total_next(0);
std::atomic<uint64_t> total_next_found(0);
std::atomic<uint64_t> total_prev(0);
std::atomic<uint64_t> total_prev_found(0);
std::atomic<uint64_t> total_bytes(0);
std::vector<port::Thread> threads;
std::function<void()> reader_func_next = [&]() {
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
Iterator* iter = NewIterator(ReadOptions());
iter->SeekToFirst();
// Seek will bump ITER_BYTES_READ
uint64_t bytes = 0;
bytes += iter->key().size();
bytes += iter->value().size();
while (true) {
iter->Next();
total_next++;
if (!iter->Valid()) {
EXPECT_OK(iter->status());
break;
}
total_next_found++;
bytes += iter->key().size();
bytes += iter->value().size();
}
delete iter;
ASSERT_EQ(bytes, get_perf_context()->iter_read_bytes);
SetPerfLevel(kDisable);
total_bytes += bytes;
};
std::function<void()> reader_func_prev = [&]() {
SetPerfLevel(kEnableCount);
Iterator* iter = NewIterator(ReadOptions());
iter->SeekToLast();
// Seek will bump ITER_BYTES_READ
uint64_t bytes = 0;
bytes += iter->key().size();
bytes += iter->value().size();
while (true) {
iter->Prev();
total_prev++;
if (!iter->Valid()) {
EXPECT_OK(iter->status());
break;
}
total_prev_found++;
bytes += iter->key().size();
bytes += iter->value().size();
}
delete iter;
ASSERT_EQ(bytes, get_perf_context()->iter_read_bytes);
SetPerfLevel(kDisable);
total_bytes += bytes;
};
for (int i = 0; i < 10; i++) {
threads.emplace_back(reader_func_next);
}
for (int i = 0; i < 15; i++) {
threads.emplace_back(reader_func_prev);
}
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT), (uint64_t)total_next);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT_FOUND),
(uint64_t)total_next_found);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV), (uint64_t)total_prev);
ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV_FOUND),
(uint64_t)total_prev_found);
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ),
(uint64_t)total_bytes);
}
TEST_P(DBIteratorTest, ReadAhead) {
Options options;
env_->count_random_reads_ = true;
options.env = env_;
options.disable_auto_compactions = true;
options.write_buffer_size = 4 << 20;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
table_options.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
std::string value(1024, 'a');
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
MoveFilesToLevel(2);
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), value));
}
ASSERT_OK(Flush());
ASSERT_EQ("1,1,1", FilesPerLevel());
env_->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
ReadOptions read_options;
auto* iter = NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
size_t bytes_read = env_->random_read_bytes_counter_;
delete iter;
env_->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
read_options.readahead_size = 1024 * 10;
iter = NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
size_t bytes_read_readahead = env_->random_read_bytes_counter_;
delete iter;
ASSERT_EQ(num_file_opens, num_file_opens_readahead);
ASSERT_GT(bytes_read_readahead, bytes_read);
ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
// Verify correctness.
iter = NewIterator(read_options);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(value, iter->value());
count++;
}
ASSERT_EQ(100, count);
for (int i = 0; i < 100; i++) {
iter->Seek(Key(i));
ASSERT_EQ(value, iter->value());
}
delete iter;
}
// Insert a key, create a snapshot iterator, overwrite key lots of times,
// seek to a smaller key. Expect DBIter to fall back to a seek instead of
// going through all the overwrites linearly.
TEST_P(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.max_sequential_skip_in_iterations = 3;
options.prefix_extractor = nullptr;
options.write_buffer_size = 1 << 27; // big enough to avoid flush
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
// Insert.
ASSERT_OK(Put("b", "0"));
// Create iterator.
ReadOptions ro;
std::unique_ptr<Iterator> iter(NewIterator(ro));
// Insert a lot.
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put("b", std::to_string(i + 1).c_str()));
}
// Check that memtable wasn't flushed.
std::string val;
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &val));
EXPECT_EQ("0", val);
// Seek iterator to a smaller key.
get_perf_context()->Reset();
iter->Seek("a");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b", iter->key().ToString());
EXPECT_EQ("0", iter->value().ToString());
// Check that the seek didn't do too much work.
// Checks are not tight, just make sure that everything is well below 100.
EXPECT_LT(get_perf_context()->internal_key_skipped_count, 4);
EXPECT_LT(get_perf_context()->internal_recent_skipped_count, 8);
EXPECT_LT(get_perf_context()->seek_on_memtable_count, 10);
EXPECT_LT(get_perf_context()->next_on_memtable_count, 10);
EXPECT_LT(get_perf_context()->prev_on_memtable_count, 10);
// Check that iterator did something like what we expect.
EXPECT_EQ(get_perf_context()->internal_delete_skipped_count, 0);
EXPECT_EQ(get_perf_context()->internal_merge_count, 0);
EXPECT_GE(get_perf_context()->internal_recent_skipped_count, 2);
EXPECT_GE(get_perf_context()->seek_on_memtable_count, 2);
EXPECT_EQ(1,
options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
}
TEST_P(DBIteratorTest, Refresh) {
ASSERT_OK(Put("x", "y"));
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
ASSERT_OK(iter->status());
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(Put("c", "d"));
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_OK(iter->Refresh());
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("c")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("m", "n"));
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("c")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_OK(iter->Refresh());
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("c")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("m")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset();
}
TEST_P(DBIteratorTest, RefreshWithSnapshot) {
// L1 file, uses LevelIterator internally
ASSERT_OK(Put(Key(0), "val0"));
ASSERT_OK(Put(Key(5), "val5"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
// L0 file, uses table iterator internally
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(4), "val4"));
ASSERT_OK(Flush());
// Memtable
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(3), "val3"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(Put(Key(2), "new val"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(7)));
const Snapshot* snapshot2 = db_->GetSnapshot();
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ReadOptions options;
options.snapshot = snapshot;
Iterator* iter = NewIterator(options);
ASSERT_OK(Put(Key(6), "val6"));
ASSERT_OK(iter->status());
auto verify_iter = [&](int start, int end, bool new_key2 = false) {
for (int i = start; i < end; ++i) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
if (i == 2 && new_key2) {
ASSERT_EQ(iter->value(), "new val");
} else {
ASSERT_EQ(iter->value(), "val" + std::to_string(i));
}
iter->Next();
}
};
for (int j = 0; j < 2; j++) {
iter->Seek(Key(1));
verify_iter(1, 3);
// Refresh to same snapshot
ASSERT_OK(iter->Refresh(snapshot));
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
// Refresh to a newer snapshot
ASSERT_OK(iter->Refresh(snapshot2));
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
iter->SeekToFirst();
verify_iter(0, 4, /*new_key2=*/true);
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
// Refresh to an older snapshot
ASSERT_OK(iter->Refresh(snapshot));
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
// Refresh to no snapshot
ASSERT_OK(iter->Refresh());
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
iter->Seek(Key(2));
verify_iter(2, 4, /*new_key2=*/true);
verify_iter(6, 7);
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
// Change LSM shape, new SuperVersion is created.
ASSERT_OK(Flush());
// Refresh back to original snapshot
ASSERT_OK(iter->Refresh(snapshot));
}
delete iter;
db_->ReleaseSnapshot(snapshot);
db_->ReleaseSnapshot(snapshot2);
ASSERT_OK(db_->Close());
}
TEST_P(DBIteratorTest, AutoRefreshIterator) {
constexpr int kNumKeys = 1000;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
for (const DBIter::Direction direction :
{DBIter::kForward, DBIter::kReverse}) {
for (const bool auto_refresh_enabled : {false, true}) {
for (const bool explicit_snapshot : {false, true}) {
DestroyAndReopen(options);
// Multi dimensional iterator:
//
// L0 (level iterator): [key000000]
// L1 (table iterator): [key000001]
// Memtable : [key000000, key000999]
for (int i = 0; i < kNumKeys + 2; i++) {
ASSERT_OK(Put(Key(i % kNumKeys), "val" + std::to_string(i)));
if (i <= 1) {
ASSERT_OK(Flush());
}
if (i == 0) {
MoveFilesToLevel(1);
}
}
ReadOptions read_options;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (explicit_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
}
read_options.snapshot =
explicit_snapshot ? snapshot->snapshot() : nullptr;
read_options.auto_refresh_iterator_with_snapshot = auto_refresh_enabled;
std::unique_ptr<Iterator> iter(NewIterator(read_options));
int trigger_compact_on_it = kNumKeys / 2;
// This update should NOT be visible from the iterator.
ASSERT_OK(Put(Key(trigger_compact_on_it + 1), "new val"));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
uint64_t all_memtables_size_before_refresh;
uint64_t all_memtables_size_after_refresh;
std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
&prop_value));
int superversion_number = std::stoi(prop_value);
std::vector<LiveFileMetaData> old_files;
db_->GetLiveFilesMetaData(&old_files);
int expected_next_key_int;
if (direction == DBIter::kForward) {
expected_next_key_int = 0;
iter->SeekToFirst();
} else { // DBIter::kReverse
expected_next_key_int = kNumKeys - 1;
iter->SeekToLast();
}
int it_num = 0;
std::unordered_map<std::string, std::string> kvs;
while (iter->Valid()) {
ASSERT_OK(iter->status());
it_num++;
if (it_num == trigger_compact_on_it) {
// Bump the superversion by manually scheduling flush + compaction.
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr,
nullptr));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
// For accuracy, capture the memtables size right before consecutive
// iterator call to Next() will update its' stale superversion ref.
dbfull()->GetIntProperty("rocksdb.size-all-mem-tables",
&all_memtables_size_before_refresh);
}
if (it_num == trigger_compact_on_it + 1) {
dbfull()->GetIntProperty("rocksdb.size-all-mem-tables",
&all_memtables_size_after_refresh);
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
&prop_value));
uint64_t new_superversion_number = std::stoi(prop_value);
Status expected_status_for_preexisting_files;
if (auto_refresh_enabled && explicit_snapshot) {
// Iterator is expected to detect its' superversion staleness.
ASSERT_LT(superversion_number, new_superversion_number);
// ... and since our iterator was the only reference to that very
// superversion, we expect most of the active memory to be
// returned upon automatical iterator refresh.
ASSERT_GT(all_memtables_size_before_refresh,
all_memtables_size_after_refresh);
expected_status_for_preexisting_files = Status::NotFound();
} else {
ASSERT_EQ(superversion_number, new_superversion_number);
ASSERT_EQ(all_memtables_size_after_refresh,
all_memtables_size_before_refresh);
expected_status_for_preexisting_files = Status::OK();
}
for (const auto& file : old_files) {
ASSERT_EQ(env_->FileExists(file.db_path + "/" + file.name),
expected_status_for_preexisting_files);
}
}
// Ensure we're visiting the keys in desired order and at most once!
ASSERT_EQ(IdFromKey(iter->key().ToString()), expected_next_key_int);
kvs[iter->key().ToString()] = iter->value().ToString();
if (direction == DBIter::kForward) {
iter->Next();
expected_next_key_int++;
} else {
iter->Prev();
expected_next_key_int--;
}
}
ASSERT_OK(iter->status());
// Data validation.
ASSERT_EQ(kvs.size(), kNumKeys);
for (int i = 0; i < kNumKeys; i++) {
auto kv = kvs.find(Key(i));
ASSERT_TRUE(kv != kvs.end());
int val = i;
if (i <= 1) {
val += kNumKeys;
}
ASSERT_EQ(kv->second, "val" + std::to_string(val));
}
}
}
}
}
TEST_P(DBIteratorTest, CreationFailure) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::NewInternalIterator:StatusCallback", [](void* arg) {
*(static_cast<Status*>(arg)) = Status::Corruption("test status");
});
SyncPoint::GetInstance()->EnableProcessing();
Iterator* iter = NewIterator(ReadOptions());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
}
TEST_P(DBIteratorTest, UpperBoundWithChangeDirection) {
Options options = CurrentOptions();
options.max_sequential_skip_in_iterations = 3;
DestroyAndReopen(options);
// write a bunch of kvs to the database.
ASSERT_OK(Put("a", "1"));
ASSERT_OK(Put("y", "1"));
ASSERT_OK(Put("y1", "1"));
ASSERT_OK(Put("y2", "1"));
ASSERT_OK(Put("y3", "1"));
ASSERT_OK(Put("z", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("a", "1"));
ASSERT_OK(Put("z", "1"));
ASSERT_OK(Put("bar", "1"));
ASSERT_OK(Put("foo", "1"));
std::string upper_bound = "x";
Slice ub_slice(upper_bound);
ReadOptions ro;
ro.iterate_upper_bound = &ub_slice;
ro.max_skippable_internal_keys = 1000;
Iterator* iter = NewIterator(ro);
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("bar", iter->key().ToString());
delete iter;
}
TEST_P(DBIteratorTest, TableFilter) {
ASSERT_OK(Put("a", "1"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("b", "2"));
ASSERT_OK(Put("c", "3"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Put("d", "4"));
ASSERT_OK(Put("e", "5"));
ASSERT_OK(Put("f", "6"));
EXPECT_OK(dbfull()->Flush(FlushOptions()));
// Ensure the table_filter callback is called once for each table.
{
std::set<uint64_t> unseen{1, 2, 3};
ReadOptions opts;
opts.table_filter = [&](const TableProperties& props) {
auto it = unseen.find(props.num_entries);
if (it == unseen.end()) {
ADD_FAILURE() << "saw table properties with an unexpected "
<< props.num_entries << " entries";
} else {
unseen.erase(it);
}
return true;
};
auto iter = NewIterator(opts);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->1");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->2");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->3");
iter->Next();
ASSERT_EQ(IterStatus(iter), "d->4");
iter->Next();
ASSERT_EQ(IterStatus(iter), "e->5");
iter->Next();
ASSERT_EQ(IterStatus(iter), "f->6");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_TRUE(unseen.empty());
delete iter;
}
// Ensure returning false in the table_filter hides the keys from that table
// during iteration.
{
ReadOptions opts;
opts.table_filter = [](const TableProperties& props) {
return props.num_entries != 2;
};
auto iter = NewIterator(opts);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->1");
iter->Next();
ASSERT_EQ(IterStatus(iter), "d->4");
iter->Next();
ASSERT_EQ(IterStatus(iter), "e->5");
iter->Next();
ASSERT_EQ(IterStatus(iter), "f->6");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
TEST_P(DBIteratorTest, UpperBoundWithPrevReseek) {
Options options = CurrentOptions();
options.max_sequential_skip_in_iterations = 3;
DestroyAndReopen(options);
// write a bunch of kvs to the database.
ASSERT_OK(Put("a", "1"));
ASSERT_OK(Put("y", "1"));
ASSERT_OK(Put("z", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("a", "1"));
ASSERT_OK(Put("z", "1"));
ASSERT_OK(Put("bar", "1"));
ASSERT_OK(Put("foo", "1"));
ASSERT_OK(Put("foo", "2"));
ASSERT_OK(Put("foo", "3"));
ASSERT_OK(Put("foo", "4"));
ASSERT_OK(Put("foo", "5"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(Put("foo", "6"));
std::string upper_bound = "x";
Slice ub_slice(upper_bound);
ReadOptions ro;
ro.snapshot = snapshot;
ro.iterate_upper_bound = &ub_slice;
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("goo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar", iter->key().ToString());
delete iter;
db_->ReleaseSnapshot(snapshot);
}
TEST_P(DBIteratorTest, SkipStatistics) {
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
int skip_count = 0;
// write a bunch of kvs to the database.
ASSERT_OK(Put("a", "1"));
ASSERT_OK(Put("b", "1"));
ASSERT_OK(Put("c", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("d", "1"));
ASSERT_OK(Put("e", "1"));
ASSERT_OK(Put("f", "1"));
ASSERT_OK(Put("a", "2"));
ASSERT_OK(Put("b", "2"));
ASSERT_OK(Flush());
ASSERT_OK(Delete("d"));
ASSERT_OK(Delete("e"));
ASSERT_OK(Delete("f"));
Iterator* iter = NewIterator(ReadOptions());
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_EQ(count, 3);
delete iter;
skip_count += 8; // 3 deletes + 3 original keys + 2 lower in sequence
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
iter = NewIterator(ReadOptions());
count = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, 3);
delete iter;
skip_count += 8; // Same as above, but in reverse order
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
ASSERT_OK(Put("aa", "1"));
ASSERT_OK(Put("ab", "1"));
ASSERT_OK(Put("ac", "1"));
ASSERT_OK(Put("ad", "1"));
ASSERT_OK(Flush());
ASSERT_OK(Delete("ab"));
ASSERT_OK(Delete("ac"));
ASSERT_OK(Delete("ad"));
ReadOptions ro;
Slice prefix("b");
ro.iterate_upper_bound = &prefix;
iter = NewIterator(ro);
count = 0;
for (iter->Seek("aa"); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_EQ(count, 1);
delete iter;
skip_count += 6; // 3 deletes + 3 original keys
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
iter = NewIterator(ro);
count = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(count, 2);
delete iter;
// 3 deletes + 3 original keys + lower sequence of "a"
skip_count += 7;
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
}
TEST_P(DBIteratorTest, SeekAfterHittingManyInternalKeys) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ReadOptions ropts;
ropts.max_skippable_internal_keys = 2;
ASSERT_OK(Put("1", "val_1"));
// Add more tombstones than max_skippable_internal_keys so that Next() fails.
ASSERT_OK(Delete("2"));
ASSERT_OK(Delete("3"));
ASSERT_OK(Delete("4"));
ASSERT_OK(Delete("5"));
ASSERT_OK(Put("6", "val_6"));
std::unique_ptr<Iterator> iter(NewIterator(ropts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "1");
ASSERT_EQ(iter->value().ToString(), "val_1");
// This should fail as incomplete due to too many non-visible internal keys on
// the way to the next valid user key.
iter->Next();
ASSERT_TRUE(!iter->Valid());
ASSERT_TRUE(iter->status().IsIncomplete());
// Get the internal key at which Next() failed.
std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.internal-key", &prop_value));
ASSERT_EQ("4", prop_value);
// Create a new iterator to seek to the internal key.
std::unique_ptr<Iterator> iter2(NewIterator(ropts));
iter2->Seek(prop_value);
ASSERT_TRUE(iter2->Valid());
ASSERT_OK(iter2->status());
ASSERT_EQ(iter2->key().ToString(), "6");
ASSERT_EQ(iter2->value().ToString(), "val_6");
}
// Reproduces a former bug where iterator would skip some records when DBIter
// re-seeks subiterator with Incomplete status.
TEST_P(DBIteratorTest, NonBlockingIterationBugRepro) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
// Make sure the sst file has more than one block.
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
// Two records in sst file, each in its own block.
ASSERT_OK(Put("b", ""));
ASSERT_OK(Put("d", ""));
ASSERT_OK(Flush());
// Create a nonblocking iterator before writing to memtable.
ReadOptions ropt;
ropt.read_tier = kBlockCacheTier;
std::unique_ptr<Iterator> iter(NewIterator(ropt));
// Overwrite a key in memtable many times to hit
// max_sequential_skip_in_iterations (which is 8 by default).
for (int i = 0; i < 20; ++i) {
ASSERT_OK(Put("c", ""));
}
// Load the second block in sst file into the block cache.
{
std::unique_ptr<Iterator> iter2(NewIterator(ReadOptions()));
iter2->Seek("d");
}
// Finally seek the nonblocking iterator.
iter->Seek("a");
// With the bug, the status used to be OK, and the iterator used to point to
// "d".
EXPECT_TRUE(iter->status().IsIncomplete());
}
TEST_P(DBIteratorTest, SeekBackwardAfterOutOfUpperBound) {
ASSERT_OK(Put("a", ""));
ASSERT_OK(Put("b", ""));
ASSERT_OK(Flush());
ReadOptions ropt;
Slice ub = "b";
ropt.iterate_upper_bound = &ub;
std::unique_ptr<Iterator> it(dbfull()->NewIterator(ropt));
it->SeekForPrev("a");
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ("a", it->key().ToString());
it->Next();
ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status());
it->SeekForPrev("a");
ASSERT_OK(it->status());
ASSERT_TRUE(it->Valid());
ASSERT_EQ("a", it->key().ToString());
}
TEST_P(DBIteratorTest, AvoidReseekLevelIterator) {
Options options = CurrentOptions();
options.compression = CompressionType::kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 800;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
std::string random_str = rnd.RandomString(180);
ASSERT_OK(Put("1", random_str));
ASSERT_OK(Put("2", random_str));
ASSERT_OK(Put("3", random_str));
ASSERT_OK(Put("4", random_str));
// A new block
ASSERT_OK(Put("5", random_str));
ASSERT_OK(Put("6", random_str));
ASSERT_OK(Put("7", random_str));
ASSERT_OK(Flush());
ASSERT_OK(Put("8", random_str));
ASSERT_OK(Put("9", random_str));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
int num_find_file_in_level = 0;
int num_idx_blk_seek = 0;
SyncPoint::GetInstance()->SetCallBack(
"LevelIterator::Seek:BeforeFindFile",
[&](void* /*arg*/) { num_find_file_in_level++; });
SyncPoint::GetInstance()->SetCallBack(
"IndexBlockIter::Seek:0", [&](void* /*arg*/) { num_idx_blk_seek++; });
SyncPoint::GetInstance()->EnableProcessing();
{
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
iter->Seek("1");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(1, num_idx_blk_seek);
iter->Seek("2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(1, num_idx_blk_seek);
iter->Seek("3");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(1, num_idx_blk_seek);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(1, num_idx_blk_seek);
iter->Seek("5");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(2, num_idx_blk_seek);
iter->Seek("6");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(2, num_idx_blk_seek);
iter->Seek("7");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(1, num_find_file_in_level);
ASSERT_EQ(3, num_idx_blk_seek);
iter->Seek("8");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(2, num_find_file_in_level);
// Still re-seek because "8" is the boundary key, which has
// the same user key as the seek key.
ASSERT_EQ(4, num_idx_blk_seek);
iter->Seek("5");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_find_file_in_level);
ASSERT_EQ(5, num_idx_blk_seek);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_find_file_in_level);
ASSERT_EQ(5, num_idx_blk_seek);
// Seek backward never triggers the index block seek to be skipped
iter->Seek("5");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(3, num_find_file_in_level);
ASSERT_EQ(6, num_idx_blk_seek);
}
SyncPoint::GetInstance()->DisableProcessing();
}
// MyRocks may change iterate bounds before seek. Simply test to make sure such
// usage doesn't break iterator.
TEST_P(DBIteratorTest, IterateBoundChangedBeforeSeek) {
Options options = CurrentOptions();
options.compression = CompressionType::kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 100;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
std::string value(50, 'v');
Reopen(options);
ASSERT_OK(Put("aaa", value));
ASSERT_OK(Flush());
ASSERT_OK(Put("bbb", "v"));
ASSERT_OK(Put("ccc", "v"));
ASSERT_OK(Put("ddd", "v"));
ASSERT_OK(Flush());
ASSERT_OK(Put("eee", "v"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::string ub1 = "e";
std::string ub2 = "c";
Slice ub(ub1);
ReadOptions read_opts1;
read_opts1.iterate_upper_bound = &ub;
Iterator* iter = NewIterator(read_opts1);
// Seek and iterate accross block boundary.
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("bbb", iter->key());
ub = Slice(ub2);
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("bbb", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
std::string lb1 = "a";
std::string lb2 = "c";
Slice lb(lb1);
ReadOptions read_opts2;
read_opts2.iterate_lower_bound = &lb;
iter = NewIterator(read_opts2);
iter->SeekForPrev("d");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("ccc", iter->key());
lb = Slice(lb2);
iter->SeekForPrev("d");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("ccc", iter->key());
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
TEST_P(DBIteratorTest, IterateWithLowerBoundAcrossFileBoundary) {
ASSERT_OK(Put("aaa", "v"));
ASSERT_OK(Put("bbb", "v"));
ASSERT_OK(Flush());
ASSERT_OK(Put("ccc", "v"));
ASSERT_OK(Put("ddd", "v"));
ASSERT_OK(Flush());
// Move both files to bottom level.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Slice lower_bound("b");
ReadOptions read_opts;
read_opts.iterate_lower_bound = &lower_bound;
std::unique_ptr<Iterator> iter(NewIterator(read_opts));
iter->SeekForPrev("d");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("ccc", iter->key());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("bbb", iter->key());
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
TEST_P(DBIteratorTest, Blob) {
Options options = CurrentOptions();
options.enable_blob_files = true;
options.max_sequential_skip_in_iterations = 2;
options.statistics = CreateDBStatistics();
Reopen(options);
// Note: we have 4 KVs (3 of which are hidden) for key "b" and
// max_sequential_skip_in_iterations is set to 2. Thus, we need to do a reseek
// anytime we move from "b" to "c" or vice versa.
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb0"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Flush());
std::unique_ptr<Iterator> iter_guard(NewIterator(ReadOptions()));
Iterator* const iter = iter_guard.get();
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("a");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->SeekForPrev("d");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("c");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("bx");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Seek("b");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Seek("z");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("b");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->SeekForPrev("");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
ASSERT_EQ(IterStatus(iter), "(invalid)");
// Switch from reverse to forward
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 6);
ASSERT_EQ(IterStatus(iter), "b->vb3");
// Switch from forward to reverse
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 6);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 6);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 7);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 8);
ASSERT_EQ(IterStatus(iter), "b->vb3");
}
INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest,
testing::Values(true, false));
// Tests how DBIter work with ReadCallback
class DBIteratorWithReadCallbackTest : public DBIteratorTest {};
TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
class TestReadCallback : public ReadCallback {
public:
explicit TestReadCallback(SequenceNumber _max_visible_seq)
: ReadCallback(_max_visible_seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= max_visible_seq_;
}
};
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("foo", "v2"));
ASSERT_OK(Put("foo", "v3"));
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("z", "vz"));
SequenceNumber seq1 = db_->GetLatestSequenceNumber();
TestReadCallback callback1(seq1);
ASSERT_OK(Put("foo", "v4"));
ASSERT_OK(Put("foo", "v5"));
ASSERT_OK(Put("bar", "v7"));
SequenceNumber seq2 = db_->GetLatestSequenceNumber();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
db_->DefaultColumnFamily());
auto* cfd = cfh->cfd();
// The iterator are suppose to see data before seq1.
DBImpl* db_impl = dbfull();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version,
seq2, &callback1);
// Seek
// The latest value of "foo" before seq1 is "v3"
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v3", iter->value());
// "bar" is not visible to the iterator. It will move on to the next key
// "foo".
iter->Seek("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v3", iter->value());
// Next
// Seek to "a"
iter->Seek("a");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("va", iter->value());
// "bar" is not visible to the iterator. It will move on to the next key
// "foo".
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v3", iter->value());
// Prev
// Seek to "z"
iter->Seek("z");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("vz", iter->value());
// The previous key is "foo", which is visible to the iterator.
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v3", iter->value());
// "bar" is not visible to the iterator. It will move on to the next key "a".
iter->Prev(); // skipping "bar"
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("a", iter->key());
ASSERT_EQ("va", iter->value());
// SeekForPrev
// The previous key is "foo", which is visible to the iterator.
iter->SeekForPrev("y");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v3", iter->value());
// "bar" is not visible to the iterator. It will move on to the next key "a".
iter->SeekForPrev("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("a", iter->key());
ASSERT_EQ("va", iter->value());
delete iter;
// Prev beyond max_sequential_skip_in_iterations
uint64_t num_versions =
CurrentOptions().max_sequential_skip_in_iterations + 10;
for (uint64_t i = 0; i < num_versions; i++) {
ASSERT_OK(Put("bar", std::to_string(i)));
}
SequenceNumber seq3 = db_->GetLatestSequenceNumber();
TestReadCallback callback2(seq3);
ASSERT_OK(Put("bar", "v8"));
SequenceNumber seq4 = db_->GetLatestSequenceNumber();
// The iterator is suppose to see data before seq3.
super_version = cfd->GetReferencedSuperVersion(db_impl);
iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version, seq4,
&callback2);
// Seek to "z", which is visible.
iter->Seek("z");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("vz", iter->value());
// Previous key is "foo" and the last value "v5" is visible.
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("foo", iter->key());
ASSERT_EQ("v5", iter->value());
// Since the number of values of "bar" is more than
// max_sequential_skip_in_iterations, Prev() will ultimately fallback to
// seek in forward direction. Here we test the fallback seek is correct.
// The last visible value should be (num_versions - 1), as "v8" is not
// visible.
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ("bar", iter->key());
ASSERT_EQ(std::to_string(num_versions - 1), iter->value());
delete iter;
}
TEST_F(DBIteratorTest, BackwardIterationOnInplaceUpdateMemtable) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = false;
options.env = env_;
DestroyAndReopen(options);
constexpr int kNumKeys = 10;
// Write kNumKeys to WAL.
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ReadOptions read_opts;
read_opts.total_order_seek = true;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
int count = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
++count;
}
ASSERT_OK(iter->status());
ASSERT_EQ(kNumKeys, count);
}
// Reopen and rebuild the memtable from WAL.
options.create_if_missing = false;
options.avoid_flush_during_recovery = true;
options.inplace_update_support = true;
options.allow_concurrent_memtable_write = false;
Reopen(options);
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->SeekToLast();
// Backward iteration not supported due to inplace_update_support = true.
ASSERT_TRUE(iter->status().IsNotSupported());
ASSERT_FALSE(iter->Valid());
}
}
TEST_F(DBIteratorTest, IteratorRefreshReturnSV) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
SyncPoint::GetInstance()->SetCallBack(
"ArenaWrappedDBIter::Refresh:SV", [&](void*) {
ASSERT_OK(db_->Put(WriteOptions(), "dummy", "new SV"));
// This makes the local SV obselete.
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(iter->Refresh());
iter.reset();
// iter used to not cleanup SV, so the Close() below would hit an assertion
// error.
Close();
}
TEST_F(DBIteratorTest, ErrorWhenReadFile) {
// This is to test a bug that is fixed in
// https://github.com/facebook/rocksdb/pull/11782.
//
// Ingest error when reading from a file, and
// see if Iterator handles it correctly.
Options opts = CurrentOptions();
opts.num_levels = 7;
opts.compression = kNoCompression;
BlockBasedTableOptions bbto;
// Always do I/O
bbto.no_block_cache = true;
opts.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(opts);
// Set up LSM
// L5: F1 [key0, key99], F2 [key100, key199]
// L6: F3 [key50, key149]
Random rnd(301);
const int kValLen = 100;
for (int i = 50; i < 150; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(6);
std::vector<std::string> values;
for (int i = 0; i < 100; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
for (int i = 100; i < 200; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_EQ(2, NumTableFilesAtLevel(5));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
std::vector<LiveFileMetaData> files;
db_->GetLiveFilesMetaData(&files);
// Get file names for F1, F2 and F3.
// These are file names, not full paths.
std::string f1, f2, f3;
for (auto& file_meta : files) {
if (file_meta.level == 6) {
f3 = file_meta.name;
} else {
if (file_meta.smallestkey == Key(0)) {
f1 = file_meta.name;
} else {
f2 = file_meta.name;
}
}
}
ASSERT_TRUE(!f1.empty());
ASSERT_TRUE(!f2.empty());
ASSERT_TRUE(!f3.empty());
std::string error_file;
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::Read::BeforeReturn",
[&error_file](void* io_s_ptr) {
auto p = static_cast<std::pair<std::string*, IOStatus*>*>(io_s_ptr);
if (p->first->find(error_file) != std::string::npos) {
*p->second = IOStatus::IOError();
p->second->SetRetryable(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
// Error reading F1
error_file = f1;
std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
iter->SeekToFirst();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// This does not require reading the first block.
iter->Seek(Key(90));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[90]);
// iter has ok status before this Seek.
iter->Seek(Key(1));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Error reading F2
error_file = f2;
iter.reset(db_->NewIterator(ReadOptions()));
iter->Seek(Key(99));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[99]);
// Need to read from F2.
iter->Next();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->Seek(Key(190));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[190]);
// Seek for first key of F2.
iter->Seek(Key(100));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->SeekToLast();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[199]);
// SeekForPrev for first key of F2.
iter->SeekForPrev(Key(100));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Does not read first block (offset 0).
iter->SeekForPrev(Key(98));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[98]);
// Error reading F3
error_file = f3;
iter.reset(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->Seek(Key(50));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->SeekForPrev(Key(50));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Does not read file 3
iter->Seek(Key(150));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[150]);
// Test when file read error occurs during Prev().
// This requires returning an error when reading near the end of a file
// instead of offset 0.
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::Read::AnyOffset", [&f1](void* pair_ptr) {
auto p = static_cast<std::pair<std::string*, IOStatus*>*>(pair_ptr);
if (p->first->find(f1) != std::string::npos) {
*p->second = IOStatus::IOError();
p->second->SetRetryable(true);
}
});
iter->SeekForPrev(Key(101));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[101]);
// DBIter will not stop at Key(100) since it needs
// to make sure the key it returns has the max sequence number for Key(100).
// So it will call MergingIterator::Prev() which will read F1.
iter->Prev();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
iter->Reset();
}
TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
// After SV is obtained for the first CF, flush for the second CF
ASSERT_OK(Flush(1));
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ReadOptions read_options;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val_new");
}
for (auto* iter : iters) {
delete iter;
}
// Thread-local SVs are no longer obsolete nor in use
for (int i = 0; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) {
Options options = GetDefaultOptions();
options.atomic_flush = true;
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
// After SV is obtained for the first CF, do the atomic flush()
ASSERT_OK(Flush());
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Explicit snapshot wouldn't force reloading all svs. We should expect old
// values
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions read_options;
read_options.snapshot = snapshot;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val");
}
db_->ReleaseSnapshot(snapshot);
for (auto* iter : iters) {
delete iter;
}
// Thread-local SV for cf_0 is obsolete (atomic flush happened after the first
// SV Ref)
auto* cfd0 =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd();
ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
// Rest are not InUse nor Obsolete
for (int i = 1; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
TEST_P(DBIteratorTest, MemtableOpsScanFlushTriggerWithSeek) {
// Tests that option memtable_op_scan_flush_trigger works when the limit
// is reached during a Seek() operation.
const int kTrigger = 10;
Random* r = Random::GetTLSInstance();
for (int trigger : {kTrigger, kTrigger + 1}) {
for (bool delete_only : {false, true}) {
Options options;
options.create_if_missing = true;
options.memtable_op_scan_flush_trigger = trigger;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
// Base data that will be covered by a consecutive sequence of tombstones.
int kNumKeys = delete_only ? kTrigger : kTrigger / 2;
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(100)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
if (delete_only) {
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(SingleDelete(Key(i)));
}
} else {
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(100)));
}
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Delete(Key(i)));
}
}
SetPerfLevel(PerfLevel::kEnableCount);
get_perf_context()->Reset();
ReadOptions ro;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
// Seek to the first key, this will scan through all the tombstones and
// hidden puts
iter->Seek(Key(0));
ASSERT_FALSE(
iter->Valid()); // All keys are deleted, so iterator is not valid
ASSERT_OK(iter->status());
ASSERT_EQ(get_perf_context()->next_on_memtable_count, kTrigger);
// Skipping kNumTrigger memtable entries in a single iterator operation
// should mark the memtable for flush.
//
// At the end of a write, we check and update memtable to request a flush
ASSERT_OK(Put(Key(11), "val"));
// Before a write, we schedule memtables for flush if requested.
ASSERT_OK(Put(Key(12), "val"));
ASSERT_OK(db_->WaitForCompact({}));
if (trigger <= kTrigger) {
// Check if memtable was flushed due to scan trigger
ASSERT_EQ(1, NumTableFilesAtLevel(0));
uint64_t val = 0;
ASSERT_TRUE(
db_->GetIntProperty("rocksdb.num-deletes-active-mem-table", &val));
ASSERT_EQ(0, val);
} else {
ASSERT_EQ(0, NumTableFilesAtLevel(0));
uint64_t val = 0;
ASSERT_TRUE(
db_->GetIntProperty("rocksdb.num-deletes-active-mem-table", &val));
ASSERT_EQ(kNumKeys, val);
}
}
}
}
TEST_P(DBIteratorTest, MemtableOpsScanFlushTriggerWithNext) {
// Tests that option memtable_op_scan_flush_trigger works when the limit
// is reached during a Next() operation, and not trigger a flush when
// the limit is reached across multiple Next() operations.
const int kTrigger = 10;
Random* r = Random::GetTLSInstance();
for (int trigger : {kTrigger, kTrigger + 1}) {
for (bool delete_only : {false, true}) {
Options options;
options.create_if_missing = true;
options.memtable_op_scan_flush_trigger = trigger;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
// Base data that will be covered by a consecutive sequence of tombstones.
int kNumKeys = delete_only ? kTrigger : kTrigger / 2;
for (int i = 0; i <= kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(100)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
ASSERT_OK(Put(Key(0), "val"));
if (delete_only) {
for (int i = 1; i <= kNumKeys; ++i) {
ASSERT_OK(SingleDelete(Key(i)));
}
} else {
for (int i = 1; i <= kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(100)));
}
for (int i = 1; i <= kNumKeys; ++i) {
ASSERT_OK(Delete(Key(i)));
}
}
// Total number of tombstones and hidden puts scanned across multiple
// Next() operations below will be kTrigger, and it should not trigger a
// flush when the limit is kTrigger + 1.
ASSERT_OK(Put(Key(kNumKeys + 1), "v1"));
ASSERT_OK(Delete(Key(kNumKeys + 2)));
ASSERT_OK(Put(Key(kNumKeys + 3), "v3"));
SetPerfLevel(PerfLevel::kEnableCount);
get_perf_context()->Reset();
ReadOptions ro;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek(Key(0));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), "val");
ASSERT_OK(iter->status());
ASSERT_EQ(get_perf_context()->next_on_memtable_count, 0);
iter->Next();
// kTrigger tombstones and invisible puts and 1 for the visible put
ASSERT_EQ(get_perf_context()->next_on_memtable_count, kTrigger + 1);
iter->Next();
ASSERT_EQ(get_perf_context()->next_on_memtable_count, kTrigger + 3);
// Skipping kNumTrigger memtable entries in a single iterator operation
// should mark the memtable for flush.
//
// At the end of a write, we check and update memtable to request a flush
ASSERT_OK(Put(Key(11), "val"));
// Before a write, we schedule memtables for flush if requested.
ASSERT_OK(Put(Key(12), "val"));
ASSERT_OK(db_->WaitForCompact({}));
if (trigger <= kTrigger) {
// Check if memtable was flushed due to scan trigger
ASSERT_EQ(1, NumTableFilesAtLevel(0));
uint64_t val = 0;
ASSERT_TRUE(
db_->GetIntProperty("rocksdb.num-deletes-active-mem-table", &val));
ASSERT_EQ(0, val);
} else {
uint64_t val = 0;
ASSERT_TRUE(
db_->GetIntProperty("rocksdb.num-deletes-active-mem-table", &val));
ASSERT_EQ(kNumKeys + 1, val);
}
}
}
}
TEST_P(DBIteratorTest, AverageMemtableOpsScanFlushTrigger) {
// Tests option memtable_avg_op_scan_flush_trigger with
// long tombstone sequences.
Random* r = Random::GetTLSInstance();
const int kAvgTrigger = 10;
const int kMaxTrigger = 500;
Options options;
options.create_if_missing = true;
options.memtable_op_scan_flush_trigger = kMaxTrigger;
options.memtable_avg_op_scan_flush_trigger = kAvgTrigger;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
const int kNumKeys = 1000;
// Base data that will be covered by a consecutive sequence of tombstones.
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(50)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
for (int i = 0; i < kNumKeys; ++i) {
// We issue slightly more deletions than kAvgTrigger between visible keys
// to ensure avg skipped entries exceed kAvgTrigger.
if (i % (kAvgTrigger + 2) != 0) {
ASSERT_OK(SingleDelete(Key(i)));
}
}
// Each operation, except the first Seek, is expected to see kAvgTrigger + 1
// tombstones (from the active memtable) before it finds the next visible key.
SetPerfLevel(PerfLevel::kEnableCount);
get_perf_context()->Reset();
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek(Key(1));
ASSERT_EQ(get_perf_context()->next_on_memtable_count, kAvgTrigger + 1);
iter.reset();
// Should not flush since total entries skipped is below
// memtable_op_scan_flush_trigger
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(db_->WaitForCompact({}));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
get_perf_context()->Reset();
iter.reset(db_->NewIterator(ReadOptions()));
int num_ops = 1;
uint64_t num_skipped = 0;
iter->Seek(Key(0));
ASSERT_EQ(iter->key(), Key(0));
uint64_t last_memtable_next_count =
get_perf_context()->next_on_memtable_count;
iter->Next();
num_ops++;
while (iter->Valid()) {
ASSERT_OK(iter->status());
uint64_t num_skipped_in_op =
get_perf_context()->next_on_memtable_count - last_memtable_next_count;
ASSERT_GE(num_skipped_in_op, kAvgTrigger + 1);
last_memtable_next_count = get_perf_context()->next_on_memtable_count;
num_skipped += num_skipped_in_op;
iter->Next();
num_ops++;
}
// During iterator destruction we mark memtable for flush
iter.reset();
// avg trigger
ASSERT_GE(num_skipped, kAvgTrigger * num_ops);
// memtable_op_scan_flush_trigger
ASSERT_GE(num_skipped, kMaxTrigger);
// Average hidden entries scanned from memtable per operation is more than
// kAvgTrigger and the total skipped is more than
// memtable_op_scan_flush_trigger, the current memtable should be marked for
// flush. The following two writes will trigger the flush.
ASSERT_OK(Put(Key(0), "dummy write"));
// Before a write, we schedule memtables for flush if requested.
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(db_->WaitForCompact({}));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_P(DBIteratorTest, AverageMemtableOpsScanFlushTriggerByOverwrites) {
// Tests option memtable_avg_op_scan_flush_trigger with overwrites to keys.
Random* r = Random::GetTLSInstance();
const int kAvgTrigger = 25;
Options options;
options.create_if_missing = true;
options.memtable_op_scan_flush_trigger = 250;
options.memtable_avg_op_scan_flush_trigger = kAvgTrigger;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
const int kNumKeys = 100;
// Base data that will be covered by a consecutive sequence of tombstones.
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(Put(Key(i), r->RandomString(50)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
// One visible key every 10 keys.
// Each non-visible user key has 3 non-visible entries in the active memtable.
for (int i = 0; i < kNumKeys; ++i) {
if (i % 10 != 0) {
ASSERT_OK(Put(Key(i), r->RandomString(50)));
ASSERT_OK(Put(Key(i), r->RandomString(50)));
ASSERT_OK(Delete(Key(i)));
}
}
SetPerfLevel(PerfLevel::kEnableCount);
get_perf_context()->Reset();
ReadOptions ro;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek(Key(1));
ASSERT_GT(get_perf_context()->next_on_memtable_count, kAvgTrigger);
// Re-seek to trigger check for flush trigger
iter->Seek(Key(1));
// Should not flush since total entries skipped is below
// memtable_op_scan_flush_trigger
ASSERT_FALSE(static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd()
->mem()
->IsMarkedForFlush());
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(db_->WaitForCompact({}));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
get_perf_context()->Reset();
int num_ops = 1;
iter->Seek(Key(1));
while (iter->Valid()) {
num_ops++;
iter->Next();
}
ASSERT_GT(get_perf_context()->next_on_memtable_count, num_ops * kAvgTrigger);
// Re-seek should check conditions for marking memtable for flush
iter->Seek(Key(80));
// Average hidden entries scanned from memtable per operation is 2.
ASSERT_OK(Put(Key(0), "dummy write"));
// Before a write, we schedule memtables for flush if requested.
ASSERT_OK(Put(Key(0), "dummy write"));
ASSERT_OK(db_->WaitForCompact({}));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
class DBMultiScanIteratorTest : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBMultiScanIteratorTest()
: DBTestBase("db_multi_scan_iterator_test", /*env_do_fsync=*/true) {}
};
// Param 0: ReadOptions::fill_cache
INSTANTIATE_TEST_CASE_P(DBMultiScanIteratorTest, DBMultiScanIteratorTest,
::testing::Bool());
TEST_P(DBMultiScanIteratorTest, BasicTest) {
auto options = CurrentOptions();
DestroyAndReopen(options);
// Create a file
for (int i = 0; i < 100; ++i) {
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << i;
ASSERT_OK(Put("k" + ss.str(), "val" + ss.str()));
}
ASSERT_OK(Flush());
std::vector<std::string> key_ranges({"k03", "k10", "k25", "k50"});
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int idx = 0;
int count = 0;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(it.first.ToString().compare(key_ranges[idx]), 0);
ASSERT_LT(it.first.ToString().compare(key_ranges[idx + 1]), 0);
count++;
}
idx += 2;
}
ASSERT_EQ(count, 32);
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, MixedBoundsTest) {
auto options = CurrentOptions();
DestroyAndReopen(options);
// Create a file
for (int i = 0; i < 100; ++i) {
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << i;
ASSERT_OK(Put("k" + ss.str(), "val" + ss.str()));
}
ASSERT_OK(Flush());
std::vector<std::string> key_ranges(
{"k03", "k10", "k25", "k50", "k75", "k90"});
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2]);
scan_options.insert(key_ranges[4], key_ranges[5]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int idx = 0;
int count = 0;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(
it.first.ToString().compare(
scan_options.GetScanRanges()[idx].range.start->ToString()),
0);
if (scan_options.GetScanRanges()[idx].range.limit) {
ASSERT_LT(
it.first.ToString().compare(
scan_options.GetScanRanges()[idx].range.limit->ToString()),
0);
}
count++;
}
idx++;
}
ASSERT_EQ(count, 97);
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
scan_options = MultiScanArgs(BytewiseComparator());
scan_options.insert(key_ranges[0]);
scan_options.insert(key_ranges[2], key_ranges[3]);
scan_options.insert(key_ranges[4]);
iter = dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int idx = 0;
int count = 0;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(
it.first.ToString().compare(
scan_options.GetScanRanges()[idx].range.start->ToString()),
0);
if (scan_options.GetScanRanges()[idx].range.limit) {
ASSERT_LT(
it.first.ToString().compare(
scan_options.GetScanRanges()[idx].range.limit->ToString()),
0);
}
count++;
}
idx++;
}
ASSERT_EQ(count, 147);
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, RangeAcrossFiles) {
auto options = CurrentOptions();
options.target_file_size_base = 100 << 10; // 20KB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
DestroyAndReopen(options);
auto rnd = Random::GetTLSInstance();
// Write ~200KB data
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(Key(i), rnd->RandomString(2 << 10)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(2, NumTableFilesAtLevel(49));
std::vector<std::string> key_ranges({Key(10), Key(90)});
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int i = 10;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_EQ(it.first.ToString(), Key(i));
++i;
}
}
ASSERT_EQ(i, 90);
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, FailureTest) {
auto options = CurrentOptions();
options.compression = kNoCompression;
DestroyAndReopen(options);
Random rnd(301);
// Create a file
for (int i = 0; i < 100; ++i) {
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << i;
ASSERT_OK(Put("k" + ss.str(), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
std::vector<std::string> key_ranges({"k04", "k06", "k12", "k14"});
ReadOptions ro;
Slice ub;
ro.iterate_upper_bound = &ub;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
scan_options.max_prefetch_size = 4500;
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ro, cfh));
ASSERT_NE(iter, nullptr);
iter->Prepare(scan_options);
int count = 0;
ub = key_ranges[1];
iter->Seek(key_ranges[0]);
while (iter->status().ok() && iter->Valid()) {
ASSERT_GE(iter->key().compare(key_ranges[0]), 0);
ASSERT_LT(iter->key().compare(key_ranges[1]), 0);
count++;
iter->Next();
}
ASSERT_OK(iter->status()) << iter->status().ToString();
ASSERT_EQ(count, 2);
// Second seek should hit the max_prefetch_size limit
ub = key_ranges[3];
iter->Seek(key_ranges[2]);
ASSERT_NOK(iter->status());
iter.reset();
// Test the case of unexpected Seek key
iter.reset(dbfull()->NewIterator(ro, cfh));
ASSERT_NE(iter, nullptr);
scan_options.max_prefetch_size = 0;
iter->Prepare(scan_options);
ub = key_ranges[3];
iter->Seek(key_ranges[2]);
ASSERT_NOK(iter->status());
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, OutOfL0FileRange) {
// Test that prepare does not fail scan when a scan range
// is outside of a L0 file's key range.
auto options = CurrentOptions();
options.compression = kNoCompression;
DestroyAndReopen(options);
Random rnd(301);
// Create a Lmax file
// key01 ~ key99
for (int i = 0; i < 100; ++i) {
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << i;
ASSERT_OK(Put("k" + ss.str(), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// Create a L0 file
// key00 ~ key09
for (int i = 0; i < 10; ++i) {
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << i;
ASSERT_OK(Put("k" + ss.str(), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// The second range is outside of L0 file's key range
std::vector<std::string> key_ranges({"k04", "k06", "k12", "k14"});
ReadOptions ro;
Slice ub;
ro.iterate_upper_bound = &ub;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ro, cfh));
ASSERT_NE(iter, nullptr);
iter->Prepare(scan_options);
int count = 0;
ub = key_ranges[1];
iter->Seek(key_ranges[0]);
while (iter->status().ok() && iter->Valid()) {
ASSERT_GE(iter->key().compare(key_ranges[0]), 0);
ASSERT_LT(iter->key().compare(key_ranges[1]), 0);
count++;
iter->Next();
}
ASSERT_OK(iter->status()) << iter->status().ToString();
ASSERT_EQ(count, 2);
ub = key_ranges[3];
count = 0;
iter->Seek(key_ranges[2]);
while (iter->status().ok() && iter->Valid()) {
ASSERT_GE(iter->key().compare(key_ranges[2]), 0);
ASSERT_LT(iter->key().compare(key_ranges[3]), 0);
count++;
iter->Next();
}
ASSERT_OK(iter->status()) << iter->status().ToString();
ASSERT_EQ(count, 2);
}
TEST_P(DBMultiScanIteratorTest, RangeBetweenFiles) {
auto options = CurrentOptions();
options.target_file_size_base = 100 << 10; // 20KB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
DestroyAndReopen(options);
auto rnd = Random::GetTLSInstance();
// Write ~200KB data
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(Key(i), rnd->RandomString(2 << 10)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_EQ(2, NumTableFilesAtLevel(49));
// Test with a scan range that overlaps an entire file, with upper bound
// between 2 files
std::vector<LiveFileMetaData> file_meta;
dbfull()->GetLiveFilesMetaData(&file_meta);
ASSERT_EQ(file_meta.size(), 2);
std::vector<std::string> key_ranges(4);
key_ranges[0] = file_meta[0].smallestkey;
key_ranges[1] = file_meta[0].largestkey + "0";
key_ranges[2] = file_meta[1].smallestkey + "0";
key_ranges[3] = file_meta[1].largestkey;
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(it.first.ToString(), key_ranges[0]);
}
}
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
// Test multiscan with a range entirely between adjacent files
key_ranges[0] = file_meta[0].largestkey + "0";
key_ranges[1] = file_meta[0].largestkey + "1";
key_ranges[2] = file_meta[1].smallestkey + "0";
key_ranges[3] = file_meta[1].largestkey;
(*scan_options).clear();
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
iter = dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(it.first.ToString(), key_ranges[0]);
}
}
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
}
// This test case tests multiscan in the presence of fragmented range
// tombstones in the LSM.
TEST_P(DBMultiScanIteratorTest, FragmentedRangeTombstones) {
auto options = CurrentOptions();
// Compaction may create files 2x the target_file_size_base,
// so set this to 50KB so we atleast end up with 2 files of
// 100KB
options.target_file_size_base = 50 << 10; // 50KB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
DestroyAndReopen(options);
// Setup the LSM as follows -
// 1. Ingest a file with 100 keys
// 2. Ingest a file with one overlapping key
// 3. Do a Put and flush a file to L0 with one overlapping key
// 4. Ingest a standalone delete range file that covers the full key space
// and a file with the same 100 keys with new values. This will ingest
// into L0 due to the presence of an existing file in L0
// The final LSM will have an SST in Lmax with 100 keys, and 2 SST files
// in Lmax-1 with half the keys each and completely overlapping delete ranges
std::unordered_map<std::string, std::string> kvs;
auto rnd = Random::GetTLSInstance();
auto create_ingestion_data_file_and_update_key_value =
[&](const std::string& filename, int start_key, int end_key) {
std::unique_ptr<SstFileWriter> writer;
writer.reset(new SstFileWriter(EnvOptions(), options));
ASSERT_OK(writer->Open(filename));
for (int i = start_key; i < end_key; ++i) {
auto kiter = kvs.find(Key(i));
if (kiter != kvs.end()) {
kvs.erase(kiter);
}
auto res =
kvs.emplace(std::make_pair(Key(i), rnd->RandomString(2 << 10)));
ASSERT_OK(writer->Put(res.first->first, res.first->second));
}
ASSERT_OK(writer->Finish());
writer.reset();
};
CreateColumnFamilies({"new_cf"}, options);
std::string ingest_file = dbname_ + "test.sst";
// Write ~200KB data
create_ingestion_data_file_and_update_key_value(ingest_file + "_0", 0, 100);
create_ingestion_data_file_and_update_key_value(ingest_file + "_1", 50, 51);
ColumnFamilyHandle* cfh = handles_[0];
IngestExternalFileOptions ifo;
Status s = dbfull()->IngestExternalFile(
cfh, {ingest_file + "_0", ingest_file + "_1"}, ifo);
ASSERT_OK(s);
ASSERT_OK(Put(0, Key(50), rnd->RandomString(2 << 10)));
ASSERT_OK(Flush());
{
std::unique_ptr<SstFileWriter> writer;
writer.reset(new SstFileWriter(EnvOptions(), options));
ASSERT_OK(writer->Open(ingest_file + "_2"));
ASSERT_OK(writer->DeleteRange("a", "z"));
ASSERT_OK(writer->Finish());
writer.reset();
}
create_ingestion_data_file_and_update_key_value(ingest_file + "_3", 0, 100);
s = dbfull()->IngestExternalFile(
cfh, {ingest_file + "_2", ingest_file + "_3"}, ifo);
ASSERT_OK(s);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(cfh, &cf_meta);
// Only the L0 with range deletion is compacted.
ASSERT_EQ(1, cf_meta.levels[0].files.size());
ASSERT_EQ(0, cf_meta.levels[0].files[0].num_deletions);
// The first scan range overlaps the DB key range, while the second extends
// beyond but overlaps the delete range
std::vector<std::string> key_ranges({"key000085", "key000090", "l", "n"});
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int i = 0;
int count = 0;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(it.first.ToString(), key_ranges[i]);
ASSERT_LT(it.first.ToString(), key_ranges[i + 1]);
auto kiter = kvs.find(it.first.ToString());
ASSERT_NE(kiter, kvs.end());
ASSERT_EQ(kiter->second, it.second.ToString());
count++;
}
i += 2;
}
ASSERT_EQ(i, 4);
ASSERT_EQ(count, 5);
} catch (MultiScanException& ex) {
ASSERT_OK(ex.status());
}
iter.reset();
// The second scan range start overlaps the delete range in the first file
// in Lmax-1, while the end overlaps the keys in the second file
(*scan_options).clear();
key_ranges[0] = "key000010";
key_ranges[1] = "key000020";
key_ranges[2] = "key0000500";
key_ranges[3] = "key000060";
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
iter = dbfull()->NewMultiScan(ro, cfh, scan_options);
try {
int i = 0;
int count = 0;
for (auto range : *iter) {
for (auto it : range) {
ASSERT_GE(it.first.ToString(), key_ranges[i]);
ASSERT_LT(it.first.ToString(), key_ranges[i + 1]);
auto kiter = kvs.find(it.first.ToString());
ASSERT_NE(kiter, kvs.end());
ASSERT_EQ(kiter->second, it.second.ToString());
count++;
}
i += 2;
}
ASSERT_EQ(i, 4);
ASSERT_EQ(count, 19);
} catch (MultiScanException& ex) {
ASSERT_OK(ex.status());
}
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, ReseekAcrossBlocksSameUserKey) {
// This test exposes a bug where multiscan reseeks backwards when
// max_sequential_skip_in_iterations is triggered with the same user key
// spanning multiple data blocks.
auto options = CurrentOptions();
options.max_sequential_skip_in_iterations = 3;
options.compression = kNoCompression;
// Force each internal key into its own block
BlockBasedTableOptions table_options;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
// Taking a snapshot after each Put to preserve all versions during flush.
std::vector<const Snapshot*> snapshots;
for (int i = 0; i < 7; ++i) {
ASSERT_OK(Put("key_a", "value_" + std::to_string(i)));
snapshots.push_back(db_->GetSnapshot());
}
ASSERT_OK(Put("key_b", "value_b"));
ASSERT_OK(Flush());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Setup multiscan range covering both keys
std::vector<std::string> key_ranges({"key_a", "key_c"});
ReadOptions ro;
Slice ub = key_ranges[1];
ro.iterate_upper_bound = &ub;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.insert(key_ranges[0], key_ranges[1]);
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ro, cfh));
ASSERT_NE(iter, nullptr);
iter->Prepare(scan_options);
std::vector<std::string> seen_keys;
std::vector<std::string> seen_values;
iter->Seek(key_ranges[0]);
while (iter->status().ok() && iter->Valid()) {
seen_keys.push_back(iter->key().ToString());
seen_values.push_back(iter->value().ToString());
iter->Next();
}
ASSERT_OK(iter->status()) << iter->status().ToString();
ASSERT_EQ(seen_keys.size(), 2) << "Should see key_a and key_b";
ASSERT_EQ(seen_keys[0], "key_a");
ASSERT_EQ(seen_keys[1], "key_b");
ASSERT_EQ(seen_values[0], "value_6");
ASSERT_EQ(seen_values[1], "value_b");
for (auto* snapshot : snapshots) {
db_->ReleaseSnapshot(snapshot);
}
}
TEST_P(DBMultiScanIteratorTest, AsyncPrefetchAcrossMultipleFiles) {
// Test async prefetch with multiple ranges within a single file
auto options = CurrentOptions();
options.target_file_size_base = 1 << 15; // 32KiB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(303);
// Create a single large file with many keys
// ~1MiB of data
// Should be lots of files now
for (int i = 0; i < 1000; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
// 1KiB values
ASSERT_OK(Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_GT(NumTableFilesAtLevel(49), 3);
// Set up multiple non-overlapping ranges in the same file
// Every 32 values should be a file or so
std::vector<std::string> key_ranges(
{"k00000", "k00100", "k00500", "k00600", "k00800", "k00900"});
ReadOptions ro;
ro.fill_cache = GetParam();
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.use_async_io = true;
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
scan_options.insert(key_ranges[4], key_ranges[5]);
auto read_count_before =
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
ASSERT_NE(iter, nullptr);
auto read_count_after =
options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
ASSERT_EQ(read_count_after, read_count_before);
// Verify all three ranges can be scanned successfully
try {
for (auto range : *iter) {
for (auto it : range) {
it.first.ToString();
}
}
} catch (MultiScanException& ex) {
// Make sure exception contains the status
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, AsyncPrefetchMultipleLevels) {
// Test async prefetch with files in L0 and non-L0 levels
// Similar setup to AsyncPrefetchAcrossMultipleFiles but with L0 files
auto options = CurrentOptions();
options.target_file_size_base = 1 << 15; // 32KiB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(304);
// Create base files and compact to bottom level - ~500KiB of data
for (int i = 0; i < 500; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
ASSERT_OK(Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
// Verify we have files at bottom level
ASSERT_GT(NumTableFilesAtLevel(49), 0);
// Create additional L0 files with overlapping key ranges
for (int i = 100; i < 150; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
ASSERT_OK(Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
// Verify we now have files in both L0 and bottom level
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(49), 0);
// Set up multiple non-overlapping ranges
std::vector<std::string> key_ranges(
{"k00000", "k00100", "k00200", "k00300", "k00400", "k00500"});
ReadOptions ro;
ro.fill_cache = GetParam();
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.use_async_io = true;
scan_options.insert(key_ranges[0], key_ranges[1]);
scan_options.insert(key_ranges[2], key_ranges[3]);
scan_options.insert(key_ranges[4], key_ranges[5]);
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
ASSERT_NE(iter, nullptr);
// Verify all three ranges can be scanned successfully
int total_keys = 0;
try {
for (auto range : *iter) {
for (auto it : range) {
it.first.ToString();
total_keys++;
}
}
} catch (MultiScanException& ex) {
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
// Should have keys from all three ranges
ASSERT_GT(total_keys, 0);
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, AsyncPrefetchWithDeleteRange) {
// Test async prefetch with delete ranges
auto options = CurrentOptions();
options.target_file_size_base = 1 << 15; // 32KiB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
DestroyAndReopen(options);
Random rnd(305);
// Create base data - ~500KiB
for (int i = 0; i < 500; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
ASSERT_OK(Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
// Add delete ranges
ASSERT_OK(db_->DeleteRange(WriteOptions(), dbfull()->DefaultColumnFamily(),
"k00100", "k00200"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
ASSERT_GT(NumTableFilesAtLevel(49), 0);
// Set up scan ranges that interact with delete ranges
std::vector<std::string> key_ranges({"k00000", "k00500"});
ReadOptions ro;
ro.fill_cache = GetParam();
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.use_async_io = true;
scan_options.insert(key_ranges[0], key_ranges[1]);
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
ASSERT_NE(iter, nullptr);
// Verify ranges can be scanned successfully
int total_keys = 0;
try {
for (auto range : *iter) {
for (auto it : range) {
std::string key = it.first.ToString();
// Verify deleted keys are not returned
ASSERT_TRUE((key < "k00100" || key >= "k00200"));
total_keys++;
}
}
} catch (MultiScanException& ex) {
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
// Should have keys excluding deleted ranges
ASSERT_EQ(total_keys, 400);
iter.reset();
}
TEST_P(DBMultiScanIteratorTest, AsyncPrefetchWithExternalFileIngestion) {
// Test async prefetch with externally ingested files
auto options = CurrentOptions();
options.target_file_size_base = 1 << 15; // 32KiB
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 50;
options.compression = kNoCompression;
DestroyAndReopen(options);
Random rnd(306);
// Create base data - ~200KiB
for (int i = 0; i < 200; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
ASSERT_OK(Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
// Create and ingest external SST file with new data
std::string ingest_file = dbname_ + "/test_ingest.sst";
{
std::unique_ptr<SstFileWriter> writer;
writer.reset(new SstFileWriter(EnvOptions(), options));
ASSERT_OK(writer->Open(ingest_file));
for (int i = 300; i < 500; ++i) {
std::stringstream ss;
ss << "k" << std::setw(5) << std::setfill('0') << i;
ASSERT_OK(writer->Put(ss.str(), rnd.RandomString(1 << 10)));
}
ASSERT_OK(writer->Finish());
}
IngestExternalFileOptions ifo;
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
ASSERT_OK(dbfull()->IngestExternalFile(cfh, {ingest_file}, ifo));
// Set up scan ranges that span both regular and ingested files
std::vector<std::string> key_ranges({"k00000", "k00500"});
ReadOptions ro;
ro.fill_cache = GetParam();
MultiScanArgs scan_options(BytewiseComparator());
scan_options.use_async_io = true;
scan_options.insert(key_ranges[0], key_ranges[1]);
std::unique_ptr<MultiScan> iter =
dbfull()->NewMultiScan(ro, cfh, scan_options);
ASSERT_NE(iter, nullptr);
// Verify all ranges can be scanned successfully
int total_keys = 0;
try {
for (auto range : *iter) {
for (auto it : range) {
it.first.ToString();
total_keys++;
}
}
} catch (MultiScanException& ex) {
ASSERT_NOK(ex.status());
std::cerr << "Iterator returned status " << ex.what();
abort();
} catch (std::logic_error& ex) {
std::cerr << "Iterator returned logic error " << ex.what();
abort();
}
ASSERT_EQ(total_keys, 400);
iter.reset();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}