上一节实现的查找表中,桶类 bucket_type
使用了C++标准库提供的存储结构链表 list 作为数据存储容器,但通过C++标准库提供的链表 list
存储键值对虽然对并发读不影响(共享锁保证多个线程可以线程安全的读共享数据),但是不能并发写,在同一时间有且仅有一个线程可以修改共享数据(因为C++标准库提供的链表的增删改查是通过同一个互斥量实现的,锁粒度不够精细),我们需要像实现锁粒度足够小的线程安全的队列一样,自定义一个链表结构,通过多个锁来为增删查改进行保护,每个操作有自己的锁,实现写操作的并发。
参考:
单向链表示意图如下图所示:
代码表示如下:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
双向链表中同样有数据域和指针域。不同之处在于,指针域有左右(或上一个、下一个)之分,用来连接上一个结点、当前结点、下一个结点。
双向链表示意图如下图所示:
代码表示如下:
struct Node {
std::shared_ptr<T> data;
std::unique_ptr<node> left;
std::unique_ptr<node> right;
};
为了线程安全,一般将节点使用智能指针通过模板的方式封装,完整代码如下:
template <typename T>
struct node {
std::shared_ptr<T> data;
std::unique_ptr<node> next;
// 构造函数:初始化数据和指针
node(const T& value) : data(std::make_shared<T>(value)), next(nullptr) {}
// 构造函数:允许外部传入shared_ptr作为数据
node(std::shared_ptr<T> value_ptr) : data(std::move(value_ptr)), next(nullptr) {}
// 禁用拷贝构造函数和赋值运算符
node(const node&) = delete;
node& operator=(const node&) = delete;
// 默认移动构造函数、赋值运算符和析构函数
node(node&&) = default;
node& operator=(node&&) = default;
~node() = default;
};
上面的两种链表显而易见可以通过一个互斥量控制整个链表的增删查改,进而实现线程安全的访问、修改。但是,因为使用一个互斥量保护整个链表的锁粒度太大,如果我们想要实现支持多线程高并发访问、修改链表,那么性能会很有限(没办法同时使用多个线程修改链表,同一时间有且仅有一个线程可以修改,其他线程需等待,虽然我们通过条件变量挂起了这些线程,避免了算力资源的浪费,但是我们没有办法实现修改函数同时调用),我们可以对每个节点都使用一个互斥量,这样能保证多个线程操作不同节点时加不同的锁,减少耦合性。
此外,我们尽量使用智能指针存储数据,使用智能指针存储数据不仅能防止内存不够抛出异常,而且能够有效节省资源,智能指针间的赋值不涉及内存的分配(但是创建一个指针会占8字节)。
#pragma once
#include <mutex>
#include <memory>
template<typename T>
class threadsafe_list // 单向链表
{
struct node
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node() :next() {}
node(const T& value) :
data(std::make_shared<T>(value)){}
};
node head;
public:
threadsafe_list()
{}
~threadsafe_list()
{
remove_if([](const node&) {return true; });
}
threadsafe_list(threadsafe_list const& other) = delete;
threadsafe_list& operator=(threadsafe_list const& other) = delete;
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}
template<typename Function>
void for_each(Function f)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}
template<typename Predicate>
void remove_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
};
template<typename Predicate>
void remove_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (const node* next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
该函数通过模板来接受一个谓词函数(Predicate p),然后遍历链表并根据谓词判断删除满足条件的节点:
[](const node&) {return true; }
作为谓词函数传入,该函数总会返回true,所以链表的所有元素均满足谓词函数的条件,删除所有元素current
用于指向链表中的每个元素,从头节点 head
开始备遍历;
在 while 循环判断条件中,调用智能指针的 get 函数获取当前节点的下一个节点包装的原始指针,如果原始指针不为空,则进入循环:
std::unique_ptr
锁住当前节点的下一个节点中的互斥量,因为是从虚拟头节点的下一个节点是真正头节点,所以相当于从头遍历链表nullptr
,所以if-else
中走的是else
分支,这里会将虚拟头节点解锁,并且将最后一个节点赋值给current
,当整个函数作用域结束后,current
调用析构函数释放最后一个节点。而虚拟头节点始终存在。get()
获取到最后一个节点的next
指针时,因为指向nullptr
,循环条件不满足,退出循环我们传给模板函数 remove_if 的谓词函数是一个lmbda函数,它的参数类型是 const node&,但我们调用谓词函数p(*next->data)时,其中 *next->data 的类型是 T&,而 T& 和 const node& 类型明显不匹配(尽管const node&既可以接受node又可以接受const node),那么为什么能编译通过?
其实这里编译器隐式调用了 node 的构造函数,将
T&
类型的对象(*next->data
,data中的数据是T类型但是解引用后成了T&)隐式转换为了node
类型。因为node的构造函数只接受一个参数,形参类型是const T&,既能接受const T&也能接受T&,所以这里T&通过隐式调用构造函数被转换为了node。而 const node&可以接受node类型的参数。
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}
next
指向链表的第一个真实节点(虚拟头节点的 next
指向的元素)next
指向新节点,即可实现新元素的插入请注意,插入和删除的加锁顺序需要保持一致,都需要从虚拟头节点开始逐个往后加,这样可以避免死锁
template<typename Function>
void for_each(Function f)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}
for_each
算法作用相同,用于遍历容器中的所有元素,并对每个元素执行指定的函数current
存储虚拟头节点的地址,并对虚拟头节点的互斥加锁,包装访问 虚拟头节点和其 next
指针是安全的current
的 next
指针存储的节点原始地址,如果不为空,则进入循环
f
,用于对元素修改current
更新为下一个节点,并将下一个节点的互斥转移到 lk
中,准备进入下一个循环按这样的顺序直至最后一个空指针,然后退出循环
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}
std::find_if
算法作用相同,用于在容器中查找第一个满足指定条件的元素,并返回其数据;如果没有找到,则返回空的智能指针我们实现该链表的目的是为了采用精细锁度的锁操作,摆脱单一的全局互斥,从而增加并发操作的机会。那么上面这个仅支持前插的单向链表实现这一点了嘛?
我们实现了,不同的线程可以同时在不同的节点上工作(每个节点都有自己的互斥进行线程安全的维护),无论具体的操作是利用
for_each()
处理数据,还是利用find_first_if()
进行查找,还是通过remove_if()
删除节点。此外,因为这些函数内部互斥的加锁顺序是按照链表中的节点前后顺序进行的,故多线程不可能出现 “超越他人的处理节点”,若一个线程需要在某个节点上耗费特别长的时间,那么其他线程在处理完该节点之前的数据抵达该节点时,必须等待处理线程结束释放锁才能继续往后走。
如果我们想要从链表尾部开始插入一个新节点呢,我们应该如何操作?
那么现在有一个问题,我们需要专门给尾部指针增加一个锁吗?
实际上,给尾部指针单独增加一个锁并没有必要,尾部指针本身指向尾部节点,因此可以完全依赖尾部节点内部的锁来保证线程安全;同样,头节点的操作也可以依赖节点内部的锁来实现保护。
至于尾部和头部插入竞争的情形,其实根本不会发生,在分析头尾插入操作之间可能的竞争条件时,可以重点考虑链表为空的情况。此时,头节点和尾节点是同一个节点(即虚拟头节点),由于头尾节点共用一个内部的
mutex
,因此无论是执行push_front()
还是push_back()
,实际上都在竞争同一个锁。这个锁能够自然地保证这两种操作串行化,从而避免竞争条件。同理,在同时考虑头部增删和尾部增删的竞争条件时,都可以考虑头尾节点合二为一的情形,此时头尾节点内部是同一个锁,自动保证了线程安全,避免了竞争条件以上条件均是在满足加锁顺序是从头到尾依次加时才可行的,如果我们是双向链表插入时(这时锁即可能从后向前加也可能从前向后加),这时候就必须使用一个额外的锁保护尾指针,否则会造成死锁。
完整代码:
#pragma once
#include <mutex>
#include <memory>
template<typename T>
class double_push_list
{
struct node_d
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node_d> next;
node_d() :
next()
{}
node_d(T const& value) :
data(std::make_shared<T>(value))
{}
};
node_d head;
node_d * last_node_ptr;
public:
double_push_list()
{
last_node_ptr = &head;
}
~double_push_list()
{
remove_if([](node_d const&) {return true; });
}
double_push_list(double_push_list const& other) = delete;
double_push_list& operator=(double_push_list const& other) = delete;
void push_front(T const& value)
{
std::unique_ptr<node_d> new_node(new node_d(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
// 更新最后一个节点
if (head.next->next == nullptr) {
last_node_ptr = head.next.get();
}
}
void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m);
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}
template<typename Function>
void for_each(Function f)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}
template<typename Predicate>
void remove_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current; // 将尾指针 `last_node_ptr` 指向虚拟头节点
}
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
template<typename Predicate>
bool remove_first(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current;
}
next_lk.unlock();
return true;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
return false;
}
template<typename Predicate>
void insert_if(Predicate p, T const & value)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while(node_d * const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*(next->data))) // 如果找到满足条件的节点
{
std::unique_ptr<node_d> new_node(new node_d(value));
// 将新节点的 next 指向当前节点的下一个节点
auto old_next = std::move(current->next);
// 将当前节点的 next 更新为新节点
new_node->next = std::move(old_next);
//当前节点的下一个节点更新为新节点
current->next = std::move(new_node);
return;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
void remove_last() {
if (last_node_ptr == &head) {
std::cerr << "Error: Attempt to remove from an empty list!" << std::endl;
return;
}
node_d* current = &head;
std::unique_lock<std::mutex> current_lock(current->m); // 锁住虚拟头节点
// 遍历到最后一个节点的前一个节点
while (current->next.get() != last_node_ptr) {
current_lock.unlock();
current = current->next.get();
current_lock = std::unique_lock<std::mutex>(current->m);
}
// 删除最后一个节点
std::unique_ptr<node_d> old_next = std::move(current->next);
last_node_ptr = current; // 更新尾指针
}
};
void push_front(T const& value)
{
std::unique_ptr<node_d> new_node(new node_d(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
// 更新最后一个节点
if (head.next->next == nullptr) {
last_node_ptr = head.next.get();
}
}
void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m);
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}
注意,因为在单向链表中我们是按从头到尾的顺序依次加锁的,所以我们这里先通过尾指针直接调用其指向节点内部的锁,也可以保护尾指针的修改,因为抵达最后一个节点(未更新前的最后一个节点)的其他线程均会被阻挡在外,直至最后一个节点(未更新前的最后一个节点)修改完毕。
析构函数调用的 remove_if 函数也有一些改变,需要判断删除的节点是否为最后一个节点
template<typename Predicate>
void remove_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current; // 将尾指针 `last_node_ptr` 指向虚拟头节点
}
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
last_node_ptr
指向虚拟头节点template<typename Predicate>
bool remove_first(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current;
}
next_lk.unlock();
return true;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
return false;
}
remove_if
相同,只不过如何找到满足谓词函数的节点,该函数会返回 true,否则返回 falsetemplate<typename Predicate>
void insert_if(Predicate p, T const & value)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while(node_d * const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*(next->data))) // 如果找到满足条件的节点
{
std::unique_ptr<node_d> new_node(new node_d(value));
// 将新节点的 next 指向当前节点的下一个节点
auto old_next = std::move(current->next);
// 将当前节点的 next 更新为新节点
new_node->next = std::move(old_next);
//当前节点的下一个节点更新为新节点
current->next = std::move(new_node);
return;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
p(*(next->data))
是否满足条件,如果满足条件:
value
。next
指针指向当前节点的下一个节点(旧的 next
)。next
更新为新节点。其实这里还有一个小问题需要我们思考一下:
虽然我们在 2.2 的开头说了尾部和头部不可能竞争的情形(只适用于单向链表且加锁从头到尾依次加的情况),但我们仍需要举一个例子说明一下。
假如我们现在有两个线程 t1 和 t2,线程 t1 用于执行 push_back 函数在链表的尾部新加一个节点,而线程 t2 想要删除最后一个节点。那么,有没有可能发生以下情况:
假设线程2先执行删除操作,节点更新并且更新last_node_ptr的值,而此时线程1因为之前无法抢占最后一个节点(last_node_ptr)自带的互斥量所以挂起,当线程2执行完后,线程1才开始继续执行,但是此时last_node_ptr已经变化了,而线程1可能还用的是旧的last_node_ptr的值,导致插入数据失败(很可能崩溃或者插入到一个分叉的链表)。
其实如果加锁顺序是一致的,这种情况不可能发生,因为尾部指针指向的节点内部锁已经足够使用,不可能发生线程t1修改后,线程t2还持有更新前值的情况,比如下述代码:
void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m); // 锁住当前尾节点
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}
// 错误版本
void remove_last() {
std::unique_lock<std::mutex> tail_lock(last_node_ptr->m); // 锁住当前尾节点
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m); // 错误的,不能已经对尾节点加锁的情况下从后往前加锁
while (current->next.get() != last_node_ptr) {
std::unique_lock<std::mutex> current_lock(current->m); // 锁住当前节点
current = current->next.get();
}
current->next.reset();
last_node_ptr = current;
}
// 正确版本
void remove_last() {
if (last_node_ptr == &head) {
std::cerr << "Error: Attempt to remove from an empty list!" << std::endl;
return;
}
node_d* current = &head;
std::unique_lock<std::mutex> current_lock(current->m); // 锁住虚拟头节点
// 遍历到最后一个节点的前一个节点
while (current->next.get() != last_node_ptr) {
current_lock.unlock();
current = current->next.get();
current_lock = std::unique_lock<std::mutex>(current->m);
}
// 删除最后一个节点
std::unique_ptr<node_d> old_next = std::move(current->next);
last_node_ptr = current; // 更新尾指针
}
在push_back函数中,直接将尾指针指向节点的互斥锁住;在remove_last函数中,先锁住虚拟头节点的互斥,然后依次往后逐个加解锁,直至找到最后一个节点的上一个节点。这两个函数的加锁流程其实都遵循了从前往后依次加锁,确保尾部状态的一致性。
请注意,在 push_back 函数中,我们如果直接将尾指针指向节点的互斥锁住,那么后续就不能对其他节点的互斥进行加解锁,否则就会出错;我们要么从头到尾依次加解锁,要么直接从中间某一个部分加锁,但是不能对之前的部分进行加锁,然后继续往后加。总之,对于单向链表的加锁,要保证从前往后的顺序。
而last_node_ptr
的更新也在锁的保护范围内,不存在线程1使用**“旧值”**的情况。无论是 push_back
还是删除尾部节点,last_node_ptr
的修改在锁保护下完成,保证它指向的是当前尾部状态。
简单对该例子做个测试:
void TestTailPushAndDelete() {
double_push_list<int> list;
// 创建线程执行 push_back 和 remove_last
std::thread t1([&list]() {
for (int i = 1; i <= 10; ++i) {
list.push_back(i);
std::cout << "push back " << i << " success" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
std::thread t2([&list]() {
for (int i = 0; i < 10; ++i) {
list.remove_last();
std::cout << "remove last success" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(15));
}
});
t1.join();
t2.join();
// 打印最终链表状态
std::cout << "Final list: ";
list.print_list();
}
void print_list() {
node_d* current = head.next.get();
while (current) {
std::cout << current->data << " -> ";
current = current->next.get();
}
std::cout << "NULL\n";
}
函数 print_list 是链表类的成员函数,方便观察数据变化,结果如下:
push back 1 success
remove last success
Error: Attempt to remove from an empty list!push back 2 success
remove last success
push back 3 success
remove last successpush back 4 success
push back 5 success
push back 6 success
remove last success
push back 7 success
push back remove last success
8 success
push back 9 success
push back 10 success
remove last success
remove last success
remove last success
remove last success
remove last success
Final list: 0000021DAC5F26C0 -> NULL
因为我们没有加内存序保证顺序执行,所以输出是乱序的,但是可以看出来,只要按顺序加锁,就不会发生数据竞争问题。
创建三个线程进行测试
void MultiThreadPush()
{
double_push_list<MyClass> thread_safe_list;
std::thread t1([&]()
{
for (int i = 0; i < 20000; i++)
{
MyClass mc(i);
thread_safe_list.push_front(mc);
std::cout << "push front " << i << " success" << std::endl;
}
});
std::thread t2([&]()
{
for (int i = 20000; i < 40000; i++)
{
MyClass mc(i);
thread_safe_list.push_back(mc);
std::cout << "push back " << i << " success" << std::endl;
}
});
std::thread t3([&]()
{
for (int i = 0; i < 40000; )
{
bool rmv_res = thread_safe_list.remove_first([&](const MyClass& mc)
{
return mc.GetData() == i;
});
if (!rmv_res)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
i++;
}
});
t1.join();
t2.join();
t3.join();
std::cout << "begin for each print...." << std::endl;
thread_safe_list.for_each([](const MyClass& mc)
{
std::cout << "for each print " << mc << std::endl;
});
std::cout << "end for each print...." << std::endl;
}
输出结果为:
因为没有使用内存序保证顺序执行,所以输出是乱序的,而且因为不同线程会竞争资源,t2和t3线程明显提前结束,而t1线程最后结束。但足以保证该模板类的有效性。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- huatuo8.com 版权所有 湘ICP备2023022238号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务