rocksdb/utilities/transactions/lock/point/point_lock_manager_test.cc
Xingbo Wang f46242cef6 Fix uninitialized value complaint in valgrind (#13934)
Summary:
Fix uninitialized value complaint in valgrind due to gtest print padded struct.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13934

Test Plan: CI. Verified that valgrind no longer complains about it.

Reviewed By: pdillinger

Differential Revision: D82124983

Pulled By: xingbowang

fbshipit-source-id: 99eb7bab99726c45affe0a231777e5951844d73b
2025-09-10 10:42:07 -07:00

1435 lines
46 KiB
C++

// Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/transactions/lock/point/point_lock_manager_test.h"
#include "utilities/transactions/lock/point/any_lock_manager_test.h"
namespace ROCKSDB_NAMESPACE {
struct SpotLockManagerTestParam {
bool use_per_key_point_lock_manager;
int deadlock_timeout_us;
};
// Define operator<< for SpotLockManagerTestParam to stop valgrind from
// complaining uinitialized value when printing SpotLockManagerTestParam.
std::ostream& operator<<(std::ostream& os,
const SpotLockManagerTestParam& param) {
os << "use_per_key_point_lock_manager: "
<< param.use_per_key_point_lock_manager
<< ", deadlock_timeout_us: " << param.deadlock_timeout_us;
return os;
}
// including test for both PointLockManager and PerKeyPointLockManager
class SpotLockManagerTest
: public PointLockManagerTest,
public testing::WithParamInterface<SpotLockManagerTestParam> {
public:
void SetUp() override {
init();
// If a custom setup function was provided, use it. Otherwise, use what we
// have inherited.
auto param = GetParam();
if (param.use_per_key_point_lock_manager) {
locker_.reset(new PerKeyPointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
} else {
locker_.reset(new PointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
}
deadlock_timeout_us = param.deadlock_timeout_us;
}
};
// This test is not applicable for Range Lock manager as Range Lock Manager
// operates on Column Families, not their ids.
TEST_P(SpotLockManagerTest, LockNonExistingColumnFamily) {
MockColumnFamilyHandle cf(1024);
locker_->RemoveColumnFamily(&cf);
auto txn = NewTxn();
auto s = locker_->TryLock(txn, 1024, "k", env_, true);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STREQ(s.getState(), "Column family id not found: 1024");
delete txn;
}
TEST_P(SpotLockManagerTest, LockStatus) {
MockColumnFamilyHandle cf1(1024), cf2(2048);
locker_->AddColumnFamily(&cf1);
locker_->AddColumnFamily(&cf2);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true));
ASSERT_OK(locker_->TryLock(txn1, 2048, "k1", env_, true));
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false));
auto s = locker_->GetPointLockStatus();
ASSERT_EQ(s.size(), 4u);
for (uint32_t cf_id : {1024, 2048}) {
ASSERT_EQ(s.count(cf_id), 2u);
auto range = s.equal_range(cf_id);
for (auto it = range.first; it != range.second; it++) {
ASSERT_TRUE(it->second.key == "k1" || it->second.key == "k2");
if (it->second.key == "k1") {
ASSERT_EQ(it->second.exclusive, true);
ASSERT_EQ(it->second.ids.size(), 1u);
ASSERT_EQ(it->second.ids[0], txn1->GetID());
} else if (it->second.key == "k2") {
ASSERT_EQ(it->second.exclusive, false);
ASSERT_EQ(it->second.ids.size(), 1u);
ASSERT_EQ(it->second.ids[0], txn2->GetID());
}
}
}
// Cleanup
locker_->UnLock(txn1, 1024, "k1", env_);
locker_->UnLock(txn1, 2048, "k1", env_);
locker_->UnLock(txn2, 1024, "k2", env_);
locker_->UnLock(txn2, 2048, "k2", env_);
delete txn1;
delete txn2;
}
TEST_P(SpotLockManagerTest, UnlockExclusive) {
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true));
locker_->UnLock(txn1, 1, "k", env_);
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
// Cleanup
locker_->UnLock(txn2, 1, "k", env_);
delete txn1;
delete txn2;
}
TEST_P(SpotLockManagerTest, UnlockShared) {
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
locker_->UnLock(txn1, 1, "k", env_);
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
// Cleanup
locker_->UnLock(txn2, 1, "k", env_);
delete txn1;
delete txn2;
}
// This test doesn't work with Range Lock Manager, because Range Lock Manager
// doesn't support deadlock_detect_depth.
TEST_P(SpotLockManagerTest, DeadlockDepthExceeded) {
// Tests that when detecting deadlock, if the detection depth is exceeded,
// it's also viewed as deadlock.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.deadlock_detect_depth = 1;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
auto txn3 = NewTxn(txn_opt);
auto txn4 = NewTxn(txn_opt);
// "a ->(k) b" means transaction a is waiting for transaction b to release
// the held lock on key k.
// txn4 ->(k3) -> txn3 ->(k2) txn2 ->(k1) txn1
// txn3's deadlock detection will exceed the detection depth 1,
// which will be viewed as a deadlock.
// NOTE:
// txn4 ->(k3) -> txn3 must be set up before
// txn3 ->(k2) -> txn2, because to trigger deadlock detection for txn3,
// it must have another txn waiting on it, which is txn4 in this case.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
port::Thread t1;
BlockUntilWaitingTxn(wait_sync_point_name_, t1, [&]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
// block because txn1 is holding a lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
});
ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true));
port::Thread t2;
BlockUntilWaitingTxn(wait_sync_point_name_, t2, [&]() {
// block because txn3 is holding a lock on k1.
ASSERT_OK(locker_->TryLock(txn4, 1, "k3", env_, true));
});
auto s = locker_->TryLock(txn3, 1, "k2", env_, true);
ASSERT_TRUE(s.IsBusy());
ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
ASSERT_EQ(deadlock_paths.size(), 1u);
ASSERT_TRUE(deadlock_paths[0].limit_exceeded);
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn3, 1, "k3", env_);
t1.join();
t2.join();
locker_->UnLock(txn2, 1, "k2", env_);
locker_->UnLock(txn2, 1, "k1", env_);
locker_->UnLock(txn4, 1, "k3", env_);
delete txn4;
delete txn3;
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, PrioritizedLockUpgradeWithExclusiveLock) {
// Tests that a lock upgrade request is prioritized over other lock requests.
// txn1 acquires shared lock on k1.
// txn2 acquires exclusive lock on k1.
// txn1 acquires exclusive locks k1 successfully
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
// txn2 tries to lock k1 exclusively, will be blocked.
port::Thread t;
BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn2]() {
// block because txn1 is holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
});
// verify lock upgrade successfully
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
// unlock txn1, so txn2 could proceed
locker_->UnLock(txn1, 1, "k1", env_);
// Cleanup
t.join();
// Cleanup
locker_->UnLock(txn2, 1, "k1", env_);
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest,
PrioritizedLockUpgradeWithExclusiveLockAndSharedLock) {
// Tests that lock upgrade is prioritized when mixed with shared and exclusive
// locks requests
// txn1 acquires shared lock on k1.
// txn2 acquires shared lock on k1.
// txn3 acquires exclusive lock on k1.
// txn1 acquires exclusive locks k1 <- request granted after txn2 release the
// lock
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
auto txn3 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
// txn3 tries to lock k1 exclusively, will be blocked.
port::Thread txn3_thread;
BlockUntilWaitingTxn(wait_sync_point_name_, txn3_thread, [this, &txn3]() {
// block because txn1 and txn2 are holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
});
// Verify txn3 is blocked
ASSERT_TRUE(txn3_thread.joinable());
// txn1 tries to lock k1 exclusively, will be blocked.
port::Thread txn1_thread;
BlockUntilWaitingTxn(wait_sync_point_name_, txn1_thread, [this, &txn1]() {
// block because txn1 and txn2 are holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
});
// Verify txn1 is blocked
ASSERT_TRUE(txn1_thread.joinable());
// Unlock txn2, so txn1 could proceed
locker_->UnLock(txn2, 1, "k1", env_);
txn1_thread.join();
// Unlock txn1, so txn3 could proceed
locker_->UnLock(txn1, 1, "k1", env_);
txn3_thread.join();
// Cleanup
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, Deadlock_MultipleUpgrade) {
// Tests that deadlock can be detected for shared locks and exclusive locks
// mixed Deadlock scenario:
// txn1 acquires shared lock on k1.
// txn2 acquires shared lock on k1.
// txn1 acquires exclusive locks k1
// txn2 acquires exclusive locks k1 <- dead lock detected
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
// txn1 tries to lock k1 exclusively, will be blocked.
port::Thread t;
BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn1]() {
// block because txn2 is holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
});
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsBusy());
ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
ASSERT_EQ(deadlock_paths.size(), 1u);
ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
ASSERT_EQ(deadlocks.size(), 2u);
ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[0].m_exclusive);
ASSERT_EQ(deadlocks[0].m_waiting_key, "k1");
ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[1].m_exclusive);
ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
locker_->UnLock(txn2, 1, "k1", env_);
t.join();
// Cleanup
locker_->UnLock(txn1, 1, "k1", env_);
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, Deadlock_MultipleUpgradeInterleaveExclusive) {
// Tests that deadlock can be detected for shared locks and exclusive locks
// mixed Deadlock scenario:
// txn1 acquires shared lock on k1.
// txn2 acquires shared lock on k1.
// txn3 acquires exclusive lock on k1.
// txn1 acquires exclusive locks k1 <- request granted after txn2 release the
// lock.
// txn2 acquires exclusive locks k1 <- dead lock detected
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
auto txn3 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
// txn3 tries to lock k1 exclusively, will be blocked.
port::Thread txn3_thread;
BlockUntilWaitingTxn(wait_sync_point_name_, txn3_thread, [this, &txn3]() {
// block because txn1 and txn2 are holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
});
// Verify txn3 is blocked
ASSERT_TRUE(txn3_thread.joinable());
// txn1 tries to lock k1 exclusively, will be blocked.
port::Thread txn1_thread;
BlockUntilWaitingTxn(wait_sync_point_name_, txn1_thread, [this, &txn1]() {
// block because txn1 and txn2 are holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
});
// Verify txn1 is blocked
ASSERT_TRUE(txn1_thread.joinable());
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsBusy());
ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
ASSERT_EQ(deadlock_paths.size(), 1u);
ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
ASSERT_EQ(deadlocks.size(), 2u);
ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[0].m_exclusive);
ASSERT_EQ(deadlocks[0].m_waiting_key, "k1");
ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[1].m_exclusive);
ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
// Unlock txn2, so txn1 could proceed
locker_->UnLock(txn2, 1, "k1", env_);
txn1_thread.join();
// Unlock txn1, so txn3 could proceed
locker_->UnLock(txn1, 1, "k1", env_);
txn3_thread.join();
// Cleanup
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
class PerKeyPointLockManagerTest : public PointLockManagerTest {
public:
void SetUp() override {
init();
cf_ = std::make_unique<MockColumnFamilyHandle>(1);
txn_opt_.deadlock_detect = true;
// by default use long timeout and disable expiration
txn_opt_.lock_timeout = kLongTxnTimeoutMs;
txn_opt_.expiration = -1;
// CAUTION: This test creates a separate lock manager object (right, NOT
// the one that the TransactionDB is using!), and runs tests on it.
locker_.reset(new PerKeyPointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txndb_opt_));
locker_->AddColumnFamily(cf_.get());
}
TransactionOptions txn_opt_;
std::unique_ptr<MockColumnFamilyHandle> cf_;
};
TEST_F(PerKeyPointLockManagerTest, LockEfficiency) {
// Create multiple transactions, each acquire exclusive lock on the same key
std::vector<PessimisticTransaction*> txns;
std::vector<port::Thread> blockingThreads;
// Count the total number of wait sync point calls
std::atomic_int wait_sync_point_times = 0;
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_,
[&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr auto num_of_txn = 10;
// create 10 transactions, each of them try to acquire exclusive lock on the
// same key
for (int i = 0; i < num_of_txn; i++) {
auto txn = NewTxn(txn_opt_);
txns.push_back(txn);
if (i == 0) {
// txn0 acquires the lock, so the rest of the transactions could block
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, true));
} else {
blockingThreads.emplace_back([this, txn]() {
// block because first txn is holding an exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, true));
});
}
// wait for transaction i to be blocked
while (wait_sync_point_times.load() < i) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
// unlock the key, so next transaction could take the lock.
locker_->UnLock(txns[0], 1, "k1", env_);
auto num_of_blocking_thread = num_of_txn - 1;
for (int i = 0; i < num_of_blocking_thread; i++) {
// validate the thread is finished
blockingThreads[i].join();
auto num_of_threads_completed = i + 1;
for (int j = 0; j < num_of_blocking_thread; j++) {
if (j < num_of_threads_completed) {
// validate the thread is no longer joinable
ASSERT_FALSE(blockingThreads[j].joinable());
} else {
// validate the rest of the threads are still joinable
ASSERT_TRUE(blockingThreads[j].joinable());
}
}
// unlock the key, so next transaction could take the lock.
locker_->UnLock(txns[i + 1], 1, "k1", env_);
}
ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (int i = 0; i < num_of_txn; i++) {
delete txns[num_of_txn - i - 1];
}
}
TEST_F(PerKeyPointLockManagerTest, LockFairness) {
// Create multiple transactions requesting locks on the same key, validate
// that they are executed in FIFO order
// txn0 acquires exclusive lock on k1.
// txn1 acquires shared lock on k1.
// txn2 acquires shared lock on k1.
// txn3 acquires exclusive lock on k1.
// txn4 acquires shared lock on k1.
// txn5 acquires exclusive lock on k1.
// txn6 acquires exclusive lock on k1.
// txn7 acquires shared lock on k1.
// txn8 acquires shared lock on k1.
// txn9 acquires exclusive lock on k1.
std::vector<PessimisticTransaction*> txns;
std::vector<port::Thread> blockingThreads;
// Count the total number of wait sync point calls
std::atomic_int wait_sync_point_times = 0;
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_,
[&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr auto num_of_txn = 10;
std::vector<bool> txn_lock_types = {true, false, false, true, false,
true, true, false, false, true};
// create 10 transactions, each of them try to acquire exclusive lock on the
// same key
for (int i = 0; i < num_of_txn; i++) {
auto txn = NewTxn(txn_opt_);
txns.push_back(txn);
if (i == 0) {
// txn0 acquires the lock, so the rest of the transactions would block
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, txn_lock_types[0]));
} else {
blockingThreads.emplace_back([this, txn, type = txn_lock_types[i]]() {
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, type));
});
}
// wait for transaction i to be blocked
while (wait_sync_point_times.load() < i) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
auto num_of_blocking_thread = num_of_txn - 1;
auto thread_idx = 0;
auto txn_idx = 0;
auto unlockTxn = [&]() {
// unlock the key in transaction.
locker_->UnLock(txns[txn_idx++], 1, "k1", env_);
};
auto validateLockTakenByNextTxn = [&]() {
// validate the thread is finished
blockingThreads[thread_idx++].join();
};
auto stillWaitingForLock = [&]() {
// validate the thread is no longer joinable
ASSERT_TRUE(blockingThreads[thread_idx].joinable());
};
// unlock the key, so next group of transactions could take the lock.
unlockTxn();
// txn1 acquires shared lock on k1.
// txn2 acquires shared lock on k1.
validateLockTakenByNextTxn();
validateLockTakenByNextTxn();
// txn3 acquires exclusive lock on k1.
stillWaitingForLock();
unlockTxn();
unlockTxn();
validateLockTakenByNextTxn();
// txn4 acquires shared lock on k1.
stillWaitingForLock();
unlockTxn();
validateLockTakenByNextTxn();
// txn5 acquires exclusive lock on k1.
stillWaitingForLock();
unlockTxn();
validateLockTakenByNextTxn();
// txn6 acquires exclusive lock on k1.
stillWaitingForLock();
unlockTxn();
validateLockTakenByNextTxn();
// txn7 acquires shared lock on k1.
// txn8 acquires shared lock on k1.
stillWaitingForLock();
unlockTxn();
validateLockTakenByNextTxn();
validateLockTakenByNextTxn();
// txn9 acquires exclusive lock on k1.
stillWaitingForLock();
unlockTxn();
unlockTxn();
validateLockTakenByNextTxn();
// clean up
unlockTxn();
ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (int i = 0; i < num_of_txn; i++) {
delete txns[num_of_txn - i - 1];
}
}
TEST_F(PerKeyPointLockManagerTest, FIFO) {
// validate S, X, S lock order would be executed in FIFO order
// txn1 acquires shared lock on k1.
// txn2 acquires exclusive lock on k1.
// txn3 acquires shared lock on k1.
std::vector<PessimisticTransaction*> txns;
std::vector<port::Thread> blockingThreads;
// Count the total number of wait sync point calls
std::atomic_int wait_sync_point_times = 0;
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_,
[&wait_sync_point_times](void* /*arg*/) { wait_sync_point_times++; });
SyncPoint::GetInstance()->EnableProcessing();
constexpr auto num_of_txn = 3;
std::vector<bool> txn_lock_types = {false, true, false};
// create 3 transactions, each of them try to acquire exclusive lock on the
// same key
for (int i = 0; i < num_of_txn; i++) {
auto txn = NewTxn(txn_opt_);
txns.push_back(txn);
if (i == 0) {
// txn0 acquires the lock, so the rest of the transactions would block
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, txn_lock_types[0]));
} else {
blockingThreads.emplace_back([this, txn, type = txn_lock_types[i]]() {
ASSERT_OK(locker_->TryLock(txn, 1, "k1", env_, type));
});
}
// wait for transaction i to be blocked
while (wait_sync_point_times.load() < i) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
auto num_of_blocking_thread = num_of_txn - 1;
auto thread_idx = 0;
auto txn_idx = 0;
auto unlockTxn = [&]() {
// unlock the key in transaction.
locker_->UnLock(txns[txn_idx++], 1, "k1", env_);
};
auto validateLockTakenByNextTxn = [&]() {
// validate the thread is finished
blockingThreads[thread_idx++].join();
};
auto stillWaitingForLock = [&]() {
// validate the thread is no longer joinable
ASSERT_TRUE(blockingThreads[thread_idx].joinable());
};
// unlock the key, so next group of transactions could take the lock.
stillWaitingForLock();
unlockTxn();
// txn1 acquires exclusive lock on k1.
validateLockTakenByNextTxn();
// txn2 acquires shared lock on k1.
stillWaitingForLock();
unlockTxn();
validateLockTakenByNextTxn();
// clean up
unlockTxn();
ASSERT_EQ(wait_sync_point_times.load(), num_of_blocking_thread);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
for (int i = 0; i < num_of_txn; i++) {
delete txns[num_of_txn - i - 1];
}
}
TEST_P(SpotLockManagerTest, LockDownGradeWithOtherLockRequests) {
// Test lock down grade always succeeds, even if there are other lock requests
// waiting for the same lock.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
for (bool exclusive : {true, false}) {
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
port::Thread t;
BlockUntilWaitingTxn(wait_sync_point_name_, t, [this, &txn2, exclusive]() {
// block because txn1 is holding a exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, exclusive));
});
// txn1 downgrades the lock to shared lock, so txn2 could proceed
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
locker_->UnLock(txn1, 1, "k1", env_);
t.join();
locker_->UnLock(txn2, 1, "k1", env_);
}
// clean up
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, LockTimeout) {
// Test lock timeout
// txn1 acquires an exclusive lock on k1 successfully.
// txn2 try to acquire a lock on k1, but timedout.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kShortTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
for (bool exclusive : {true, false}) {
auto ret = locker_->TryLock(txn2, 1, "k1", env_, exclusive);
ASSERT_TRUE(ret.IsTimedOut());
}
// clean up
locker_->UnLock(txn1, 1, "k1", env_);
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, ExpiredLockStolenAfterTimeout) {
// validate an expired lock can be stolen by another transaction that timed
// out on the lock.
// txn1 acquires an exclusive lock on k1 successfully with a short expiration
// time.
// txn2 try to acquire a shared lock on k1 with timeout that is slightly
// longer than the txn1 expiration.
// Validate txn2 will take the lock.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.expiration = 1000;
txn_opt.lock_timeout = 1000 * 2;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
port::Thread t1;
BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
// block because txn1 is holding an exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
});
t1.join();
// clean up
locker_->UnLock(txn2, 1, "k1", env_);
locker_->UnLock(txn1, 1, "k1", env_);
delete txn2;
delete txn1;
}
// Try to block until transaction enters waiting state.
// However due to timing, it could fail, so return true if succeeded, false
// otherwise.
bool TryBlockUntilWaitingTxn(const char* sync_point_name, port::Thread& t,
std::function<void()> function) {
std::atomic<bool> reached(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
sync_point_name, [&](void* /*arg*/) { reached.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// As the lifetime of the complete variable could go beyond the scope of this
// function, so we wrap it in a shared_ptr, and copy it into the lambda
std::shared_ptr<std::atomic<bool>> complete =
std::make_shared<std::atomic<bool>>(false);
t = port::Thread([complete, &function]() {
function();
complete->store(true);
});
auto ret = false;
while (true) {
if (complete->load()) {
// function completed, before sync point was reached, return false
t.join();
ret = false;
break;
}
if (reached.load()) {
// sync point was reached before function completed, return true
ret = true;
break;
}
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
return ret;
}
TEST_F(PerKeyPointLockManagerTest, LockStealAfterExpirationExclusive) {
// There are multiple transactions waiting for the same lock.
// txn1 acquires an exclusive lock on k1 successfully with a short expiration
// time.
// txn2 try to acquire an exclusive lock on k1, before expiration time,
// so it is blocked and waits for txn1 lock expired.
// txn3 try to acquire an exclusive lock on k1 after txn1 lock expires, FIFO
// order is respected.
// txn2 is woken up and takes the lock. unlock txn2, txn3 should proceed.
txn_opt_.expiration = 1000;
auto txn1 = NewTxn(txn_opt_);
txn_opt_.expiration = -1;
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
port::Thread t1;
auto retry_times = 10;
// Use a loop to reduce test flakiness.
// that the test is flaky because the txn2 thread start could be delayed until
// txn1 lock expired. In that case, txn2 will not enter into wait state, which
// will defeat the test purpose. Use a loop to retry a few times, until it is
// able to enter into wait state.
while (retry_times--) {
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
if (TryBlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
// block because txn1 is holding a shared lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
})) {
break;
}
// failed, retry again
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn2, 1, "k1", env_);
}
// make sure txn2 is able to reach the wait state before proceed
ASSERT_GT(retry_times, 0);
// txn3 try to acquire an exclusive lock on k1, FIFO order is respected.
port::Thread t2;
BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
// block because txn1 is holding an exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
});
// validate txn2 is woken up and takes the lock
t1.join();
// unlock txn2, txn3 should proceed
locker_->UnLock(txn2, 1, "k1", env_);
t2.join();
// clean up
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, LockStealAfterExpirationShared) {
// There are multiple transactions waiting for the same lock.
// txn1 acquires a shared lock on k1 successfully with a short expiration
// time.
// txn2 try to acquire an exclusive lock on k1, before expiration time,
// so it is blocked and waits for txn1 lock expired.
// txn3 try to acquire a shared lock on k1 after txn1 lock expires, FIFO
// order is respected.
// txn2 is woken up and takes the lock. unlock txn2, txn3 should proceed.
txn_opt_.expiration = 1000;
auto txn1 = NewTxn(txn_opt_);
txn_opt_.expiration = -1;
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
port::Thread t1;
auto retry_times = 10;
// Use a loop to reduce test flakiness.
// that the test is flaky because the txn2 thread start could be delayed until
// txn1 lock expired. In that case, txn2 will not enter into wait state, which
// will defeat the test purpose. Use a loop to retry a few times, until it is
// able to enter into wait state.
while (retry_times--) {
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
if (TryBlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
// block because txn1 is holding an exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
})) {
break;
}
// failed, retry again
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn2, 1, "k1", env_);
}
// make sure txn2 is able to reach the wait state before proceed
ASSERT_GT(retry_times, 0);
// txn3 try to acquire an exclusive lock on k1, FIFO order is respected.
port::Thread t2;
BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
// block because txn1 is holding an exclusive lock on k1.
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
});
// validate txn2 is woken up and takes the lock
t1.join();
// unlock txn2, txn3 should proceed
locker_->UnLock(txn2, 1, "k1", env_);
t2.join();
// clean up
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, DeadLockOnWaiter) {
// Txn1 acquires exclusive lock on k1
// Txn3 acquires shared lock on k2
// Txn2 tries to acquire exclusive lock on k1, waiting in the waiter queue.
// Txn3 tries to acquire exclusive lock on k1, waiting in the waiter queue.
// Txn3 depends on both Txn1 and Txn2. Txn1 unlocks k1.
// Txn2 takes the lock k1, and tries to acquire lock k2.
// Now Txn2 depends on Txn3.
// Deadlock is detected, and Txn2 is aborted.
auto txn1 = NewTxn(txn_opt_);
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
ASSERT_OK(locker_->TryLock(txn3, 1, "k2", env_, false));
port::Thread t1;
BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
auto s = locker_->TryLock(txn2, 1, "k2", env_, true);
ASSERT_TRUE(s.IsDeadlock());
});
port::Thread t2;
BlockUntilWaitingTxn(wait_sync_point_name_, t2, [this, &txn3]() {
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
});
locker_->UnLock(txn1, 1, "k1", env_);
t1.join();
locker_->UnLock(txn2, 1, "k1", env_);
t2.join();
// clean up
locker_->UnLock(txn3, 1, "k1", env_);
locker_->UnLock(txn3, 1, "k2", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, SharedLockRaceCondition) {
// Verify a shared lock race condition is handled properly.
// When there are waiters in the queue, and all of them are shared waiters,
// and no one has taken the lock and all of them just got woken up and not
// yet taken the lock yet. A new shared lock request should be granted
// directly, without wait in the queue. If it did, It would not be woken up
// until the last shared lock is released.
// Disable deadlock detection timeout to prevent test flakyness.
deadlock_timeout_us = 0;
auto txn1 = NewTxn(txn_opt_);
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
"PerKeyPointLockManagerTest::SharedLockRaceCondition:"
"BeforeNewSharedLockRequest"},
{"PerKeyPointLockManagerTest::SharedLockRaceCondition:"
"AfterNewSharedLockRequest",
"PerKeyPointLockManager::AcquireWithTimeout:BeforeTakeLock"}});
std::atomic<bool> reached(false);
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_,
[&reached](void* /*arg*/) { reached.store(true); });
SyncPoint::GetInstance()->EnableProcessing();
// txn1 acquires an exclusive lock on k1, so that the following shared lock
// request would be blocked
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
// txn2 try to acquire a shared lock on k1, and get blocked
auto t1 = port::Thread([this, &txn2]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
});
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// unlock txn1, txn2 should be woken up, but txn2 stops on the sync point
locker_->UnLock(txn1, 1, "k1", env_);
// Use sync point to simulate the race condition.
// txn3 tries to take the lock right after txn2 is woken up, but before it
// takes the lock
TEST_SYNC_POINT(
"PerKeyPointLockManagerTest::SharedLockRaceCondition:"
"BeforeNewSharedLockRequest");
// txn3 try to acquire a shared lock on k1, and get granted immediately
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
TEST_SYNC_POINT(
"PerKeyPointLockManagerTest::SharedLockRaceCondition:"
"AfterNewSharedLockRequest");
// validate txn2 is woken up and takes the lock
t1.join();
// cleanup
locker_->UnLock(txn2, 1, "k1", env_);
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, UpgradeLockRaceCondition) {
// Verify an upgrade lock race condition is handled properly.
// When a key is locked in exlusive mode, shared lock waiters will be enqueued
// as waiters.
// When the exclusive lock holder release the lock. The shared lock waiters
// are woken up to take the lock. At this point, when a new shared lock
// requester comes in, it will take the lock directly without waiting or
// queueing. This requester then immediately upgrade the lock to exclusive
// lock. This request will be prioritized to the head of the queue.
// Meantime, it should also depend on the shared lock waiters which are still
// in the queue that are ready to take the lock. Later, when one of the reader
// lock want to also upgrade its lock, it will detect a dead lock and abort.
auto txn1 = NewTxn(txn_opt_);
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
"PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
"BeforeNewSharedLockRequest"},
{"PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
"AfterNewSharedLockRequest",
"PerKeyPointLockManager::AcquireWithTimeout:BeforeTakeLock"}});
std::atomic<bool> reached(false);
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_,
[&reached](void* /*arg*/) { reached.store(true); });
SyncPoint::GetInstance()->EnableProcessing();
// txn1 acquires an exclusive lock on k1, so that the following shared lock
// request would be blocked
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
auto t1 = port::Thread([this, &txn2]() {
// txn2 try to acquire a shared lock on k1, and get blocked
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
});
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// unlock txn1, txn2 should be woken up, but txn2 stops on the sync point
locker_->UnLock(txn1, 1, "k1", env_);
// Use sync point to simulate the race condition.
// txn3 tries to take the lock right after txn2 is woken up, but before it
// takes the lock
TEST_SYNC_POINT(
"PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
"BeforeNewSharedLockRequest");
// txn3 try to acquire a shared lock on k1, and get granted immediately
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
// txn3 try to upgrade its lock to exclusive lock and get blocked.
reached = false;
auto t2 = port::Thread([this, &txn3]() {
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, true));
});
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
TEST_SYNC_POINT(
"PerKeyPointLockManagerTest::UpgradeLockRaceCondition:"
"AfterNewSharedLockRequest");
// validate txn2 is woken up and takes the shared lock
t1.join();
// validate txn2 would get deadlock when it try to upgrade its lock to
// exclusive
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsDeadlock());
// cleanup
locker_->UnLock(txn2, 1, "k1", env_);
t2.join();
locker_->UnLock(txn3, 1, "k1", env_);
delete txn3;
delete txn2;
delete txn1;
}
TEST_P(SpotLockManagerTest, Catch22) {
// Benchmark the overhead of one transaction depends on another in a circle
// repeatedly
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = kLongTxnTimeoutMs;
txn_opt.expiration = kLongTxnTimeoutMs;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
// use a wait count to count the number of times the lock is waited inside
// transaction lock
std::atomic_int wait_count(0);
SyncPoint::GetInstance()->DisableProcessing();
if (GetParam().use_per_key_point_lock_manager &&
GetParam().deadlock_timeout_us != 0) {
// Use special sync point when deadlock timeout is enabled, so the test run
// faster
SyncPoint::GetInstance()->SetCallBack(
"PerKeyPointLockManager::AcquireWithTimeout:"
"WaitingTxnBeforeDeadLockDetection",
[&wait_count](void* /*arg*/) { wait_count++; });
} else {
// PointLockManager
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_, [&wait_count](void* /*arg*/) { wait_count++; });
}
SyncPoint::GetInstance()->EnableProcessing();
// txn1 X lock
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
std::mutex coordinator_mutex;
int iteration_count = 10000;
// txn1 try to lock X lock in a loop
auto t1 = port::Thread(
[this, &txn1, &wait_count, &coordinator_mutex, &iteration_count]() {
while (wait_count.load() < iteration_count) {
// spin wait until the other thread enters the lock waiter queue.
while (wait_count.load() % 2 == 0);
// unlock the lock, so that the other thread can acquire the lock
locker_->UnLock(txn1, 1, "k1", env_);
{
// Use the coordinator mutex to make sure the other thread has been
// waked up and acquired the lock, before this thread try to acquire
// the lock again.
std::scoped_lock<std::mutex> lock(coordinator_mutex);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
}
}
locker_->UnLock(txn1, 1, "k1", env_);
});
// txn2 try to lock X lock in a loop
auto t2 = port::Thread(
[this, &txn2, &wait_count, &coordinator_mutex, &iteration_count]() {
while (wait_count.load() < iteration_count) {
{
// Use the coordinator mutex to make sure the other thread has been
// waked up and acquired the lock, before this thread try to acquire
// the lock again.
std::scoped_lock<std::mutex> lock(coordinator_mutex);
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
}
// spin wait until the other thread enters the lock waiter queue.
while (wait_count.load() % 2 == 1);
// unlock the lock, so that the other thread can acquire the lock
locker_->UnLock(txn2, 1, "k1", env_);
}
});
// clean up
t1.join();
t2.join();
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, LockUpgradeOrdering) {
// When lock is upgraded, verify that it will only upgrade its lock after all
// the shared lock that are before the first exclusive lock in the lock wait
// queue.
auto txn1 = NewTxn(txn_opt_);
auto txn2 = NewTxn(txn_opt_);
auto txn3 = NewTxn(txn_opt_);
auto txn4 = NewTxn(txn_opt_);
std::mutex txn4_mutex;
std::unique_lock<std::mutex> txn4_lock(txn4_mutex);
std::atomic_bool txn4_waked_up(false);
std::atomic_int wait_count(0);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
wait_sync_point_name_, [&wait_count](void* /*arg*/) { wait_count++; });
SyncPoint::GetInstance()->SetCallBack(
"PerKeyPointLockManager::AcquireWithTimeout:AfterWokenUp",
[&txn4, &txn4_mutex, &txn4_waked_up](void* arg) {
auto transaction_id = *(static_cast<TransactionID*>(arg));
if (transaction_id == txn4->GetID()) {
txn4_waked_up.store(true);
{
// wait for txn4 mutex to be released, so that this thread will be
// blocked.
std::scoped_lock<std::mutex> lock(txn4_mutex);
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
// Txn1 X lock
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
// Txn2,3,4 try S lock
port::Thread t1([this, &txn2]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
});
port::Thread t2([this, &txn3]() {
ASSERT_OK(locker_->TryLock(txn3, 1, "k1", env_, false));
});
port::Thread t3([this, &txn4]() {
ASSERT_OK(locker_->TryLock(txn4, 1, "k1", env_, false));
});
// wait for all 3 transactions to enter wait state
while (wait_count.load() < 3) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Txn1 unlock
locker_->UnLock(txn1, 1, "k1", env_);
// Txn2,3 take S lock
t1.join();
t2.join();
// wait for txn4 to be woken up, otherwise txn2 will get deadlock
while (!txn4_waked_up.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Txn2 try X lock
std::atomic_bool txn2_exclusive_lock_acquired(false);
port::Thread t4([this, &txn2, &txn2_exclusive_lock_acquired]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, true));
txn2_exclusive_lock_acquired.store(true);
});
// wait for txn2 to enter wait state
while (wait_count.load() < 4) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Txn3 release S lock
locker_->UnLock(txn3, 1, "k1", env_);
// Validate Txn2 has not acquired the lock yet
ASSERT_FALSE(txn2_exclusive_lock_acquired.load());
// Txn4 take S lock
txn4_lock.unlock();
t3.join();
// Txn4 release S lock Txn2 upgraded to X lock Txn2
locker_->UnLock(txn4, 1, "k1", env_);
t4.join();
ASSERT_TRUE(txn2_exclusive_lock_acquired.load());
// release lock clean up
locker_->UnLock(txn2, 1, "k1", env_);
delete txn4;
delete txn3;
delete txn2;
delete txn1;
}
TEST_F(PerKeyPointLockManagerTest, LockDownGradeRaceCondition) {
// When a lock is downgraded, it should notify all the shared waiters in the
// queue to take the lock.
auto txn1 = NewTxn(txn_opt_);
auto txn2 = NewTxn(txn_opt_);
// Txn1 X lock
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
// Txn2 try S lock
port::Thread t1;
BlockUntilWaitingTxn(wait_sync_point_name_, t1, [this, &txn2]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k1", env_, false));
});
// Txn1 downgrade to S lock
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, false));
// Txn2 take S lock
t1.join();
// clean up
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn2, 1, "k1", env_);
delete txn2;
delete txn1;
}
// Run AnyLockManagerTest with PointLockManager
INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest,
::testing::Values(nullptr));
// Run AnyLockManagerTest with PerKeyPointLockManager
template <int64_t N>
void PerKeyPointLockManagerTestSetup(PointLockManagerTest* self) {
self->init();
self->deadlock_timeout_us = N;
self->UsePerKeyPointLockManager();
}
INSTANTIATE_TEST_CASE_P(
PerLockPointLockManager, AnyLockManagerTest,
::testing::Values(PerKeyPointLockManagerTestSetup<0>,
PerKeyPointLockManagerTestSetup<100>,
PerKeyPointLockManagerTestSetup<1000>));
// Run PointLockManagerTest with PerLockPointLockManager and PointLockManager
INSTANTIATE_TEST_CASE_P(
PointLockCorrectnessCheckTestSuite, SpotLockManagerTest,
::testing::ValuesIn(std::vector<SpotLockManagerTestParam>{
{true, 0}, {true, 100}, {true, 1000}, {false, 0}}));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}