Project 3: Concurrency Control
这个 Project 的难度实际上感觉略高于上一个,但是复杂度略低,调试所花的时间并没有很长。在这个 Project 中,我们需要自己定义所需要的数据结构和相关操作,在已有接口的基础上完成并发控制。因为时间紧张,对于 C++ 的了解也不算深入,所以写出的代码质量比较差,仅仅能跑通基本的测试,可能还有很多 BUG。
TASK 1 – LOCK MANAGER
在第一部分中,我们需要控制事务对数据项的访问。我们设计的 Lock Manager 需要有数据结构保存事务所持有的锁及类型,当有访问请求到来时,Lock Manager 决定将锁授予该事务,阻塞该事务或放弃该事务。
我们的系统中有一个全局的 Lock Manager。当有事务需要访问或修改元组时,TableHeap 类将会用我们的 Lock Manager 来获得对于记录的锁。我们要实现的是两阶段锁 (2PL) 和严格两阶段锁 (S2PL),以及死锁的避免和检测。
两段锁的基本思想就是对事务划分了几个状态,只有在 GROWING 状态下,事务可以获得锁,而在 SHRINKING 状态下,事务只能释放锁。为了保证 serializability, 一个事务先经历 GROWING PHASE,然后经历 SHRINKING PHASE,所以称之为两段锁。而严格两段锁需要额外确保事务获得的所有互斥锁直到提交时才能释放。
1 2 3 4 5 6 7 8 9 10 |
/** * Transaction states: * * _________________________ * | v * GROWING -> SHRINKING -> COMMITTED ABORTED * |__________|________________________^ * **/ |
我们需要完成的函数如下:
- LockShared(Transaction, RID): 尝试获得共享锁,成功授予返回
true
,事务被终止返回false
- LockExclusive(Transaction, RID): 尝试获得互斥锁,成功授予返回
true
,事务被终止返回false
- LockUpgrade(Transaction, RID): 尝试将共享锁升级为互斥锁,成功授予返回
true
,事务被终止返回false
。如果另外一个事务已经在等待升级锁时需要将该事务终止并返回false
- Unlock(Transaction, RID): 解除
Transaction
对RID
的锁
在书中,我们有一节对于 Lock Manager 的实现,大致思路如下
总的来说,我们需要有一个 lock table 存储数据项到请求记录之间的映射。这些请求被串接成了一个链表。当有新的请求到来时,我们查看数据项对应的链表是否为空,如果为空就创建链表,将请求记录加入该链表并将锁授予该事务;否则我们将请求记录加入链表,判断请求是否可以满足,执行授予或者等待操作。在解锁时,除了在链表中删除对应的请求记录,还应该判断其他请求是否可以满足。需要注意的是,当一个事务被终止时,我们需要删除其所有正在等待的请求。
基于以上思路,我设计了如下的数据结构。我使用 unodered_map
存储数据项到请求链表之间的的映射。TransactionList
用于存储这个链表,TransactionItem
为每一个请求,包含事务号,锁类型,是否授予等信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
enum class LockType { SHARED = 0, EXCLUSIVE, UPGRADING }; class TransactionItem { public: TransactionItem(txn_id_t txn_id, LockType lock_type, bool is_granted): txn_id_(txn_id), lock_type_(lock_type), is_granted_(is_granted) {} txn_id_t txn_id_; std::mutex mutex_; std::condition_variable cv; LockType lock_type_; bool is_granted_; void Wait() { std::unique_lock<std::mutex> lock(mutex_); cv.wait(lock, [this]{return this->is_granted_;}); } void Notify() { std::unique_lock<std::mutex> lock(mutex_); cv.notify_one(); } }; class TransactionList { public: std::list<TransactionItem> transaction_list_; std::mutex mutex_; bool AllowGranted(LockType lock_type) { if(transaction_list_.empty()) return true; auto &last = transaction_list_.back(); if (lock_type == LockType::UPGRADING) { if(transaction_list_.size() == 1 && last.lock_type_ == LockType::SHARED) return true; else return false; } else if (last.is_granted_ && last.lock_type_ == LockType::SHARED && lock_type == LockType::SHARED) return true; else return false; } void InsertItem(Transaction *txn, const RID &rid, LockType lock_type, std::unique_lock<std::mutex> &lock) { bool granted = AllowGranted(lock_type); if (lock_type == LockType::UPGRADING && granted) lock_type = LockType::EXCLUSIVE; transaction_list_.emplace_back(txn->GetTransactionId(), lock_type, granted); auto & new_item = transaction_list_.back(); lock.unlock(); if(!granted) { new_item.Wait(); } if(lock_type == LockType::SHARED) txn->GetSharedLockSet()->insert(rid); else txn->GetExclusiveLockSet()->insert(rid); } }; bool strict_2PL_; std::unordered_map<RID, TransactionList> lock_table_; std::mutex mutex_; |
在此基础之上,LockManager的代码如下。在这里我们通过 SetState
隐式地终止事务,还需要 TransactionManager
的 Abort
函数显式地终止事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
bool LockManager::LockShared(Transaction *txn, const RID &rid) { if(txn->GetState() != TransactionState::GROWING) { txn->SetState(TransactionState::ABORTED); return false; } std::unique_lock<std::mutex> table_lock(mutex_); auto& txn_list = lock_table_[rid]; std::unique_lock<std::mutex> list_lock(txn_list.mutex_); table_lock.unlock(); txn_list.InsertItem(txn, rid, LockType::SHARED, list_lock); return true; } bool LockManager::LockExclusive(Transaction *txn, const RID &rid) { if(txn->GetState() != TransactionState::GROWING) { txn->SetState(TransactionState::ABORTED); return false; } std::unique_lock<std::mutex> table_lock(mutex_); auto& txn_list = lock_table_[rid]; std::unique_lock<std::mutex> list_lock(txn_list.mutex_); table_lock.unlock(); txn_list.InsertItem(txn, rid, LockType::EXCLUSIVE, list_lock); return true; } bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) { if(txn->GetState() != TransactionState::GROWING) { txn->SetState(TransactionState::ABORTED); return false; } std::unique_lock<std::mutex> table_lock(mutex_); auto& txn_list = lock_table_[rid]; std::unique_lock<std::mutex> list_lock(txn_list.mutex_); table_lock.unlock(); auto itr = std::find_if(txn_list.transaction_list_.begin(), txn_list.transaction_list_.end(), [txn](const TransactionItem &item){return txn->GetTransactionId() == item.txn_id_;}); if(itr == txn_list.transaction_list_.end() || itr->lock_type_ == LockType::EXCLUSIVE || !itr->is_granted_) { txn->SetState(TransactionState::ABORTED); return false; } txn_list.transaction_list_.erase(itr); txn_list.InsertItem(txn, rid, LockType::UPGRADING, list_lock); return true; } bool LockManager::Unlock(Transaction *txn, const RID &rid) { std::unique_lock<std::mutex> table_lock(mutex_); auto& txn_list = lock_table_[rid]; std::unique_lock<std::mutex> list_lock(txn_list.mutex_); if(strict_2PL_) { if(txn->GetState() != TransactionState::COMMITTED && txn->GetState() != TransactionState::ABORTED) { if(std::find_if(txn_list.transaction_list_.begin(), txn_list.transaction_list_.end(), [txn](const TransactionItem &item){return txn->GetTransactionId() == item.txn_id_ && item.lock_type_ == LockType::EXCLUSIVE;}) != txn_list.transaction_list_.end()) { txn->SetState(TransactionState::ABORTED); return false; } } } if(txn->GetState() == TransactionState::GROWING) txn->SetState(TransactionState::SHRINKING); auto itr = std::find_if(txn_list.transaction_list_.begin(), txn_list.transaction_list_.end(), [txn](const TransactionItem &item){return txn->GetTransactionId() == item.txn_id_;}); if(itr->lock_type_ == LockType::EXCLUSIVE) txn->GetExclusiveLockSet()->erase(rid); else txn->GetSharedLockSet()->erase(rid); txn_list.transaction_list_.erase(itr); if(txn_list.transaction_list_.empty()) { lock_table_.erase(rid); return true; } table_lock.unlock(); for(auto &item: txn_list.transaction_list_) { if(item.is_granted_) break; item.Notify(); if(item.lock_type_ != LockType::SHARED) break; } return true; } |
我们对 lock_table_
有一个全局的锁,对每个 TransactionList
有一个局部的锁。当有请求到来时,我么给 lock_table_
加锁,找到对应的链表,然后为 lock_table_
解锁,为 TransactionList
加锁,然后我们调用 InsertItem
函数将其加入到该链表中(若已有相同事务的请求,则不加入),如果可以授予相应权限,就将 RID
添加到事务对应的表中,否则,调用 Wait
函数等待。在 Unlock
函数中,对于 Strict 2PL,我们需要确保所有的互斥锁在 Transaction
的状态变为 COMMITED
后才可以释放。对于一般的 2PL,我们只需要找到对应的请求项,将其删除(同时从 Transaction
的 LockSet 删除)即可,之后我们通知等待的请求,使其不再处于等待状态。
需要特别注意的是对于锁的升级,在此我们假定只有获得了共享锁后才能对锁进行升级(可以通过返回值判断是否成功获得)。当其他事务拥有对数据项的共享锁时,我们需要等待。
以上的代码严格来说还是有问题的,一个是当我们返回 true
时 txn
的状态可能已经变成 ABORTED
,因此会产生错误的返回结果,另外就是当我们某个请求处于等待时,该请求对应的事务可能已经被 TransactionManager
终止,因而该请求可能长时处于错误的等待状态,Lock 函数无法返回。另外一个可能存在的问题是当我们发出 UPGRADE 请求且该请求无法得到满足时,该 Transaction
的 shared_lock_set_
中已经删除了 RID
,但 exclusive_lock_set_
中还没有该 RID
。仔细思考了一下,我认为这似乎并无大碍。如果一个线程尝试升级某个事务对某个数据项的锁,而另一个进程提交该事务,有可能出现提交后该事务才获得升级后的互斥锁,而这之后该锁无法被解。不过一般情况下,我们在操作完成后,即请求返回后才会进行提交。当然,也可以额外增加判断解决该问题。其余问题会在后面进行解决。
1 2 3 |
if(txn->GetState() == TransactionState::COMMITTED) Unlock(txn, rid); |
TASK 2 – DEADLOCK PREVENTION
在第二和第三部分中,我们需要实现死锁的预防和死锁的检测。前者确保没有死锁的产生,后者单独用一个线程检测死锁,当产生死锁时选择将一个或多个事务终止。在死锁的预防中,我们采用的是 wound-wait 方法,即发生冲突时,抢占时间戳更大的事务(更年轻的)获得锁。书中对该算法描述如下
我首先加入了枚举类型,修改了 Transaction
类,增加了私有成员变量 timestamp_
,在构造函数中完成初始化,加入 GetTimestamp
函数,用于获取该私有成员。
1 2 |
enum class DeadlockMode { PREVENTION, DETECTION }; |
1 2 3 4 5 6 7 8 |
Transaction(txn_id_t txn_id) : state_(TransactionState::GROWING), thread_id_(std::this_thread::get_id()), txn_id_(txn_id), prev_lsn_(INVALID_LSN), shared_lock_set_{new std::unordered_set<RID>}, exclusive_lock_set_{new std::unordered_set<RID>}, timestamp_(std::chrono::system_clock::now()){} inline std::chrono::system_clock::time_point GetTimeStamp() {return timestamp_;} |
为了方便,我将 TransactionItem
中的 txn_id_
修改为了 Transaction
的指针(后来发现可以在 TransactionManager
中加入静态成员函数 GetTransaction(txn_id_t)
,后来没有进行修改)。将原来的 AllowGranted
修改如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
bool AllowGranted(LockType lock_type, Transaction *txn, Transaction* &victim) { if(transaction_list_.empty()) { victim = nullptr; return true; } auto &last = transaction_list_.back(); if (lock_type == LockType::UPGRADING) { std::list<TransactionItem>::iterator itr; if((itr = std::find_if(transaction_list_.begin(), transaction_list_.end(), [](const TransactionItem &item){return item.is_granted_ == true;})) == transaction_list_.end()) { victim = nullptr; return true; } else if (itr->transaction_->GetTimeStamp() > txn->GetTimeStamp()) { victim = itr->transaction_; return true; } else { victim = itr->transaction_; return false; } } else if (last.is_granted_ && last.lock_type_ == LockType::SHARED && lock_type == LockType::SHARED) { victim = nullptr; return true; } else { std::list<TransactionItem>::iterator itr = std::find_if(transaction_list_.begin(), transaction_list_.end(), [](const TransactionItem &item){return item.is_granted_ == true;}); victim = itr->transaction_; if(itr->transaction_->GetTimeStamp() > txn->GetTimeStamp()) { return true; } else { return false; } } } |
当然,这部分代码可以修改得更加精简一些。我的核心思路就是传进来一个 Transaction
的指针引用 victim
,通过调用 GetTimestamp
函数决定是否进行牺牲。如果不需要牺牲且可以授予权限,则返回 true
,victim
为 nullptr
;如果需要牺牲且可以授予权限,返回 true
,victim
为要牺牲的事务的指针。原 InsertItem
函数的部分代码修改如下
1 2 3 4 5 6 7 8 9 10 |
bool granted = txn_list.AllowGranted(lock_type, txn, victim); txn_list.transaction_list_.emplace_back(txn, lock_type, granted); auto & new_item = txn_list.transaction_list_.back(); lock.unlock(); if(victim != nullptr && granted) victim->SetState(TransactionState::ABORTED); if(!granted) new_item.Wait(); |
在这里,我把 InsertItem
函数放到了 LockManager
类中,主要原因是在该函数中,我们需要根据不同的死锁模式进行不同的操作,而死锁模式存储在外部类中,如果拷贝到每个 TransactionList
会增加很多冗余项。
实际上,这段代码也可能会产生问题。比如我们选择了牺牲的事务并将其状态设置为了 ABORTED
,但这个事务的某些请求可能依然处于等待状态,相应的线程会被阻塞。即使我们之后调用 TransactionManager
的 Abort
函数也无法解决该问题,因为我们在 Abort
中会调用 Unlock
函数,但因为处于等待状态的请求还没有加入到对应的 lockset
,所以无法删去。
TASK 3 – DEADLOCK DETECTION
这一部分的代码相对来说更难实现一些,首先在构造函数中,如果我们是死锁检测模式,需要单独开启一个线程周期性地进行死锁的检测(每隔500ms)
1 2 3 4 5 6 7 8 9 10 11 12 |
LockManager(TwoPLMode two_pl_mode = TwoPLMode::REGULAR, DeadlockMode deadlock_mode = DeadlockMode::PREVENTION) : two_pl_mode_(two_pl_mode), deadlock_mode_(deadlock_mode) { if (Detection()) { enable_cycle_detection_ = true; cycle_detection_thread_ = new std::thread(&LockManager::RunCycleDetection, this); LOG_INFO("Cycle detection thread launched"); } } |
为了检测死锁,我们需要构建 wait-for 图,检测图中是否有环存在。我们需要根据当前的状态增加构建图,实现增加、删除边等基本操作。在这之前,我先在 LockManager
中定义如下成员变量用于存储整张图。由于在要求中提到我们的搜索是确定性的搜索,需要从事务号最小的顶点出发找到第一个环,然后牺牲该环中最年轻的(在这里我的实现是事务号最大的,也可以是时间戳最大的),所以我使用了 std::map
而非 std::unordered_map
。事务号实际为32位的整数,所以我们使用迭代器遍历时默认从事务号最小的结点开始。
1 2 |
std::map<txn_id_t, std::set<txn_id_t>> graph_; |
之后,我实现了一些上面提到的基本操作。在 HasCycle
中,我额外定义了 path
,用于记录整个环路,从中挑选出事务号最大的,存在 txn_id
中。这里面的 new_id
是冗余定义,之后忘记删了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
void LockManager::AddEdge(txn_id_t t1, txn_id_t t2) { assert(Detection()); std::unique_lock<std::mutex> graph_lock(latch_); graph_[t1].insert(t2); return; } s void LockManager::RemoveEdge(txn_id_t t1, txn_id_t t2) { assert(Detection()); std::unique_lock<std::mutex> graph_lock(latch_); graph_[t1].erase(t2); return; } bool LockManager::HasCycleUtil(txn_id_t txn_id, std::unordered_map<txn_id_t, bool> &visited, std::unordered_map<txn_id_t, bool> &rec_stack, std::vector<txn_id_t> &path) { if(visited[txn_id] == false) { visited[txn_id] = true; rec_stack[txn_id] = true; path.push_back(txn_id); for(auto i: graph_[txn_id]) { if(!visited[i] && HasCycleUtil(i, visited, rec_stack, path)) return true; else if(rec_stack[i]){ if(txn_id > newid) newid = txn_id; return true; } } } rec_stack[txn_id] = false; path.pop_back(); return false; } bool LockManager::HasCycle(txn_id_t &txn_id) { assert(Detection()); std::unique_lock<std::mutex> graph_lock(latch_); std::unordered_map<txn_id_t, bool> visited; std::unordered_map<txn_id_t, bool> rec_stack; std::vector<txn_id_t> path; for(auto i: graph_) { if(HasCycleUtil(i.first, visited, rec_stack, path)) { txn_id = *std::max_element(path.begin(), path.end()); return true; } } return false; } std::vector<std::pair<txn_id_t, txn_id_t>> LockManager::GetEdgeList() { assert(Detection()); std::unique_lock<std::mutex> graph_lock(latch_); std::vector<std::pair<txn_id_t, txn_id_t>> edge_list; for(auto i: graph_) for(auto j: graph_[i.first]) edge_list.push_back({i.first, j}); return edge_list; } |
我们不能一直保存该图,需要周期性的建立和销毁。我使用 GenerateGraph
函数建立图,在 RunCycleDetection
函数中调用并检测是否存在环路,在这里,我额外调用了一个 NotifyAll
函数,该函数意在告知所有在 lock_table_
中属于被终止的事务的请求需要放弃等待,这样做可以避免线程被阻塞,从而使所有 Lock
的请求函数可以返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
void LockManager::GenerateGraph() { newid = INVALID_TXN_ID; std::unique_lock<std::mutex> table_lock(mutex_); for(auto &i: lock_table_){ std::set<txn_id_t> start; std::set<txn_id_t> end; std::unique_lock<std::mutex> list_lock(i.second.mutex_); for(auto &j: lock_table_[i.first].transaction_list_){ if(j.is_granted_ && j.transaction_->GetState() != TransactionState::ABORTED) start.insert(j.transaction_->GetTransactionId()); else if(!j.is_granted_ && j.transaction_->GetState() != TransactionState::ABORTED) end.insert(j.transaction_->GetTransactionId()); } for(auto end_id: end) for(auto start_id: start) AddEdge(end_id, start_id); } } void LockManager::RunCycleDetection() { assert(Detection()); while (enable_cycle_detection_) { std::this_thread::sleep_for(CYCLE_DETECTION_INTERVAL); { graph_.clear(); GenerateGraph(); std::unique_lock<std::mutex> l(mutex_); txn_id_t txn_id; if(HasCycle(txn_id)) { LOG_INFO("Has cycle: %d", txn_id); TransactionManager::GetTransaction(txn_id)->SetState(TransactionState::ABORTED); NotifyAll(TransactionManager::GetTransaction(txn_id)); } } } } |
我们还需要修改之前的请求锁的函数,把所有的 return true
改为 return txn->GetState() != TransactionState::ABORTED
,而在所有 txn->SetState(TransactionState::ABORTED)
后增加 NotifyAll(txn)
,这样我刚刚提到了所有问题都可以解决。唯一值得担心的就是 NotifyAll
的实现略显愚蠢,复杂性可能比较高,但后来想了以下,这个函数本来就是在异常情况下才会被调用,调用的频率不会非常高,对整个系统产生的影响不会很大。
在其他实现中,我也看到有把 condition_variable 放在 LockManager
类里,而在 wait
的 pred
字段使用较长的判断函数。这样当满足要求时会导致竞争情况(用的是 notify_all
),而如果用 notify_one
也会有问题,因为挑选唤醒哪个线程是随机的,无法保证先来先服务的顺序,可能会有“饿死”情况出现。在死锁的检测和预防中,这种 condition_variable 就更难精细化地实现针对事务所属请求的线程的唤醒了。
以上就是 Project 3 的全部内容,因为总是想到可能存在的新的问题,所以修改了很多次,在这之学到了很多知识,对于 C++ 并发的控制也更加熟悉。最后的代码可能有些丑,一些潜在的问题可能也没有解决,不过基本的功能以及我能想到了所有问题算是全部实现和解决了。在此还要推荐一下这篇对 C++ 并发的讲解