* Check that NewWritableFile succeeded when copying over backup files (#13734) Summary: I am seeing crashes during backups. The stack trace points back to `WritableFileWriter` creation inside `BackupEngineImpl::CopyOrCreateFile`. I believe the issue is that we are calling `writable_file_->GetRequiredBufferAlignment()` with a `null` `writable_file`. https://github.com/facebook/rocksdb/blob/v10.2.1/utilities/backup/backup_engine.cc#L2396-L2397 https://github.com/facebook/rocksdb/blob/v10.2.1/file/writable_file_writer.h#L210 Here's how I think the flow is: ```cpp io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options, &dst_file, nullptr); // say there was some issue and dst_file is nullptr // evaluates to false if (io_s.ok() && !src.empty()) { // we don't go down this branch auto src_file_options = FileOptions(src_env_options); src_file_options.temperature = *src_temperature; io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options, &src_file, nullptr); } // say this evaluates to true if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) { // Retry without temperature hint in case the FileSystem is strict with // non-kUnknown temperature option io_s = src_env->GetFileSystem()->NewSequentialFile( src, FileOptions(src_env_options), &src_file, nullptr); } // this is now from the NewSequentialFile call, not NewWritableFile if (!io_s.ok()) { return io_s; } // dst_file is still nullptr ``` If the first `NewWritableFile` fails and `IsPathNotFound Tests: existing unit tests Pull Request resolved: https://github.com/facebook/rocksdb/pull/13734 Reviewed By: pdillinger Differential Revision: D77390694 Pulled By: archang19 fbshipit-source-id: 865a3a646079ae2349a3b6f25e53ae85df8e4985 * Add a new periodic task to trigger compactions (#13736) Summary: address an existing limitation on compaction triggering mechanism that relies on events like flush/compaction/SetOptions. This is important for periodic compactions where files can become eligible without any of these events. The periodic task now runs every 12 hours and check CFs that enables `periodic_compaction_second` (TBD if we want to expand to all CFs) for eligible compactions. Some of the periodic tasks probably don't need to run immediately after Register(). I'm keeping the existing behavior for now for patch release and to makes tests happy. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13736 Test Plan: - new unit test that fails before this change. - ran crash test for hours with the periodic task running every 5 seconds: `python3 ./tools/db_crashtest.py blackbox --test_batches_snapshot=0 --periodic_compaction_seconds=10` Reviewed By: pdillinger Differential Revision: D77460715 Pulled By: cbi42 fbshipit-source-id: 00f61502753185e76830c9ed44c5ccc4f4f16bfa * update history and version for 10.4.1 * Update HISTORY.md Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com> --------- Co-authored-by: Andrew Chang <andrewrchang@meta.com> Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com>
118 lines
4.3 KiB
C++
118 lines
4.3 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/periodic_task_scheduler.h"
|
|
|
|
#include "rocksdb/system_clock.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// `timer_mutex` is a global mutex serves 3 purposes currently:
|
|
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
|
|
// they are currently not implemented in a thread-safe way; and
|
|
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
|
|
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
|
|
// (3) protect tasks_map_ in PeriodicTaskScheduler
|
|
// Note: It's not efficient to have a static global mutex, for
|
|
// PeriodicTaskScheduler it should be okay, as the operations are called
|
|
// infrequently.
|
|
static port::Mutex timer_mutex;
|
|
|
|
static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
|
|
{PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
|
|
{PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
|
|
{PeriodicTaskType::kFlushInfoLog, 10},
|
|
{PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
|
|
{PeriodicTaskType::kTriggerCompaction, 12 * 60 * 60} // 12 hours
|
|
};
|
|
|
|
static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
|
|
{PeriodicTaskType::kDumpStats, "dump_st"},
|
|
{PeriodicTaskType::kPersistStats, "pst_st"},
|
|
{PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
|
|
{PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
|
|
{PeriodicTaskType::kTriggerCompaction, "trigger_compaction"},
|
|
};
|
|
|
|
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
|
|
const PeriodicTaskFunc& fn,
|
|
bool run_immediately) {
|
|
return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type),
|
|
run_immediately);
|
|
}
|
|
|
|
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
|
|
const PeriodicTaskFunc& fn,
|
|
uint64_t repeat_period_seconds,
|
|
bool run_immediately) {
|
|
MutexLock l(&timer_mutex);
|
|
static std::atomic<uint64_t> initial_delay(0);
|
|
|
|
if (repeat_period_seconds == kInvalidPeriodSec) {
|
|
return Status::InvalidArgument("Invalid task repeat period");
|
|
}
|
|
auto it = tasks_map_.find(task_type);
|
|
if (it != tasks_map_.end()) {
|
|
// the task already exists and it's the same, no update needed
|
|
if (it->second.repeat_every_sec == repeat_period_seconds) {
|
|
return Status::OK();
|
|
}
|
|
// cancel the existing one before register new one
|
|
timer_->Cancel(it->second.name);
|
|
tasks_map_.erase(it);
|
|
}
|
|
|
|
timer_->Start();
|
|
// put task type name as prefix, for easy debug
|
|
std::string unique_id =
|
|
kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
|
|
|
|
uint64_t initial_delay_micros =
|
|
(initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond;
|
|
if (!run_immediately) {
|
|
initial_delay_micros += repeat_period_seconds * kMicrosInSecond;
|
|
}
|
|
bool succeeded = timer_->Add(fn, unique_id, initial_delay_micros,
|
|
repeat_period_seconds * kMicrosInSecond);
|
|
if (!succeeded) {
|
|
return Status::Aborted("Failed to register periodic task");
|
|
}
|
|
auto result = tasks_map_.try_emplace(
|
|
task_type, TaskInfo{unique_id, repeat_period_seconds});
|
|
if (!result.second) {
|
|
return Status::Aborted("Failed to add periodic task");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
|
|
MutexLock l(&timer_mutex);
|
|
auto it = tasks_map_.find(task_type);
|
|
if (it != tasks_map_.end()) {
|
|
timer_->Cancel(it->second.name);
|
|
tasks_map_.erase(it);
|
|
}
|
|
if (!timer_->HasPendingTask()) {
|
|
timer_->Shutdown();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Timer* PeriodicTaskScheduler::Default() {
|
|
STATIC_AVOID_DESTRUCTION(Timer, timer)(SystemClock::Default().get());
|
|
return &timer;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
|
|
static Timer test_timer(clock);
|
|
test_timer.TEST_OverrideTimer(clock);
|
|
MutexLock l(&timer_mutex);
|
|
timer_ = &test_timer;
|
|
}
|
|
#endif // NDEBUG
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|