(learn&think)

不浮躁,不自傲,学习,思考,总结

浅谈C++ Multithreading Programming

| Comments

Overview

随着多核 CPU 随处可见,多线程(multithreading)可以被用来实现并行,提高 CPU 的利用率和性能显著的提高.掌握多线程编程也成为现代实现软件的基本要求技能之一.Introduction to Parallel Computing详细的介绍了 Parallel Computing; 为什么使用它;Parallel Computing 的分类;Parallel Computing 的 limits 和 costs; Parallel Computing 的程序模型;如何设计 Parallel 程序等.

这里先介绍多线程的概念,多线程中涉及的基本概念,然后用实例介绍 Pthread 库的使用,并介绍 Google Code 中如何把它封装成 C++类,最后介绍可移植并大量使用的 Boost Thread 库.

还有一些其他的 Thread 库:

Thread

定义

A thread is defined as an independent stream of instructions that can be scheduled to run as such by the operating system.所以它是在程序中独立于其他代码可由操作系统调度的一段指令.

那么是操作系统是如何具体实现这一独立性呢?

要理解 thread,必须先明白 process.进程由操作系统创建来运行相应的程序,进程包含程序资源和程序执行状态的信息.以 Linux 的进程为例包含:

  • Process ID, process group ID, user ID, and group ID
  • Environment
  • Working directory
  • Program instructions
  • Registers
  • Stack
  • Heap
  • File descriptors
  • Signal actions
  • Shared libraries
  • Inter-process communication tools (such as message queues, pipes, semaphores, or shared memory).

Thread 使用 Process 的资源,并且能成为独立的元件被操作系统调度,是因为它仅重复那些使得它们能成为独立运行代码的必要资源.Thread 维护它自己如下的信息:

  • Stack pointer
  • Registers
  • Scheduling properties (such as policy or priority)
  • Set of pending and blocked signals
  • Thread specific data.

与 Process 比较,Thread 可以总结如下:

  • Thread 相当于一个 lightweight 的 Process,拥有如 ID,properties 等相似信息, 但仅仅包含能使得它独立运行的信息即可.
  • 信息包含的不同,与需要复制大量信息来创建 Process 比,Thread 的创建比较快捷.
  • 独立的 Processes 不共享任何信息.每个 Process 运行在独立的地址空间.Threads 共享所在 Process 的资源,全局变量和文件描述符.
  • Threads 可以直接与所在同一 Process 的 Threads 通信.而 Processes 必须使用 inter-process communication(IPC)来通信.
  • 因为 Threads 的资源共享性,增加并行操作资源的难度,需要引入 Thread 同步机制来达到资源并行性.

一些术语

Posix Thread 基本模型如下图,一些有关其中 Thread 的术语:

  • Lightweight Process(LWP) 可以被认为虚拟的 CPU,在系统中通常 LWP 的个数大于实际 CPU 个数.Thread 库通过与 LWP 沟通来调度 thread.LWP 也通常被称为 kernel threads.
  • Contention Scope 是如何决定哪个线程得到调度.
  • Bound threads 拥有系统级别的 Contention Scope,也就是,它们与其他进程一同竞争.
  • Unbound threads 拥有进程级别的 Contention Scope.
  • Thread-safe 意味共享数据被得到保护,可以同时被多个 thread 调用而安全.
  • Reentrant code 意味程序可以被多个 thread 并行运行.
  • asynchronous-safe function 在 signal handler 下被安全调用并没有任何副作用.
  • Concurrency vs. Parallelism 并不一样.Parallelism 意味同时运行代码.而 Concurrency 意味许多任务可以以任何顺序执行或可以是并行运行.

Amdahl 法则和 Pareto 原则

Threads 能提供益处 对于相适 的应用.所以 thread 的并行性对于应用来说也有它的限制.

Amdahl 法则

Amdahl 法则 陈述到潜在的程序加速由能被并行的代码率 P 定义为:

$$ \begin{align} speedup = \dfrac{1}{1-P} \end{align} $$
  • 如果代码中没有能并行的部分,P=0,那么 speedup=1.
  • 如果所有代码都能并行,P=1,那么 speedup 能达到无限(理论上).
  • 如果 50%的代码能并行,那么最大的 speedup=2,也就是最多 2 倍的加速.

引入能并行的处理器个数,那么进一步可以定义为:

$$ \begin{align} speedup = \dfrac{1}{\dfrac{P}{N} + (1-P)} 其中 P 并行率,N 处理器个数 \end{align} $$

Pareto 原则

Pareto 原则 陈述到 80%的处理器时间花在 20%的代码中.所以仔细分析代码,不要把时间花在并行/优化那部分不重要的代码.

Thread 设计模式1

在程序中有不同的方法使用线程,这里讨论 3 种线程设计模式,没有哪一种模式最好,每种模式都有相应适合的应用场合.

Boss/worker(Thread pool)

如上图,一个 Boss 线程创建其他 Worker 线程,并给它们分配任务,必要的话,并等待其他线程运行结束.通常 Boss 线程会在初始建立 Thread Pool 来为之后分配.尽管线程是轻量级的,但是创建它们仍是有开销的.

Peer(Workcrew)

Peer 模式又叫做 workcrew 模式,一个 thread 创建其他 peer threads 当程序开始,但是如上图,与 Boss/worker 模式不同,这个 thread 之后也变成 peer thread 去处理自己的任务.

