rocksdb/db/periodic_task_scheduler_test.cc
Changyu Bi 3df9173a8c
10.4.1 Patch Release (#13748)
* Check that NewWritableFile succeeded when copying over backup files (#13734)

Summary:
I am seeing crashes during backups. The stack trace points back to `WritableFileWriter` creation inside `BackupEngineImpl::CopyOrCreateFile`. I believe the issue is that we are calling `writable_file_->GetRequiredBufferAlignment()` with a `null` `writable_file`.

https://github.com/facebook/rocksdb/blob/v10.2.1/utilities/backup/backup_engine.cc#L2396-L2397

https://github.com/facebook/rocksdb/blob/v10.2.1/file/writable_file_writer.h#L210

Here's how I think the flow is:

```cpp
  io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
                                                   &dst_file, nullptr);
// say there was some issue and dst_file is nullptr
// evaluates to false
if (io_s.ok() && !src.empty()) {
   // we don't go down this branch
    auto src_file_options = FileOptions(src_env_options);
    src_file_options.temperature = *src_temperature;
    io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options,
                                                       &src_file, nullptr);
  }
  // say this evaluates to true
  if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) {
    // Retry without temperature hint in case the FileSystem is strict with
    // non-kUnknown temperature option
    io_s = src_env->GetFileSystem()->NewSequentialFile(
        src, FileOptions(src_env_options), &src_file, nullptr);
  }
// this is now from the NewSequentialFile call, not NewWritableFile
  if (!io_s.ok()) {
    return io_s;
  }
// dst_file is still nullptr
```

If the first `NewWritableFile` fails and `IsPathNotFound

Tests: existing unit tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13734

Reviewed By: pdillinger

Differential Revision: D77390694

Pulled By: archang19

fbshipit-source-id: 865a3a646079ae2349a3b6f25e53ae85df8e4985

* Add a new periodic task to trigger compactions (#13736)

Summary:
address an existing limitation on compaction triggering mechanism that relies on events like flush/compaction/SetOptions. This is important for periodic compactions where files can become eligible without any of these events. The periodic task now runs every 12 hours and check CFs that enables `periodic_compaction_second` (TBD if we want to expand to all CFs) for eligible compactions.

Some of the periodic tasks probably don't need to run immediately after Register(). I'm keeping the existing behavior for now for patch release and to makes tests happy.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13736

Test Plan:
- new unit test that fails before this change.
- ran crash test for hours with the periodic task running every 5 seconds: `python3 ./tools/db_crashtest.py blackbox --test_batches_snapshot=0 --periodic_compaction_seconds=10`

Reviewed By: pdillinger

Differential Revision: D77460715

Pulled By: cbi42

fbshipit-source-id: 00f61502753185e76830c9ed44c5ccc4f4f16bfa

* update history and version for 10.4.1

* Update HISTORY.md

Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com>

---------

Co-authored-by: Andrew Chang <andrewrchang@meta.com>
Co-authored-by: Andrew Chang <39173193+archang19@users.noreply.github.com>
2025-07-01 12:52:30 -07:00

247 lines
8.1 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/periodic_task_scheduler.h"
#include "db/db_test_util.h"
#include "env/composite_env_wrapper.h"
#include "test_util/mock_time_env.h"
namespace ROCKSDB_NAMESPACE {
class PeriodicTaskSchedulerTest : public DBTestBase {
public:
PeriodicTaskSchedulerTest()
: DBTestBase("periodic_task_scheduler_test", /*env_do_fsync=*/true) {
mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
mock_env_.reset(new CompositeEnvWrapper(env_, mock_clock_));
}
protected:
std::unique_ptr<Env> mock_env_;
std::shared_ptr<MockSystemClock> mock_clock_;
void SetUp() override {
mock_clock_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) {
auto periodic_task_scheduler_ptr =
static_cast<PeriodicTaskScheduler*>(arg);
periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get());
});
}
};
TEST_F(PeriodicTaskSchedulerTest, Basic) {
constexpr unsigned int kPeriodSec = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
int flush_info_log_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushInfoLog:StartRunning",
[&](void*) { flush_info_log_counter++; });
int trigger_compaction_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::TriggerPeriodicCompaction:StartRunning",
[&](void*) { trigger_compaction_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_GT(kPeriodSec, 1u);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec) - 1);
});
const PeriodicTaskScheduler& scheduler =
dbfull()->TEST_GetPeriodicTaskScheduler();
ASSERT_EQ((int)PeriodicTaskType::kMax - 1, scheduler.TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter);
ASSERT_EQ(1, flush_info_log_counter);
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter);
ASSERT_EQ(2, flush_info_log_counter);
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(3, flush_info_log_counter);
// Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}}));
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
// Info log flush should still run.
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(4, flush_info_log_counter);
ASSERT_EQ(2u, scheduler.TEST_GetValidTaskNum());
// Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_EQ(3, scheduler.TEST_GetValidTaskNum());
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(4, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(5, flush_info_log_counter);
ASSERT_EQ(0, trigger_compaction_counter);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
});
ASSERT_EQ(1, trigger_compaction_counter);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
});
ASSERT_EQ(2, trigger_compaction_counter);
Close();
}
TEST_F(PeriodicTaskSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5;
const int kInstanceNum = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
auto dbs = std::vector<DB*>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
const PeriodicTaskScheduler& scheduler = dbi->TEST_GetPeriodicTaskScheduler();
// kRecordSeqnoTime is not registered since the feature is not enabled
ASSERT_EQ(kInstanceNum * ((int)PeriodicTaskType::kMax - 1),
scheduler.TEST_GetValidTaskNum());
int expected_run = kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
int half = kInstanceNum / 2;
for (int i = 0; i < half; i++) {
delete dbs[i];
}
expected_run += (kInstanceNum - half) * 2;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
for (int i = half; i < kInstanceNum; i++) {
ASSERT_OK(dbs[i]->Close());
delete dbs[i];
}
}
TEST_F(PeriodicTaskSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10;
Close();
Options options1;
options1.stats_dump_period_sec = kDumpPeriodSec;
options1.stats_persist_period_sec = kPersistPeriodSec;
options1.create_if_missing = true;
options1.env = mock_env_.get();
Reopen(options1);
std::unique_ptr<Env> mock_env2(
new CompositeEnvWrapper(Env::Default(), mock_clock_));
Options options2;
options2.stats_dump_period_sec = kDumpPeriodSec;
options2.stats_persist_period_sec = kPersistPeriodSec;
options2.create_if_missing = true;
options1.env = mock_env2.get();
std::string dbname = test::PerThreadDBPath("multi_env_test");
DB* db;
ASSERT_OK(DB::Open(options2, dbname, &db));
ASSERT_OK(db->Close());
delete db;
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}