rocksdb/db/periodic_task_scheduler.cc
Changyu Bi 3df9173a8c
10.4.1 Patch Release (#13748)
* Check that NewWritableFile succeeded when copying over backup files (#13734)

Summary:
I am seeing crashes during backups. The stack trace points back to `WritableFileWriter` creation inside `BackupEngineImpl::CopyOrCreateFile`. I believe the issue is that we are calling `writable_file_->GetRequiredBufferAlignment()` with a `null` `writable_file`.

https://github.com/facebook/rocksdb/blob/v10.2.1/utilities/backup/backup_engine.cc#L2396-L2397

https://github.com/facebook/rocksdb/blob/v10.2.1/file/writable_file_writer.h#L210

Here's how I think the flow is:

```cpp
  io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
                                                   &dst_file, nullptr);
// say there was some issue and dst_file is nullptr
// evaluates to false
if (io_s.ok() && !src.empty()) {
   // we don't go down this branch
    auto src_file_options = FileOptions(src_env_options);
    src_file_options.temperature = *src_temperature;
    io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options,
                                                       &src_file, nullptr);
  }
  // say this evaluates to true
  if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) {
    // Retry without temperature hint in case the FileSystem is strict with
    // non-kUnknown temperature option
    io_s = src_env->GetFileSystem()->NewSequentialFile(
        src, FileOptions(src_env_options), &src_file, nullptr);
  }
// this is now from the NewSequentialFile call, not NewWritableFile
  if (!io_s.ok()) {
    return io_s;
  }
// dst_file is still nullptr
```

If the first `NewWritableFile` fails and `IsPathNotFound

Tests: existing unit tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13734

Reviewed By: pdillinger

Differential Revision: D77390694

Pulled By: archang19

fbshipit-source-id: 865a3a646079ae2349a3b6f25e53ae85df8e4985

* Add a new periodic task to trigger compactions (#13736)

Summary:
address an existing limitation on compaction triggering mechanism that relies on events like flush/compaction/SetOptions. This is important for periodic compactions where files can become eligible without any of these events. The periodic task now runs every 12 hours and check CFs that enables `periodic_compaction_second` (TBD if we want to expand to all CFs) for eligible compactions.

Some of the periodic tasks probably don't need to run immediately after Register(). I'm keeping the existing behavior for now for patch release and to makes tests happy.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13736

Test Plan:
- new unit test that fails before this change.
- ran crash test for hours with the periodic task running every 5 seconds: `python3 ./tools/db_crashtest.py blackbox --test_batches_snapshot=0 --periodic_compaction_seconds=10`

Reviewed By: pdillinger

Differential Revision: D77460715

Pulled By: cbi42

fbshipit-source-id: 00f61502753185e76830c9ed44c5ccc4f4f16bfa

* update history and version for 10.4.1

* Update HISTORY.md

Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com>

---------

Co-authored-by: Andrew Chang <andrewrchang@meta.com>
Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com>
2025-07-01 12:52:30 -07:00

118 lines
4.3 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/periodic_task_scheduler.h"
#include "rocksdb/system_clock.h"
namespace ROCKSDB_NAMESPACE {
// `timer_mutex` is a global mutex serves 3 purposes currently:
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
// they are currently not implemented in a thread-safe way; and
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
// (3) protect tasks_map_ in PeriodicTaskScheduler
// Note: It's not efficient to have a static global mutex, for
// PeriodicTaskScheduler it should be okay, as the operations are called
// infrequently.
static port::Mutex timer_mutex;
static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
{PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
{PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
{PeriodicTaskType::kFlushInfoLog, 10},
{PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
{PeriodicTaskType::kTriggerCompaction, 12 * 60 * 60} // 12 hours
};
static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
{PeriodicTaskType::kDumpStats, "dump_st"},
{PeriodicTaskType::kPersistStats, "pst_st"},
{PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
{PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
{PeriodicTaskType::kTriggerCompaction, "trigger_compaction"},
};
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
const PeriodicTaskFunc& fn,
bool run_immediately) {
return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type),
run_immediately);
}
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
const PeriodicTaskFunc& fn,
uint64_t repeat_period_seconds,
bool run_immediately) {
MutexLock l(&timer_mutex);
static std::atomic<uint64_t> initial_delay(0);
if (repeat_period_seconds == kInvalidPeriodSec) {
return Status::InvalidArgument("Invalid task repeat period");
}
auto it = tasks_map_.find(task_type);
if (it != tasks_map_.end()) {
// the task already exists and it's the same, no update needed
if (it->second.repeat_every_sec == repeat_period_seconds) {
return Status::OK();
}
// cancel the existing one before register new one
timer_->Cancel(it->second.name);
tasks_map_.erase(it);
}
timer_->Start();
// put task type name as prefix, for easy debug
std::string unique_id =
kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
uint64_t initial_delay_micros =
(initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond;
if (!run_immediately) {
initial_delay_micros += repeat_period_seconds * kMicrosInSecond;
}
bool succeeded = timer_->Add(fn, unique_id, initial_delay_micros,
repeat_period_seconds * kMicrosInSecond);
if (!succeeded) {
return Status::Aborted("Failed to register periodic task");
}
auto result = tasks_map_.try_emplace(
task_type, TaskInfo{unique_id, repeat_period_seconds});
if (!result.second) {
return Status::Aborted("Failed to add periodic task");
}
return Status::OK();
}
Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
MutexLock l(&timer_mutex);
auto it = tasks_map_.find(task_type);
if (it != tasks_map_.end()) {
timer_->Cancel(it->second.name);
tasks_map_.erase(it);
}
if (!timer_->HasPendingTask()) {
timer_->Shutdown();
}
return Status::OK();
}
Timer* PeriodicTaskScheduler::Default() {
STATIC_AVOID_DESTRUCTION(Timer, timer)(SystemClock::Default().get());
return &timer;
}
#ifndef NDEBUG
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
static Timer test_timer(clock);
test_timer.TEST_OverrideTimer(clock);
MutexLock l(&timer_mutex);
timer_ = &test_timer;
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE