forked from continuwuation/rocksdb
Summary: Crash tests have been failing of late with this assertion failure - db_stress: `./table/block_based/block_based_table_iterator.h:656: void rocksdb::BlockBasedTableIterator::PrepareReadAsyncCallBack(rocksdb::FSReadRequest &, void *): Assertion `async_state->status.IsAborted()' failed.` Instead of asserting, surface the failure status so we can troubleshoot. Pull Request resolved: https://github.com/facebook/rocksdb/pull/14171 Reviewed By: xingbowang Differential Revision: D88396654 Pulled By: anand1976 fbshipit-source-id: 8d59d7ace0c522c17b7af17c50e16af876911bad
743 lines
28 KiB
C++
743 lines
28 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
#include <deque>
|
|
|
|
#include "db/seqno_to_time_mapping.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/block_based_table_reader_impl.h"
|
|
#include "table/block_based/block_prefetcher.h"
|
|
#include "table/block_based/reader_common.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
// Iterates over the contents of BlockBasedTable.
|
|
class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
|
|
// compaction_readahead_size: its value will only be used if for_compaction =
|
|
// true
|
|
// @param read_options Must outlive this iterator.
|
|
public:
|
|
BlockBasedTableIterator(
|
|
const BlockBasedTable* table, const ReadOptions& read_options,
|
|
const InternalKeyComparator& icomp,
|
|
std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter,
|
|
bool check_filter, bool need_upper_bound_check,
|
|
const SliceTransform* prefix_extractor, TableReaderCaller caller,
|
|
size_t compaction_readahead_size = 0, bool allow_unprepared_value = false)
|
|
: index_iter_(std::move(index_iter)),
|
|
table_(table),
|
|
read_options_(read_options),
|
|
icomp_(icomp),
|
|
user_comparator_(icomp.user_comparator()),
|
|
pinned_iters_mgr_(nullptr),
|
|
prefix_extractor_(prefix_extractor),
|
|
lookup_context_(caller),
|
|
block_prefetcher_(
|
|
compaction_readahead_size,
|
|
table_->get_rep()->table_options.initial_auto_readahead_size),
|
|
allow_unprepared_value_(allow_unprepared_value),
|
|
check_filter_(check_filter),
|
|
need_upper_bound_check_(need_upper_bound_check),
|
|
async_read_in_progress_(false),
|
|
is_last_level_(table->IsLastLevel()),
|
|
block_iter_points_to_real_block_(false) {
|
|
multi_scan_status_.PermitUncheckedError();
|
|
}
|
|
|
|
~BlockBasedTableIterator() override { ClearBlockHandles(); }
|
|
|
|
void Seek(const Slice& target) override;
|
|
void SeekForPrev(const Slice& target) override;
|
|
void SeekToFirst() override;
|
|
void SeekToLast() override;
|
|
void Next() final override;
|
|
bool NextAndGetResult(IterateResult* result) override;
|
|
void Prev() override;
|
|
bool Valid() const override {
|
|
return !is_out_of_bound_ && multi_scan_status_.ok() &&
|
|
(is_at_first_key_from_index_ ||
|
|
(block_iter_points_to_real_block_ && block_iter_.Valid()));
|
|
}
|
|
|
|
// For block cache readahead lookup scenario -
|
|
// If is_at_first_key_from_index_ is true, InitDataBlock hasn't been
|
|
// called. It means block_handles is empty and index_ point to current block.
|
|
// So index_iter_ can be accessed directly.
|
|
Slice key() const override {
|
|
assert(Valid());
|
|
if (is_at_first_key_from_index_) {
|
|
assert(!multi_scan_);
|
|
return index_iter_->value().first_internal_key;
|
|
} else {
|
|
return block_iter_.key();
|
|
}
|
|
}
|
|
Slice user_key() const override {
|
|
assert(Valid());
|
|
if (is_at_first_key_from_index_) {
|
|
return ExtractUserKey(index_iter_->value().first_internal_key);
|
|
} else {
|
|
return block_iter_.user_key();
|
|
}
|
|
}
|
|
|
|
bool PrepareValue() override {
|
|
assert(Valid());
|
|
|
|
if (!is_at_first_key_from_index_) {
|
|
return true;
|
|
}
|
|
|
|
return const_cast<BlockBasedTableIterator*>(this)
|
|
->MaterializeCurrentBlock();
|
|
}
|
|
|
|
uint64_t write_unix_time() const override {
|
|
assert(Valid());
|
|
ParsedInternalKey pikey;
|
|
SequenceNumber seqno;
|
|
const SeqnoToTimeMapping& seqno_to_time_mapping =
|
|
table_->GetSeqnoToTimeMapping();
|
|
Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
|
|
if (!s.ok()) {
|
|
return std::numeric_limits<uint64_t>::max();
|
|
} else if (kUnknownSeqnoBeforeAll == pikey.sequence) {
|
|
return kUnknownTimeBeforeAll;
|
|
} else if (seqno_to_time_mapping.Empty()) {
|
|
return std::numeric_limits<uint64_t>::max();
|
|
} else if (kTypeValuePreferredSeqno == pikey.type) {
|
|
seqno = ParsePackedValueForSeqno(value());
|
|
} else {
|
|
seqno = pikey.sequence;
|
|
}
|
|
return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno);
|
|
}
|
|
|
|
Slice value() const override {
|
|
// PrepareValue() must have been called.
|
|
assert(!is_at_first_key_from_index_);
|
|
assert(Valid());
|
|
|
|
if (seek_stat_state_ & kReportOnUseful) {
|
|
bool filter_used = (seek_stat_state_ & kFilterUsed) != 0;
|
|
RecordTick(
|
|
table_->GetStatistics(),
|
|
filter_used
|
|
? (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH
|
|
: NON_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH)
|
|
: (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER
|
|
: NON_LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER));
|
|
seek_stat_state_ = kDataBlockReadSinceLastSeek;
|
|
}
|
|
|
|
return block_iter_.value();
|
|
}
|
|
Status status() const override {
|
|
if (!multi_scan_status_.ok()) {
|
|
return multi_scan_status_;
|
|
}
|
|
// In case of block cache readahead lookup, it won't add the block to
|
|
// block_handles if it's index is invalid. So index_iter_->status check can
|
|
// be skipped.
|
|
// Prefix index set status to NotFound when the prefix does not exist.
|
|
if (IsIndexAtCurr() && !index_iter_->status().ok() &&
|
|
!index_iter_->status().IsNotFound()) {
|
|
assert(!multi_scan_);
|
|
return index_iter_->status();
|
|
} else if (block_iter_points_to_real_block_) {
|
|
// This is the common case.
|
|
return block_iter_.status();
|
|
} else if (async_read_in_progress_) {
|
|
assert(!multi_scan_);
|
|
return Status::TryAgain("Async read in progress");
|
|
} else if (multi_scan_) {
|
|
return multi_scan_status_;
|
|
} else {
|
|
return Status::OK();
|
|
}
|
|
}
|
|
|
|
inline IterBoundCheck UpperBoundCheckResult() override {
|
|
if (is_out_of_bound_) {
|
|
return IterBoundCheck::kOutOfBound;
|
|
} else if (block_upper_bound_check_ ==
|
|
BlockUpperBound::kUpperBoundBeyondCurBlock) {
|
|
assert(!is_out_of_bound_);
|
|
// MultiScan does not do block level upper bound check yet.
|
|
assert(!multi_scan_);
|
|
return IterBoundCheck::kInbound;
|
|
} else {
|
|
return IterBoundCheck::kUnknown;
|
|
}
|
|
}
|
|
|
|
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
|
|
pinned_iters_mgr_ = pinned_iters_mgr;
|
|
}
|
|
bool IsKeyPinned() const override {
|
|
// Our key comes either from block_iter_'s current key
|
|
// or index_iter_'s current *value*.
|
|
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
|
|
((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
|
|
(block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
|
|
}
|
|
bool IsValuePinned() const override {
|
|
assert(!is_at_first_key_from_index_);
|
|
assert(Valid());
|
|
|
|
// BlockIter::IsValuePinned() is always true. No need to check
|
|
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
|
|
block_iter_points_to_real_block_;
|
|
}
|
|
|
|
void ResetDataIter() {
|
|
if (block_iter_points_to_real_block_) {
|
|
if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
|
|
block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
|
|
}
|
|
block_iter_.Invalidate(Status::OK());
|
|
block_iter_points_to_real_block_ = false;
|
|
}
|
|
block_upper_bound_check_ = BlockUpperBound::kUnknown;
|
|
}
|
|
|
|
void SavePrevIndexValue() {
|
|
if (block_iter_points_to_real_block_ && IsIndexAtCurr()) {
|
|
// Reseek. If they end up with the same data block, we shouldn't re-fetch
|
|
// the same data block.
|
|
prev_block_offset_ = index_iter_->value().handle.offset();
|
|
}
|
|
}
|
|
|
|
void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
|
|
if (block_prefetcher_.prefetch_buffer() != nullptr &&
|
|
read_options_.adaptive_readahead) {
|
|
block_prefetcher_.prefetch_buffer()->GetReadaheadState(
|
|
&(readahead_file_info->data_block_readahead_info));
|
|
if (index_iter_) {
|
|
index_iter_->GetReadaheadState(readahead_file_info);
|
|
}
|
|
}
|
|
}
|
|
|
|
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
|
|
if (read_options_.adaptive_readahead) {
|
|
block_prefetcher_.SetReadaheadState(
|
|
&(readahead_file_info->data_block_readahead_info));
|
|
if (index_iter_) {
|
|
index_iter_->SetReadaheadState(readahead_file_info);
|
|
}
|
|
}
|
|
}
|
|
|
|
void Prepare(const MultiScanArgs* scan_opts) override;
|
|
|
|
FilePrefetchBuffer* prefetch_buffer() {
|
|
return block_prefetcher_.prefetch_buffer();
|
|
}
|
|
|
|
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
|
|
|
|
bool TEST_IsBlockPinnedByMultiScan(size_t block_idx) {
|
|
if (!multi_scan_) {
|
|
return false;
|
|
}
|
|
if (block_idx >= multi_scan_->pinned_data_blocks.size()) {
|
|
return false;
|
|
}
|
|
return !multi_scan_->pinned_data_blocks[block_idx].IsEmpty();
|
|
}
|
|
|
|
private:
|
|
enum class IterDirection {
|
|
kForward,
|
|
kBackward,
|
|
};
|
|
// This enum indicates whether the upper bound falls into current block
|
|
// or beyond.
|
|
// +-------------+
|
|
// | cur block | <-- (1)
|
|
// +-------------+
|
|
// <-- (2)
|
|
// --- <boundary key> ---
|
|
// <-- (3)
|
|
// +-------------+
|
|
// | next block | <-- (4)
|
|
// ......
|
|
//
|
|
// When the block is smaller than <boundary key>, kUpperBoundInCurBlock
|
|
// is the value to use. The examples are (1) or (2) in the graph. It means
|
|
// all keys in the next block or beyond will be out of bound. Keys within
|
|
// the current block may or may not be out of bound.
|
|
// When the block is larger or equal to <boundary key>,
|
|
// kUpperBoundBeyondCurBlock is to be used. The examples are (3) and (4)
|
|
// in the graph. It means that all keys in the current block is within the
|
|
// upper bound and keys in the next block may or may not be within the uppder
|
|
// bound.
|
|
// If the boundary key hasn't been checked against the upper bound,
|
|
// kUnknown can be used.
|
|
enum class BlockUpperBound : uint8_t {
|
|
kUpperBoundInCurBlock,
|
|
kUpperBoundBeyondCurBlock,
|
|
kUnknown,
|
|
};
|
|
|
|
// State bits for collecting stats on seeks and whether they returned useful
|
|
// results.
|
|
enum SeekStatState : uint8_t {
|
|
kNone = 0,
|
|
// Most recent seek checked prefix filter (or similar future feature)
|
|
kFilterUsed = 1 << 0,
|
|
// Already recorded that a data block was accessed since the last seek.
|
|
kDataBlockReadSinceLastSeek = 1 << 1,
|
|
// Have not yet recorded that a value() was accessed.
|
|
kReportOnUseful = 1 << 2,
|
|
};
|
|
|
|
// BlockHandleInfo is used to store the info needed when block cache lookup
|
|
// ahead is enabled to tune readahead_size.
|
|
struct BlockHandleInfo {
|
|
void SetFirstInternalKey(const Slice& key) {
|
|
if (key.empty()) {
|
|
return;
|
|
}
|
|
size_t size = key.size();
|
|
buf_ = std::unique_ptr<char[]>(new char[size]);
|
|
memcpy(buf_.get(), key.data(), size);
|
|
first_internal_key_ = Slice(buf_.get(), size);
|
|
}
|
|
|
|
BlockHandle handle_;
|
|
bool is_cache_hit_ = false;
|
|
CachableEntry<Block> cachable_entry_;
|
|
Slice first_internal_key_;
|
|
std::unique_ptr<char[]> buf_;
|
|
};
|
|
|
|
bool IsIndexAtCurr() const { return is_index_at_curr_block_; }
|
|
|
|
const BlockBasedTable* table_;
|
|
const ReadOptions& read_options_;
|
|
const InternalKeyComparator& icomp_;
|
|
UserComparatorWrapper user_comparator_;
|
|
PinnedIteratorsManager* pinned_iters_mgr_;
|
|
DataBlockIter block_iter_;
|
|
const SliceTransform* prefix_extractor_;
|
|
uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
|
|
BlockCacheLookupContext lookup_context_;
|
|
|
|
BlockPrefetcher block_prefetcher_;
|
|
|
|
// It stores all the block handles that are lookuped in cache ahead when
|
|
// BlockCacheLookupForReadAheadSize is called. Since index_iter_ may point to
|
|
// different blocks when readahead_size is calculated in
|
|
// BlockCacheLookupForReadAheadSize, to avoid index_iter_ reseek,
|
|
// block_handles_ is used.
|
|
// `block_handles_` is lazily constructed to save CPU when it is unused
|
|
std::unique_ptr<std::deque<BlockHandleInfo>> block_handles_;
|
|
|
|
// The prefix of the key called with SeekImpl().
|
|
// This is for readahead trimming so no data blocks containing keys of a
|
|
// different prefix are prefetched
|
|
std::string seek_key_prefix_for_readahead_trimming_ = "";
|
|
|
|
const bool allow_unprepared_value_;
|
|
// How current data block's boundary key with the next block is compared with
|
|
// iterate upper bound.
|
|
BlockUpperBound block_upper_bound_check_ = BlockUpperBound::kUnknown;
|
|
// True if we're standing at the first key of a block, and we haven't loaded
|
|
// that block yet. A call to PrepareValue() will trigger loading the block.
|
|
bool is_at_first_key_from_index_ = false;
|
|
bool check_filter_;
|
|
// TODO(Zhongyi): pick a better name
|
|
bool need_upper_bound_check_;
|
|
|
|
bool async_read_in_progress_;
|
|
|
|
mutable SeekStatState seek_stat_state_ = SeekStatState::kNone;
|
|
bool is_last_level_;
|
|
|
|
// If set to true, it'll lookup in the cache ahead to estimate the readahead
|
|
// size based on cache hit and miss.
|
|
bool readahead_cache_lookup_ = false;
|
|
|
|
bool is_index_out_of_bound_ = false;
|
|
|
|
// Used in case of auto_readahead_size to disable the block_cache lookup if
|
|
// direction is reversed from forward to backward. In case of backward
|
|
// direction, SeekForPrev or Prev might call Seek from db_iter. So direction
|
|
// is used to disable the lookup.
|
|
IterDirection direction_ = IterDirection::kForward;
|
|
|
|
//*** BEGIN States used by both regular scan and multiscan
|
|
|
|
// True if block_iter_ is initialized and points to the same block
|
|
// as index iterator.
|
|
bool block_iter_points_to_real_block_;
|
|
// See InternalIteratorBase::IsOutOfBound().
|
|
bool is_out_of_bound_ = false;
|
|
|
|
// Mark prepared ranges as exhausted for multiscan.
|
|
void MarkPreparedRangeExhausted() {
|
|
assert(multi_scan_ != nullptr);
|
|
if (multi_scan_->next_scan_idx <
|
|
multi_scan_->block_index_ranges_per_scan.size()) {
|
|
// If there are more prepared ranges, we don't ResetDataIter() here,
|
|
// because next scan might be reading from the same block. ResetDataIter()
|
|
// will free the underlying block cache handle and we don't want the
|
|
// block to be unpinned.
|
|
// Set out of bound to mark the current prepared range as exhausted.
|
|
is_out_of_bound_ = true;
|
|
} else {
|
|
// This is the last prepared range of this file, there might be more
|
|
// data on next file. Reset data iterator to indicate the iterator is
|
|
// no longer valid on this file. Let LevelIter advance to the next file
|
|
// instead of ending the scan.
|
|
ResetDataIter();
|
|
}
|
|
}
|
|
|
|
// During cache lookup to find readahead size, index_iter_ is iterated and it
|
|
// can point to a different block.
|
|
// If Prepare() is called, index_iter_ is used to prefetch data blocks for the
|
|
// multiscan, so is_index_at_curr_block_ will be false.
|
|
// Whether index is expected to match the current data_block_iter_.
|
|
bool is_index_at_curr_block_ = true;
|
|
|
|
// *** END States used by both regular scan and multiscan
|
|
|
|
// *** BEGIN MultiScan related states ***
|
|
struct AsyncReadState {
|
|
std::unique_ptr<char[]> buf{nullptr};
|
|
// Indices into pinned_data_blocks that this request reads.
|
|
std::vector<size_t> block_indices;
|
|
// BlockHandle for each block in block_indices.
|
|
std::vector<BlockHandle> blocks;
|
|
void* io_handle{nullptr};
|
|
IOHandleDeleter del_fn{nullptr};
|
|
// offset for this async read request.
|
|
uint64_t offset{0};
|
|
|
|
// These two states are populated from the FSReadRequest
|
|
// by ReadAsync callback
|
|
Status status;
|
|
Slice result;
|
|
|
|
// For direct I/O support
|
|
AlignedBuf aligned_buf{nullptr};
|
|
|
|
bool finished{false};
|
|
|
|
AsyncReadState() = default;
|
|
DECLARE_DEFAULT_MOVES(AsyncReadState);
|
|
// Delete copy operations
|
|
AsyncReadState(const AsyncReadState&) = delete;
|
|
AsyncReadState& operator=(const AsyncReadState&) = delete;
|
|
|
|
void CleanUpIOHandle() {
|
|
if (io_handle != nullptr) {
|
|
assert(del_fn);
|
|
del_fn(io_handle);
|
|
io_handle = nullptr;
|
|
}
|
|
finished = true;
|
|
}
|
|
|
|
~AsyncReadState() {
|
|
// Should be cleaned up before destruction.
|
|
assert(io_handle == nullptr);
|
|
}
|
|
};
|
|
|
|
struct MultiScanState {
|
|
// For Aborting async I/Os in destructor.
|
|
const std::shared_ptr<FileSystem> fs;
|
|
const MultiScanArgs* scan_opts;
|
|
std::vector<CachableEntry<Block>> pinned_data_blocks;
|
|
// The separator of each data block in above pinned_data_blocks vector.
|
|
// Its size is same as pinned_data_blocks.
|
|
// The value of separator is larger than or equal to the last key in the
|
|
// corresponding data block.
|
|
std::vector<std::string> data_block_separators;
|
|
// Track previously seeked key in multi-scan.
|
|
// This is used to ensure that the seek key is keep moving forward, as
|
|
// blocks that are smaller than the seek key are unpinned from memory.
|
|
std::string prev_seek_key_;
|
|
|
|
// Indicies into pinned_data_blocks for data blocks for each scan range.
|
|
// inclusive start, exclusive end
|
|
std::vector<std::tuple<size_t, size_t>> block_index_ranges_per_scan;
|
|
size_t next_scan_idx;
|
|
size_t cur_data_block_idx;
|
|
|
|
// States for async reads.
|
|
//
|
|
// Each async state correspond to an async read request.
|
|
// Each async read request may read content for multiple blocks
|
|
// (potentially coalesced). In PollForBlock(idx), we will poll for the
|
|
// completion of the async read request responsible for
|
|
// pinned_data_blocks[idx], and populate `pinned_data_blocks` with all the
|
|
// blocks read. To find out the async read request responsible for
|
|
// pinned_data_blocks[idx], we store the mapping in
|
|
// block_idx_to_readreq_idx. Index i is in block_idx_to_readreq_idx and
|
|
// block_idx_to_readreq_idx[i] = j iff pinned_data_blocks[i] is read by
|
|
// async_states[j].
|
|
std::vector<AsyncReadState> async_states;
|
|
UnorderedMap<size_t, size_t> block_idx_to_readreq_idx;
|
|
size_t prefetch_max_idx;
|
|
|
|
MultiScanState(
|
|
const std::shared_ptr<FileSystem>& _fs, const MultiScanArgs* _scan_opts,
|
|
std::vector<CachableEntry<Block>>&& _pinned_data_blocks,
|
|
std::vector<std::string>&& _data_block_separators,
|
|
std::vector<std::tuple<size_t, size_t>>&& _block_index_ranges_per_scan,
|
|
UnorderedMap<size_t, size_t>&& _block_idx_to_readreq_idx,
|
|
std::vector<AsyncReadState>&& _async_states, size_t _prefetch_max_idx)
|
|
: fs(_fs),
|
|
scan_opts(_scan_opts),
|
|
pinned_data_blocks(std::move(_pinned_data_blocks)),
|
|
data_block_separators(std::move(_data_block_separators)),
|
|
block_index_ranges_per_scan(std::move(_block_index_ranges_per_scan)),
|
|
next_scan_idx(0),
|
|
cur_data_block_idx(0),
|
|
async_states(std::move(_async_states)),
|
|
block_idx_to_readreq_idx(std::move(_block_idx_to_readreq_idx)),
|
|
prefetch_max_idx(_prefetch_max_idx) {}
|
|
|
|
~MultiScanState();
|
|
};
|
|
|
|
Status multi_scan_status_;
|
|
std::unique_ptr<MultiScanState> multi_scan_;
|
|
// *** END MultiScan related APIs and states ***
|
|
|
|
void SeekSecondPass(const Slice* target);
|
|
|
|
// If `target` is null, seek to first.
|
|
void SeekImpl(const Slice* target, bool async_prefetch);
|
|
|
|
void InitDataBlock();
|
|
void AsyncInitDataBlock(bool is_first_pass);
|
|
bool MaterializeCurrentBlock();
|
|
void FindKeyForward();
|
|
void FindBlockForward();
|
|
void FindKeyBackward();
|
|
void CheckOutOfBound();
|
|
|
|
// Check if data block is fully within iterate_upper_bound.
|
|
//
|
|
// Note MyRocks may update iterate bounds between seek. To workaround it,
|
|
// we need to check and update data_block_within_upper_bound_ accordingly.
|
|
void CheckDataBlockWithinUpperBound();
|
|
|
|
bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction,
|
|
bool* filter_checked) {
|
|
if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
|
|
// Upper bound check isn't sufficient for backward direction to
|
|
// guarantee the same result as total order, so disable prefix
|
|
// check.
|
|
return true;
|
|
}
|
|
if (check_filter_ &&
|
|
!table_->PrefixRangeMayMatch(ikey, read_options_, prefix_extractor_,
|
|
need_upper_bound_check_, &lookup_context_,
|
|
filter_checked)) {
|
|
// TODO remember the iterator is invalidated because of prefix
|
|
// match. This can avoid the upper level file iterator to falsely
|
|
// believe the position is the end of the SST file and move to
|
|
// the first key of the next file.
|
|
ResetDataIter();
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// *** BEGIN APIs relevant to auto tuning of readahead_size ***
|
|
|
|
// This API is called to lookup the data blocks ahead in the cache to tune
|
|
// the start and end offsets passed.
|
|
void BlockCacheLookupForReadAheadSize(bool read_curr_block,
|
|
uint64_t& start_offset,
|
|
uint64_t& end_offset);
|
|
|
|
void ResetBlockCacheLookupVar() {
|
|
is_index_out_of_bound_ = false;
|
|
readahead_cache_lookup_ = false;
|
|
ClearBlockHandles();
|
|
}
|
|
|
|
bool IsNextBlockOutOfReadaheadBound() {
|
|
const Slice& index_iter_user_key = index_iter_->user_key();
|
|
// If curr block's index key >= iterate_upper_bound, it means all the keys
|
|
// in next block or above are out of bound.
|
|
bool out_of_upper_bound =
|
|
read_options_.iterate_upper_bound != nullptr &&
|
|
(user_comparator_.CompareWithoutTimestamp(
|
|
index_iter_user_key,
|
|
/*a_has_ts=*/true, *read_options_.iterate_upper_bound,
|
|
/*b_has_ts=*/false) >= 0
|
|
? true
|
|
: false);
|
|
if (out_of_upper_bound) {
|
|
return true;
|
|
}
|
|
|
|
// If curr block's index key has a different prefix from the seek key's, it
|
|
// means all the keys in next block or above has a different prefix from the
|
|
// seek key's.
|
|
bool out_of_prefix_bound =
|
|
(read_options_.prefix_same_as_start &&
|
|
!seek_key_prefix_for_readahead_trimming_.empty() &&
|
|
(prefix_extractor_->InDomain(index_iter_user_key)
|
|
? (prefix_extractor_->Transform(index_iter_user_key)
|
|
.compare(seek_key_prefix_for_readahead_trimming_) != 0)
|
|
: user_comparator_.CompareWithoutTimestamp(
|
|
index_iter_user_key,
|
|
/*a_has_ts=*/true, seek_key_prefix_for_readahead_trimming_,
|
|
/*b_has_ts=*/false) > 0));
|
|
|
|
if (out_of_prefix_bound) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void ClearBlockHandles() {
|
|
if (block_handles_ != nullptr) {
|
|
block_handles_->clear();
|
|
}
|
|
}
|
|
|
|
// Reset prev_block_offset_. If index_iter_ has moved ahead, it won't get
|
|
// accurate prev_block_offset_.
|
|
void ResetPreviousBlockOffset() {
|
|
prev_block_offset_ = std::numeric_limits<uint64_t>::max();
|
|
}
|
|
|
|
bool DoesContainBlockHandles() {
|
|
return block_handles_ != nullptr && !block_handles_->empty();
|
|
}
|
|
|
|
void InitializeStartAndEndOffsets(bool read_curr_block,
|
|
bool& found_first_miss_block,
|
|
uint64_t& start_updated_offset,
|
|
uint64_t& end_updated_offset,
|
|
size_t& prev_handles_size);
|
|
// *** END APIs relevant to auto tuning of readahead_size ***
|
|
|
|
// *** BEGIN APIs relevant to multiscan ***
|
|
|
|
void SeekMultiScan(const Slice* target);
|
|
|
|
void FindBlockForwardInMultiScan();
|
|
|
|
void PrepareReadAsyncCallBack(FSReadRequest& req, void* cb_arg) {
|
|
// Record status, result and sanity check offset from `req`.
|
|
AsyncReadState* async_state = static_cast<AsyncReadState*>(cb_arg);
|
|
|
|
async_state->status = req.status;
|
|
async_state->result = req.result;
|
|
|
|
if (async_state->status.ok()) {
|
|
assert(async_state->offset == req.offset);
|
|
if (async_state->offset != req.offset) {
|
|
async_state->status = Status::InvalidArgument(
|
|
"offset mismatch between async read request " +
|
|
std::to_string(async_state->offset) + " and async callback " +
|
|
std::to_string(req.offset));
|
|
}
|
|
}
|
|
}
|
|
|
|
void MultiScanSeekTargetFromBlock(const Slice* seek_target, size_t block_idx);
|
|
void MultiScanUnexpectedSeekTarget(const Slice* seek_target,
|
|
const Slice* user_seek_target);
|
|
|
|
// Return true, if there is an error, or end of file
|
|
bool MultiScanLoadDataBlock(size_t idx) {
|
|
if (idx >= multi_scan_->prefetch_max_idx) {
|
|
// TODO: Fix the max_prefetch_size support for multiple files.
|
|
// The goal is to limit the memory usage, prefetch could be done
|
|
// incrementally.
|
|
if (multi_scan_->scan_opts->max_prefetch_size == 0) {
|
|
// If max_prefetch_size is not set, treat this as end of file.
|
|
ResetDataIter();
|
|
assert(!is_out_of_bound_);
|
|
assert(!Valid());
|
|
} else {
|
|
// If max_prefetch_size is set, treat this as error.
|
|
multi_scan_status_ = Status::PrefetchLimitReached();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
if (!multi_scan_->async_states.empty()) {
|
|
multi_scan_status_ = PollForBlock(idx);
|
|
if (!multi_scan_status_.ok()) {
|
|
return true;
|
|
}
|
|
}
|
|
// This block should have been initialized
|
|
assert(multi_scan_->pinned_data_blocks[idx].GetValue());
|
|
// Note that the block_iter_ takes ownership of the pinned data block
|
|
// TODO: we can delegate the clean up like with pinned_iters_mgr_ if
|
|
// need to pin blocks longer.
|
|
table_->NewDataBlockIterator<DataBlockIter>(
|
|
read_options_, multi_scan_->pinned_data_blocks[idx], &block_iter_,
|
|
Status::OK());
|
|
return false;
|
|
}
|
|
|
|
// After PollForBlock(idx), the async request that contains
|
|
// pinned_data_blocks[idx] should be done, and all blocks contained in this
|
|
// read request will be initialzed in pinned_data_blocks and pinned in block
|
|
// cache.
|
|
Status PollForBlock(size_t idx);
|
|
|
|
// Helper function to create and pin a block in cache from buffer data
|
|
// Handles decompressor setup with dictionary loading and block
|
|
// creation/pinning. The buffer_start_offset is the file offset where
|
|
// buffer_data starts.
|
|
Status CreateAndPinBlockFromBuffer(const BlockHandle& block,
|
|
uint64_t buffer_start_offset,
|
|
const Slice& buffer_data,
|
|
CachableEntry<Block>& pinned_block_entry);
|
|
|
|
Status CollectBlockHandles(
|
|
const std::vector<ScanOptions>& scan_opts,
|
|
std::vector<BlockHandle>* scan_block_handles,
|
|
std::vector<std::tuple<size_t, size_t>>* block_index_ranges_per_scan,
|
|
std::vector<std::string>* data_block_boundary_keys);
|
|
|
|
Status FilterAndPinCachedBlocks(
|
|
const std::vector<BlockHandle>& scan_block_handles,
|
|
const MultiScanArgs* multiscan_opts,
|
|
std::vector<size_t>* block_indices_to_read,
|
|
std::vector<CachableEntry<Block>>* pinned_data_blocks_guard,
|
|
size_t* prefetched_max_idx);
|
|
|
|
void PrepareIORequests(
|
|
const std::vector<size_t>& block_indices_to_read,
|
|
const std::vector<BlockHandle>& scan_block_handles,
|
|
const MultiScanArgs* multiscan_opts,
|
|
std::vector<FSReadRequest>* read_reqs,
|
|
UnorderedMap<size_t, size_t>* block_idx_to_readreq_idx,
|
|
std::vector<std::vector<size_t>>* coalesced_block_indices);
|
|
|
|
Status ExecuteIO(
|
|
const std::vector<BlockHandle>& scan_block_handles,
|
|
const MultiScanArgs* multiscan_opts,
|
|
const std::vector<std::vector<size_t>>& coalesced_block_indices,
|
|
std::vector<FSReadRequest>* read_reqs,
|
|
std::vector<AsyncReadState>* async_states,
|
|
std::vector<CachableEntry<Block>>* pinned_data_blocks_guard);
|
|
|
|
// *** END APIs relevant to multiscan ***
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|