Pipeline

Pipeline 模式假定:

  • 一串连续长输入.
  • 每个输入经过一连串的子操作(熟知为 stages 或 fliers).
  • 每个处理 stage 能一次处理个不同的输入.

如上图, Pipeline 就像流水线一般,每个 thread 是一个长链中的一部分.每个 thread 处理由之前 thread 过的数据.

线程同步原语

如上线程中的定义,线程们共享进程中的全局变量或资源,它们可以并行同时对这些数据和资源操作,如果没有一定的机制协调它们,那么数据或资源将处于一个不安全状态,引起诸如如下的一些问题:

  • Race condition发生于不能决定行为的结果因为线程们操作共享数据或资源没有遵循一定的同步规则.
  • ABA problem发生于一个地方被读取两次,都读到相同的值,’值是相同的’被用来说明’没有东西被改变’.但是,另外一个线程能在这两次读取中间执行操作并修改这个位置的值,然后做一些其他操作,最后把这个值改回去,以致愚弄第一个线程让它认为’没有东西被改变’,即使第二个线程的操作已经破坏了这个假设.

所以我们需要如下的一些线程同步原语满足不同的线程间同步需求.

Mutex

Mutex 又被称为 Lock,所以它就像一把 Lock,一个线程 Lock 住一段资源,那么其他线程就不能去访问那段资源,只有等到第一个线程 Unlock 那么资源,它才能访问.

在 Lock 和 Unlock 之间的代码,一般被称为 critical section.

Mutex 也包含一些复杂的类型,如下:

  • Recursive: 允许占有锁的那一个线程再次获取同样的锁,对递归算法是必要的.
  • Queuing: 使得 公平 的获取锁,通过 FIFO 排序锁的请求.
  • Reader/Writer(rwlock): 允许多个 reader 同时获取锁,如果有 reader 占用锁,writer 只有等到 reader 释放锁.
  • Scoped: RAII 类型定义的锁获取和解锁.

但 Mutex 也会引入其他一些问题,如deadlockpriority inversion.

在 Blog 中之前浅谈 Mutex (Lock)中可以看到更多有关 Mutex 的性能和开销分析,并如何实现一个轻量级的 Mutex.

Join

线程 join 机制能让一个线程 join 到另外一个线程中.比如一个子线程 join 回主线程,那么主线程就会等待子线程运行结束.从而达到线程间等待的同步机制.

Condition Variable

Condition variable 允许线程同步到某个共享资源的某个值.

比如,程序有一个计数器,当计数器达到某一个值时去激活某个线程运行.把计数器当成一个 Condition variable.这个线程可以等待这个 Condition variable,其他 active 线程操作完这个 Condition variable,可以通过 signal/broadcast 去唤醒那些等待这个 Condition variable 睡眠的线程.

Barrier

Barrier 是一种能让一系列线程在某个点得到同步的方法,通过让参与 barrier 的线程等待直到所有参与线程都调用了这个 barrier 函数.本质上就是,阻塞所有参与 barrier 的线程直到最慢的那个参与线程调用 barrier.

Spinlock

Spinlock 与 mutex 类似,是种锁,但当获取锁失败时,spinlock 不会让线程进入睡眠,而是不断 poll 去获取这个锁直到获取成功.更多Mutex 与 Spinlock 的区别.

Semaphore

当某些资源具有多个时,简单的 Mutex 不能满足,引入 Semphore,Semphore 可以根据资源个数初始化为任意值.当线程们占有所有资源,使得 Semphore 为 0,那么其他线程再获取资源只有等待.当 Semphore 值只能是 1 或 0 时,它相当于简单的 Mutex.

Pthread

Overview

原始的 Pthread API 由 ANSI/IEEE POSIX 1003.1 - 1995 standard 定义.POSIX 标准也随着时间不断改进.

接下来主要把 Pthread API 分成如下主要 5 部分:

  1. Thread Management
  2. Mutex Variables
  3. Condition Variables
  4. Synchronization
  5. Miscellaneous

如果想把 Pthread 封装成类对象或 Scoped Lock,可以参考之后 Google wrap the Pthread,或直接使用之后介绍的Boost thread library.

如果更全面的 API 参考文章最后的Pthread Library Routines Reference.更多有关资料参考文章后的其他资料.

编译 Pthread 程序

include

对于 POSIX 系统,包含头文件 pthread.h. 如果使用 semaphore, 包含 semaphore.h.

#include <pthread.h>
#include <semaphore.h>
compile

对于 Gcc 编译器,使用选项 -l,如下:

gcc Program.o -o Program -lpthread

Thread Management

Creating and Terminating Threads

APIs
int pthread_create(pthread_t *thread,
              const pthread_attr_t *attr,
              void *(*start_routine)(void*), void *arg);
void pthread_exit(void *value_ptr);
int pthread_cancel(pthread_t thread);
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
Creating Threads

pthread_create 创建一个新的线程并运行它.它能在代码的任何处被多次调用.

pthread_create 的参数:

  • thread:返回新 thread 程的唯一标识.
  • attr:设置 thread 的性质.NULL 为默认性质.
  • start_routine: 新 thread 运行的函数指针.
  • arg:传给 start_routine 的参数,必须强制转换成 void *.NULL 为没有参数传入.

