为什么要使用条件变量?

前言

最近看了很多与线程有关的 C++ 新特性,条件变量是见的比较多的一个特性。

看的时候我发现,想要理解一个新的特性,关键的要看它的引入到底解决了哪些问题,没有什么特性我们要实现相同的功能要怎么做?

以我的理解来看,条件变量是一个线程间互相同步与通知的手段,他通过主动唤醒的方式减小了各个线程的开销,取代了简单但是消耗较大的一直被动循环检验与等待

没有条件变量我们如何实现相同的需求?

这里采用现代C++教程1 中关于条件变量的一个例子作为基础:

不使用条件变量版本

#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>


int main() {
    std::queue<int> produced_nums;
    std::mutex mtx;

    // 生产者
    auto producer = [&]() {
        for (int i = 0; ; i++) {
            std::this_thread::sleep_for(std::chrono::milliseconds(900));
            std::unique_lock<std::mutex> lock(mtx);
            std::cout << "producing " << i << std::endl;
            produced_nums.push(i);
        }
    };
    // 消费者
    auto consumer = [&]() {
        while (true) {
            {
              std::unique_lock<std::mutex> lock(mtx);
              if(produced_nums.empty()) continue;
            }
            std::unique_lock<std::mutex> lock(mtx);
            // 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
            lock.unlock();
            // 消费者慢于生产者
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            lock.lock();
            while (!produced_nums.empty()) {
                std::cout << "consuming " << produced_nums.front() << std::endl;
                produced_nums.pop();
            }
        }
    };

    // 分别在不同的线程中运行
    std::thread p(producer);
    std::thread cs[2];
    for (int i = 0; i < 2; ++i) {
        cs[i] = std::thread(consumer);
    }
    p.join();
    for (int i = 0; i < 2; ++i) {
        cs[i].join();
    }
    return 0;
}

使用条件变量版本

#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>


int main() {
    std::queue<int> produced_nums;
    std::mutex mtx;
    std::condition_variable cv;
    bool notified = false;  // 通知信号

    // 生产者
    auto producer = [&]() {
        for (int i = 0; ; i++) {
            std::this_thread::sleep_for(std::chrono::milliseconds(900));
            std::unique_lock<std::mutex> lock(mtx);
            std::cout << "producing " << i << std::endl;
            produced_nums.push(i);
            notified = true;
            cv.notify_all(); // 此处也可以使用 notify_one
        }
    };
    // 消费者
    auto consumer = [&]() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            while (!notified) {  // 避免虚假唤醒
                cv.wait(lock);
            }
            // 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
            lock.unlock();
            // 消费者慢于生产者
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            lock.lock();
            while (!produced_nums.empty()) {
                std::cout << "consuming " << produced_nums.front() << std::endl;
                produced_nums.pop();
            }
            notified = false;
        }
    };

    // 分别在不同的线程中运行
    std::thread p(producer);
    std::thread cs[2];
    for (int i = 0; i < 2; ++i) {
        cs[i] = std::thread(consumer);
    }
    p.join();
    for (int i = 0; i < 2; ++i) {
        cs[i].join();
    }
    return 0;
}

这两段代码在效果上是等效的,都是一个生产者两个消费者。

前者使用了 while 循环来一直检查是否可以消费,后者使用了 cv.wait(lock) 条件变量来实现阻塞等待可消费的提醒。

可以发现在这个例子里,前者是主动的去检查是否可以消费,后者是被动的被提醒可以消费,而主动则代表着需要一直询问查询,主动的 while 循环检查始终在重复如下流程:

而这个上锁、检查、释放锁的过程就是非常冗余、消耗资源、效率低下的,而条件变量解决了这个问题,条件变量做的事:

这里需要注意的一点是,在条件变量调用 wait() 时,做了两件事,一个是阻塞线程等待其他线程唤醒、另一个是释放锁,只有这样才会让别的线程有机会获得锁,而被唤醒后又会自动上锁。

为什么要和 mutex 与 lock 一起用?

我们常见的搭配就是 condition_variable、mutex、unique_lock 一起使用,那么为什么要这么做呢?

一个比较常见的说法是,在调用 wait() 函数与线程真正的阻塞等待状态是存在一定时间差的,那么就会存在唤醒丢失的问题,一种情况如下:

线程A: ------- 调用 wait() 函数 ------- 进入等待状态 ------

线程B: -------------------------唤醒A-------------------

而我们希望的是:

线程A: ------- 调用 wait() 函数 ------- 进入等待状态 ------

线程B: -------------------------------------------唤醒A-

我们总是要确保,在线程 A 真正进入等待状态后再进行唤醒,因此这里需要一个 lock 来保证我们对于一个线程空闲/等待的改变是原子性的,也就是不应该被其他线程中途干扰。

为什么使用 unique_lock 而不使用 scoped_lock 或 lock_guard

首先三个函数都是 RAII 的锁管理函数,可以有效解决 lock 后忘记 unlock 的情形,而目前在 C++17 后推荐统一使用 scoped_lock 而不是 lock_guard

而使用 unique_lock 是因为我们在使用条件变量时,需要在条件变量 wait 时解除 lock,只有 unique_lock 能够满足这个条件,实现自己更细粒度的锁区间的划分。

什么是虚假唤醒?

可以注意到在第一部分代码中有这么一段:

auto consumer = [&]() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            while (!notified) {  // 避免虚假唤醒
                cv.wait(lock);
            }
            // ...
            notified = false;
        }
    };

避免虚假唤醒,那么什么是虚假唤醒呢?

虚假唤醒简而言之就是,没有满足消费的条件却被唤醒后进行了消费。

这个发生的可能多是在系统调度层面,具体的可以参考此知乎问题:为什么条件锁会产生虚假唤醒现象(spurious wakeup)?

而解决的方案就是在消费之前再检查一下是否满足消费的条件,而这个消费条件多是用一个形如 notified 的 bool 变量来标识是否可以消费。

如果不检查,带条件变量的执行流程就会像如下这样:

其中少了一步检查是否可以消费的过程。

后记

本文记录了我在学习条件变量过程中的一些疑问,如有错误之处,敬请交流指正。