Summary:
With WP/WUP, we can deadlock on db mutex here: 8dc3d77b59/db/db_impl/db_impl_compaction_flush.cc (L4626)
. Here we release a snapshot (which will acquire db mutex) while already holding the mutex. This caused some transaction lock timeout error in crash test. This PR fixes this by refactoring snapshot related context into JobContext and only allow snapshot related context to be initialized once. This also reduces the number of parameters being passed around.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13632
Test Plan:
- existing tests
- this fails with timeout before this fix
```
./db_stress --WAL_size_limit_MB=0 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --adm_policy=2 --advise_random_on_open=1 --allow_data_in_errors=True --allow_fallocate=0 --allow_unprepared_value=1 --async_io=0 --auto_readahead_size=0 --auto_refresh_iterator_with_snapshot=0 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=1 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 --bgerror_resume_retry_interval=1000000 --block_align=0 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=6 --bloom_bits=2 --bottommost_compression_type=lz4 --bottommost_file_compaction_delay=0 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=0 --cache_size=33554432 --cache_type=fixed_hyper_clock_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 --charge_table_reader=0 --check_multiget_consistency=1 --check_multiget_entity_consistency=0 --checkpoint_one_in=0 --checksum_type=kxxHash --clear_column_family_one_in=0 --commit_bypass_memtable_one_in=0 --compact_files_one_in=1000 --compact_range_one_in=1000000 --compaction_pri=1 --compaction_readahead_size=0 --compaction_style=1 --compaction_ttl=0 --compress_format_version=2 --compressed_secondary_cache_ratio=0.0 --compressed_secondary_cache_size=0 --compression_checksum=1 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --create_timestamped_snapshot_one_in=0 --daily_offpeak_time_utc= --data_block_index_type=1 --db_write_buffer_size=0 --decouple_partitioned_filters=1 --default_temperature=kWarm --default_write_temperature=kHot --delete_obsolete_files_period_micros=30000000 --delpercent=5 --delrangepercent=0 --destroy_db_initially=1 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=1000000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=1 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=0 --enable_index_compression=0 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_remote_compaction=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=1 --error_recovery_with_no_fault_injection=0 --exclude_wal_from_write_fault_injection=1 --fifo_allow_compaction=1 --file_checksum_impl=none --file_temperature_age_thresholds= --fill_cache=1 --flush_one_in=1000 --format_version=3 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=10000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0 --index_block_restart_interval=2 --index_shortening=1 --index_type=0 --ingest_external_file_one_in=0 --ingest_wbwi_one_in=0 --initial_auto_readahead_size=16384 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kWarm --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=1000000 --log2_keys_per_lock=10 --log_file_time_to_roll=60 --log_readahead_size=16777216 --long_running_snapshots=1 --low_pri_pool_ratio=0 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=1 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16777216 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=1000 --memtable_op_scan_flush_trigger=100 --memtable_prefix_bloom_size_ratio=0 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=0 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=500000 --open_metadata_read_fault_one_in=8 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=200000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --paranoid_memory_checks=0 --partition_filters=0 --partition_pinning=0 --pause_background_one_in=10000 --periodic_compaction_seconds=0 --prefix_size=-1 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=60 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=32 --readahead_size=524288 --readpercent=50 --recycle_log_file_num=0 --reopen=20 --report_bg_io_stats=1 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=bar --sqfc_version=0 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --stats_history_buffer_size=0 --strict_bytes_per_sync=0 --subcompactions=4 --sync=0 --sync_fault_injection=0 --table_cache_numshardbits=6 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --test_ingest_standalone_range_deletion_one_in=0 --top_level_index_pinning=0 --two_write_queues=1 --txn_write_policy=2 --uncache_aggressiveness=2 --universal_max_read_amp=10 --unordered_write=0 --unpartitioned_pinning=1 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=0 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=1 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=1 --use_optimistic_txn=0 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_txn=1 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=1 --verify_db_one_in=10000 --verify_file_checksums_one_in=0 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=4194304 --write_dbid_to_manifest=1 --write_fault_one_in=0 --write_identity_file=0 --writepercent=35 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox
```
Reviewed By: hx235
Differential Revision: D75173149
Pulled By: cbi42
fbshipit-source-id: ec68cadc78469730dfe26824e20b8ca4ab993101
293 lines
9.7 KiB
C++
293 lines
9.7 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "db/column_family.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/version_set.h"
|
|
#include "util/autovector.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class MemTable;
|
|
struct SuperVersion;
|
|
|
|
// The purpose of this struct is to simplify pushing work such as
|
|
// allocation/construction, de-allocation/destruction, and notifications to
|
|
// outside of holding the DB mutex.
|
|
struct SuperVersionContext {
|
|
struct WriteStallNotification {
|
|
WriteStallInfo write_stall_info;
|
|
const ImmutableOptions* immutable_options;
|
|
};
|
|
|
|
autovector<SuperVersion*> superversions_to_free;
|
|
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
|
|
autovector<WriteStallNotification> write_stall_notifications;
|
|
#endif
|
|
std::unique_ptr<SuperVersion>
|
|
new_superversion; // if nullptr no new superversion
|
|
|
|
explicit SuperVersionContext(bool create_superversion = false)
|
|
: new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
|
|
|
|
explicit SuperVersionContext(SuperVersionContext&& other) noexcept
|
|
: superversions_to_free(std::move(other.superversions_to_free)),
|
|
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
|
|
write_stall_notifications(std::move(other.write_stall_notifications)),
|
|
#endif
|
|
new_superversion(std::move(other.new_superversion)) {
|
|
}
|
|
// No copies
|
|
SuperVersionContext(const SuperVersionContext& other) = delete;
|
|
void operator=(const SuperVersionContext& other) = delete;
|
|
|
|
void NewSuperVersion() {
|
|
new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());
|
|
}
|
|
|
|
inline bool HaveSomethingToDelete() const {
|
|
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
|
|
return !superversions_to_free.empty() || !write_stall_notifications.empty();
|
|
#else
|
|
return !superversions_to_free.empty();
|
|
#endif
|
|
}
|
|
|
|
void PushWriteStallNotification(WriteStallCondition old_cond,
|
|
WriteStallCondition new_cond,
|
|
const std::string& name,
|
|
const ImmutableOptions* ioptions) {
|
|
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
|
|
WriteStallNotification notif;
|
|
notif.write_stall_info.cf_name = name;
|
|
notif.write_stall_info.condition.prev = old_cond;
|
|
notif.write_stall_info.condition.cur = new_cond;
|
|
notif.immutable_options = ioptions;
|
|
write_stall_notifications.push_back(notif);
|
|
#else
|
|
(void)old_cond;
|
|
(void)new_cond;
|
|
(void)name;
|
|
(void)ioptions;
|
|
#endif // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
|
|
}
|
|
|
|
void Clean() {
|
|
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
|
|
// notify listeners on changed write stall conditions
|
|
for (auto& notif : write_stall_notifications) {
|
|
for (auto& listener : notif.immutable_options->listeners) {
|
|
listener->OnStallConditionsChanged(notif.write_stall_info);
|
|
}
|
|
}
|
|
write_stall_notifications.clear();
|
|
#endif
|
|
// free superversions
|
|
for (auto s : superversions_to_free) {
|
|
delete s;
|
|
}
|
|
superversions_to_free.clear();
|
|
}
|
|
|
|
~SuperVersionContext() {
|
|
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
|
|
assert(write_stall_notifications.empty());
|
|
#endif
|
|
assert(superversions_to_free.empty());
|
|
}
|
|
};
|
|
|
|
struct JobContext {
|
|
inline bool HaveSomethingToDelete() const {
|
|
return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
|
|
blob_delete_files.empty() && log_delete_files.empty() &&
|
|
manifest_delete_files.empty());
|
|
}
|
|
|
|
inline bool HaveSomethingToClean() const {
|
|
bool sv_have_sth = false;
|
|
for (const auto& sv_ctx : superversion_contexts) {
|
|
if (sv_ctx.HaveSomethingToDelete()) {
|
|
sv_have_sth = true;
|
|
break;
|
|
}
|
|
}
|
|
return memtables_to_free.size() > 0 || wals_to_free.size() > 0 ||
|
|
job_snapshot != nullptr || sv_have_sth;
|
|
}
|
|
|
|
SequenceNumber GetJobSnapshotSequence() const {
|
|
if (job_snapshot) {
|
|
assert(job_snapshot->snapshot());
|
|
return job_snapshot->snapshot()->GetSequenceNumber();
|
|
}
|
|
return kMaxSequenceNumber;
|
|
}
|
|
|
|
SequenceNumber GetLatestSnapshotSequence() const {
|
|
assert(snapshot_context_initialized);
|
|
if (snapshot_seqs.empty()) {
|
|
return 0;
|
|
}
|
|
return snapshot_seqs.back();
|
|
}
|
|
|
|
SequenceNumber GetEarliestSnapshotSequence() const {
|
|
assert(snapshot_context_initialized);
|
|
if (snapshot_seqs.empty()) {
|
|
return kMaxSequenceNumber;
|
|
}
|
|
return snapshot_seqs.front();
|
|
}
|
|
|
|
void InitSnapshotContext(SnapshotChecker* checker,
|
|
std::unique_ptr<ManagedSnapshot> managed_snapshot,
|
|
SequenceNumber earliest_write_conflict,
|
|
std::vector<SequenceNumber>&& snapshots) {
|
|
if (snapshot_context_initialized) {
|
|
return;
|
|
}
|
|
snapshot_context_initialized = true;
|
|
snapshot_checker = checker;
|
|
assert(!job_snapshot);
|
|
job_snapshot = std::move(managed_snapshot);
|
|
earliest_write_conflict_snapshot = earliest_write_conflict;
|
|
snapshot_seqs = std::move(snapshots);
|
|
}
|
|
|
|
// Structure to store information for candidate files to delete.
|
|
struct CandidateFileInfo {
|
|
std::string file_name;
|
|
std::string file_path;
|
|
CandidateFileInfo(std::string name, std::string path)
|
|
: file_name(std::move(name)), file_path(std::move(path)) {}
|
|
bool operator==(const CandidateFileInfo& other) const {
|
|
return file_name == other.file_name && file_path == other.file_path;
|
|
}
|
|
};
|
|
|
|
// a list of all files that we'll consider deleting
|
|
// (every once in a while this is filled up with all files
|
|
// in the DB directory)
|
|
// (filled only if we're doing full scan)
|
|
std::vector<CandidateFileInfo> full_scan_candidate_files;
|
|
|
|
// the list of all live sst files that cannot be deleted
|
|
std::vector<uint64_t> sst_live;
|
|
|
|
// the list of sst files that we need to delete
|
|
std::vector<ObsoleteFileInfo> sst_delete_files;
|
|
|
|
// the list of all live blob files that cannot be deleted
|
|
std::vector<uint64_t> blob_live;
|
|
|
|
// the list of blob files that we need to delete
|
|
std::vector<ObsoleteBlobFileInfo> blob_delete_files;
|
|
|
|
// a list of log files that we need to delete
|
|
std::vector<uint64_t> log_delete_files;
|
|
|
|
// a list of log files that we need to preserve during full purge since they
|
|
// will be reused later
|
|
std::vector<uint64_t> log_recycle_files;
|
|
|
|
// Files quarantined from deletion. This list contains file numbers for files
|
|
// that are in an ambiguous states. This includes newly generated SST files
|
|
// and blob files from flush and compaction job whose VersionEdits' persist
|
|
// state in Manifest are unclear. An old manifest file whose immediately
|
|
// following new manifest file's CURRENT file creation is in an unclear state.
|
|
// WAL logs don't have this premature deletion risk since
|
|
// min_log_number_to_keep is only updated after successful manifest commits.
|
|
// So this data structure doesn't track log files.
|
|
autovector<uint64_t> files_to_quarantine;
|
|
|
|
// a list of manifest files that we need to delete
|
|
std::vector<std::string> manifest_delete_files;
|
|
|
|
// a list of memtables to be free
|
|
autovector<ReadOnlyMemTable*> memtables_to_free;
|
|
|
|
// contexts for installing superversions for multiple column families
|
|
std::vector<SuperVersionContext> superversion_contexts;
|
|
|
|
autovector<log::Writer*> wals_to_free;
|
|
|
|
// the current manifest_file_number, log_number and prev_log_number
|
|
// that corresponds to the set of files in 'live'.
|
|
uint64_t manifest_file_number = 0;
|
|
uint64_t pending_manifest_file_number = 0;
|
|
|
|
// Used for remote compaction. To prevent OPTIONS files from getting
|
|
// purged by PurgeObsoleteFiles() of the primary host
|
|
uint64_t min_options_file_number;
|
|
uint64_t log_number = 0;
|
|
uint64_t prev_log_number = 0;
|
|
|
|
uint64_t min_pending_output = 0;
|
|
uint64_t prev_wals_total_size = 0;
|
|
size_t num_alive_wal_files = 0;
|
|
uint64_t size_log_to_delete = 0;
|
|
|
|
// Snapshot taken before flush/compaction job.
|
|
std::unique_ptr<ManagedSnapshot> job_snapshot;
|
|
SnapshotChecker* snapshot_checker = nullptr;
|
|
std::vector<SequenceNumber> snapshot_seqs;
|
|
// This is the earliest snapshot that could be used for write-conflict
|
|
// checking by a transaction. For any user-key newer than this snapshot, we
|
|
// should make sure not to remove evidence that a write occurred.
|
|
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber;
|
|
|
|
// Unique job id
|
|
int job_id;
|
|
|
|
bool snapshot_context_initialized = false;
|
|
|
|
explicit JobContext(int _job_id, bool create_superversion = false) {
|
|
job_id = _job_id;
|
|
superversion_contexts.emplace_back(
|
|
SuperVersionContext(create_superversion));
|
|
}
|
|
|
|
// Delete the default constructor
|
|
JobContext() = delete;
|
|
|
|
// For non-empty JobContext Clean() has to be called at least once before
|
|
// before destruction (see asserts in ~JobContext()). Should be called with
|
|
// unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
|
|
// doing potentially slow Clean() with locked DB mutex.
|
|
void Clean() {
|
|
// free superversions
|
|
for (auto& sv_context : superversion_contexts) {
|
|
sv_context.Clean();
|
|
}
|
|
// free pending memtables
|
|
for (auto m : memtables_to_free) {
|
|
delete m;
|
|
}
|
|
for (auto l : wals_to_free) {
|
|
delete l;
|
|
}
|
|
|
|
memtables_to_free.clear();
|
|
wals_to_free.clear();
|
|
job_snapshot.reset();
|
|
}
|
|
|
|
~JobContext() {
|
|
assert(memtables_to_free.size() == 0);
|
|
assert(wals_to_free.size() == 0);
|
|
}
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|