Process 能创建的最大 thread 个数由系统配置决定.如下 Ubuntu 打印出的结果:

$ limit
cputime         unlimited
filesize        unlimited
datasize        unlimited
stacksize       8MB
coredumpsize    0kB
memoryuse       unlimited
maxproc         62694
descriptors     1024
memorylocked    64kB
addressspace    unlimited
maxfilelocks    unlimited
sigpending      62694
msgqueue        819200
nice            0
rt_priority     0
rt_time         unlimited
Thread Attributes

pthread_attr_initpthread_attr_destroy 被用来初始化/销毁 thread 性质对象.

性质包括:

  • Detached or joinable state
  • Scheduling inheritance
  • Scheduling policy
  • Scheduling parameters
  • Scheduling contention scope
  • Stack size
  • Stack address
  • Stack guard (overflow) size
Thread Binding

Pthread APIs 并没有提供 binding threads 到特定 cpus/cores 的接口.但不同系统可能包含这功能,比如提供非标准的pthread_setaffinity_np 接口.

比如设置两个线程都在 core0 上运行,如下设置:

cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(0, &cpus);
pthread_setaffinity_np(thread[0], sizeof(cpu_set_t), &cpus);
pthread_setaffinity_np(thread[1], sizeof(cpu_set_t), &cpus);
Terminating Threads

一个线程有很多种方法终止:

  • 线程从它的运行中正常放回.它的工作完成.
  • 线程调用 pthread_exit 无论它的工作完成否.
  • 线程被另外一个线程调用 pthread_cancel 来取消.
  • 整个线程终止因为调用 exec()exit().
  • main() 函数先完成,没有调用 pthread_exit.

pthread_exit() 允许指定一个可选的终止 status parameter.这个可选参数一般返回给线程”joining”到这个终止线程.

pthread_exit() 不关闭文件,在线程打开的任何文件将继续打开在线程终止后.

main() 调用 pthread_exit() :

  • 如果 main() 在它创建的 threads 之前终止,并没有显示的调用 pthread_exit(),这将是个问题.所有创建的线程将终止因为 main()结束,不再存在支持这些线程.
  • 通过 main()在最后调用 pthread_exit(), main()将阻塞并保持存活来支持它创建的线程运行直到它们完成.
Example: Pthread Creation and Termination

如果注释掉 main()中最后的 pthread_exit(NULL); ,那么它创建的线程将会完成不了所有的打印而被强制退出.

#include <pthread.h>
#include <cstdio>
#include <cstdlib>

void *ThreadProc(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  for (int i = 0; i < 10; ++i) {
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], NULL, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  pthread_exit(NULL);
}

Joining and Detaching Threads

APIs
int pthread_join(pthread_t thread, void **value_ptr);
int pthread_detach(pthread_t thread);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
int pthread_attr_getdetachstate(const pthread_attr_t *attr,
              int *detachstate);
Joining

Joining 是同步不同线程的方法之一,原理如下图:

  • pthread_join() 阻塞调用它的线程直到指定的 threadid 的线程终止.
  • 调用的线程能获取目标线程终止返回的 status 只要目标线程调用 pthread_exit().
  • 当一个线程被创建,它的属性之一是它是否可以 join.只有创建的能被 join 的线程才能被 join.如果线程线程以 detached 创建,它永远都不能被 join.
  • 显示的创建一个线程可 join 或 detached,使用在 pthread_create() 中的 attr 参数.典型的步骤是:
    1. 定义 pthread_attr_t 类型的 pthread 属性;
    2. pthread_attr_init() 初始化属性变量;
    3. pthread_attr_setdetachstate() 设置 detached 属性;
    4. pthread_attr_destroy() 释放属性使用的资源.
Detaching
  • pthread_detach() 能显示的 detach 一个线程即使它是以可 join 创建.
  • 没有相反的操作.
Example: Pthread Joining
#include <pthread.h>
#include <cstdio>
#include <cstdlib>

void *ThreadProc(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  for (int i = 0; i < 10; ++i) {
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(param);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  pthread_attr_t attr;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], &attr, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  for (int i = 0; i < kNumThreads; ++i) {
    void *status;
    int rt = pthread_join(threads[i], &status);
    if (rt) {
      printf("ERROR: pthread_join failed, rt=%d\n", rt);
      exit(1);
    }
    printf("completed join with thread %d having a status of %d\n"
           , i, *static_cast<int *>(status));
  }
  pthread_exit(NULL);
}

Stack Management

APIs
int pthread_attr_getstacksize(const pthread_attr_t *restrict attr,
              size_t *restrict stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstackaddr(const pthread_attr_t *restrict attr,
              void **restrict stackaddr);
int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr);

每个线程都有各自独立的 stack, pthread_attr_getstackaddrpthread_attr_setstackaddr 分别获取和设置线程的 stack 属性.

Example: Stack Management
#include <pthread.h>
#include <cstdio>
#include <cstdlib>

pthread_attr_t attr;

