Summary:
This PR adds support for reusing the file system provided buffer to avoid an extra `memcpy` into RockDB's buffer. This optimization has already been implemented for point lookups, as well as compaction and scan reads _when prefetching is disabled_.
This PR extends this optimization to work with synchronous prefetching (`num_buffers == 1`). Asynchronous prefetching can be addressed in a future PR (and probably should be to keep this PR from growing too large).
Remarks
- To handle the case where the main buffer only has part of the requested data, I used the existing `overlap_buf_` (currently used in the async prefetching case) instead of defining a separate buffer. This was discussed in https://github.com/facebook/rocksdb/pull/13118#discussion_r1842839360.
- We use `MultiRead` with a single request to take advantage of the file system buffer. This is consistent with previous work (e.g. https://github.com/facebook/rocksdb/pull/12266).
- Even without the tests I added, there was some code coverage inside in at least `DBIOCorruptionTest.IterReadCorruptionRetry`, since those tests were failing before I addressed a bug in my code for this PR. [Run with failed test](3261150881
).
- This prefetching code is not too easy to follow, so I added quite a bit of comments to both the code and test case to try to make it easier to understand the exact internal state of the prefetch buffer at every point in time.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13118
Test Plan:
I wrote pretty thorough unit tests that cover synchronous prefetching with file system buffer reuse. The flows for partial hits, complete hits, and complete misses are tested. I also parametrized the test to make sure the async prefetching (without file system buffer reuse) still work as expected.
Once we agree on the changes, I will run a long stress test before merging.
Reviewed By: anand1976
Differential Revision: D65559101
Pulled By: archang19
fbshipit-source-id: 1a56d846e918c20a009b83f1371c1791f69849ae
197 lines
7.2 KiB
C++
197 lines
7.2 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
#include <atomic>
|
|
#include <sstream>
|
|
#include <string>
|
|
|
|
#include "env/file_system_tracer.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/file_system.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/rate_limiter.h"
|
|
#include "util/aligned_buffer.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
class Statistics;
|
|
class HistogramImpl;
|
|
class SystemClock;
|
|
|
|
using AlignedBuf = FSAllocationPtr;
|
|
|
|
// Align the request r according to alignment and return the aligned result.
|
|
FSReadRequest Align(const FSReadRequest& r, size_t alignment);
|
|
|
|
// Try to merge src to dest if they have overlap.
|
|
//
|
|
// Each request represents an inclusive interval [offset, offset + len].
|
|
// If the intervals have overlap, update offset and len to represent the
|
|
// merged interval, and return true.
|
|
// Otherwise, do nothing and return false.
|
|
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src);
|
|
|
|
// RandomAccessFileReader is a wrapper on top of FSRandomAccessFile. It is
|
|
// responsible for:
|
|
// - Handling Buffered and Direct reads appropriately.
|
|
// - Rate limiting compaction reads.
|
|
// - Notifying any interested listeners on the completion of a read.
|
|
// - Updating IO stats.
|
|
class RandomAccessFileReader {
|
|
private:
|
|
void NotifyOnFileReadFinish(
|
|
uint64_t offset, size_t length,
|
|
const FileOperationInfo::StartTimePoint& start_ts,
|
|
const FileOperationInfo::FinishTimePoint& finish_ts,
|
|
const Status& status) const {
|
|
FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
|
|
finish_ts, status, file_temperature_);
|
|
info.offset = offset;
|
|
info.length = length;
|
|
|
|
for (auto& listener : listeners_) {
|
|
listener->OnFileReadFinish(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
|
|
const std::string& file_path, size_t length,
|
|
uint64_t offset) const {
|
|
if (listeners_.empty()) {
|
|
return;
|
|
}
|
|
IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);
|
|
|
|
for (auto& listener : listeners_) {
|
|
listener->OnIOError(io_error_info);
|
|
}
|
|
io_status.PermitUncheckedError();
|
|
}
|
|
|
|
|
|
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
|
|
|
|
FSRandomAccessFilePtr file_;
|
|
std::string file_name_;
|
|
SystemClock* clock_;
|
|
Statistics* stats_;
|
|
uint32_t hist_type_;
|
|
HistogramImpl* file_read_hist_;
|
|
RateLimiter* rate_limiter_;
|
|
std::vector<std::shared_ptr<EventListener>> listeners_;
|
|
const Temperature file_temperature_;
|
|
const bool is_last_level_;
|
|
|
|
struct ReadAsyncInfo {
|
|
ReadAsyncInfo(std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
|
|
uint64_t start_time)
|
|
: cb_(cb),
|
|
cb_arg_(cb_arg),
|
|
start_time_(start_time),
|
|
user_scratch_(nullptr),
|
|
user_aligned_buf_(nullptr),
|
|
user_offset_(0),
|
|
user_len_(0),
|
|
is_aligned_(false) {}
|
|
|
|
std::function<void(FSReadRequest&, void*)> cb_;
|
|
void* cb_arg_;
|
|
uint64_t start_time_;
|
|
FileOperationInfo::StartTimePoint fs_start_ts_;
|
|
// Below fields stores the parameters passed by caller in case of direct_io.
|
|
char* user_scratch_;
|
|
AlignedBuf* user_aligned_buf_;
|
|
uint64_t user_offset_;
|
|
size_t user_len_;
|
|
Slice user_result_;
|
|
// Used in case of direct_io
|
|
AlignedBuffer buf_;
|
|
bool is_aligned_;
|
|
};
|
|
|
|
public:
|
|
explicit RandomAccessFileReader(
|
|
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
|
|
SystemClock* clock = nullptr,
|
|
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
|
|
Statistics* stats = nullptr,
|
|
uint32_t hist_type = Histograms::HISTOGRAM_ENUM_MAX,
|
|
HistogramImpl* file_read_hist = nullptr,
|
|
RateLimiter* rate_limiter = nullptr,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
|
|
Temperature file_temperature = Temperature::kUnknown,
|
|
bool is_last_level = false)
|
|
: file_(std::move(raf), io_tracer, _file_name),
|
|
file_name_(std::move(_file_name)),
|
|
clock_(clock),
|
|
stats_(stats),
|
|
hist_type_(hist_type),
|
|
file_read_hist_(file_read_hist),
|
|
rate_limiter_(rate_limiter),
|
|
listeners_(),
|
|
file_temperature_(file_temperature),
|
|
is_last_level_(is_last_level) {
|
|
std::for_each(listeners.begin(), listeners.end(),
|
|
[this](const std::shared_ptr<EventListener>& e) {
|
|
if (e->ShouldBeNotifiedOnFileIO()) {
|
|
listeners_.emplace_back(e);
|
|
}
|
|
});
|
|
}
|
|
|
|
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
|
|
const std::string& fname, const FileOptions& file_opts,
|
|
std::unique_ptr<RandomAccessFileReader>* reader,
|
|
IODebugContext* dbg);
|
|
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
|
|
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
|
|
|
|
// In non-direct IO mode,
|
|
// 1. if using mmap, result is stored in a buffer other than scratch;
|
|
// 2. if not using mmap, result is stored in the buffer starting from scratch.
|
|
//
|
|
// In direct IO mode, an aligned buffer is allocated internally.
|
|
// 1. If aligned_buf is null, then results are copied to the buffer
|
|
// starting from scratch;
|
|
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
|
|
// the internally allocated buffer on return, and the result refers to a
|
|
// region in aligned_buf.
|
|
IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
|
|
char* scratch, AlignedBuf* aligned_buf) const;
|
|
|
|
// REQUIRES:
|
|
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
|
|
// In non-direct IO mode, aligned_buf should be null;
|
|
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
|
|
// MultiRead, the result Slices in reqs refer to aligned_buf.
|
|
IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
|
|
size_t num_reqs, AlignedBuf* aligned_buf) const;
|
|
|
|
IOStatus Prefetch(const IOOptions& opts, uint64_t offset, size_t n) const {
|
|
return file_->Prefetch(offset, n, opts, nullptr);
|
|
}
|
|
|
|
FSRandomAccessFile* file() { return file_.get(); }
|
|
|
|
const std::string& file_name() const { return file_name_; }
|
|
|
|
bool use_direct_io() const { return file_->use_direct_io(); }
|
|
|
|
IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts) const;
|
|
|
|
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
|
|
std::function<void(FSReadRequest&, void*)> cb,
|
|
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
|
|
AlignedBuf* aligned_buf);
|
|
|
|
void ReadAsyncCallback(FSReadRequest& req, void* cb_arg);
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|