rocksdb/db/periodic_task_scheduler_test.cc
Changyu Bi 4f7d3a0cb2 Add a new periodic task to trigger compactions (#13736)
Summary:
address an existing limitation on compaction triggering mechanism that relies on events like flush/compaction/SetOptions. This is important for periodic compactions where files can become eligible without any of these events. The periodic task now runs every 12 hours and check CFs that enables `periodic_compaction_second` (TBD if we want to expand to all CFs) for eligible compactions.

Some of the periodic tasks probably don't need to run immediately after Register(). I'm keeping the existing behavior for now for patch release and to makes tests happy.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13736

Test Plan:
- new unit test that fails before this change.
- ran crash test for hours with the periodic task running every 5 seconds: `python3 ./tools/db_crashtest.py blackbox --test_batches_snapshot=0 --periodic_compaction_seconds=10`

Reviewed By: pdillinger

Differential Revision: D77460715

Pulled By: cbi42

fbshipit-source-id: 00f61502753185e76830c9ed44c5ccc4f4f16bfa
2025-07-01 11:07:51 -07:00

247 lines
8.1 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/periodic_task_scheduler.h"
#include "db/db_test_util.h"
#include "env/composite_env_wrapper.h"
#include "test_util/mock_time_env.h"
namespace ROCKSDB_NAMESPACE {
class PeriodicTaskSchedulerTest : public DBTestBase {
public:
PeriodicTaskSchedulerTest()
: DBTestBase("periodic_task_scheduler_test", /*env_do_fsync=*/true) {
mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
mock_env_.reset(new CompositeEnvWrapper(env_, mock_clock_));
}
protected:
std::unique_ptr<Env> mock_env_;
std::shared_ptr<MockSystemClock> mock_clock_;
void SetUp() override {
mock_clock_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) {
auto periodic_task_scheduler_ptr =
static_cast<PeriodicTaskScheduler*>(arg);
periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get());
});
}
};
TEST_F(PeriodicTaskSchedulerTest, Basic) {
constexpr unsigned int kPeriodSec = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
int flush_info_log_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushInfoLog:StartRunning",
[&](void*) { flush_info_log_counter++; });
int trigger_compaction_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::TriggerPeriodicCompaction:StartRunning",
[&](void*) { trigger_compaction_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_GT(kPeriodSec, 1u);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec) - 1);
});
const PeriodicTaskScheduler& scheduler =
dbfull()->TEST_GetPeriodicTaskScheduler();
ASSERT_EQ((int)PeriodicTaskType::kMax - 1, scheduler.TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter);
ASSERT_EQ(1, flush_info_log_counter);
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter);
ASSERT_EQ(2, flush_info_log_counter);
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(3, flush_info_log_counter);
// Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}}));
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
// Info log flush should still run.
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(4, flush_info_log_counter);
ASSERT_EQ(2u, scheduler.TEST_GetValidTaskNum());
// Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_EQ(3, scheduler.TEST_GetValidTaskNum());
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(4, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(5, flush_info_log_counter);
ASSERT_EQ(0, trigger_compaction_counter);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
});
ASSERT_EQ(1, trigger_compaction_counter);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
});
ASSERT_EQ(2, trigger_compaction_counter);
Close();
}
TEST_F(PeriodicTaskSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5;
const int kInstanceNum = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
auto dbs = std::vector<DB*>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
const PeriodicTaskScheduler& scheduler = dbi->TEST_GetPeriodicTaskScheduler();
// kRecordSeqnoTime is not registered since the feature is not enabled
ASSERT_EQ(kInstanceNum * ((int)PeriodicTaskType::kMax - 1),
scheduler.TEST_GetValidTaskNum());
int expected_run = kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
int half = kInstanceNum / 2;
for (int i = 0; i < half; i++) {
delete dbs[i];
}
expected_run += (kInstanceNum - half) * 2;
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
dbi->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
for (int i = half; i < kInstanceNum; i++) {
ASSERT_OK(dbs[i]->Close());
delete dbs[i];
}
}
TEST_F(PeriodicTaskSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10;
Close();
Options options1;
options1.stats_dump_period_sec = kDumpPeriodSec;
options1.stats_persist_period_sec = kPersistPeriodSec;
options1.create_if_missing = true;
options1.env = mock_env_.get();
Reopen(options1);
std::unique_ptr<Env> mock_env2(
new CompositeEnvWrapper(Env::Default(), mock_clock_));
Options options2;
options2.stats_dump_period_sec = kDumpPeriodSec;
options2.stats_persist_period_sec = kPersistPeriodSec;
options2.create_if_missing = true;
options1.env = mock_env2.get();
std::string dbname = test::PerThreadDBPath("multi_env_test");
DB* db;
ASSERT_OK(DB::Open(options2, dbname, &db));
ASSERT_OK(db->Close());
delete db;
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}