void *ThreadProc(void *param) {
  int id;
  size_t thread_stack_size;
  id = *(static_cast<int *>(param));
  pthread_attr_getstacksize(&attr, &thread_stack_size);
  printf("thread %d: stack size = %d\n", id, thread_stack_size);
  for (int i = 0; i < 10; ++i) {
    printf("thread %d: run %d \n", id, i);
  }
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  const int kThround = 1000;
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  size_t stack_size;

  pthread_attr_init(&attr);
  pthread_attr_getstacksize(&attr, &stack_size);
  printf("Default stack size = %d\n", stack_size);
  stack_size = sizeof(double) * kThround * kThround;
  printf("Setting stack size = %d\n", stack_size);
  pthread_attr_setstacksize(&attr, stack_size);
  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
    int rt = pthread_create(&threads[i], &attr, ThreadProc,
                            static_cast<void *>(&thread_ids[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  pthread_exit(NULL);
  pthread_attr_destroy(&attr);
  return 0;
}

Miscellaneous

pthread_t pthread_self(void);
int pthread_equal(pthread_t t1, pthread_t t2);
int pthread_once(pthread_once_t *once_control,
              void (*init_routine)(void));
pthread_once_t once_control = PTHREAD_ONCE_INIT;
  • pthread_self 返回调用线程的唯一 thread ID.
  • pthread_equal 比较两个线程 ID 是否相等.
  • pthread_once 只执行 init_routine 仅仅一次在进程中.

Mutex Variables

Overview

Mutex 以”mutual exclusion”(互斥)简称.

Mutex variable 就像一把”锁”一样保护共享数据资源.mutex 的基本概念就是,只有一个线程能 lock 一个 mutex 变量在任何时候.所以,即使很多线程尝试去锁一个 mute,也仅仅只有一个线程能成功.

典型使用 mutex 的顺序如下:

  1. 创建和初始化 mutex 变量;
  2. 许多线程尝试锁住 mutex;
  3. 只有一个线程成功锁住 mutex,其他线程等待;
  4. 拥有 mutex 的线程进行自己的操作;
  5. 拥有线程解锁 mutex;
  6. 其他线程继续获取 mutex 并持续如上步骤;
  7. 最后 mutex 销毁.

Creating and Destroying Mutexes

int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
          const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_init(pthread_mutexattr_t *attr);

Mutex 变量由 pthread_mutex_t 声明定义,而且必须初始化在使用前.两种方法初始:

  1. 静态的,当声明时.如:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  1. 动态的,使用 pthread_mutex_init() 函数,并能设置 mutex 的属性 attr.

attr 用来设置 mutex 变量的属性,必须是 pthread_mutexattr_t 类型.Pthread 标准中定义的 3 种可选 mutex 属性:

  • Protocol: Specifies the protocol used to prevent priority inversions for a mutex.
  • Prioceiling: Specifies the priority ceiling of a mutex.
  • Process-shared: Specifies the process sharing of a mutex.(Pthread mutex 能被 process 间使用).

Locking and Unlocking Mutexes

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_lock() 函数被用来获取传入的 mutex 变量,如果 mutex 已经被其他线程占用,那么这个调用就阻塞调用线程,使它进入睡眠等待这个 mutex 直到它被释放.

pthread_mutex_trylock() 仅尝试获取锁,若不成功也立即返回’busy’信号.

Example: Using Mutexes

#include <pthread.h>
#include <cstdio>
#include <cstdlib>

struct ThreadData {
  int tid;
  int data;
};

int shared_x;
pthread_mutex_t lock;

void *ThreadProc(void *param) {
  ThreadData *data = static_cast<ThreadData *>(param);
  printf("begin from thread id: %d\n", data->tid);
  pthread_mutex_lock(&lock);
  shared_x += data->data;
  printf("thread %d: x = %d\n", data->tid, shared_x);
  pthread_mutex_unlock(&lock);
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  const int kNumThreads = 4;
  pthread_t threads[kNumThreads];
  ThreadData threads_data[kNumThreads];
  pthread_attr_t attr;

  shared_x = 0;
  pthread_mutex_init(&lock, NULL);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i = 0; i < kNumThreads; ++i) {
    threads_data[i].tid = i;
    threads_data[i].data = i * i;
    int rt = pthread_create(&threads[i], &attr, ThreadProc,
                            static_cast<void *>(&threads_data[i]));
    if (rt) {
      printf("ERROR: pthread_create failed, rt=%d\n", rt);
      exit(1);
    }
  }
  for (int i = 0; i < kNumThreads; ++i) {
    void *status;
    pthread_join(threads[i], &status);
  }
  pthread_attr_destroy(&attr);
  pthread_exit(NULL);
  return 0;
}

Condition Variables

Overview

Mutex 变量如锁一般防止多个线程访问共享数据资源,如果某个线程等待某个共享数据达到某个数值才进行相应的操作,那么这个线程需要不断的去 poll,查看是否满足需要的值,这样开销很大,因为线程需要一直处于忙状态.

引入 Condition Variables 来完成这样的同步到某个实际数据值而不要不断 poll.

Condition 变量一般与 mutex 一起使用.锁住查看的共享数据资源.

使用 Condition 的一般步骤如下:

  • 声明和定义需要同步的共享数据;
  • 声明和定义 condition 变量;
  • 声明和定义相对应的 mutex;
  • 创建线程使用 condition 变量同步.

Creating and Destroying Condition Variables

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
                      const pthread_condattr_t *restrict attr);
int pthread_condattr_destroy(pthread_condattr_t *attr);
int pthread_condattr_init(pthread_condattr_t *attr);

Condition 变量由 pthread_cond_t 声明定义,而且必须初始化在使用前.两种方法初始:

  1. 静态的,当声明时.如:
pthread_cond_t convar = PTHREAD_COND_INITIALIZER;
  1. 动态的,使用 pthread_cond_init() 函数,并能设置 condition 的属性 attr.

attr 用来设置 condition 变量的属性,必须是 pthread_condattr_t 类型.只有一种属性可选:是否进程共享,也就是允许其他进程中的线程也能看到它.

Waiting and Signaling on Condition Variables

int pthread_cond_wait(pthread_cond_t *cond,
                      pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_wait() 阻塞调用它的线程直到其中 cond 被 signal.这个函数需要在占有 mutex 时被调用,而它将 自动释放 mutex 当它等待时.等到 signal 收到,线程被唤醒, mutex自动被占有 .最后当线程完成 condition 的操作,要负责对 mutex 解锁.

pthread_cond_signal() 用来 signal 其他等待这个 cond 的线程.它需要在占有 mutex 时被调用.然后必须对 mutex 解锁来完成 pthread_cond_wait 的等待.

如果有多余一个线程处于等待 cond 而阻塞, 应该用 pthread_cond_broadcast() 替换 pthread_cond_signal().

Example: Using Condition Variables

#include <pthread.h>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>

const int kNumThreads = 3;
const int kLoops = 10;
const int kCountLimit = 15;

int g_count;
pthread_mutex_t count_mutex;
pthread_cond_t count_cv;

void *IncreaseCount(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  for (int i = 0; i < kLoops; ++i) {
    pthread_mutex_lock(&count_mutex);
    g_count++;
    if (g_count == kCountLimit) {
      pthread_cond_signal(&count_cv);
      printf("increse thread %d: count = %d, signal cond\n", id, g_count);
    }
    printf("increse thread %d: count = %d, unlock mutex\n", id, g_count);
    pthread_mutex_unlock(&count_mutex);
    sleep(1);
  }
  pthread_exit(NULL);
}

void *WatchCount(void *param) {
  int id;
  id = *(static_cast<int *>(param));
  pthread_mutex_lock(&count_mutex);
  while (g_count < kCountLimit) {
    pthread_cond_wait(&count_cv, &count_mutex);
    printf("watch thread %d: count = %d, receive signal\n", id, g_count);
  }
  pthread_mutex_unlock(&count_mutex);
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  pthread_t threads[kNumThreads];
  int thread_ids[kNumThreads];
  pthread_attr_t attr;

  pthread_mutex_init(&count_mutex, NULL);
  pthread_cond_init(&count_cv, NULL);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i = 0; i < kNumThreads; ++i) {
    thread_ids[i] = i;
  }
  int rt;
  rt = pthread_create(&threads[0], &attr, WatchCount,
                            static_cast<void *>(&thread_ids[0]));
  if (rt) {
    printf("ERROR: pthread_create failed, rt=%d\n", rt);
    exit(1);
  }
  rt = pthread_create(&threads[1], &attr, IncreaseCount,
                            static_cast<void *>(&thread_ids[1]));
  if (rt) {
    printf("ERROR: pthread_create failed, rt=%d\n", rt);
    exit(1);
  }
  rt = pthread_create(&threads[2], &attr, IncreaseCount,
                            static_cast<void *>(&thread_ids[2]));
  if (rt) {
    printf("ERROR: pthread_create failed, rt=%d\n", rt);
    exit(1);
  }
  for (int i = 0; i < kNumThreads; ++i) {
    pthread_join(threads[i], NULL);
  }
  pthread_attr_destroy(&attr);
  pthread_cond_destroy(&count_cv);
  pthread_mutex_destroy(&count_mutex);
  pthread_exit(NULL);
}

Barrier

Overview

Barrier 就是栅栏一样,调用等待 barrier 的线程需要等待直到满足调用 barrier 的线程个数达到要求的 count.

Creating, Destroying and Wait Barrier

int pthread_barrier_init(pthread_barrier_t *barrier,
                const pthread_barrierattr_t *attr, unsigned count);
pthread_barrier_t barrier = PTHREAD_BARRIER_INITIALIZER(count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrierattr_init(pthread_barrierattr_t *attr);
int pthread_barrierattr_destroy(pthread_barrierattr_t *attr);
int pthread_barrier_wait(pthread_barrier_t *barrier);

Barrier 变量由 pthread_barrier_t 声明定义,而且必须初始化在使用前.需要传入满足 barrier 等待的个数 count, 两种方法初始:

  1. 静态的,当声明时.如:
pthread_barrier_t barrier = PTHREAD_BARRIER_INITIALIZER(count);
  1. 动态的,使用 pthread_barrier_init() 函数,并能设置 barrier 的属性 attr.

线程调用 barrier,只需要调用 pthread_barrier_wait 来等待 barrier 达到满足条件.

Google wrap the Pthread

Mutex 类和 CondVar 类

Google api 的 base 包里封装了 Mutex 类和 CondVar 类.

namespace base {
enum LinkerInitialized { LINKER_INITIALIZED };
}

class LOCKABLE PThreadMutex {
 public:
  explicit PThreadMutex(base::LinkerInitialized) {
    pthread_mutex_init(&mutex_, NULL);
  }
  PThreadMutex()   { pthread_mutex_init(&mutex_, NULL); }
  ~PThreadMutex()  { pthread_mutex_destroy(&mutex_); }

  void Lock()     { CHECK_EQ(0, pthread_mutex_lock(&mutex_)); }
  void Unlock()   { CHECK_EQ(0, pthread_mutex_unlock(&mutex_)); }

 private:
  friend class PThreadCondVar;
  pthread_mutex_t mutex_;

  DISALLOW_COPY_AND_ASSIGN(PThreadMutex);
};

class PThreadCondVar {
 public:
  PThreadCondVar()  { pthread_cond_init(&cv_, NULL); }
  ~PThreadCondVar() { CHECK_EQ(0, pthread_cond_destroy(&cv_)); }

  void Signal()        { CHECK_EQ(0, pthread_cond_signal(&cv_)); }
  void SignalAll()     { CHECK_EQ(0, pthread_cond_broadcast(&cv_)); }
  void Wait(PThreadMutex* mu) {
    CHECK_EQ(0, pthread_cond_wait(&cv_, &mu->mutex_));
  }
  bool WaitWithTimeout(PThreadMutex* mu, int64 millis) {
    struct timeval tv;
    struct timespec ts;
    gettimeofday(&tv, NULL);
    ts.tv_sec = tv.tv_sec + millis / 1000;
    ts.tv_nsec = millis % 1000;
    int result = pthread_cond_timedwait(&cv_, &mu->mutex_, &ts);
    if (!result) return true;

    CHECK_EQ(ETIMEDOUT, result);
    return false;
  }

 private:
  pthread_cond_t cv_;
  DISALLOW_COPY_AND_ASSIGN(PThreadCondVar);
};

typedef PThreadCondVar CondVar;
typedef PThreadMutex Mutex;

GoogleOnceInit 类

typedef pthread_once_t GoogleOnceType;
#define GOOGLE_ONCE_INIT PTHREAD_ONCE_INIT

inline void GoogleOnceInit(GoogleOnceType* once, void (*initializer)()) {
  CHECK_EQ(0, pthread_once(once, initializer));
}

Monitoring, Debugging and Performance Analysis Tools for Pthreads

Monitoring

Linux ps command

使用 Linux 自带的 ps 命令查看运行的 thread 情况,ps 的 man 手册.

➜$ ps -Lf
UID        PID  PPID   LWP  C NLWP STIME TTY          TIME CMD
shougang 13103  8814 13103  0    1 23:30 pts/17   00:00:00 /bin/zsh
shougang 13237 13103 13237  0    6 23:30 pts/17   00:00:00 [thread]
shougang 13237 13103 13240  0    6 23:30 pts/17   00:00:00 [thread]
shougang 13237 13103 13241  0    6 23:30 pts/17   00:00:00 [thread]
shougang 13237 13103 13242  0    6 23:30 pts/17   00:00:00 [thread]
shougang 13237 13103 13243  0    6 23:30 pts/17   00:00:00 [thread]
shougang 13237 13103 13244  0    6 23:30 pts/17   00:00:00 [thread]

Linux top command

Linux 的 top 命令加上 -H 参数, process 中的 threads 也能看到. 如下是 top -H 的一个例子:

Performance Analysis Tools2

Boost thread library

Overview

直到 C++11 库才比较好的支持 thread,之前 C++程序使用操作系统支持的 thread 库(如 Pthread).但这样做至少有个主要的问题:(1) 这些库基本是 C 的库,需要很小心的 C++中使用,和(2) 每个操作系统提供自己的一套对 thread 支持的库.以致,编写的代码既不标准又不可移植.

Boost Thread可以解决这两个主要问题. Boost Thread 不是通过继承来使用线程,而是 Boost 的 thread 类使用一个 Callable 的对象创建.

编译 Boost Thread 程序

include

根据使用到的 Boost Thread 中的类型包含不同头文件:

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/locks.hpp> 
#include <boost/thread/once.hpp>
compile

对于 Gcc 编译器,使用选项 -l,如下:

g++ Program.o -o Program -lboost_thread -lboost_system

Thread Management

Thread 类

  #include <boost/thread/thread.hpp>
  class thread
  {
  public:
      thread() noexcept;
      thread(const thread&) = delete;
      thread& operator=(const thread&) = delete;

      thread(thread&&) noexcept;
      thread& operator=(thread&&) noexcept;
      ~thread();

      template <class F>
      explicit thread(F f);
      template <class F>
      thread(F &&f);

      template <class F,class A1,class A2,...>
      thread(F f,A1 a1,A2 a2,...);
      template <class F, class ...Args>
      explicit thread(F&& f, Args&&... args);

      template <class F>
      explicit thread(attributes& attrs, F f); // EXTENSION
      template <class F>
      thread(attributes& attrs, F &&f); // EXTENSION
      template <class F, class ...Args>
      explicit thread(attributes& attrs, F&& f, Args&&... args);
      class id;
      id get_id() const noexcept;

      bool joinable() const noexcept;
      void join();
...
};

整个 thread 类包含 thread 的所有特性,如 thread id, join, detach 等.

Create Thread

Callable 对象既可以是一个函数又可以是类中的 operator() 实现,如下:

void hello() {
  cout << "hello world" << endl;
}

struct Hello {
 public:
  void operator() () {
    cout << "hello world" << endl;
  }
};

Hello h;
boost::thread thread_hello(h);
boost::thread thread_hello(hello);
Passing data

传递参数给线程

  1. thread 创建时附加后面
void hello(const string &str) {
  cout << str << endl;
}
string str = "hello";
boost::thread thrd(hello, str);
  1. 利用 Boost.bind 库接口
void hello(const string &str) {
  cout << str << endl;
}
string str = "hello";
string str = "hello";
boost::thread thrd(bind(hello, str));

Thread group 类

可以使用 thread group 类管理 thread,通过 add_threadcreate_thread 添加线程到管理类中, 可以直接 join_all 将所有管理类中的线程 join.

#include <boost/thread/thread.hpp>

class thread_group
{
public:
    thread_group(const thread_group&) = delete;
    thread_group& operator=(const thread_group&) = delete;

    thread_group();
    ~thread_group();

    template<typename F>
    thread* create_thread(F threadfunc);
    void add_thread(thread* thrd);
    void remove_thread(thread* thrd);
    bool is_this_thread_in();
    bool is_thread_in(thread* thrd);
    void join_all();
    void interrupt_all();
    int size() const;
};

Miscellaneous

Boost Thread 中还有 strict_scoped_thread 类和 scoped_thread 类,提供线程结束不是调用 terminate ,而是调用传入的参数来执行特定行为.

Example: Thread Creation

#include <iostream>
#include <string>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

void hello(const string &str) {
  cout << str << endl;
}

int main(){
  string str = "hello";
  boost::thread thrd(bind(hello, str));
  thrd.join();
  return 0;
}

Mutex Variables

#include <boost/thread/mutex.hpp>

class mutex:
    boost::noncopyable
{
public:
    mutex();
    ~mutex();

    void lock();
    bool try_lock();
    void unlock();

    typedef platform-specific-type native_handle_type;
    native_handle_type native_handle();

    typedef unique_lock<mutex> scoped_lock;
    typedef unspecified-type scoped_try_lock;
};
  • lock() 来获取锁.
  • unlock() 释放锁.
  • typedef unique_lock<mutex> scoped_lock; 定义了 scoped_lock 的类型,通过 boost::mutex::scoped_lock 来定义一个 RAII-style 锁,离开定义区域自动释放锁.

lock_guard

boost::lock_guard 非常简单:

  • 构造时,它获取锁.
  • 析构时,它释放锁.

它提供了一个简单的 RAII-style 锁对象,使得 exception-safe 锁和解锁更容易.

namespace boost
{
  template<typename Lockable>
  class lock_guard
#if ! defined BOOST_THREAD_NO_MAKE_LOCK_GUARD
  template <typename Lockable>
  lock_guard<Lockable> make_lock_guard(Lockable& mtx); // EXTENSION
  template <typename Lockable>
  lock_guard<Lockable> make_lock_guard(Lockable& mtx, adopt_lock_t); // EXTENSION
#endif
}

基本使用,传入可 Lockable 的 mutex 类型:

boost::mutex count_mutex;
boost::lock_guard<mutex> lock(count_mutex) ;

Example: Using Mutexes

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>

using std::cout;
using std::endl;
boost::mutex count_mutex;

struct count {
  count(int id) : id_(id) {
  }
  void operator() () {
    for (int i = 0; i < 10; ++i) {
      boost::mutex::scoped_lock lock(count_mutex);
      cout << id_ << ": " << i << endl;
    }
  }
  int id_;
};

int main(int argc, char *argv[]) {
  boost::thread thread1(count(1));
  boost::thread thread2(count(2));
  thread1.join();
  thread2.join();
  return 0;
}

Condition Variables

与 Pthread, Boost Condition Variable 功能更全面,如不同条件的 wait_until , wait_for 等功能.

namespace boost
{
    class condition_variable
    {
    public:
        condition_variable();
        ~condition_variable();

        void notify_one() noexcept;
        void notify_all() noexcept;

        void wait(boost::unique_lock<boost::mutex>& lock);

        template<typename predicate_type>
        void wait(boost::unique_lock<boost::mutex>& lock,predicate_type predicate);

        template <class Clock, class Duration>
        typename cv_status::type
        wait_until(
            unique_lock<mutex>& lock,
            const chrono::time_point<Clock, Duration>& t);
...
};

Example: Using Condition Variables

利用 Condition Variables 实现一个简单的 read/writer Buffer.

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <iostream>
using std::cout;
using std::endl;
const int kBufSize = 10;
const int kIters = 100;

boost::mutex io_mutex;

class Buffer {
 public:
  typedef boost::mutex::scoped_lock scoped_lock;
  Buffer() : p(0), c(0), full(0) {
  }

  void Put(int m) {
    scoped_lock lock(mutex);
    if (full == kBufSize) {
      {
        scoped_lock lock(io_mutex);
        cout << "Buffer is full." << endl;
      }
      while (full == kBufSize) {
        cond.wait(lock);
      }
    }
    buf[p] = m;
    p = (p + 1) % kBufSize;
    ++full;
    cond.notify_one();
  }

  int Get() {
    scoped_lock lock(mutex);
    if (full == 0) {
      {
        scoped_lock lock(io_mutex);
        cout << "Buffer is empty." << endl;
      }
      while (full == 0) {
        cond.wait(lock);
      }
    }
    int i = buf[c];
    c = (c + 1) % kBufSize;
    --full;
    cond.notify_one();
    return i;
  }
 private:
  boost::mutex mutex;
  boost::condition cond;
  unsigned int p, c, full;
  int buf[kBufSize];
};

Buffer buf;

void writer() {
  for (int i = 0; i < kIters; ++i) {
    {
      boost::mutex::scoped_lock lock(io_mutex);
      cout << "sending: " << i << endl;
    }
    buf.Put(i);
  }
}

void reader() {
  for (int i = 0; i < kIters; ++i) {
    int n = buf.Get();
    {
      boost::mutex::scoped_lock lock(io_mutex);
      cout << "received: " << n << endl;
    }
  }
}

int main(int argc, char *argv[]) {
  boost::thread thread_reader(&reader);
  boost::thread thread_writer(&writer);
  thread_reader.join();
  thread_writer.join();
  return 0;
}

Miscellaneous

Barrier

基本使用:

  1. 定义 barrier,传入参与 thread 个数: barrier b(num_threads).
  2. thread 中等待 barrier: b.wait().
class barrier
{
public:
    barrier(barrier const&) = delete;
    barrier& operator=(barrier const&) = delete;

    barrier(unsigned int count);
    template <typename F>
    barrier(unsigned int count, F&&);

    ~barrier();

    bool wait();
    void count_down_and_wait();
};

Once Routines

#include <boost/thread/once.hpp>

namespace boost
{
  struct once_flag;
  template<typename Function, class ...ArgTypes>
  inline void call_once(once_flag& flag, Function&& f, ArgTypes&&... args);

#if defined BOOST_THREAD_PROVIDES_DEPRECATED_FEATURES_SINCE_V3_0_0
  void call_once(void (*func)(),once_flag& flag);
#endif

}
Example: Call once
#include <boost/thread/thread.hpp>
#include <boost/thread/once.hpp>
#include <iostream>

int i = 0;
boost::once_flag flag = BOOST_ONCE_INIT;

void init()
{
  ++i;
}

void thread()
{
  boost::call_once(&init, flag);
}

int main(int argc, char* argv[])
{
  boost::thread thrd1(&thread);
  boost::thread thrd2(&thread);
  thrd1.join();
  thrd2.join();
  std::cout << i << std::endl;
  return 0;
}

其他资料

Books:

Pthread Library Routines Reference

pthread_atfork
pthread_attr_destroy
pthread_attr_getdetachstate
pthread_attr_getguardsize
pthread_attr_getinheritsched
pthread_attr_getschedparam
pthread_attr_getschedpolicy
pthread_attr_getscope
pthread_attr_getstack
pthread_attr_getstackaddr
pthread_attr_getstacksize
pthread_attr_init
pthread_attr_setdetachstate
pthread_attr_setguardsize
pthread_attr_setinheritsched
pthread_attr_setschedparam
pthread_attr_setschedpolicy
pthread_attr_setscope
pthread_attr_setstack
pthread_attr_setstackaddr
pthread_attr_setstacksize
pthread_barrier_destroy
pthread_barrier_init
pthread_barrier_wait
pthread_barrierattr_destroy
pthread_barrierattr_getpshared
pthread_barrierattr_init
pthread_barrierattr_setpshared
pthread_cancel
pthread_cleanup_pop
pthread_cleanup_push
pthread_cond_broadcast
pthread_cond_destroy
pthread_cond_init
pthread_cond_signal
pthread_cond_timedwait
pthread_cond_wait
pthread_condattr_destroy
pthread_condattr_getclock
pthread_condattr_getpshared
pthread_condattr_init
pthread_condattr_setclock
pthread_condattr_setpshared
pthread_create
pthread_detach
pthread_equal
pthread_exit
pthread_getconcurrency
pthread_getcpuclockid
pthread_getschedparam
pthread_getspecific
pthread_join
pthread_key_create
pthread_key_delete
pthread_kill
pthread_mutex_destroy
pthread_mutex_getprioceiling
pthread_mutex_init
pthread_mutex_lock
pthread_mutex_setprioceiling
pthread_mutex_timedlock
pthread_mutex_trylock
pthread_mutex_unlock
pthread_mutexattr_destroy
pthread_mutexattr_getprioceiling
pthread_mutexattr_getprotocol
pthread_mutexattr_getpshared
pthread_mutexattr_gettype
pthread_mutexattr_init
pthread_mutexattr_setprioceiling
pthread_mutexattr_setprotocol
pthread_mutexattr_setpshared
pthread_mutexattr_settype
pthread_once
pthread_rwlock_destroy
pthread_rwlock_init
pthread_rwlock_rdlock
pthread_rwlock_timedrdlock
pthread_rwlock_timedwrlock
pthread_rwlock_tryrdlock
pthread_rwlock_trywrlock
pthread_rwlock_unlock
pthread_rwlock_wrlock
pthread_rwlockattr_destroy
pthread_rwlockattr_getpshared
pthread_rwlockattr_init
pthread_rwlockattr_setpshared
pthread_self
pthread_setcancelstate
pthread_setcanceltype
pthread_setconcurrency
pthread_setschedparam
pthread_setschedprio
pthread_setspecific
pthread_sigmask
pthread_spin_destroy
pthread_spin_init
pthread_spin_lock
pthread_spin_trylock
pthread_spin_unlock
pthread_testcancel

Comments