forked from continuwuation/rocksdb
Summary: **Context/Summary:** - Add resumable compaction to stress test with adaptive progress cancellation - Add fault injection to remote compaction - Fix a real minor bug in a couple testing framework bugs with remote compaction Pull Request resolved: https://github.com/facebook/rocksdb/pull/14041 Test Plan: - Rehearsal stress test, finding bugs for https://github.com/facebook/rocksdb/pull/13984 effectively and did not create new failures. Reviewed By: jaykorean Differential Revision: D84524194 Pulled By: hx235 fbshipit-source-id: 42b4264e428c6739631ed9aa5eb02723367510bc
93 lines
3.4 KiB
C++
93 lines
3.4 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#ifdef GFLAGS
|
|
#pragma once
|
|
|
|
#include "db/compaction/compaction_job.h"
|
|
#include "db_stress_shared_state.h"
|
|
#include "rocksdb/options.h"
|
|
#include "utilities/fault_injection_fs.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Service to simulate Remote Compaction in Stress Test
|
|
class DbStressCompactionService : public CompactionService {
|
|
public:
|
|
explicit DbStressCompactionService(SharedState* shared,
|
|
bool failure_should_fall_back_to_local)
|
|
: shared_(shared),
|
|
aborted_(false),
|
|
failure_should_fall_back_to_local_(failure_should_fall_back_to_local) {}
|
|
|
|
static const char* kClassName() { return "DbStressCompactionService"; }
|
|
|
|
const char* Name() const override { return kClassName(); }
|
|
|
|
static constexpr uint64_t kWaitIntervalInMicros = 10 * 1000; // 10ms
|
|
|
|
static constexpr const char* kTempOutputDirectoryPrefix = "tmp_output_";
|
|
|
|
CompactionServiceScheduleResponse Schedule(
|
|
const CompactionServiceJobInfo& info,
|
|
const std::string& compaction_service_input) override {
|
|
std::string job_id = info.db_id + "_" + info.db_session_id + "_" +
|
|
std::to_string(info.job_id);
|
|
|
|
if (aborted_.load()) {
|
|
return CompactionServiceScheduleResponse(
|
|
job_id, CompactionServiceJobStatus::kUseLocal);
|
|
}
|
|
std::string output_directory = info.db_name + "/" +
|
|
kTempOutputDirectoryPrefix +
|
|
Env::Default()->GenerateUniqueId();
|
|
|
|
shared_->EnqueueRemoteCompaction(
|
|
job_id, info, compaction_service_input, output_directory,
|
|
false /* was_cancelled */); // Not canceled initially
|
|
CompactionServiceScheduleResponse response(
|
|
job_id, CompactionServiceJobStatus::kSuccess);
|
|
return response;
|
|
}
|
|
|
|
CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
|
|
std::string* result) override;
|
|
|
|
void OnInstallation(const std::string& scheduled_job_id,
|
|
CompactionServiceJobStatus /*status*/) override {
|
|
// Clean up tmp directory
|
|
std::string serialized;
|
|
CompactionServiceResult result;
|
|
if (shared_->GetRemoteCompactionResult(scheduled_job_id, &serialized)
|
|
.has_value()) {
|
|
if (CompactionServiceResult::Read(serialized, &result).ok()) {
|
|
std::vector<std::string> filenames;
|
|
Status s = Env::Default()->GetChildren(result.output_path, &filenames);
|
|
for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
|
|
s = Env::Default()->DeleteFile(result.output_path + "/" +
|
|
filenames[i]);
|
|
if (!s.ok()) {
|
|
// TODO - Handle clean up failure?
|
|
break;
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
Env::Default()->DeleteDir(result.output_path).PermitUncheckedError();
|
|
}
|
|
}
|
|
shared_->RemoveRemoteCompactionResult(scheduled_job_id);
|
|
}
|
|
}
|
|
|
|
void CancelAwaitingJobs() override { aborted_.store(true); }
|
|
|
|
private:
|
|
SharedState* shared_;
|
|
std::atomic_bool aborted_{false};
|
|
bool failure_should_fall_back_to_local_;
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // GFLAGS
|