rocksdb/table/block_based/block_based_table_iterator.h
anand76 5d0cf98e6c Surface MultiScan async read failure instead of asserting (#14171)
Summary:
Crash tests have been failing of late with this assertion failure - db_stress: `./table/block_based/block_based_table_iterator.h:656: void rocksdb::BlockBasedTableIterator::PrepareReadAsyncCallBack(rocksdb::FSReadRequest &, void *): Assertion `async_state->status.IsAborted()' failed.` Instead of asserting, surface the failure status so we can troubleshoot.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/14171

Reviewed By: xingbowang

Differential Revision: D88396654

Pulled By: anand1976

fbshipit-source-id: 8d59d7ace0c522c17b7af17c50e16af876911bad
2025-12-05 10:45:26 -08:00

743 lines
28 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <deque>
#include "db/seqno_to_time_mapping.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_based_table_reader_impl.h"
#include "table/block_based/block_prefetcher.h"
#include "table/block_based/reader_common.h"
namespace ROCKSDB_NAMESPACE {
// Iterates over the contents of BlockBasedTable.
class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// compaction_readahead_size: its value will only be used if for_compaction =
// true
// @param read_options Must outlive this iterator.
public:
BlockBasedTableIterator(
const BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator& icomp,
std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter,
bool check_filter, bool need_upper_bound_check,
const SliceTransform* prefix_extractor, TableReaderCaller caller,
size_t compaction_readahead_size = 0, bool allow_unprepared_value = false)
: index_iter_(std::move(index_iter)),
table_(table),
read_options_(read_options),
icomp_(icomp),
user_comparator_(icomp.user_comparator()),
pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor),
lookup_context_(caller),
block_prefetcher_(
compaction_readahead_size,
table_->get_rep()->table_options.initial_auto_readahead_size),
allow_unprepared_value_(allow_unprepared_value),
check_filter_(check_filter),
need_upper_bound_check_(need_upper_bound_check),
async_read_in_progress_(false),
is_last_level_(table->IsLastLevel()),
block_iter_points_to_real_block_(false) {
multi_scan_status_.PermitUncheckedError();
}
~BlockBasedTableIterator() override { ClearBlockHandles(); }
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() final override;
bool NextAndGetResult(IterateResult* result) override;
void Prev() override;
bool Valid() const override {
return !is_out_of_bound_ && multi_scan_status_.ok() &&
(is_at_first_key_from_index_ ||
(block_iter_points_to_real_block_ && block_iter_.Valid()));
}
// For block cache readahead lookup scenario -
// If is_at_first_key_from_index_ is true, InitDataBlock hasn't been
// called. It means block_handles is empty and index_ point to current block.
// So index_iter_ can be accessed directly.
Slice key() const override {
assert(Valid());
if (is_at_first_key_from_index_) {
assert(!multi_scan_);
return index_iter_->value().first_internal_key;
} else {
return block_iter_.key();
}
}
Slice user_key() const override {
assert(Valid());
if (is_at_first_key_from_index_) {
return ExtractUserKey(index_iter_->value().first_internal_key);
} else {
return block_iter_.user_key();
}
}
bool PrepareValue() override {
assert(Valid());
if (!is_at_first_key_from_index_) {
return true;
}
return const_cast<BlockBasedTableIterator*>(this)
->MaterializeCurrentBlock();
}
uint64_t write_unix_time() const override {
assert(Valid());
ParsedInternalKey pikey;
SequenceNumber seqno;
const SeqnoToTimeMapping& seqno_to_time_mapping =
table_->GetSeqnoToTimeMapping();
Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
if (!s.ok()) {
return std::numeric_limits<uint64_t>::max();
} else if (kUnknownSeqnoBeforeAll == pikey.sequence) {
return kUnknownTimeBeforeAll;
} else if (seqno_to_time_mapping.Empty()) {
return std::numeric_limits<uint64_t>::max();
} else if (kTypeValuePreferredSeqno == pikey.type) {
seqno = ParsePackedValueForSeqno(value());
} else {
seqno = pikey.sequence;
}
return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno);
}
Slice value() const override {
// PrepareValue() must have been called.
assert(!is_at_first_key_from_index_);
assert(Valid());
if (seek_stat_state_ & kReportOnUseful) {
bool filter_used = (seek_stat_state_ & kFilterUsed) != 0;
RecordTick(
table_->GetStatistics(),
filter_used
? (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH
: NON_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH)
: (is_last_level_ ? LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER
: NON_LAST_LEVEL_SEEK_DATA_USEFUL_NO_FILTER));
seek_stat_state_ = kDataBlockReadSinceLastSeek;
}
return block_iter_.value();
}
Status status() const override {
if (!multi_scan_status_.ok()) {
return multi_scan_status_;
}
// In case of block cache readahead lookup, it won't add the block to
// block_handles if it's index is invalid. So index_iter_->status check can
// be skipped.
// Prefix index set status to NotFound when the prefix does not exist.
if (IsIndexAtCurr() && !index_iter_->status().ok() &&
!index_iter_->status().IsNotFound()) {
assert(!multi_scan_);
return index_iter_->status();
} else if (block_iter_points_to_real_block_) {
// This is the common case.
return block_iter_.status();
} else if (async_read_in_progress_) {
assert(!multi_scan_);
return Status::TryAgain("Async read in progress");
} else if (multi_scan_) {
return multi_scan_status_;
} else {
return Status::OK();
}
}
inline IterBoundCheck UpperBoundCheckResult() override {
if (is_out_of_bound_) {
return IterBoundCheck::kOutOfBound;
} else if (block_upper_bound_check_ ==
BlockUpperBound::kUpperBoundBeyondCurBlock) {
assert(!is_out_of_bound_);
// MultiScan does not do block level upper bound check yet.
assert(!multi_scan_);
return IterBoundCheck::kInbound;
} else {
return IterBoundCheck::kUnknown;
}
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
}
bool IsKeyPinned() const override {
// Our key comes either from block_iter_'s current key
// or index_iter_'s current *value*.
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
(block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
}
bool IsValuePinned() const override {
assert(!is_at_first_key_from_index_);
assert(Valid());
// BlockIter::IsValuePinned() is always true. No need to check
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
block_iter_points_to_real_block_;
}
void ResetDataIter() {
if (block_iter_points_to_real_block_) {
if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
}
block_iter_.Invalidate(Status::OK());
block_iter_points_to_real_block_ = false;
}
block_upper_bound_check_ = BlockUpperBound::kUnknown;
}
void SavePrevIndexValue() {
if (block_iter_points_to_real_block_ && IsIndexAtCurr()) {
// Reseek. If they end up with the same data block, we shouldn't re-fetch
// the same data block.
prev_block_offset_ = index_iter_->value().handle.offset();
}
}
void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
if (block_prefetcher_.prefetch_buffer() != nullptr &&
read_options_.adaptive_readahead) {
block_prefetcher_.prefetch_buffer()->GetReadaheadState(
&(readahead_file_info->data_block_readahead_info));
if (index_iter_) {
index_iter_->GetReadaheadState(readahead_file_info);
}
}
}
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
if (read_options_.adaptive_readahead) {
block_prefetcher_.SetReadaheadState(
&(readahead_file_info->data_block_readahead_info));
if (index_iter_) {
index_iter_->SetReadaheadState(readahead_file_info);
}
}
}
void Prepare(const MultiScanArgs* scan_opts) override;
FilePrefetchBuffer* prefetch_buffer() {
return block_prefetcher_.prefetch_buffer();
}
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
bool TEST_IsBlockPinnedByMultiScan(size_t block_idx) {
if (!multi_scan_) {
return false;
}
if (block_idx >= multi_scan_->pinned_data_blocks.size()) {
return false;
}
return !multi_scan_->pinned_data_blocks[block_idx].IsEmpty();
}
private:
enum class IterDirection {
kForward,
kBackward,
};
// This enum indicates whether the upper bound falls into current block
// or beyond.
// +-------------+
// | cur block | <-- (1)
// +-------------+
// <-- (2)
// --- <boundary key> ---
// <-- (3)
// +-------------+
// | next block | <-- (4)
// ......
//
// When the block is smaller than <boundary key>, kUpperBoundInCurBlock
// is the value to use. The examples are (1) or (2) in the graph. It means
// all keys in the next block or beyond will be out of bound. Keys within
// the current block may or may not be out of bound.
// When the block is larger or equal to <boundary key>,
// kUpperBoundBeyondCurBlock is to be used. The examples are (3) and (4)
// in the graph. It means that all keys in the current block is within the
// upper bound and keys in the next block may or may not be within the uppder
// bound.
// If the boundary key hasn't been checked against the upper bound,
// kUnknown can be used.
enum class BlockUpperBound : uint8_t {
kUpperBoundInCurBlock,
kUpperBoundBeyondCurBlock,
kUnknown,
};
// State bits for collecting stats on seeks and whether they returned useful
// results.
enum SeekStatState : uint8_t {
kNone = 0,
// Most recent seek checked prefix filter (or similar future feature)
kFilterUsed = 1 << 0,
// Already recorded that a data block was accessed since the last seek.
kDataBlockReadSinceLastSeek = 1 << 1,
// Have not yet recorded that a value() was accessed.
kReportOnUseful = 1 << 2,
};
// BlockHandleInfo is used to store the info needed when block cache lookup
// ahead is enabled to tune readahead_size.
struct BlockHandleInfo {
void SetFirstInternalKey(const Slice& key) {
if (key.empty()) {
return;
}
size_t size = key.size();
buf_ = std::unique_ptr<char[]>(new char[size]);
memcpy(buf_.get(), key.data(), size);
first_internal_key_ = Slice(buf_.get(), size);
}
BlockHandle handle_;
bool is_cache_hit_ = false;
CachableEntry<Block> cachable_entry_;
Slice first_internal_key_;
std::unique_ptr<char[]> buf_;
};
bool IsIndexAtCurr() const { return is_index_at_curr_block_; }
const BlockBasedTable* table_;
const ReadOptions& read_options_;
const InternalKeyComparator& icomp_;
UserComparatorWrapper user_comparator_;
PinnedIteratorsManager* pinned_iters_mgr_;
DataBlockIter block_iter_;
const SliceTransform* prefix_extractor_;
uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
BlockCacheLookupContext lookup_context_;
BlockPrefetcher block_prefetcher_;
// It stores all the block handles that are lookuped in cache ahead when
// BlockCacheLookupForReadAheadSize is called. Since index_iter_ may point to
// different blocks when readahead_size is calculated in
// BlockCacheLookupForReadAheadSize, to avoid index_iter_ reseek,
// block_handles_ is used.
// `block_handles_` is lazily constructed to save CPU when it is unused
std::unique_ptr<std::deque<BlockHandleInfo>> block_handles_;
// The prefix of the key called with SeekImpl().
// This is for readahead trimming so no data blocks containing keys of a
// different prefix are prefetched
std::string seek_key_prefix_for_readahead_trimming_ = "";
const bool allow_unprepared_value_;
// How current data block's boundary key with the next block is compared with
// iterate upper bound.
BlockUpperBound block_upper_bound_check_ = BlockUpperBound::kUnknown;
// True if we're standing at the first key of a block, and we haven't loaded
// that block yet. A call to PrepareValue() will trigger loading the block.
bool is_at_first_key_from_index_ = false;
bool check_filter_;
// TODO(Zhongyi): pick a better name
bool need_upper_bound_check_;
bool async_read_in_progress_;
mutable SeekStatState seek_stat_state_ = SeekStatState::kNone;
bool is_last_level_;
// If set to true, it'll lookup in the cache ahead to estimate the readahead
// size based on cache hit and miss.
bool readahead_cache_lookup_ = false;
bool is_index_out_of_bound_ = false;
// Used in case of auto_readahead_size to disable the block_cache lookup if
// direction is reversed from forward to backward. In case of backward
// direction, SeekForPrev or Prev might call Seek from db_iter. So direction
// is used to disable the lookup.
IterDirection direction_ = IterDirection::kForward;
//*** BEGIN States used by both regular scan and multiscan
// True if block_iter_ is initialized and points to the same block
// as index iterator.
bool block_iter_points_to_real_block_;
// See InternalIteratorBase::IsOutOfBound().
bool is_out_of_bound_ = false;
// Mark prepared ranges as exhausted for multiscan.
void MarkPreparedRangeExhausted() {
assert(multi_scan_ != nullptr);
if (multi_scan_->next_scan_idx <
multi_scan_->block_index_ranges_per_scan.size()) {
// If there are more prepared ranges, we don't ResetDataIter() here,
// because next scan might be reading from the same block. ResetDataIter()
// will free the underlying block cache handle and we don't want the
// block to be unpinned.
// Set out of bound to mark the current prepared range as exhausted.
is_out_of_bound_ = true;
} else {
// This is the last prepared range of this file, there might be more
// data on next file. Reset data iterator to indicate the iterator is
// no longer valid on this file. Let LevelIter advance to the next file
// instead of ending the scan.
ResetDataIter();
}
}
// During cache lookup to find readahead size, index_iter_ is iterated and it
// can point to a different block.
// If Prepare() is called, index_iter_ is used to prefetch data blocks for the
// multiscan, so is_index_at_curr_block_ will be false.
// Whether index is expected to match the current data_block_iter_.
bool is_index_at_curr_block_ = true;
// *** END States used by both regular scan and multiscan
// *** BEGIN MultiScan related states ***
struct AsyncReadState {
std::unique_ptr<char[]> buf{nullptr};
// Indices into pinned_data_blocks that this request reads.
std::vector<size_t> block_indices;
// BlockHandle for each block in block_indices.
std::vector<BlockHandle> blocks;
void* io_handle{nullptr};
IOHandleDeleter del_fn{nullptr};
// offset for this async read request.
uint64_t offset{0};
// These two states are populated from the FSReadRequest
// by ReadAsync callback
Status status;
Slice result;
// For direct I/O support
AlignedBuf aligned_buf{nullptr};
bool finished{false};
AsyncReadState() = default;
DECLARE_DEFAULT_MOVES(AsyncReadState);
// Delete copy operations
AsyncReadState(const AsyncReadState&) = delete;
AsyncReadState& operator=(const AsyncReadState&) = delete;
void CleanUpIOHandle() {
if (io_handle != nullptr) {
assert(del_fn);
del_fn(io_handle);
io_handle = nullptr;
}
finished = true;
}
~AsyncReadState() {
// Should be cleaned up before destruction.
assert(io_handle == nullptr);
}
};
struct MultiScanState {
// For Aborting async I/Os in destructor.
const std::shared_ptr<FileSystem> fs;
const MultiScanArgs* scan_opts;
std::vector<CachableEntry<Block>> pinned_data_blocks;
// The separator of each data block in above pinned_data_blocks vector.
// Its size is same as pinned_data_blocks.
// The value of separator is larger than or equal to the last key in the
// corresponding data block.
std::vector<std::string> data_block_separators;
// Track previously seeked key in multi-scan.
// This is used to ensure that the seek key is keep moving forward, as
// blocks that are smaller than the seek key are unpinned from memory.
std::string prev_seek_key_;
// Indicies into pinned_data_blocks for data blocks for each scan range.
// inclusive start, exclusive end
std::vector<std::tuple<size_t, size_t>> block_index_ranges_per_scan;
size_t next_scan_idx;
size_t cur_data_block_idx;
// States for async reads.
//
// Each async state correspond to an async read request.
// Each async read request may read content for multiple blocks
// (potentially coalesced). In PollForBlock(idx), we will poll for the
// completion of the async read request responsible for
// pinned_data_blocks[idx], and populate `pinned_data_blocks` with all the
// blocks read. To find out the async read request responsible for
// pinned_data_blocks[idx], we store the mapping in
// block_idx_to_readreq_idx. Index i is in block_idx_to_readreq_idx and
// block_idx_to_readreq_idx[i] = j iff pinned_data_blocks[i] is read by
// async_states[j].
std::vector<AsyncReadState> async_states;
UnorderedMap<size_t, size_t> block_idx_to_readreq_idx;
size_t prefetch_max_idx;
MultiScanState(
const std::shared_ptr<FileSystem>& _fs, const MultiScanArgs* _scan_opts,
std::vector<CachableEntry<Block>>&& _pinned_data_blocks,
std::vector<std::string>&& _data_block_separators,
std::vector<std::tuple<size_t, size_t>>&& _block_index_ranges_per_scan,
UnorderedMap<size_t, size_t>&& _block_idx_to_readreq_idx,
std::vector<AsyncReadState>&& _async_states, size_t _prefetch_max_idx)
: fs(_fs),
scan_opts(_scan_opts),
pinned_data_blocks(std::move(_pinned_data_blocks)),
data_block_separators(std::move(_data_block_separators)),
block_index_ranges_per_scan(std::move(_block_index_ranges_per_scan)),
next_scan_idx(0),
cur_data_block_idx(0),
async_states(std::move(_async_states)),
block_idx_to_readreq_idx(std::move(_block_idx_to_readreq_idx)),
prefetch_max_idx(_prefetch_max_idx) {}
~MultiScanState();
};
Status multi_scan_status_;
std::unique_ptr<MultiScanState> multi_scan_;
// *** END MultiScan related APIs and states ***
void SeekSecondPass(const Slice* target);
// If `target` is null, seek to first.
void SeekImpl(const Slice* target, bool async_prefetch);
void InitDataBlock();
void AsyncInitDataBlock(bool is_first_pass);
bool MaterializeCurrentBlock();
void FindKeyForward();
void FindBlockForward();
void FindKeyBackward();
void CheckOutOfBound();
// Check if data block is fully within iterate_upper_bound.
//
// Note MyRocks may update iterate bounds between seek. To workaround it,
// we need to check and update data_block_within_upper_bound_ accordingly.
void CheckDataBlockWithinUpperBound();
bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction,
bool* filter_checked) {
if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
// Upper bound check isn't sufficient for backward direction to
// guarantee the same result as total order, so disable prefix
// check.
return true;
}
if (check_filter_ &&
!table_->PrefixRangeMayMatch(ikey, read_options_, prefix_extractor_,
need_upper_bound_check_, &lookup_context_,
filter_checked)) {
// TODO remember the iterator is invalidated because of prefix
// match. This can avoid the upper level file iterator to falsely
// believe the position is the end of the SST file and move to
// the first key of the next file.
ResetDataIter();
return false;
}
return true;
}
// *** BEGIN APIs relevant to auto tuning of readahead_size ***
// This API is called to lookup the data blocks ahead in the cache to tune
// the start and end offsets passed.
void BlockCacheLookupForReadAheadSize(bool read_curr_block,
uint64_t& start_offset,
uint64_t& end_offset);
void ResetBlockCacheLookupVar() {
is_index_out_of_bound_ = false;
readahead_cache_lookup_ = false;
ClearBlockHandles();
}
bool IsNextBlockOutOfReadaheadBound() {
const Slice& index_iter_user_key = index_iter_->user_key();
// If curr block's index key >= iterate_upper_bound, it means all the keys
// in next block or above are out of bound.
bool out_of_upper_bound =
read_options_.iterate_upper_bound != nullptr &&
(user_comparator_.CompareWithoutTimestamp(
index_iter_user_key,
/*a_has_ts=*/true, *read_options_.iterate_upper_bound,
/*b_has_ts=*/false) >= 0
? true
: false);
if (out_of_upper_bound) {
return true;
}
// If curr block's index key has a different prefix from the seek key's, it
// means all the keys in next block or above has a different prefix from the
// seek key's.
bool out_of_prefix_bound =
(read_options_.prefix_same_as_start &&
!seek_key_prefix_for_readahead_trimming_.empty() &&
(prefix_extractor_->InDomain(index_iter_user_key)
? (prefix_extractor_->Transform(index_iter_user_key)
.compare(seek_key_prefix_for_readahead_trimming_) != 0)
: user_comparator_.CompareWithoutTimestamp(
index_iter_user_key,
/*a_has_ts=*/true, seek_key_prefix_for_readahead_trimming_,
/*b_has_ts=*/false) > 0));
if (out_of_prefix_bound) {
return true;
}
return false;
}
void ClearBlockHandles() {
if (block_handles_ != nullptr) {
block_handles_->clear();
}
}
// Reset prev_block_offset_. If index_iter_ has moved ahead, it won't get
// accurate prev_block_offset_.
void ResetPreviousBlockOffset() {
prev_block_offset_ = std::numeric_limits<uint64_t>::max();
}
bool DoesContainBlockHandles() {
return block_handles_ != nullptr && !block_handles_->empty();
}
void InitializeStartAndEndOffsets(bool read_curr_block,
bool& found_first_miss_block,
uint64_t& start_updated_offset,
uint64_t& end_updated_offset,
size_t& prev_handles_size);
// *** END APIs relevant to auto tuning of readahead_size ***
// *** BEGIN APIs relevant to multiscan ***
void SeekMultiScan(const Slice* target);
void FindBlockForwardInMultiScan();
void PrepareReadAsyncCallBack(FSReadRequest& req, void* cb_arg) {
// Record status, result and sanity check offset from `req`.
AsyncReadState* async_state = static_cast<AsyncReadState*>(cb_arg);
async_state->status = req.status;
async_state->result = req.result;
if (async_state->status.ok()) {
assert(async_state->offset == req.offset);
if (async_state->offset != req.offset) {
async_state->status = Status::InvalidArgument(
"offset mismatch between async read request " +
std::to_string(async_state->offset) + " and async callback " +
std::to_string(req.offset));
}
}
}
void MultiScanSeekTargetFromBlock(const Slice* seek_target, size_t block_idx);
void MultiScanUnexpectedSeekTarget(const Slice* seek_target,
const Slice* user_seek_target);
// Return true, if there is an error, or end of file
bool MultiScanLoadDataBlock(size_t idx) {
if (idx >= multi_scan_->prefetch_max_idx) {
// TODO: Fix the max_prefetch_size support for multiple files.
// The goal is to limit the memory usage, prefetch could be done
// incrementally.
if (multi_scan_->scan_opts->max_prefetch_size == 0) {
// If max_prefetch_size is not set, treat this as end of file.
ResetDataIter();
assert(!is_out_of_bound_);
assert(!Valid());
} else {
// If max_prefetch_size is set, treat this as error.
multi_scan_status_ = Status::PrefetchLimitReached();
}
return true;
}
if (!multi_scan_->async_states.empty()) {
multi_scan_status_ = PollForBlock(idx);
if (!multi_scan_status_.ok()) {
return true;
}
}
// This block should have been initialized
assert(multi_scan_->pinned_data_blocks[idx].GetValue());
// Note that the block_iter_ takes ownership of the pinned data block
// TODO: we can delegate the clean up like with pinned_iters_mgr_ if
// need to pin blocks longer.
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, multi_scan_->pinned_data_blocks[idx], &block_iter_,
Status::OK());
return false;
}
// After PollForBlock(idx), the async request that contains
// pinned_data_blocks[idx] should be done, and all blocks contained in this
// read request will be initialzed in pinned_data_blocks and pinned in block
// cache.
Status PollForBlock(size_t idx);
// Helper function to create and pin a block in cache from buffer data
// Handles decompressor setup with dictionary loading and block
// creation/pinning. The buffer_start_offset is the file offset where
// buffer_data starts.
Status CreateAndPinBlockFromBuffer(const BlockHandle& block,
uint64_t buffer_start_offset,
const Slice& buffer_data,
CachableEntry<Block>& pinned_block_entry);
Status CollectBlockHandles(
const std::vector<ScanOptions>& scan_opts,
std::vector<BlockHandle>* scan_block_handles,
std::vector<std::tuple<size_t, size_t>>* block_index_ranges_per_scan,
std::vector<std::string>* data_block_boundary_keys);
Status FilterAndPinCachedBlocks(
const std::vector<BlockHandle>& scan_block_handles,
const MultiScanArgs* multiscan_opts,
std::vector<size_t>* block_indices_to_read,
std::vector<CachableEntry<Block>>* pinned_data_blocks_guard,
size_t* prefetched_max_idx);
void PrepareIORequests(
const std::vector<size_t>& block_indices_to_read,
const std::vector<BlockHandle>& scan_block_handles,
const MultiScanArgs* multiscan_opts,
std::vector<FSReadRequest>* read_reqs,
UnorderedMap<size_t, size_t>* block_idx_to_readreq_idx,
std::vector<std::vector<size_t>>* coalesced_block_indices);
Status ExecuteIO(
const std::vector<BlockHandle>& scan_block_handles,
const MultiScanArgs* multiscan_opts,
const std::vector<std::vector<size_t>>& coalesced_block_indices,
std::vector<FSReadRequest>* read_reqs,
std::vector<AsyncReadState>* async_states,
std::vector<CachableEntry<Block>>* pinned_data_blocks_guard);
// *** END APIs relevant to multiscan ***
};
} // namespace ROCKSDB_NAMESPACE