rocksdb/util/io_dispatcher_imp.cc
Anand Ananthabhotla b21eaa91c2 Fix memory accounting leak in IODispatcher ReadIndex() (#14569)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/14569

ReadSet::ReadIndex() moves block values out of pinned_blocks_ via std::move,
but never releases the associated prefetch memory accounting. This causes
ReleaseBlock() and the destructor to skip ReleaseMemory() since they check
pinned_blocks_.GetValue() which returns null after the move. Over time, the
memory budget is exhausted and no further prefetches can be dispatched when
max_prefetch_memory_bytes is set. The bug was introduced in https://github.com/facebook/rocksdb/pull/14401.

The fix releases memory accounting in ReadIndex() when moving values out
(both for Case 1: block already available, and Case 2: after async IO
polling), and zeros block_sizes_ to prevent double-release.

Also adds multiscan_max_prefetch_memory_bytes option to db_stress/crashtest
for stress testing this code path.

Reviewed By: hx235

Differential Revision: D99488961

fbshipit-source-id: 5ddd1f50e2f6ebb357f86e013d781a790e7e558a
2026-04-09 22:43:39 -07:00

1099 lines
39 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/io_dispatcher_imp.h"
#include <deque>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "file/random_access_file_reader.h"
#include "monitoring/statistics_impl.h"
#include "port/port.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_dispatcher.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/reader_common.h"
#include "table/format.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
// IODispatcherImplData is the base that provides ReleaseMemory interface
// for ReadSets to call back when releasing blocks. Defined here so it's
// visible to ReadSet methods.
struct IODispatcherImplData {
virtual ~IODispatcherImplData() = default;
virtual void ReleaseMemory(size_t bytes) = 0;
};
// Helper function to create and pin a block from a buffer
// Used by both ReadSet::PollAndProcessAsyncIO and IODispatcherImpl::Impl
static Status CreateAndPinBlockFromBuffer(
const std::shared_ptr<IOJob>& job, const BlockHandle& block,
uint64_t buffer_start_offset, const Slice& buffer_data,
CachableEntry<Block>& pinned_block_entry) {
auto* rep = job->table->get_rep();
// Get decompressor
UnownedPtr<Decompressor> decompressor = rep->decompressor.get();
CachableEntry<DecompressorDict> cached_dict;
if (rep->uncompression_dict_reader) {
Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary(
nullptr, job->job_options.read_options, nullptr, nullptr, &cached_dict);
if (!s.ok()) {
return s;
}
if (cached_dict.GetValue()) {
decompressor = cached_dict.GetValue()->decompressor_.get();
}
}
// Create block from buffer data
const auto block_size_with_trailer =
BlockBasedTable::BlockSizeWithTrailer(block);
const auto block_offset_in_buffer = block.offset() - buffer_start_offset;
CacheAllocationPtr data = AllocateBlock(
block_size_with_trailer, GetMemoryAllocator(rep->table_options));
memcpy(data.get(), buffer_data.data() + block_offset_in_buffer,
block_size_with_trailer);
BlockContents tmp_contents(std::move(data), block.size());
#ifndef NDEBUG
tmp_contents.has_trailer = rep->footer.GetBlockTrailerSize() > 0;
#endif
return job->table->CreateAndPinBlockInCache<Block_kData>(
job->job_options.read_options, block, decompressor, &tmp_contents,
&pinned_block_entry.As<Block_kData>());
}
// State for async IO operations (implementation detail)
struct AsyncIOState {
AsyncIOState() : offset(static_cast<uint64_t>(-1)) {}
~AsyncIOState() { read_req.status.PermitUncheckedError(); }
AsyncIOState(const AsyncIOState&) = delete;
AsyncIOState& operator=(const AsyncIOState&) = delete;
AsyncIOState(AsyncIOState&&) = default;
AsyncIOState& operator=(AsyncIOState&&) = default;
std::unique_ptr<char[]> buf;
AlignedBuf aligned_buf;
void* io_handle = nullptr;
IOHandleDeleter del_fn;
uint64_t offset;
std::vector<size_t> block_indices;
std::vector<BlockHandle> blocks;
FSReadRequest read_req;
};
// ReadSet destructor - clean up IO handles
// Must call AbortIO before deleting handles to avoid use-after-free when
// io_uring completions arrive for deleted handles.
ReadSet::~ReadSet() {
// Release memory for any blocks still pinned
// Note: block_sizes_[i] is only set for async IO reads where memory
// limiting applies. For sync reads, block_sizes_ remains 0, so this
// loop is effectively a no-op for sync reads.
if (auto dispatcher_data = dispatcher_data_.lock()) {
for (size_t i = 0; i < block_sizes_.size(); ++i) {
if (block_sizes_[i] > 0 && pinned_blocks_[i].GetValue()) {
dispatcher_data->ReleaseMemory(block_sizes_[i]);
}
}
}
if (async_io_map_.empty()) {
return;
}
// Collect unique pending IO handles (multiple block indices may share the
// same async_state due to coalescing)
std::vector<void*> pending_handles;
std::unordered_set<void*> seen_handles;
for (auto& pair : async_io_map_) {
auto& async_state = pair.second;
if (async_state->io_handle != nullptr &&
seen_handles.find(async_state->io_handle) == seen_handles.end()) {
pending_handles.push_back(async_state->io_handle);
seen_handles.insert(async_state->io_handle);
}
}
// Abort all pending IO operations before deleting handles
if (!pending_handles.empty() && fs_) {
// AbortIO cancels pending requests and waits for completions
IOStatus s = fs_->AbortIO(pending_handles);
(void)s; // Ignore errors in destructor
}
// Now safe to delete the handles
for (auto& pair : async_io_map_) {
auto& async_state = pair.second;
if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) {
async_state->del_fn(async_state->io_handle);
async_state->io_handle = nullptr;
}
}
}
// Main Read() method - transparently handles cache, async IO, and sync reads
Status ReadSet::ReadIndex(size_t block_index, CachableEntry<Block>* out) {
// Bounds check
if (block_index >= pinned_blocks_.size()) {
return Status::InvalidArgument("Block index out of range");
}
// Case 1: Block is already available (from cache or sync read during
// SubmitJob)
if (pinned_blocks_[block_index].GetValue()) {
*out = std::move(pinned_blocks_[block_index]);
// Release memory accounting for prefetched blocks. After moving the value
// out, ReleaseBlock() and the destructor check pinned_blocks_.GetValue()
// which will be null, so they won't release memory again.
if (block_index < block_sizes_.size() && block_sizes_[block_index] > 0) {
if (auto dispatcher_data = dispatcher_data_.lock()) {
dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
}
block_sizes_[block_index] = 0;
}
// Note: Statistics for this block were already counted during SubmitJob
// (either as cache hit or sync read)
return Status::OK();
}
// Case 2: Block has async IO in progress - poll and process
if (job_->job_options.read_options.async_io) {
auto it = async_io_map_.find(block_index);
if (it != async_io_map_.end()) {
// Get the number of blocks in this coalesced async request BEFORE polling
// (since PollAndProcessAsyncIO will remove entries from the map)
size_t num_blocks_in_request = it->second->block_indices.size();
if (Status s = PollAndProcessAsyncIO(it->second); !s.ok()) {
return s;
}
// Count all blocks that were read in this async request
num_async_reads_ += num_blocks_in_request;
// After polling, the block should be in pinned_blocks_
if (pinned_blocks_[block_index].GetValue()) {
*out = std::move(pinned_blocks_[block_index]);
// Release memory accounting (same as case 1 above)
if (block_index < block_sizes_.size() &&
block_sizes_[block_index] > 0) {
if (auto dispatcher_data = dispatcher_data_.lock()) {
dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
}
block_sizes_[block_index] = 0;
}
return Status::OK();
}
return Status::IOError("Failed to process async IO result");
}
}
// Case 3: Block needs synchronous read (pending or never-dispatched blocks).
// No ReleaseMemory() needed here because blocks reaching this path never had
// TryAcquireMemory() called — they were either pending prefetch or skipped
// during SubmitJob. block_sizes_[block_index] may be > 0 (set during
// SubmitJob for all uncached blocks) but that does not imply memory was
// acquired.
RemoveFromPending(block_index);
Status s = SyncRead(block_index);
if (s.ok()) {
*out = std::move(pinned_blocks_[block_index]);
num_sync_reads_++;
}
return s;
}
Status ReadSet::ReadOffset(size_t offset, CachableEntry<Block>* out) {
if (sorted_block_indices_.empty()) {
return Status::InvalidArgument("ReadSet not initialized");
}
// Use binary search on the sorted index to find the block containing offset.
// sorted_block_indices_ contains original indices sorted by block offset.
const auto& block_handles = job_->block_handles;
// Binary search for the first block whose offset is > offset, then back up
auto it = std::upper_bound(sorted_block_indices_.begin(),
sorted_block_indices_.end(), offset,
[&block_handles](size_t off, size_t idx) {
return off < block_handles[idx].offset();
});
// If it == begin(), offset is before all blocks
if (it == sorted_block_indices_.begin()) {
return Status::InvalidArgument("Offset not found in any block");
}
// Back up to the candidate block (largest offset <= our offset)
--it;
size_t candidate_idx = *it;
const auto& handle = block_handles[candidate_idx];
// Check if offset falls within this block
if (offset >= handle.offset() && offset < (handle.offset() + handle.size())) {
return ReadIndex(candidate_idx, out);
}
return Status::InvalidArgument("Offset not found in any block");
}
void ReadSet::ReleaseBlock(size_t block_index) {
if (block_index >= pinned_blocks_.size()) {
return;
}
// Remove from pending if applicable
RemoveFromPending(block_index);
// Release memory BEFORE unpinning
// Note: block_sizes_[idx] is only set for async IO reads where memory
// limiting applies. For sync reads, block_sizes_ remains 0, so this
// check implicitly skips ReleaseMemory for sync reads.
if (pinned_blocks_[block_index].GetValue() &&
block_index < block_sizes_.size() && block_sizes_[block_index] > 0) {
if (auto dispatcher_data = dispatcher_data_.lock()) {
dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
}
block_sizes_[block_index] = 0; // Prevent double-release
}
// Unpin the block from cache
pinned_blocks_[block_index].Reset();
// Clean up any pending async IO for this block
async_io_map_.erase(block_index);
}
bool ReadSet::IsBlockAvailable(size_t block_index) const {
if (block_index >= pinned_blocks_.size()) {
return false;
}
// Block is available if it hasn't been released (still has a value or
// has pending async IO)
return pinned_blocks_[block_index].GetValue() != nullptr ||
async_io_map_.find(block_index) != async_io_map_.end();
}
// Poll and process async IO for a specific block
Status ReadSet::PollAndProcessAsyncIO(
const std::shared_ptr<AsyncIOState>& async_state) {
auto* rep = job_->table->get_rep();
// Poll for IO completion using FileSystem Poll API
std::vector<void*> io_handles = {async_state->io_handle};
IOStatus io_s = rep->ioptions.env->GetFileSystem()->Poll(io_handles, 1);
if (!io_s.ok()) {
return io_s;
}
// Check for read errors
if (!async_state->read_req.status.ok()) {
return async_state->read_req.status;
}
// Use the result slice from the callback which has been correctly set
// with any necessary alignment adjustments for direct IO
const Slice& buffer_data = async_state->read_req.result;
// Process all blocks in this async request
for (size_t i = 0; i < async_state->block_indices.size(); ++i) {
const size_t idx = async_state->block_indices[i];
const auto& block_handle = async_state->blocks[i];
Status s =
CreateAndPinBlockFromBuffer(job_, block_handle, async_state->offset,
buffer_data, pinned_blocks_[idx]);
if (!s.ok()) {
return s;
}
}
// Clean up IO handle
if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) {
async_state->del_fn(async_state->io_handle);
async_state->io_handle = nullptr;
}
// Remove from map - all blocks in this request have been processed
// Store indices in a temporary vector to avoid iterator invalidation
std::vector<size_t> indices_to_remove = async_state->block_indices;
for (const auto idx : indices_to_remove) {
async_io_map_.erase(idx);
}
return Status::OK();
}
// Perform synchronous read for a specific block
// This performs a direct synchronous read from disk when the block is not in
// cache
Status ReadSet::SyncRead(size_t block_index) {
const auto& block_handle = job_->block_handles[block_index];
auto* rep = job_->table->get_rep();
// Get dictionary-aware decompressor if available
UnownedPtr<Decompressor> decompressor = rep->decompressor.get();
CachableEntry<DecompressorDict> cached_dict;
if (rep->uncompression_dict_reader) {
Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary(
nullptr, job_->job_options.read_options, nullptr, nullptr,
&cached_dict);
if (!s.ok()) {
return s;
}
if (cached_dict.GetValue()) {
decompressor = cached_dict.GetValue()->decompressor_.get();
}
}
return job_->table->RetrieveBlock<Block_kData>(
/*prefetch_buffer=*/nullptr, job_->job_options.read_options, block_handle,
decompressor, &pinned_blocks_[block_index].As<Block_kData>(),
/*get_context=*/nullptr, /*lookup_context=*/nullptr,
/*for_compaction=*/false, /*use_cache=*/true,
/*async_read=*/false, /*use_block_cache_for_lookup=*/true);
}
// A pre-coalesced group of blocks for prefetching
struct CoalescedPrefetchGroup {
std::vector<size_t> block_indices; // Blocks in this group (sorted by offset)
size_t total_bytes = 0; // Total bytes for this IO
};
// State for a pending memory request waiting to be granted
// Groups are pre-coalesced at queue time for efficient dispatch
struct PendingPrefetchRequest {
std::weak_ptr<ReadSet> read_set;
std::shared_ptr<IOJob> job;
// Pre-coalesced groups ready for dispatch (ordered by first block index)
std::deque<CoalescedPrefetchGroup> coalesced_groups;
// Individual block indices still pending (for RemoveFromPending lookup)
std::unordered_set<size_t> block_indices_to_prefetch;
std::atomic<size_t> pending_bytes_{0}; // Track remaining bytes
mutable port::Mutex groups_mutex_; // Protects groups and set modifications
};
// Remove a block from pending prefetch (called when block is read or released)
void ReadSet::RemoveFromPending(size_t block_index) {
if (!pending_prefetch_flags_ || block_index >= pending_prefetch_flags_size_) {
return;
}
// Atomic exchange - returns true only if it was previously true
if (!pending_prefetch_flags_[block_index].exchange(false)) {
return; // Already removed or never pending
}
if (pending_request_) {
MutexLock lock(&pending_request_->groups_mutex_);
pending_request_->block_indices_to_prefetch.erase(block_index);
pending_request_->pending_bytes_ -= block_sizes_[block_index];
}
}
// IODispatcherImpl::Impl inherits from IODispatcherImplData
struct IODispatcherImpl::Impl : public IODispatcherImplData,
public std::enable_shared_from_this<Impl> {
explicit Impl(const IODispatcherOptions& options);
~Impl() override;
// Non-copyable and non-movable
Impl(const Impl&) = delete;
Impl& operator=(const Impl&) = delete;
Impl(Impl&&) = delete;
Impl& operator=(Impl&&) = delete;
Status SubmitJob(const std::shared_ptr<IOJob>& job,
std::shared_ptr<ReadSet>* read_set);
// Memory management methods - non-blocking
bool TryAcquireMemory(size_t bytes);
void ReleaseMemory(size_t bytes) override;
// Memory limiting state
size_t max_prefetch_memory_bytes_ = 0;
std::atomic<size_t> memory_used_{0}; // Atomic for lock-free accounting
std::atomic<bool> has_pending_requests_{false}; // Fast-path check
port::Mutex memory_mutex_; // Only for pending_prefetch_queue_ access
std::deque<std::shared_ptr<PendingPrefetchRequest>> pending_prefetch_queue_;
Statistics* statistics_ = nullptr;
private:
void PrepareIORequests(
const std::shared_ptr<IOJob>& job,
const std::vector<size_t>& block_indices_to_read,
const std::vector<BlockHandle>& block_handles,
std::vector<FSReadRequest>* read_reqs,
std::vector<std::vector<size_t>>* coalesced_block_indices);
// Surface actual async IO errors to caller, but allow fallback for
// unsupported cases. Returns block indices that need sync fallback.
std::vector<size_t> ExecuteAsyncIO(
const std::shared_ptr<IOJob>& job,
const std::shared_ptr<ReadSet>& read_set,
std::vector<FSReadRequest>& read_reqs,
const std::vector<std::vector<size_t>>& coalesced_block_indices,
Status* out_status);
Status ExecuteSyncIO(
const std::shared_ptr<IOJob>& job,
const std::shared_ptr<ReadSet>& read_set,
std::vector<FSReadRequest>& read_reqs,
const std::vector<std::vector<size_t>>& coalesced_block_indices);
// Try to dispatch pending prefetch requests when memory becomes available
void TryDispatchPendingPrefetches();
// Dispatch prefetch for a specific ReadSet (called when memory is available)
void DispatchPrefetch(const std::shared_ptr<ReadSet>& read_set,
const std::shared_ptr<IOJob>& job,
const std::vector<size_t>& block_indices);
// Pre-coalesce blocks into groups, respecting max_group_bytes size limit.
// Returns groups ordered by first block index (earlier blocks first).
std::vector<CoalescedPrefetchGroup> PreCoalesceBlocks(
const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs,
const std::vector<size_t>& block_indices, size_t max_group_bytes);
};
IODispatcherImpl::Impl::Impl(const IODispatcherOptions& options)
: max_prefetch_memory_bytes_(options.max_prefetch_memory_bytes),
statistics_(options.statistics) {}
IODispatcherImpl::Impl::~Impl() {}
bool IODispatcherImpl::Impl::TryAcquireMemory(size_t bytes) {
if (max_prefetch_memory_bytes_ == 0) {
return true; // No limit configured
}
// Lock-free memory acquisition using compare-exchange
size_t current = memory_used_.load(std::memory_order_relaxed);
while (true) {
if (current + bytes > max_prefetch_memory_bytes_) {
// Not enough memory - caller should queue for later
RecordTick(statistics_, PREFETCH_MEMORY_REQUESTS_BLOCKED);
return false;
}
if (memory_used_.compare_exchange_weak(current, current + bytes,
std::memory_order_release,
std::memory_order_relaxed)) {
RecordTick(statistics_, PREFETCH_MEMORY_BYTES_GRANTED, bytes);
return true;
}
// current is updated by compare_exchange_weak on failure, retry
}
}
void IODispatcherImpl::Impl::ReleaseMemory(size_t bytes) {
if (max_prefetch_memory_bytes_ == 0) {
return; // No limit configured
}
// Lock-free memory release using atomic fetch_sub
size_t old_val = memory_used_.fetch_sub(bytes, std::memory_order_release);
assert(old_val >= bytes);
(void)old_val; // Suppress unused warning in release builds
RecordTick(statistics_, PREFETCH_MEMORY_BYTES_RELEASED, bytes);
// Fast-path: skip dispatch attempt if no pending requests
// This avoids mutex contention in the common single-threaded iterator case
if (!has_pending_requests_.load(std::memory_order_acquire)) {
return;
}
// Try to dispatch pending prefetches now that memory is available
TryDispatchPendingPrefetches();
}
void IODispatcherImpl::Impl::TryDispatchPendingPrefetches() {
// Process pending prefetch requests - dispatch entire coalesced groups
while (true) {
std::shared_ptr<PendingPrefetchRequest> pending;
{
MutexLock lock(&memory_mutex_);
if (pending_prefetch_queue_.empty()) {
has_pending_requests_.store(false, std::memory_order_release);
return;
}
// Get the next pending request
pending = std::move(pending_prefetch_queue_.front());
pending_prefetch_queue_.pop_front();
}
// Check if the ReadSet is still alive
auto read_set = pending->read_set.lock();
if (!read_set) {
continue; // ReadSet was destroyed, skip this request
}
// Try to acquire memory for coalesced groups (entire groups at a time)
std::vector<size_t> blocks_to_dispatch;
bool has_remaining_groups = false;
{
MutexLock lock(&pending->groups_mutex_);
while (!pending->coalesced_groups.empty()) {
auto& group = pending->coalesced_groups.front();
// Filter out blocks that were already read (not in pending set anymore)
std::vector<size_t> remaining_blocks;
size_t remaining_bytes = 0;
for (size_t idx : group.block_indices) {
if (pending->block_indices_to_prefetch.count(idx) > 0) {
remaining_blocks.push_back(idx);
remaining_bytes += read_set->block_sizes_[idx];
}
}
// Skip empty groups (all blocks were already read)
if (remaining_blocks.empty()) {
pending->coalesced_groups.pop_front();
continue;
}
// Try to acquire memory for remaining blocks only
if (TryAcquireMemory(remaining_bytes)) {
// Add all remaining blocks from this group to dispatch
for (size_t idx : remaining_blocks) {
blocks_to_dispatch.push_back(idx);
pending->block_indices_to_prefetch.erase(idx);
}
pending->pending_bytes_ -= remaining_bytes;
pending->coalesced_groups.pop_front();
} else {
// Not enough memory for this group - update with remaining blocks
group.block_indices = std::move(remaining_blocks);
group.total_bytes = remaining_bytes;
has_remaining_groups = true;
break;
}
}
}
// Save job before potential move of pending
auto job = pending->job;
// Requeue if groups remain
if (has_remaining_groups) {
MutexLock lock(&memory_mutex_);
pending_prefetch_queue_.push_front(std::move(pending));
} else {
// All groups dispatched, clear pending state
read_set->pending_request_.reset();
}
// Clear pending flags for dispatched blocks
if (read_set->pending_prefetch_flags_) {
for (size_t idx : blocks_to_dispatch) {
if (idx < read_set->pending_prefetch_flags_size_) {
read_set->pending_prefetch_flags_[idx].store(false);
}
}
}
// Dispatch acquired blocks
if (!blocks_to_dispatch.empty()) {
DispatchPrefetch(read_set, job, blocks_to_dispatch);
}
// If we dispatched nothing, stop (no memory available for any group)
if (blocks_to_dispatch.empty()) {
return;
}
}
}
void IODispatcherImpl::Impl::DispatchPrefetch(
const std::shared_ptr<ReadSet>& read_set, const std::shared_ptr<IOJob>& job,
const std::vector<size_t>& block_indices) {
// Sync point for testing partial prefetch - passes number of blocks being
// dispatched
TEST_SYNC_POINT_CALLBACK("IODispatcherImpl::DispatchPrefetch:BlockCount",
const_cast<std::vector<size_t>*>(&block_indices));
// Prepare and execute IO for the given blocks
std::vector<FSReadRequest> read_reqs;
std::vector<std::vector<size_t>> coalesced_block_indices;
PrepareIORequests(job, block_indices, job->block_handles, &read_reqs,
&coalesced_block_indices);
if (job->job_options.read_options.async_io) {
Status async_status;
std::vector<size_t> fallback_indices = ExecuteAsyncIO(
job, read_set, read_reqs, coalesced_block_indices, &async_status);
// For blocks where async is not supported, do sync IO
if (!fallback_indices.empty()) {
std::vector<FSReadRequest> sync_read_reqs;
std::vector<std::vector<size_t>> sync_coalesced_indices;
PrepareIORequests(job, fallback_indices, job->block_handles,
&sync_read_reqs, &sync_coalesced_indices);
// Prefetch errors are ignored - user will get the error when reading
Status s =
ExecuteSyncIO(job, read_set, sync_read_reqs, sync_coalesced_indices);
s.PermitUncheckedError();
read_set->num_sync_reads_ += fallback_indices.size();
}
// Async errors are also ignored - user will get the error when reading
async_status.PermitUncheckedError();
} else {
// Prefetch errors are ignored - user will get the error when reading
Status s = ExecuteSyncIO(job, read_set, read_reqs, coalesced_block_indices);
s.PermitUncheckedError();
read_set->num_sync_reads_ += block_indices.size();
}
}
Status IODispatcherImpl::Impl::SubmitJob(const std::shared_ptr<IOJob>& job,
std::shared_ptr<ReadSet>* read_set) {
if (!read_set) {
return Status::InvalidArgument("read_set output parameter is null");
}
auto rs = std::make_shared<ReadSet>();
// Initialize ReadSet
rs->job_ = job;
rs->fs_ = job->table->get_rep()->ioptions.env->GetFileSystem();
rs->pinned_blocks_.resize(job->block_handles.size());
rs->block_sizes_.resize(job->block_handles.size(), 0);
// Build sorted index for O(log n) ReadOffset lookups via binary search.
// sorted_block_indices_[i] = original index of i-th smallest block by offset.
rs->sorted_block_indices_.resize(job->block_handles.size());
for (size_t i = 0; i < job->block_handles.size(); ++i) {
rs->sorted_block_indices_[i] = i;
}
std::sort(rs->sorted_block_indices_.begin(), rs->sorted_block_indices_.end(),
[&job](size_t a, size_t b) {
return job->block_handles[a].offset() <
job->block_handles[b].offset();
});
// Step 1: Check cache and pin cached blocks
std::vector<size_t> block_indices_to_read;
for (size_t i = 0; i < job->block_handles.size(); ++i) {
const auto& data_block_handle = job->block_handles[i];
// Lookup and pin block in cache
Status s = job->table->LookupAndPinBlocksInCache<Block_kData>(
job->job_options.read_options, data_block_handle,
&(rs->pinned_blocks_)[i].As<Block_kData>());
if (!s.ok()) {
continue;
}
if (!(rs->pinned_blocks_)[i].GetValue()) {
// Block not in cache - needs to be read from disk
block_indices_to_read.emplace_back(i);
}
}
// Step 2: Prepare IO requests for blocks not in cache
if (block_indices_to_read.empty()) {
// All blocks found in cache - count them as cache hits
rs->num_cache_hits_ = job->block_handles.size();
*read_set = std::move(rs);
return Status::OK();
}
// Count cache hits (blocks that were found in cache during lookup above)
rs->num_cache_hits_ =
job->block_handles.size() - block_indices_to_read.size();
// Calculate block sizes for uncached blocks
for (const auto& idx : block_indices_to_read) {
size_t block_size =
BlockBasedTable::BlockSizeWithTrailer(job->block_handles[idx]);
rs->block_sizes_[idx] = block_size;
}
// Store dispatcher reference for release callbacks
rs->dispatcher_data_ = shared_from_this();
// Pre-coalesce blocks into groups, respecting memory budget per group
// This ensures we dispatch meaningful IO sizes, not tiny single-block IOs
// Both memory-limited and non-memory-limited paths use the same coalescing
auto coalesced_groups = PreCoalesceBlocks(job, rs, block_indices_to_read,
max_prefetch_memory_bytes_);
std::vector<size_t> blocks_to_dispatch;
std::deque<CoalescedPrefetchGroup> groups_to_queue;
// Try to acquire memory for entire coalesced groups
for (auto& group : coalesced_groups) {
if (TryAcquireMemory(group.total_bytes)) {
// Add all blocks from this group to dispatch
for (size_t idx : group.block_indices) {
blocks_to_dispatch.push_back(idx);
}
} else {
// Queue this group for later
groups_to_queue.push_back(std::move(group));
}
}
// Dispatch acquired blocks immediately
if (!blocks_to_dispatch.empty()) {
DispatchPrefetch(rs, job, blocks_to_dispatch);
}
// Queue remaining groups for later (only applies when memory limiting)
if (!groups_to_queue.empty()) {
auto pending = std::make_shared<PendingPrefetchRequest>();
pending->read_set = rs;
pending->job = job;
size_t pending_bytes = 0;
for (const auto& group : groups_to_queue) {
for (size_t idx : group.block_indices) {
pending->block_indices_to_prefetch.insert(idx);
}
pending_bytes += group.total_bytes;
}
pending->coalesced_groups = std::move(groups_to_queue);
pending->pending_bytes_ = pending_bytes;
// Set up pending flags for queued blocks only
size_t num_blocks = job->block_handles.size();
rs->pending_prefetch_flags_ =
std::make_unique<std::atomic<bool>[]>(num_blocks);
rs->pending_prefetch_flags_size_ = num_blocks;
for (size_t idx : pending->block_indices_to_prefetch) {
rs->pending_prefetch_flags_[idx].store(true);
}
rs->pending_request_ = pending;
{
MutexLock lock(&memory_mutex_);
pending_prefetch_queue_.push_back(std::move(pending));
has_pending_requests_.store(true, std::memory_order_release);
}
}
*read_set = std::move(rs);
return Status::OK();
}
void IODispatcherImpl::Impl::PrepareIORequests(
const std::shared_ptr<IOJob>& job,
const std::vector<size_t>& block_indices_to_read,
const std::vector<BlockHandle>& block_handles,
std::vector<FSReadRequest>* read_reqs,
std::vector<std::vector<size_t>>* coalesced_block_indices) {
// This is necessary because block handles may not be in sorted order
std::vector<size_t> sorted_block_indices = block_indices_to_read;
std::sort(sorted_block_indices.begin(), sorted_block_indices.end(),
[&block_handles](size_t a, size_t b) {
return block_handles[a].offset() < block_handles[b].offset();
});
assert(coalesced_block_indices->empty());
coalesced_block_indices->resize(1);
for (const auto& block_idx : sorted_block_indices) {
if (!coalesced_block_indices->back().empty()) {
// Check if we can coalesce with previous block
const auto& last_block_handle =
block_handles[coalesced_block_indices->back().back()];
uint64_t last_block_end =
last_block_handle.offset() +
BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
uint64_t current_start = block_handles[block_idx].offset();
if (current_start >
last_block_end + job->job_options.io_coalesce_threshold) {
// Gap too large - start new IO request
coalesced_block_indices->emplace_back();
}
}
coalesced_block_indices->back().emplace_back(block_idx);
}
// Create FSReadRequest for each coalesced group
assert(read_reqs->empty());
read_reqs->reserve(coalesced_block_indices->size());
for (const auto& block_indices : *coalesced_block_indices) {
assert(!block_indices.empty());
// Find the min and max offsets in this coalesced group
// Since blocks are now sorted, first has min offset and last has max
const auto& first_block_handle = block_handles[block_indices[0]];
const auto& last_block_handle = block_handles[block_indices.back()];
const auto start_offset = first_block_handle.offset();
const auto end_offset =
last_block_handle.offset() +
BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
assert(end_offset > start_offset);
read_reqs->emplace_back();
read_reqs->back().offset = start_offset;
read_reqs->back().len = end_offset - start_offset;
read_reqs->back().scratch = nullptr;
}
}
std::vector<CoalescedPrefetchGroup> IODispatcherImpl::Impl::PreCoalesceBlocks(
const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs,
const std::vector<size_t>& block_indices, size_t max_group_bytes) {
std::vector<CoalescedPrefetchGroup> groups;
if (block_indices.empty()) {
return groups;
}
const auto& block_handles = job->block_handles;
const uint64_t coalesce_threshold = job->job_options.io_coalesce_threshold;
// Sort block indices by offset for coalescing
std::vector<size_t> sorted_indices = block_indices;
std::sort(sorted_indices.begin(), sorted_indices.end(),
[&block_handles](size_t a, size_t b) {
return block_handles[a].offset() < block_handles[b].offset();
});
// Build coalesced groups respecting max_group_bytes
groups.emplace_back();
for (size_t idx : sorted_indices) {
size_t block_size = rs->block_sizes_[idx];
// Skip blocks that are individually larger than the memory budget
// These will be read synchronously when needed (via ReadIndex fallback)
if (max_group_bytes > 0 && block_size > max_group_bytes) {
continue;
}
// Check if we need to start a new group
bool start_new_group = false;
if (!groups.back().block_indices.empty()) {
// Check gap with previous block
size_t last_idx = groups.back().block_indices.back();
const auto& last_handle = block_handles[last_idx];
uint64_t last_end = last_handle.offset() +
BlockBasedTable::BlockSizeWithTrailer(last_handle);
uint64_t current_start = block_handles[idx].offset();
if (current_start > last_end + coalesce_threshold) {
start_new_group = true; // Gap too large
} else if (max_group_bytes > 0 &&
groups.back().total_bytes + block_size > max_group_bytes) {
start_new_group = true; // Would exceed size limit
}
}
if (start_new_group) {
groups.emplace_back();
}
groups.back().block_indices.push_back(idx);
groups.back().total_bytes += block_size;
}
return groups;
}
std::vector<size_t> IODispatcherImpl::Impl::ExecuteAsyncIO(
const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set,
std::vector<FSReadRequest>& read_reqs,
const std::vector<std::vector<size_t>>& coalesced_block_indices,
Status* out_status) {
std::vector<size_t> fallback_block_indices;
*out_status = Status::OK();
// Get file and IO options
auto* rep = job->table->get_rep();
IOOptions io_opts;
Status s =
rep->file->PrepareIOOptions(job->job_options.read_options, io_opts);
if (!s.ok()) {
*out_status = s;
return fallback_block_indices;
}
const bool direct_io = rep->file->use_direct_io();
// Submit async read requests and store them in the ReadSet
for (size_t i = 0; i < read_reqs.size(); ++i) {
auto async_state = std::make_shared<AsyncIOState>();
async_state->offset = read_reqs[i].offset;
async_state->block_indices = coalesced_block_indices[i];
async_state->read_req = std::move(read_reqs[i]);
for (const auto idx : coalesced_block_indices[i]) {
async_state->blocks.emplace_back(job->block_handles[idx]);
}
if (direct_io) {
async_state->read_req.scratch = nullptr;
} else {
async_state->buf.reset(new char[async_state->read_req.len]);
async_state->read_req.scratch = async_state->buf.get();
}
// Callback for async read completion
// Store the result slice and status back into async_state so we can access
// them after Poll() completes.
auto cb = [](const FSReadRequest& req, void* cb_arg) {
auto* state = static_cast<AsyncIOState*>(cb_arg);
state->read_req.result = req.result;
state->read_req.status = req.status;
};
s = rep->file->ReadAsync(async_state->read_req, io_opts, cb,
async_state.get(), &async_state->io_handle,
&async_state->del_fn,
direct_io ? &async_state->aligned_buf : nullptr);
if (!s.ok()) {
// Actual error - surface to caller
*out_status = s;
return fallback_block_indices;
}
if (async_state->io_handle == nullptr) {
// Async IO not supported - add to fallback list for sync IO
for (const auto idx : coalesced_block_indices[i]) {
fallback_block_indices.push_back(idx);
}
continue;
}
// Add async state to map for all blocks in this request
for (const auto idx : async_state->block_indices) {
read_set->async_io_map_[idx] = async_state;
}
}
return fallback_block_indices;
}
Status IODispatcherImpl::Impl::ExecuteSyncIO(
const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set,
std::vector<FSReadRequest>& read_reqs,
const std::vector<std::vector<size_t>>& coalesced_block_indices) {
// Get file and IO options
auto* rep = job->table->get_rep();
IOOptions io_opts;
if (Status s =
rep->file->PrepareIOOptions(job->job_options.read_options, io_opts);
!s.ok()) {
return s;
}
const bool direct_io = rep->file->use_direct_io();
// Setup scratch buffers for MultiRead
std::unique_ptr<char[]> buf;
if (direct_io) {
for (auto& read_req : read_reqs) {
read_req.scratch = nullptr;
}
} else {
// Allocate a single contiguous buffer for all requests
size_t total_len = 0;
for (const auto& req : read_reqs) {
total_len += req.len;
}
buf.reset(new char[total_len]);
size_t offset = 0;
for (auto& read_req : read_reqs) {
read_req.scratch = buf.get() + offset;
offset += read_req.len;
}
}
// Execute MultiRead
AlignedBuf aligned_buf;
if (Status s =
rep->file->MultiRead(io_opts, read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr);
!s.ok()) {
return s;
}
for (const auto& rq : read_reqs) {
if (!rq.status.ok()) {
return rq.status;
}
}
// Process all blocks from the MultiRead results
for (size_t i = 0; i < coalesced_block_indices.size(); ++i) {
const auto& read_req = read_reqs[i];
for (const auto& block_idx : coalesced_block_indices[i]) {
const auto& block_handle = job->block_handles[block_idx];
Status create_status = CreateAndPinBlockFromBuffer(
job, block_handle, read_req.offset, read_req.result,
read_set->pinned_blocks_[block_idx]);
if (!create_status.ok()) {
return create_status;
}
}
}
return Status::OK();
}
IODispatcherImpl::IODispatcherImpl()
: impl_(std::make_shared<Impl>(IODispatcherOptions())) {}
IODispatcherImpl::IODispatcherImpl(const IODispatcherOptions& options)
: impl_(std::make_shared<Impl>(options)) {}
IODispatcherImpl::~IODispatcherImpl() = default;
Status IODispatcherImpl::SubmitJob(const std::shared_ptr<IOJob>& job,
std::shared_ptr<ReadSet>* read_set) {
return impl_->SubmitJob(job, read_set);
}
IODispatcher* NewIODispatcher() { return new IODispatcherImpl(); }
IODispatcher* NewIODispatcher(const IODispatcherOptions& options) {
return new IODispatcherImpl(options);
}
} // namespace ROCKSDB_NAMESPACE