Multithreading
- 1、线程的基本操作
- 1.1、创建线程
- 1.2、等待线程和分离线程
- 1.3、获取线程id
- 2、互斥锁
- 3、条件变量
- 4、例程
1、线程的基本操作
从C++11开始推出关于多线程的库和函数,相比于Linux所配套的资源,C++11提供的函数更加容易理解和操作,对于跨平台编程更有优势。
1.1、创建线程
导入线程库,创建一个新线程,让线程执行某个函数。
#include <thread>
using namespace std;
void test_function()
{
cout << "hello, i am a subthread." << endl;
}
int main()
{
thread user_thread(test_function);
user_thread.join();
return 0;
}
可以看出,创建一个线程的时候需要一个函数名(函数地址)作为初始化参数。那么我们使用lambda函数也是可以的。
thread user_thread([]{
cout << "i am a lambda function." <<endl;
});
user_thread.join();
用类似于函数指针的function也是可以的。
function<void()> f = []{
cout << "function test..." << endl;
};
thread user_thread(f);
user_thread.join();
1.2、等待线程和分离线程
user_thread.join()是为让主线程阻塞等待user_thread这个线程执行完毕。如果使用user_thread.detach()那就是让子线程自己在后台执行。
我在打印"function test…"之前延时一秒,就可以清楚的看出两者的区别了。
int main()
{
function<void()> f = []{
this_thread::sleep_for(chrono::seconds(1));
cout << "function test..." << endl;
};
thread user_thread(f);
user_thread.detach(); //user_thread.join();
return 0;
}
1.3、获取线程id
每一个线程都有独一无二的id,以便于用户辨别。
std::this_thread::get_id();
2、互斥锁
互斥锁(mutex:Mutual Exclusion Object)是用于确保数据一致性和解决资源竞争的一种方法。被互斥锁锁住的资源,其它线程将无法访问。值得注意的是,这里锁住的并不是代码块,而是资源,即变量,数组之类的东西。
上锁和解锁是配套的操作,否则将导致某一资源永远无法被访问。
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int temp = 0;
mutex mu;
void thread_1()
{
mu.lock();
temp++;
cout << this_thread::get_id() << " " << temp << endl;
mu.unlock();
}
void thread_2()
{
mu.lock();
temp++;
cout << this_thread::get_id() << " " << temp << endl;
mu.unlock();
}
int main()
{
thread thread1(thread_1);
thread thread2(thread_2);
thread1.join();
thread2.join();
system("pause");
return 0;
}
其实像这样裸照上锁解锁还是比较少见的,更多的是利用uniuqe_locker这个类来上锁。因为使用这个类提供了需要方法,在离开作用域的时候还会自动解锁,并且支持和条件变量配套使。
void thread_1()
{
unique_lock<mutex> locker(mu);
temp++;
cout << this_thread::get_id() << " " << temp << endl;
//不必再解锁了。
}
unique_lock这个类在构造函数中执行上锁操作,在析构函数中执行解锁操作。但它也支持手动上锁和解锁。上锁的方式也有好几种,我们不妨都看一下。
首先要求实例化的时候不要自动上锁。
unique_lock<mutex> locker(mu, std::defer_lock);
- locker.lock():默认模式,阻塞式上锁。
- locker.try_lock():尝试获取资源状态,如果被已经被锁住,则返回true,反之返回false。如果不加判断的使用该方法,则可能导致上锁失败。
- locker.try_lock_for():输入参数是滞留时间,指定时间该资源还是无法访问则返回false。
- locker.try_lock_until():输入参数是目标时间,目标时间内还是无法访问该资源则返回false。
给个例子简单看看怎么用吧。
void thread_1()
{
unique_lock<mutex> locker(mu, defer_lock);
//等待一秒钟,如果还无法访问资源则离开
if(locker.try_lock_for(chrono::seconds(1))){
temp++;
cout << this_thread::get_id() << " " << temp << endl;
}
}
3、条件变量
条件变量(conditional variable)主要用于协调多线程,经常和互斥锁一起使用。其设有通知机制,分为单个通(notify_one)知和全部通知(notify_all)两种模式,前者通知任意一个线程,后者通知所有线程。
#include <mutex>
#include <condition_variable>
//定义一个条件变量
condition_variable cond;
//定义一个互斥锁
mutex mtu;
定义好条件变量和互斥锁之后,线程就等待该条件变量被唤醒即可。
void thread_test()
{
unique_lock<mutex> locker(mtu);
cond.wait(locker);
}
int main()
{
thread myThread(thread_test);
//延迟一下,以便测试条件变量的作用
this_thread::sleep_for(chrono::seconds(2));
//notify_one是通知任意一个线程,notify_all是通知所有线程
cond.notify_one();
myThread.join();
return 0;
}
如果缺乏互斥锁,那就有可能出现竞争等情况。比如该条件变量其实只唤醒了一次,但两个线程却争着被唤醒,以至于两个线程同时被唤醒。
条件变量等待通知(唤醒)的方式也有三种,类比互斥锁的那几种锁方式,便知道如何使用了。这里不再赘述。
- wait():阻塞式等待。
- wait_for():输入参数中有一个滞留时间,滞留时间到达之后即便还没有接到通知也会继续执行下去。
- wait_until():输入参数中有一个目标时间,同理,目标时间到达之后即便还没有接到通知也会继续执行下去。
这里有个点需要知道,如果再通知的那一刻,所有线程都处于忙碌状态,那么该通知就会被忽略。
void thread_test_2()
{
unique_lock<mutex> locker(mtu);
if(cond.wait_for(locker, chrono::seconds(1)) == cv_status::timeout)
cout << "time out" << endl; //会在1s之后执行这个打印语句
else
cout << "i am thread test 2 !" << endl;
}
int main()
{
thread myThread2(thread_test_2);
this_thread::sleep_for(chrono::seconds(5));
cond.notify_all();
myThread2.join();
system("pause");
return 0;
}
如果不仅仅想要判断条件变量,还想同时判断其它条件,可以这样做。
bool isReady = false;
unique_lock<mutex> locker(mtu);
cond.wait(locker, isReady);
当然,传入lambda函数或其它返回值为bool类型的函数也是可以的。
cond.wait(locker, []{
cout << "this is lambda function" << endl;
return isReady;
});
不过有几点比较奇怪的是,你不能在执行cond.wait()语句之前就让函数的返回值是true,否则它会直接终止等待,执行下面的语句。
即你不能这样。
cond.wait(locker, true});
至于wait_for和wait_until如何仿造上面,同时进行多个判断,我也没学明白。。需要时候再学吧。
4、例程
自己写了一个线程池,可能不严谨。
大致思路如下:
1、定义一个任务队列,里面存放N个任务。
2、定义M个线程,存放在vector容器中。让这些线程不断往任务队列中取出任务,并执行之。
3、每添加一个任务进任务队列,就会唤醒任意一个空闲线程去读取任务。
4、用互斥锁去保证任务队列读写任务的顺利执行,并保证条件变量的效果。
5、在析构函数中,即整个程序结束前,不断唤醒空闲线程去取任务,直至任务队列为空。
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
#include <functional>
#include <condition_variable>
using namespace std;
class ThreadPool
{
public:
ThreadPool(int size) : _size(size), _isStop(false){
//emplace some thread into thread_vector
for(int i = 0; i < _size; i++) thread_vector.emplace_back(
//using lambda function to execute task
[this]{
function<void()> thraed_task;
//all of threads will keep working until receive stop signal.
while(true){
unique_lock<mutex> locker(mtu);
//waiting until queue is not empty
cond.wait(locker, [this]{return _isStop || !task_queue.empty();});
if(_isStop && task_queue.empty()) break;
thraed_task = task_queue.front();
task_queue.pop();
mtu.unlock();
thraed_task();
}
});
}
~ThreadPool(){
while(!task_queue.empty())
cond.notify_all();
_isStop = true;
for(int i = 0; i < _size; i++) thread_vector[i].join();
cout << "deconstructor" << endl;
}
void enqueue_task(function<void()> task){
//we needn't consider mutex in different resource.
unique_lock<mutex> locker(mtu);
task_queue.push(move(task));
cond.notify_one();
mtu.unlock();
}
private:
int _size;
bool _isStop;
mutex mtu;
condition_variable cond;
queue<function<void()>> task_queue;
vector<thread> thread_vector;
};
void HandleTask(int cnt)
{
printf("thread %d handle the %d task!\n", this_thread::get_id(), cnt);
this_thread::sleep_for(chrono::seconds(1)); // Simulate work
}
int main()
{
//limit scope
{
ThreadPool pool(3);
for (int i = 0; i < 9; i++) {
pool.enqueue_task([i] { HandleTask(i + 1); });
}
}
system("pause");
return 0;
}