Summary: Add `SstFileReader::Get` (single-key) and `SstFileReader::MultiGet` (PinnableSlice) overloads to enable zero-copy point lookups directly from SST files. The existing `MultiGet(std::string*)` is refactored to delegate to the new `MultiGet(PinnableSlice*)`, which writes results directly into caller-provided `PinnableSlice` values instead of copying through an intermediate buffer. The single-key `Get` uses `TableReader::Get` with a `GetContext` for efficient single-key lookups without the overhead of MultiGet's sorting and batching machinery. Pull Request resolved: https://github.com/facebook/rocksdb/pull/14500 Test Plan: - New unit tests Reviewed By: xingbowang Differential Revision: D97825648 Pulled By: joshkang97 fbshipit-source-id: 17f3edd59bbf4747d17309c44ef12f0d952ea4eb
320 lines
11 KiB
C++
320 lines
11 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "rocksdb/sst_file_reader.h"
|
|
|
|
#include "db/arena_wrapped_db_iter.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/dbformat.h"
|
|
#include "file/random_access_file_reader.h"
|
|
#include "options/cf_options.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/file_checksum.h"
|
|
#include "rocksdb/file_system.h"
|
|
#include "table/get_context.h"
|
|
#include "table/table_builder.h"
|
|
#include "table/table_iterator.h"
|
|
#include "table/table_reader.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
struct SstFileReader::Rep {
|
|
Options options;
|
|
EnvOptions soptions;
|
|
ImmutableOptions ioptions;
|
|
MutableCFOptions moptions;
|
|
// Keep a member variable for this, since `NewIterator()` uses a const
|
|
// reference of `ReadOptions`.
|
|
ReadOptions roptions_for_table_iter;
|
|
|
|
std::unique_ptr<TableReader> table_reader;
|
|
|
|
Rep(const Options& opts)
|
|
: options(opts),
|
|
soptions(options),
|
|
ioptions(options),
|
|
moptions(ColumnFamilyOptions(options)) {
|
|
roptions_for_table_iter =
|
|
ReadOptions(/*_verify_checksums=*/true, /*_fill_cache=*/false);
|
|
}
|
|
};
|
|
|
|
SstFileReader::SstFileReader(const Options& options) : rep_(new Rep(options)) {}
|
|
|
|
SstFileReader::~SstFileReader() = default;
|
|
|
|
Status SstFileReader::Open(const std::string& file_path) {
|
|
auto r = rep_.get();
|
|
Status s;
|
|
uint64_t file_size = 0;
|
|
std::unique_ptr<FSRandomAccessFile> file;
|
|
std::unique_ptr<RandomAccessFileReader> file_reader;
|
|
FileOptions fopts(r->soptions);
|
|
fopts.file_checksum_func_name = kNoFileChecksumFuncName;
|
|
const auto& fs = r->options.env->GetFileSystem();
|
|
|
|
s = fs->GetFileSize(file_path, fopts.io_options, &file_size, nullptr);
|
|
if (s.ok()) {
|
|
s = fs->NewRandomAccessFile(file_path, fopts, &file, nullptr);
|
|
}
|
|
if (s.ok()) {
|
|
file_reader.reset(new RandomAccessFileReader(std::move(file), file_path));
|
|
}
|
|
if (s.ok()) {
|
|
TableReaderOptions t_opt(
|
|
r->ioptions, r->moptions.prefix_extractor,
|
|
r->moptions.compression_manager.get(), r->soptions,
|
|
r->ioptions.internal_comparator,
|
|
r->moptions.block_protection_bytes_per_key,
|
|
/*skip_filters*/ false, /*immortal*/ false,
|
|
/*force_direct_prefetch*/ false, /*level*/ -1,
|
|
/*block_cache_tracer*/ nullptr,
|
|
/*max_file_size_for_l0_meta_pin*/ 0, /*cur_db_session_id*/ "",
|
|
/*cur_file_num*/ 0,
|
|
/* unique_id */ {}, /* largest_seqno */ 0,
|
|
/* tail_size */ 0, r->ioptions.persist_user_defined_timestamps);
|
|
// Allow open file with global sequence number for backward compatibility.
|
|
t_opt.largest_seqno = kMaxSequenceNumber;
|
|
s = r->options.table_factory->NewTableReader(t_opt, std::move(file_reader),
|
|
file_size, &r->table_reader);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
std::vector<Status> SstFileReader::MultiGet(
|
|
const ReadOptions& roptions, const std::vector<Slice>& keys,
|
|
std::vector<PinnableSlice>* values) {
|
|
const auto num_keys = keys.size();
|
|
std::vector<Status> statuses(num_keys, Status::OK());
|
|
values->resize(num_keys);
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
(*values)[i].Reset();
|
|
}
|
|
|
|
auto r = rep_.get();
|
|
const Comparator* user_comparator =
|
|
r->ioptions.internal_comparator.user_comparator();
|
|
Statistics* statistics = r->ioptions.stats;
|
|
|
|
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
|
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
|
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_ctx;
|
|
autovector<MergeContext, MultiGetContext::MAX_BATCH_SIZE> merge_ctx;
|
|
sorted_keys.resize(num_keys);
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
PinnableSlice* val = &(*values)[i];
|
|
merge_ctx.emplace_back();
|
|
key_context.emplace_back(nullptr, keys[i], val, nullptr,
|
|
nullptr /* timestamp */, &statuses[i]);
|
|
get_ctx.emplace_back(
|
|
user_comparator, r->ioptions.merge_operator.get(), nullptr /* logger */,
|
|
statistics, GetContext::kNotFound, *key_context[i].key, val,
|
|
nullptr /* columns */, nullptr /* timestamp */,
|
|
nullptr /* value_found */, &merge_ctx[i], true,
|
|
&key_context[i].max_covering_tombstone_seq, r->ioptions.clock);
|
|
key_context[i].get_context = &get_ctx[i];
|
|
}
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
sorted_keys[i] = &key_context[i];
|
|
}
|
|
|
|
struct CompareKeyContext {
|
|
explicit CompareKeyContext(const Comparator* comp) : comparator(comp) {}
|
|
inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) const {
|
|
return comparator->CompareWithoutTimestamp(*(lhs->key), false,
|
|
*(rhs->key), false) < 0;
|
|
}
|
|
const Comparator* comparator;
|
|
};
|
|
|
|
std::sort(sorted_keys.begin(), sorted_keys.end(),
|
|
CompareKeyContext(user_comparator));
|
|
const auto sequence = roptions.snapshot != nullptr
|
|
? roptions.snapshot->GetSequenceNumber()
|
|
: kMaxSequenceNumber;
|
|
MultiGetContext ctx(&sorted_keys, 0, num_keys, sequence, roptions,
|
|
r->ioptions.fs.get(), nullptr);
|
|
MultiGetRange range = ctx.GetMultiGetRange();
|
|
r->table_reader->MultiGet(roptions, &range,
|
|
r->moptions.prefix_extractor.get(),
|
|
false /* skip filters */);
|
|
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
get_ctx[i].ReportCounters();
|
|
|
|
if (statuses[i].ok()) {
|
|
switch (get_ctx[i].State()) {
|
|
case GetContext::kFound:
|
|
break;
|
|
case GetContext::kNotFound:
|
|
case GetContext::kDeleted:
|
|
statuses[i] = Status::NotFound();
|
|
break;
|
|
case GetContext::kMerge:
|
|
statuses[i] = Status::MergeInProgress();
|
|
break;
|
|
case GetContext::kCorrupt:
|
|
case GetContext::kUnexpectedBlobIndex:
|
|
case GetContext::kMergeOperatorFailed:
|
|
statuses[i] = Status::Corruption();
|
|
break;
|
|
};
|
|
}
|
|
}
|
|
return statuses;
|
|
}
|
|
|
|
std::vector<Status> SstFileReader::MultiGet(const ReadOptions& roptions,
|
|
const std::vector<Slice>& keys,
|
|
std::vector<std::string>* values) {
|
|
std::vector<PinnableSlice> pin_values;
|
|
std::vector<Status> statuses = MultiGet(roptions, keys, &pin_values);
|
|
values->resize(keys.size());
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
if (statuses[i].ok()) {
|
|
(*values)[i].assign(pin_values[i].data(), pin_values[i].size());
|
|
}
|
|
}
|
|
return statuses;
|
|
}
|
|
|
|
Status SstFileReader::Get(const ReadOptions& roptions, const Slice& key,
|
|
PinnableSlice* value) {
|
|
auto r = rep_.get();
|
|
value->Reset();
|
|
|
|
const Comparator* user_comparator =
|
|
r->ioptions.internal_comparator.user_comparator();
|
|
Statistics* statistics = r->ioptions.stats;
|
|
|
|
Status status;
|
|
MergeContext merge_context;
|
|
SequenceNumber max_covering_tombstone_seq = 0;
|
|
GetContext get_ctx(user_comparator, r->ioptions.merge_operator.get(),
|
|
nullptr /* logger */, statistics, GetContext::kNotFound,
|
|
key, value, nullptr /* columns */, nullptr /* timestamp */,
|
|
nullptr /* value_found */, &merge_context, true,
|
|
&max_covering_tombstone_seq, r->ioptions.clock);
|
|
|
|
status = r->table_reader->Get(
|
|
roptions, InternalKey(key, kMaxSequenceNumber, kTypeValue).Encode(),
|
|
&get_ctx, r->moptions.prefix_extractor.get(), false /* skip_filters */);
|
|
|
|
get_ctx.ReportCounters();
|
|
|
|
if (status.ok()) {
|
|
switch (get_ctx.State()) {
|
|
case GetContext::kFound:
|
|
break;
|
|
case GetContext::kNotFound:
|
|
case GetContext::kDeleted:
|
|
status = Status::NotFound();
|
|
break;
|
|
case GetContext::kMerge:
|
|
status = Status::MergeInProgress();
|
|
break;
|
|
case GetContext::kCorrupt:
|
|
case GetContext::kUnexpectedBlobIndex:
|
|
case GetContext::kMergeOperatorFailed:
|
|
status = Status::Corruption();
|
|
break;
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
|
|
Status SstFileReader::Get(const ReadOptions& roptions, const Slice& key,
|
|
std::string* value) {
|
|
PinnableSlice pin_value;
|
|
Status s = Get(roptions, key, &pin_value);
|
|
if (s.ok()) {
|
|
value->assign(pin_value.data(), pin_value.size());
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
|
|
assert(roptions.io_activity == Env::IOActivity::kUnknown);
|
|
auto r = rep_.get();
|
|
auto sequence = roptions.snapshot != nullptr
|
|
? roptions.snapshot->GetSequenceNumber()
|
|
: kMaxSequenceNumber;
|
|
ArenaWrappedDBIter* res = new ArenaWrappedDBIter();
|
|
res->Init(r->options.env, roptions, r->ioptions, r->moptions,
|
|
nullptr /* version */, sequence, 0 /* version_number */,
|
|
nullptr /* read_callback */, nullptr /* cfh */,
|
|
true /* expose_blob_index */, false /* allow_refresh */,
|
|
/*active_mem=*/nullptr);
|
|
auto internal_iter = r->table_reader->NewIterator(
|
|
res->GetReadOptions(), r->moptions.prefix_extractor.get(),
|
|
res->GetArena(), false /* skip_filters */,
|
|
TableReaderCaller::kSSTFileReader);
|
|
res->SetIterUnderDBIter(internal_iter);
|
|
return res;
|
|
}
|
|
|
|
std::unique_ptr<Iterator> SstFileReader::NewTableIterator() {
|
|
auto r = rep_.get();
|
|
InternalIterator* internal_iter = r->table_reader->NewIterator(
|
|
r->roptions_for_table_iter, r->moptions.prefix_extractor.get(),
|
|
/*arena*/ nullptr, false /* skip_filters */,
|
|
TableReaderCaller::kSSTFileReader);
|
|
assert(internal_iter);
|
|
if (internal_iter == nullptr) {
|
|
// Do not attempt to create a TableIterator if we cannot get a valid
|
|
// InternalIterator.
|
|
return nullptr;
|
|
}
|
|
return std::make_unique<TableIterator>(internal_iter);
|
|
}
|
|
|
|
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
|
|
const {
|
|
return rep_->table_reader->GetTableProperties();
|
|
}
|
|
|
|
Status SstFileReader::VerifyChecksum(const ReadOptions& read_options) {
|
|
assert(read_options.io_activity == Env::IOActivity::kUnknown);
|
|
return rep_->table_reader->VerifyChecksum(read_options,
|
|
TableReaderCaller::kSSTFileReader);
|
|
}
|
|
|
|
Status SstFileReader::VerifyNumEntries(const ReadOptions& read_options) {
|
|
Rep* r = rep_.get();
|
|
std::unique_ptr<InternalIterator> internal_iter{r->table_reader->NewIterator(
|
|
read_options, r->moptions.prefix_extractor.get(), nullptr,
|
|
false /* skip_filters */, TableReaderCaller::kSSTFileReader)};
|
|
internal_iter->SeekToFirst();
|
|
Status s = internal_iter->status();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
uint64_t num_read = 0;
|
|
for (; internal_iter->Valid(); internal_iter->Next()) {
|
|
++num_read;
|
|
}
|
|
s = internal_iter->status();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
std::shared_ptr<const TableProperties> tp = GetTableProperties();
|
|
if (!tp) {
|
|
s = Status::Corruption("table properties not available");
|
|
} else {
|
|
// TODO: verify num_range_deletions
|
|
uint64_t expected = tp->num_entries - tp->num_range_deletions;
|
|
if (num_read != expected) {
|
|
std::ostringstream oss;
|
|
oss << "Table property expects " << expected
|
|
<< " entries when excluding range deletions,"
|
|
<< " but scanning the table returned " << std::to_string(num_read)
|
|
<< " entries";
|
|
s = Status::Corruption(oss.str());
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|