Summary:
**Context:**
https://github.com/facebook/rocksdb/pull/12838 allows a write thread encountered certain injected error to release the lock and sleep before retrying write in order to reduce performance cost. This requires adding checks like [this](b26b395e0a/db_stress_tool/expected_value.cc (L29-L31)
) to prevent writing to the same key from another thread.
The added check causes a false-positive failure when delete range + file ingestion + backup is used. Consider the following scenario:
(1) Issue a delete range covering some key that do not exist and a key does exist (named as k1). k1 will have "pending delete" state while the keys that does not exit will have whatever state they already have since we don't delete a key that does not exist already.
(2) After https://github.com/facebook/rocksdb/pull/12838, `PrepareDeleteRange(... &prepared)` will return `prepared = false`. So below logic will be executed and k1's "pending delete" won't get roll-backed nor committed.
```
std::vector<PendingExpectedValue> pending_expected_values =
shared->PrepareDeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
&prepared);
if (!prepared) {
for (PendingExpectedValue& pending_expected_value :
pending_expected_values) {
pending_expected_value.PermitUnclosedPendingState();
}
return s;
}
```
(3) Issue an file ingestion covering k1 and another key k2. Similar to (2), we will have `shared->PreparePut(column_family, key, &prepared)` return `prepared = false` for k1 while k2 will have a "pending put" state. So below logic will be executed and k2's "pending put" state won't get roll-backed nor committed.
```
for (int64_t key = key_base;
s.ok() && key < shared->GetMaxKey() &&
static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
++key)
PendingExpectedValue pending_expected_value =
shared->PreparePut(column_family, key, &prepared);
if (!prepared) {
pending_expected_value.PermitUnclosedPendingState();
for (PendingExpectedValue& pev : pending_expected_values) {
pev.PermitUnclosedPendingState();
}
return;
}
}
```
(4) Issue a backup and verify on k2. Below logic decides that k2 should exist in restored DB since it has a pending write state while k2 is never ingested into the original DB as (3) returns early.
```
bool Exists() const { return PendingPut() || !IsDeleted(); }
TestBackupRestore() {
...
Status get_status = restored_db->Get(
read_opts, restored_cf_handles[rand_column_families[i]], key,
&restored_value);
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
if (get_status.ok()) {
if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
std::ostringstream oss;
oss << "0x" << key.ToString(true)
<< " exists in restore but not in original db";
s = Status::Corruption(oss.str());
}
} else if (get_status.IsNotFound()) {
if (exists && from_latest && ShouldAcquireMutexOnKey()) {
std::ostringstream oss;
oss << "0x" << key.ToString(true)
<< " exists in original db but not in restore";
s = Status::Corruption(oss.str());
}
}
...
}
```
So we see false-positive corruption like `Failure in a backup/restore operation with: Corruption: 0x000000000000017B0000000000000073787878 exists in original db but not in restore`
A simple fix is to remove `PendingPut()` from `bool Exists() ` since it's called under a lock and should never see a pending write. However, in order for "under a lock and should never see a pending write" to be true, we need to remove the logic of releasing the lock during sleep in the write thread, which expose pending write to other thread that can call Exists() like back up thread.
The downside of holding lock during sleep is blocking other write thread of the same key to proceed cuz they need to wait for the lock. This should happen rarely as the key of a thread is selected randomly in crash test like below.
```
void StressTest::OperateDb(ThreadState* thread) {
for (uint64_t i = 0; i < ops_per_open; i++) {
...
int64_t rand_key = GenerateOneKey(thread, i);
...
}
}
```
**Summary:**
- Removed the "lock release" part and related checks
- Printed recovery time if the write thread waited more than 10 seconds
- Reverted regression in testing coverage when deleting a non-existent key
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12917
Test Plan:
Below command repro-ed frequently before the fix and not after.
```
./db_stress --WAL_size_limit_MB=1 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=0 --adaptive_readahead=0 --adm_policy=1 --advise_random_on_open=1 --allow_concurrent_memtable_write=0 --allow_data_in_errors=True --allow_fallocate=0 --allow_setting_blob_options_dynamically=1 --async_io=0 --auto_readahead_size=1 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=100 --blob_cache_size=8388608 --blob_compaction_readahead_size=1048576 --blob_compression_type=none --blob_file_size=1073741824 --blob_file_starting_level=1 --blob_garbage_collection_age_cutoff=0.0 --blob_garbage_collection_force_threshold=0.75 --block_align=0 --block_protection_bytes_per_key=8 --block_size=16384 --bloom_before_level=2147483647 --bloom_bits=16.216959977115277 --bottommost_compression_type=xpress --bottommost_file_compaction_delay=600 --bytes_per_sync=262144 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=8388608 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=1 --check_multiget_consistency=0 --check_multiget_entity_consistency=0 --checkpoint_one_in=1000000 --checksum_type=kXXH3 --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=1000 --compact_range_one_in=0 --compaction_pri=3 --compaction_readahead_size=0 --compaction_ttl=10 --compress_format_version=2 --compressed_secondary_cache_size=8388608 --compression_checksum=0 --compression_max_dict_buffer_bytes=2097151 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --daily_offpeak_time_utc=04:00-08:00 --data_block_index_type=0 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --db_write_buffer_size=0 --default_temperature=kUnknown --default_write_temperature=kWarm --delete_obsolete_files_period_micros=21600000000 --delpercent=0 --delrangepercent=5 --destroy_db_initially=0 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=10000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=0 --enable_blob_files=0 --enable_blob_garbage_collection=1 --enable_checksum_handoff=1 --enable_compaction_filter=1 --enable_custom_split_merge=1 --enable_do_not_compress_roles=0 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=1 --enable_sst_partitioner_factory=1 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=1 --exclude_wal_from_write_fault_injection=1 --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=big --fill_cache=1 --flush_one_in=1000000 --format_version=2 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=1000000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=100000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=2097152 --high_pri_pool_ratio=0.5 --index_block_restart_interval=1 --index_shortening=2 --index_type=0 --ingest_external_file_one_in=1000 --initial_auto_readahead_size=0 --inplace_update_support=0 --iterpercent=0 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kUnknown --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=10000 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=0 --long_running_snapshots=1 --low_pri_pool_ratio=0.5 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=10 --max_auto_readahead_size=16384 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=16 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=1000 --memtable_prefix_bloom_size_ratio=0.001 --memtable_protection_bytes_per_key=4 --memtable_whole_key_filtering=1 --memtablerep=skip_list --metadata_charge_policy=1 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_blob_size=16 --min_write_buffer_number_to_merge=2 --mmap_read=0 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=20000000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=0 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=1 --pause_background_one_in=10000 --periodic_compaction_seconds=10 --prefix_size=8 --prefixpercent=0 --prepopulate_blob_cache=1 --prepopulate_block_cache=1 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=0 --readahead_size=524288 --readpercent=60 --recycle_log_file_num=1 --reopen=20 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --secondary_cache_uri= --skip_stats_update_on_db_open=1 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=foo --sqfc_version=1 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=1048576 --strict_bytes_per_sync=1 --subcompactions=2 --sync=0 --sync_fault_injection=0 --table_cache_numshardbits=0 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=3 --uncache_aggressiveness=118 --universal_max_read_amp=-1 --unpartitioned_pinning=0 --use_adaptive_mutex=0 --use_adaptive_mutex_lru=1 --use_attribute_group=0 --use_blob_cache=0 --use_delta_encoding=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=1 --use_put_entity_one_in=0 --use_shared_block_and_blob_cache=1 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_compression=0 --verify_db_one_in=10000 --verify_file_checksums_one_in=1000000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=33554432 --write_dbid_to_manifest=0 --write_fault_one_in=0 --writepercent=35
```
Reviewed By: cbi42
Differential Revision: D60890580
Pulled By: hx235
fbshipit-source-id: 401f90d6d351c7ee11088cad06fb00e54062d416
810 lines
30 KiB
C++
810 lines
30 KiB
C++
// Copyright (c) 2021-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include <atomic>
|
|
#ifdef GFLAGS
|
|
|
|
#include "db/wide/wide_column_serialization.h"
|
|
#include "db/wide/wide_columns_helper.h"
|
|
#include "db_stress_tool/db_stress_common.h"
|
|
#include "db_stress_tool/db_stress_shared_state.h"
|
|
#include "db_stress_tool/expected_state.h"
|
|
#include "rocksdb/trace_reader_writer.h"
|
|
#include "rocksdb/trace_record_result.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
ExpectedState::ExpectedState(size_t max_key, size_t num_column_families)
|
|
: max_key_(max_key),
|
|
num_column_families_(num_column_families),
|
|
values_(nullptr) {}
|
|
|
|
void ExpectedState::ClearColumnFamily(int cf) {
|
|
const uint32_t del_mask = ExpectedValue::GetDelMask();
|
|
std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), del_mask);
|
|
}
|
|
|
|
void ExpectedState::Precommit(int cf, int64_t key, const ExpectedValue& value) {
|
|
Value(cf, key).store(value.Read());
|
|
// To prevent low-level instruction reordering that results
|
|
// in db write happens before setting pending state in expected value
|
|
std::atomic_thread_fence(std::memory_order_release);
|
|
}
|
|
|
|
PendingExpectedValue ExpectedState::PreparePut(int cf, int64_t key) {
|
|
ExpectedValue expected_value = Load(cf, key);
|
|
|
|
// Calculate the original expected value
|
|
const ExpectedValue orig_expected_value = expected_value;
|
|
|
|
// Calculate the pending expected value
|
|
expected_value.Put(true /* pending */);
|
|
const ExpectedValue pending_expected_value = expected_value;
|
|
|
|
// Calculate the final expected value
|
|
expected_value.Put(false /* pending */);
|
|
const ExpectedValue final_expected_value = expected_value;
|
|
|
|
// Precommit
|
|
Precommit(cf, key, pending_expected_value);
|
|
return PendingExpectedValue(&Value(cf, key), orig_expected_value,
|
|
final_expected_value);
|
|
}
|
|
|
|
ExpectedValue ExpectedState::Get(int cf, int64_t key) { return Load(cf, key); }
|
|
|
|
PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key) {
|
|
ExpectedValue expected_value = Load(cf, key);
|
|
|
|
// Calculate the original expected value
|
|
const ExpectedValue orig_expected_value = expected_value;
|
|
|
|
// Calculate the pending expected value
|
|
bool res = expected_value.Delete(true /* pending */);
|
|
if (!res) {
|
|
PendingExpectedValue ret = PendingExpectedValue(
|
|
&Value(cf, key), orig_expected_value, orig_expected_value);
|
|
return ret;
|
|
}
|
|
const ExpectedValue pending_expected_value = expected_value;
|
|
|
|
// Calculate the final expected value
|
|
expected_value.Delete(false /* pending */);
|
|
const ExpectedValue final_expected_value = expected_value;
|
|
|
|
// Precommit
|
|
Precommit(cf, key, pending_expected_value);
|
|
return PendingExpectedValue(&Value(cf, key), orig_expected_value,
|
|
final_expected_value);
|
|
}
|
|
|
|
PendingExpectedValue ExpectedState::PrepareSingleDelete(int cf, int64_t key) {
|
|
return PrepareDelete(cf, key);
|
|
}
|
|
|
|
std::vector<PendingExpectedValue> ExpectedState::PrepareDeleteRange(
|
|
int cf, int64_t begin_key, int64_t end_key) {
|
|
std::vector<PendingExpectedValue> pending_expected_values;
|
|
|
|
for (int64_t key = begin_key; key < end_key; ++key) {
|
|
pending_expected_values.push_back(PrepareDelete(cf, key));
|
|
}
|
|
|
|
return pending_expected_values;
|
|
}
|
|
|
|
bool ExpectedState::Exists(int cf, int64_t key) {
|
|
return Load(cf, key).Exists();
|
|
}
|
|
|
|
void ExpectedState::Reset() {
|
|
const uint32_t del_mask = ExpectedValue::GetDelMask();
|
|
for (size_t i = 0; i < num_column_families_; ++i) {
|
|
for (size_t j = 0; j < max_key_; ++j) {
|
|
Value(static_cast<int>(i), j).store(del_mask, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ExpectedState::SyncPut(int cf, int64_t key, uint32_t value_base) {
|
|
ExpectedValue expected_value = Load(cf, key);
|
|
expected_value.SyncPut(value_base);
|
|
Value(cf, key).store(expected_value.Read());
|
|
}
|
|
|
|
void ExpectedState::SyncPendingPut(int cf, int64_t key) {
|
|
ExpectedValue expected_value = Load(cf, key);
|
|
expected_value.SyncPendingPut();
|
|
Value(cf, key).store(expected_value.Read());
|
|
}
|
|
|
|
void ExpectedState::SyncDelete(int cf, int64_t key) {
|
|
ExpectedValue expected_value = Load(cf, key);
|
|
expected_value.SyncDelete();
|
|
Value(cf, key).store(expected_value.Read());
|
|
}
|
|
|
|
void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
|
|
int64_t end_key) {
|
|
for (int64_t key = begin_key; key < end_key; ++key) {
|
|
SyncDelete(cf, key);
|
|
}
|
|
}
|
|
|
|
FileExpectedState::FileExpectedState(std::string expected_state_file_path,
|
|
size_t max_key, size_t num_column_families)
|
|
: ExpectedState(max_key, num_column_families),
|
|
expected_state_file_path_(expected_state_file_path) {}
|
|
|
|
Status FileExpectedState::Open(bool create) {
|
|
size_t expected_values_size = GetValuesLen();
|
|
|
|
Env* default_env = Env::Default();
|
|
|
|
Status status;
|
|
if (create) {
|
|
std::unique_ptr<WritableFile> wfile;
|
|
const EnvOptions soptions;
|
|
status = default_env->NewWritableFile(expected_state_file_path_, &wfile,
|
|
soptions);
|
|
if (status.ok()) {
|
|
std::string buf(expected_values_size, '\0');
|
|
status = wfile->Append(buf);
|
|
}
|
|
}
|
|
if (status.ok()) {
|
|
status = default_env->NewMemoryMappedFileBuffer(
|
|
expected_state_file_path_, &expected_state_mmap_buffer_);
|
|
}
|
|
if (status.ok()) {
|
|
assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
|
|
values_ = static_cast<std::atomic<uint32_t>*>(
|
|
expected_state_mmap_buffer_->GetBase());
|
|
assert(values_ != nullptr);
|
|
if (create) {
|
|
Reset();
|
|
}
|
|
} else {
|
|
assert(values_ == nullptr);
|
|
}
|
|
return status;
|
|
}
|
|
|
|
AnonExpectedState::AnonExpectedState(size_t max_key, size_t num_column_families)
|
|
: ExpectedState(max_key, num_column_families) {}
|
|
|
|
#ifndef NDEBUG
|
|
Status AnonExpectedState::Open(bool create) {
|
|
#else
|
|
Status AnonExpectedState::Open(bool /* create */) {
|
|
#endif
|
|
// AnonExpectedState only supports being freshly created.
|
|
assert(create);
|
|
values_allocation_.reset(
|
|
new std::atomic<uint32_t>[GetValuesLen() /
|
|
sizeof(std::atomic<uint32_t>)]);
|
|
values_ = &values_allocation_[0];
|
|
Reset();
|
|
return Status::OK();
|
|
}
|
|
|
|
ExpectedStateManager::ExpectedStateManager(size_t max_key,
|
|
size_t num_column_families)
|
|
: max_key_(max_key),
|
|
num_column_families_(num_column_families),
|
|
latest_(nullptr) {}
|
|
|
|
ExpectedStateManager::~ExpectedStateManager() = default;
|
|
|
|
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
|
|
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
|
|
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
|
|
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
|
|
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";
|
|
|
|
FileExpectedStateManager::FileExpectedStateManager(
|
|
size_t max_key, size_t num_column_families,
|
|
std::string expected_state_dir_path)
|
|
: ExpectedStateManager(max_key, num_column_families),
|
|
expected_state_dir_path_(std::move(expected_state_dir_path)) {
|
|
assert(!expected_state_dir_path_.empty());
|
|
}
|
|
|
|
Status FileExpectedStateManager::Open() {
|
|
// Before doing anything, sync directory state with ours. That is, determine
|
|
// `saved_seqno_`, and create any necessary missing files.
|
|
std::vector<std::string> expected_state_dir_children;
|
|
Status s = Env::Default()->GetChildren(expected_state_dir_path_,
|
|
&expected_state_dir_children);
|
|
bool found_trace = false;
|
|
if (s.ok()) {
|
|
for (size_t i = 0; i < expected_state_dir_children.size(); ++i) {
|
|
const auto& filename = expected_state_dir_children[i];
|
|
if (filename.size() >= kStateFilenameSuffix.size() &&
|
|
filename.rfind(kStateFilenameSuffix) ==
|
|
filename.size() - kStateFilenameSuffix.size() &&
|
|
filename.rfind(kLatestBasename, 0) == std::string::npos) {
|
|
SequenceNumber found_seqno = ParseUint64(
|
|
filename.substr(0, filename.size() - kStateFilenameSuffix.size()));
|
|
if (saved_seqno_ == kMaxSequenceNumber || found_seqno > saved_seqno_) {
|
|
saved_seqno_ = found_seqno;
|
|
}
|
|
}
|
|
}
|
|
// Check if crash happened after creating state file but before creating
|
|
// trace file.
|
|
if (saved_seqno_ != kMaxSequenceNumber) {
|
|
std::string saved_seqno_trace_path = GetPathForFilename(
|
|
std::to_string(saved_seqno_) + kTraceFilenameSuffix);
|
|
Status exists_status = Env::Default()->FileExists(saved_seqno_trace_path);
|
|
if (exists_status.ok()) {
|
|
found_trace = true;
|
|
} else if (exists_status.IsNotFound()) {
|
|
found_trace = false;
|
|
} else {
|
|
s = exists_status;
|
|
}
|
|
}
|
|
}
|
|
if (s.ok() && saved_seqno_ != kMaxSequenceNumber && !found_trace) {
|
|
// Create an empty trace file so later logic does not need to distinguish
|
|
// missing vs. empty trace file.
|
|
std::unique_ptr<WritableFile> wfile;
|
|
const EnvOptions soptions;
|
|
std::string saved_seqno_trace_path =
|
|
GetPathForFilename(std::to_string(saved_seqno_) + kTraceFilenameSuffix);
|
|
s = Env::Default()->NewWritableFile(saved_seqno_trace_path, &wfile,
|
|
soptions);
|
|
}
|
|
|
|
if (s.ok()) {
|
|
s = Clean();
|
|
}
|
|
|
|
std::string expected_state_file_path =
|
|
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
|
|
bool found = false;
|
|
if (s.ok()) {
|
|
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
|
|
if (exists_status.ok()) {
|
|
found = true;
|
|
} else if (exists_status.IsNotFound()) {
|
|
found = false;
|
|
} else {
|
|
s = exists_status;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
// Initialize the file in a temp path and then rename it. That way, in case
|
|
// this process is killed during setup, `Clean()` will take care of removing
|
|
// the incomplete expected values file.
|
|
std::string temp_expected_state_file_path =
|
|
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
|
|
FileExpectedState temp_expected_state(temp_expected_state_file_path,
|
|
max_key_, num_column_families_);
|
|
if (s.ok()) {
|
|
s = temp_expected_state.Open(true /* create */);
|
|
}
|
|
if (s.ok()) {
|
|
s = Env::Default()->RenameFile(temp_expected_state_file_path,
|
|
expected_state_file_path);
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
latest_.reset(new FileExpectedState(std::move(expected_state_file_path),
|
|
max_key_, num_column_families_));
|
|
s = latest_->Open(false /* create */);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
|
|
SequenceNumber seqno = db->GetLatestSequenceNumber();
|
|
|
|
std::string state_filename = std::to_string(seqno) + kStateFilenameSuffix;
|
|
std::string state_file_temp_path = GetTempPathForFilename(state_filename);
|
|
std::string state_file_path = GetPathForFilename(state_filename);
|
|
|
|
std::string latest_file_path =
|
|
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
|
|
|
|
std::string trace_filename = std::to_string(seqno) + kTraceFilenameSuffix;
|
|
std::string trace_file_path = GetPathForFilename(trace_filename);
|
|
|
|
// Populate a tempfile and then rename it to atomically create "<seqno>.state"
|
|
// with contents from "LATEST.state"
|
|
Status s =
|
|
CopyFile(FileSystem::Default(), latest_file_path, Temperature::kUnknown,
|
|
state_file_temp_path, Temperature::kUnknown, 0 /* size */,
|
|
false /* use_fsync */, nullptr /* io_tracer */);
|
|
if (s.ok()) {
|
|
s = FileSystem::Default()->RenameFile(state_file_temp_path, state_file_path,
|
|
IOOptions(), nullptr /* dbg */);
|
|
}
|
|
SequenceNumber old_saved_seqno = 0;
|
|
if (s.ok()) {
|
|
old_saved_seqno = saved_seqno_;
|
|
saved_seqno_ = seqno;
|
|
}
|
|
|
|
// If there is a crash now, i.e., after "<seqno>.state" was created but before
|
|
// "<seqno>.trace" is created, it will be treated as if "<seqno>.trace" were
|
|
// present but empty.
|
|
|
|
// Create "<seqno>.trace" directly. It is initially empty so no need for
|
|
// tempfile.
|
|
std::unique_ptr<TraceWriter> trace_writer;
|
|
if (s.ok()) {
|
|
EnvOptions soptions;
|
|
// Disable buffering so traces will not get stuck in application buffer.
|
|
soptions.writable_file_max_buffer_size = 0;
|
|
s = NewFileTraceWriter(Env::Default(), soptions, trace_file_path,
|
|
&trace_writer);
|
|
}
|
|
if (s.ok()) {
|
|
TraceOptions trace_opts;
|
|
trace_opts.filter |= kTraceFilterGet;
|
|
trace_opts.filter |= kTraceFilterMultiGet;
|
|
trace_opts.filter |= kTraceFilterIteratorSeek;
|
|
trace_opts.filter |= kTraceFilterIteratorSeekForPrev;
|
|
trace_opts.preserve_write_order = true;
|
|
s = db->StartTrace(trace_opts, std::move(trace_writer));
|
|
}
|
|
|
|
// Delete old state/trace files. Deletion order does not matter since we only
|
|
// delete after successfully saving new files, so old files will never be used
|
|
// again, even if we crash.
|
|
if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
|
|
old_saved_seqno != saved_seqno_) {
|
|
s = Env::Default()->DeleteFile(GetPathForFilename(
|
|
std::to_string(old_saved_seqno) + kStateFilenameSuffix));
|
|
}
|
|
if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
|
|
old_saved_seqno != saved_seqno_) {
|
|
s = Env::Default()->DeleteFile(GetPathForFilename(
|
|
std::to_string(old_saved_seqno) + kTraceFilenameSuffix));
|
|
}
|
|
return s;
|
|
}
|
|
|
|
bool FileExpectedStateManager::HasHistory() {
|
|
return saved_seqno_ != kMaxSequenceNumber;
|
|
}
|
|
|
|
namespace {
|
|
|
|
// An `ExpectedStateTraceRecordHandler` applies a configurable number of
|
|
// write operation trace records to the configured expected state. It is used in
|
|
// `FileExpectedStateManager::Restore()` to sync the expected state with the
|
|
// DB's post-recovery state.
|
|
class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
|
|
public WriteBatch::Handler {
|
|
public:
|
|
ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state)
|
|
: max_write_ops_(max_write_ops),
|
|
state_(state),
|
|
buffered_writes_(nullptr) {}
|
|
|
|
~ExpectedStateTraceRecordHandler() { assert(IsDone()); }
|
|
|
|
// True if we have already reached the limit on write operations to apply.
|
|
bool IsDone() { return num_write_ops_ == max_write_ops_; }
|
|
|
|
Status Handle(const WriteQueryTraceRecord& record,
|
|
std::unique_ptr<TraceRecordResult>* /* result */) override {
|
|
if (IsDone()) {
|
|
return Status::OK();
|
|
}
|
|
WriteBatch batch(record.GetWriteBatchRep().ToString());
|
|
return batch.Iterate(this);
|
|
}
|
|
|
|
// Ignore reads.
|
|
Status Handle(const GetQueryTraceRecord& /* record */,
|
|
std::unique_ptr<TraceRecordResult>* /* result */) override {
|
|
return Status::OK();
|
|
}
|
|
|
|
// Ignore reads.
|
|
Status Handle(const IteratorSeekQueryTraceRecord& /* record */,
|
|
std::unique_ptr<TraceRecordResult>* /* result */) override {
|
|
return Status::OK();
|
|
}
|
|
|
|
// Ignore reads.
|
|
Status Handle(const MultiGetQueryTraceRecord& /* record */,
|
|
std::unique_ptr<TraceRecordResult>* /* result */) override {
|
|
return Status::OK();
|
|
}
|
|
|
|
// Below are the WriteBatch::Handler overrides. We could use a separate
|
|
// object, but it's convenient and works to share state with the
|
|
// `TraceRecord::Handler`.
|
|
|
|
Status PutCF(uint32_t column_family_id, const Slice& key_with_ts,
|
|
const Slice& value) override {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
uint64_t key_id;
|
|
if (!GetIntVal(key.ToString(), &key_id)) {
|
|
return Status::Corruption("unable to parse key", key.ToString());
|
|
}
|
|
uint32_t value_base = GetValueBase(value);
|
|
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
return WriteBatchInternal::Put(buffered_writes_.get(), column_family_id,
|
|
key, value);
|
|
}
|
|
|
|
state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
|
|
++num_write_ops_;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status TimedPutCF(uint32_t column_family_id, const Slice& key_with_ts,
|
|
const Slice& value, uint64_t write_unix_time) override {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
uint64_t key_id;
|
|
if (!GetIntVal(key.ToString(), &key_id)) {
|
|
return Status::Corruption("unable to parse key", key.ToString());
|
|
}
|
|
uint32_t value_base = GetValueBase(value);
|
|
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
return WriteBatchInternal::TimedPut(buffered_writes_.get(),
|
|
column_family_id, key, value,
|
|
write_unix_time);
|
|
}
|
|
|
|
state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
|
|
++num_write_ops_;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts,
|
|
const Slice& entity) override {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
|
|
uint64_t key_id = 0;
|
|
if (!GetIntVal(key.ToString(), &key_id)) {
|
|
return Status::Corruption("Unable to parse key", key.ToString());
|
|
}
|
|
|
|
Slice entity_copy = entity;
|
|
WideColumns columns;
|
|
if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
|
|
return Status::Corruption("Unable to deserialize entity",
|
|
entity.ToString(/* hex */ true));
|
|
}
|
|
|
|
if (!VerifyWideColumns(columns)) {
|
|
return Status::Corruption("Wide columns in entity inconsistent",
|
|
entity.ToString(/* hex */ true));
|
|
}
|
|
|
|
if (buffered_writes_) {
|
|
return WriteBatchInternal::PutEntity(buffered_writes_.get(),
|
|
column_family_id, key, columns);
|
|
}
|
|
|
|
const uint32_t value_base =
|
|
GetValueBase(WideColumnsHelper::GetDefaultColumn(columns));
|
|
|
|
state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
|
|
|
|
++num_write_ops_;
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key_with_ts) override {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
uint64_t key_id;
|
|
if (!GetIntVal(key.ToString(), &key_id)) {
|
|
return Status::Corruption("unable to parse key", key.ToString());
|
|
}
|
|
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
return WriteBatchInternal::Delete(buffered_writes_.get(),
|
|
column_family_id, key);
|
|
}
|
|
|
|
state_->SyncDelete(column_family_id, static_cast<int64_t>(key_id));
|
|
++num_write_ops_;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SingleDeleteCF(uint32_t column_family_id,
|
|
const Slice& key_with_ts) override {
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
Slice ts =
|
|
ExtractTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
std::array<Slice, 2> key_with_ts_arr{{key, ts}};
|
|
return WriteBatchInternal::SingleDelete(
|
|
buffered_writes_.get(), column_family_id,
|
|
SliceParts(key_with_ts_arr.data(), 2));
|
|
}
|
|
|
|
return DeleteCF(column_family_id, key_with_ts);
|
|
}
|
|
|
|
Status DeleteRangeCF(uint32_t column_family_id,
|
|
const Slice& begin_key_with_ts,
|
|
const Slice& end_key_with_ts) override {
|
|
Slice begin_key =
|
|
StripTimestampFromUserKey(begin_key_with_ts, FLAGS_user_timestamp_size);
|
|
Slice end_key =
|
|
StripTimestampFromUserKey(end_key_with_ts, FLAGS_user_timestamp_size);
|
|
uint64_t begin_key_id, end_key_id;
|
|
if (!GetIntVal(begin_key.ToString(), &begin_key_id)) {
|
|
return Status::Corruption("unable to parse begin key",
|
|
begin_key.ToString());
|
|
}
|
|
if (!GetIntVal(end_key.ToString(), &end_key_id)) {
|
|
return Status::Corruption("unable to parse end key", end_key.ToString());
|
|
}
|
|
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
return WriteBatchInternal::DeleteRange(
|
|
buffered_writes_.get(), column_family_id, begin_key, end_key);
|
|
}
|
|
|
|
state_->SyncDeleteRange(column_family_id,
|
|
static_cast<int64_t>(begin_key_id),
|
|
static_cast<int64_t>(end_key_id));
|
|
++num_write_ops_;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MergeCF(uint32_t column_family_id, const Slice& key_with_ts,
|
|
const Slice& value) override {
|
|
Slice key =
|
|
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
|
|
|
|
bool should_buffer_write = !(buffered_writes_ == nullptr);
|
|
if (should_buffer_write) {
|
|
return WriteBatchInternal::Merge(buffered_writes_.get(), column_family_id,
|
|
key, value);
|
|
}
|
|
|
|
return PutCF(column_family_id, key, value);
|
|
}
|
|
|
|
Status MarkBeginPrepare(bool = false) override {
|
|
assert(!buffered_writes_);
|
|
buffered_writes_.reset(new WriteBatch());
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MarkEndPrepare(const Slice& xid) override {
|
|
assert(buffered_writes_);
|
|
std::string xid_str = xid.ToString();
|
|
assert(xid_to_buffered_writes_.find(xid_str) ==
|
|
xid_to_buffered_writes_.end());
|
|
|
|
xid_to_buffered_writes_[xid_str].swap(buffered_writes_);
|
|
|
|
buffered_writes_.reset();
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MarkCommit(const Slice& xid) override {
|
|
std::string xid_str = xid.ToString();
|
|
assert(xid_to_buffered_writes_.find(xid_str) !=
|
|
xid_to_buffered_writes_.end());
|
|
assert(xid_to_buffered_writes_.at(xid_str));
|
|
|
|
Status s = xid_to_buffered_writes_.at(xid_str)->Iterate(this);
|
|
xid_to_buffered_writes_.erase(xid_str);
|
|
|
|
return s;
|
|
}
|
|
|
|
Status MarkRollback(const Slice& xid) override {
|
|
std::string xid_str = xid.ToString();
|
|
assert(xid_to_buffered_writes_.find(xid_str) !=
|
|
xid_to_buffered_writes_.end());
|
|
assert(xid_to_buffered_writes_.at(xid_str));
|
|
xid_to_buffered_writes_.erase(xid_str);
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
uint64_t num_write_ops_ = 0;
|
|
uint64_t max_write_ops_;
|
|
ExpectedState* state_;
|
|
std::unordered_map<std::string, std::unique_ptr<WriteBatch>>
|
|
xid_to_buffered_writes_;
|
|
std::unique_ptr<WriteBatch> buffered_writes_;
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
Status FileExpectedStateManager::Restore(DB* db) {
|
|
assert(HasHistory());
|
|
SequenceNumber seqno = db->GetLatestSequenceNumber();
|
|
if (seqno < saved_seqno_) {
|
|
return Status::Corruption("DB is older than any restorable expected state");
|
|
}
|
|
|
|
std::string state_filename =
|
|
std::to_string(saved_seqno_) + kStateFilenameSuffix;
|
|
std::string state_file_path = GetPathForFilename(state_filename);
|
|
|
|
std::string latest_file_temp_path =
|
|
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
|
|
std::string latest_file_path =
|
|
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
|
|
|
|
std::string trace_filename =
|
|
std::to_string(saved_seqno_) + kTraceFilenameSuffix;
|
|
std::string trace_file_path = GetPathForFilename(trace_filename);
|
|
|
|
std::unique_ptr<TraceReader> trace_reader;
|
|
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
|
|
&trace_reader);
|
|
|
|
if (s.ok()) {
|
|
// We are going to replay on top of "`seqno`.state" to create a new
|
|
// "LATEST.state". Start off by creating a tempfile so we can later make the
|
|
// new "LATEST.state" appear atomically using `RenameFile()`.
|
|
s = CopyFile(FileSystem::Default(), state_file_path, Temperature::kUnknown,
|
|
latest_file_temp_path, Temperature::kUnknown, 0 /* size */,
|
|
false /* use_fsync */, nullptr /* io_tracer */);
|
|
}
|
|
|
|
{
|
|
std::unique_ptr<Replayer> replayer;
|
|
std::unique_ptr<ExpectedState> state;
|
|
std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
|
|
if (s.ok()) {
|
|
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
|
|
num_column_families_));
|
|
s = state->Open(false /* create */);
|
|
}
|
|
if (s.ok()) {
|
|
handler.reset(new ExpectedStateTraceRecordHandler(seqno - saved_seqno_,
|
|
state.get()));
|
|
// TODO(ajkr): An API limitation requires we provide `handles` although
|
|
// they will be unused since we only use the replayer for reading records.
|
|
// Just give a default CFH for now to satisfy the requirement.
|
|
s = db->NewDefaultReplayer({db->DefaultColumnFamily()} /* handles */,
|
|
std::move(trace_reader), &replayer);
|
|
}
|
|
|
|
if (s.ok()) {
|
|
s = replayer->Prepare();
|
|
}
|
|
for (; s.ok();) {
|
|
std::unique_ptr<TraceRecord> record;
|
|
s = replayer->Next(&record);
|
|
if (!s.ok()) {
|
|
if (s.IsCorruption() && handler->IsDone()) {
|
|
// There could be a corruption reading the tail record of the trace
|
|
// due to `db_stress` crashing while writing it. It shouldn't matter
|
|
// as long as we already found all the write ops we need to catch up
|
|
// the expected state.
|
|
s = Status::OK();
|
|
}
|
|
if (s.IsIncomplete()) {
|
|
// OK because `Status::Incomplete` is expected upon finishing all the
|
|
// trace records.
|
|
s = Status::OK();
|
|
}
|
|
break;
|
|
}
|
|
std::unique_ptr<TraceRecordResult> res;
|
|
s = record->Accept(handler.get(), &res);
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
s = FileSystem::Default()->RenameFile(latest_file_temp_path,
|
|
latest_file_path, IOOptions(),
|
|
nullptr /* dbg */);
|
|
}
|
|
if (s.ok()) {
|
|
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
|
|
num_column_families_));
|
|
s = latest_->Open(false /* create */);
|
|
}
|
|
|
|
// Delete old state/trace files. We must delete the state file first.
|
|
// Otherwise, a crash-recovery immediately after deleting the trace file could
|
|
// lead to `Restore()` unable to replay to `seqno`.
|
|
if (s.ok()) {
|
|
s = Env::Default()->DeleteFile(state_file_path);
|
|
}
|
|
if (s.ok()) {
|
|
saved_seqno_ = kMaxSequenceNumber;
|
|
s = Env::Default()->DeleteFile(trace_file_path);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status FileExpectedStateManager::Clean() {
|
|
std::vector<std::string> expected_state_dir_children;
|
|
Status s = Env::Default()->GetChildren(expected_state_dir_path_,
|
|
&expected_state_dir_children);
|
|
// An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
|
|
// behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
|
|
// also left behind stale state/trace files. An incomplete `Restore()` could
|
|
// have left behind stale trace files.
|
|
for (size_t i = 0; s.ok() && i < expected_state_dir_children.size(); ++i) {
|
|
const auto& filename = expected_state_dir_children[i];
|
|
if (filename.rfind(kTempFilenamePrefix, 0 /* pos */) == 0 &&
|
|
filename.size() >= kTempFilenameSuffix.size() &&
|
|
filename.rfind(kTempFilenameSuffix) ==
|
|
filename.size() - kTempFilenameSuffix.size()) {
|
|
// Delete all temp files.
|
|
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
|
|
} else if (filename.size() >= kStateFilenameSuffix.size() &&
|
|
filename.rfind(kStateFilenameSuffix) ==
|
|
filename.size() - kStateFilenameSuffix.size() &&
|
|
filename.rfind(kLatestBasename, 0) == std::string::npos &&
|
|
ParseUint64(filename.substr(
|
|
0, filename.size() - kStateFilenameSuffix.size())) <
|
|
saved_seqno_) {
|
|
assert(saved_seqno_ != kMaxSequenceNumber);
|
|
// Delete stale state files.
|
|
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
|
|
} else if (filename.size() >= kTraceFilenameSuffix.size() &&
|
|
filename.rfind(kTraceFilenameSuffix) ==
|
|
filename.size() - kTraceFilenameSuffix.size() &&
|
|
ParseUint64(filename.substr(
|
|
0, filename.size() - kTraceFilenameSuffix.size())) <
|
|
saved_seqno_) {
|
|
// Delete stale trace files.
|
|
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
std::string FileExpectedStateManager::GetTempPathForFilename(
|
|
const std::string& filename) {
|
|
assert(!expected_state_dir_path_.empty());
|
|
std::string expected_state_dir_path_slash =
|
|
expected_state_dir_path_.back() == '/' ? expected_state_dir_path_
|
|
: expected_state_dir_path_ + "/";
|
|
return expected_state_dir_path_slash + kTempFilenamePrefix + filename +
|
|
kTempFilenameSuffix;
|
|
}
|
|
|
|
std::string FileExpectedStateManager::GetPathForFilename(
|
|
const std::string& filename) {
|
|
assert(!expected_state_dir_path_.empty());
|
|
std::string expected_state_dir_path_slash =
|
|
expected_state_dir_path_.back() == '/' ? expected_state_dir_path_
|
|
: expected_state_dir_path_ + "/";
|
|
return expected_state_dir_path_slash + filename;
|
|
}
|
|
|
|
AnonExpectedStateManager::AnonExpectedStateManager(size_t max_key,
|
|
size_t num_column_families)
|
|
: ExpectedStateManager(max_key, num_column_families) {}
|
|
|
|
Status AnonExpectedStateManager::Open() {
|
|
latest_.reset(new AnonExpectedState(max_key_, num_column_families_));
|
|
return latest_->Open(true /* create */);
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // GFLAGS
|