3.同步

我们已经了解如何通过 “不可优化、保证顺序” 的原子指令实现自旋锁,以及借助操作系统 (系统调用) 实现线程的睡眠的互斥锁。

  • 自旋锁,想保护临界区,创建一把唯一的钥匙,用硬件帮助的原子的指令交换,只有得到钥匙的人才能进入临界区。但是,自旋锁的问题是,有线程会空转,浪费资源。

  • 互斥锁,锁的代码移到内核里,操作系统来管理,把空转换成睡眠。

然而,互斥并不总是能满足多个并发线程协作完成任务的需求,如何能便捷地让共享内存的线程协作以共同完成计算任务?这部分内容,如何真正的写一个并发程序。如何在多处理器上协同多个线程完成真正的任务。

  • 生产者-消费者问题

  • 条件变量

同步问题

什么是同步?

两个或以上的随时间变化的量在变化过程中保持一定的相对关系(太抽象)。

  • 云端同步(手机;电脑;云端)

  • 变速箱同步

  • 同步电机(转子与瓷砖转速一致)

  • 同步电路(所有触发器边沿同步触发)

相应地,有异步。

并发程序的步调很难保持“完全一致”。线程同步:在某个时间点共同达到互相已知的状态。即:共同约定先到的人要等,人到齐了一起前进

  • “先到先等”,在条件达成的瞬间再次恢复并行

  • 同时开始出去玩\吃饭\讨论

哈工大操作系统:

计算机里的线程就是走走停停,走的快的停下来等慢的,共同向前推进。

走的快的停下来等这个行为就是同步。

一个典型的就是线程的 join() ,主线程等待线程执行完成,在向前走,这就完成了一个线程的同步。

典型同步问题

最经典的同步问题,生产者-消费者问题。99% 的实际并发问题都可以套用生产者-消费者模型。

再简化一下这个问题,我们有两种类型的线程,一种生产者,打印 (,一种消费者,打印 ),输出的括号序列要满足条件:

  • 合法

  • 嵌套深度不超过 n

void Tproduce() { while (1) printf("("); }
void Tconsume() { while (1) printf(")"); }

即一个线程打印 ( ,另一个线程打印 ),要让打印出来的序列合法,并且深度确定。

抽象出来,( 生产者,) 为消费者。我们需要在 printf 前后加一些代码,让线程满足条件,即考虑同步问题:

  • 有空位(嵌套深度<n)的时候才打印左括号

  • 可以配对的时候打印右括号

比如,((()))((( 是合法的,()()(()()) 也没问题。如何判断序列是否正确呢?也很容易,用栈来处理。

计算图、调度器和生产者-消费者问题

为什么叫 “生产者-消费者” 而不是 “括号问题”?

  • 左括号:生产资源 (任务)、放入队列

  • 右括号:从队列取出资源 (任务) 执行

如何判断括号序列合不合法?用栈来判断,左括号用push,右括号pop,所以合法的意思是看起来是一个合法的push和pop指令,未必是栈,可以是一个资源的池子,或者任务的池子。只要一个任务的计算时间足够长,放球拿球的时间忽略不计,我们总是可以用这种方式实现并行计算。

生产者-消费者问题的互斥实现

用互斥锁保护 printf ,如果条件成立,带着锁打印,如果条件不成立,释放锁等待条件满足。

#include "thread.h"
#include "thread-sync.h"

int n, count = 0;
mutex_t lk = MUTEX_INIT();

#define CAN_PRODUCE (count < n)
#define CAN_CONSUME (count > 0)

void Tproduce() {
  while (1) {
retry:
    mutex_lock(&lk);
    if (!CAN_PRODUCE) {
      mutex_unlock(&lk);
      goto retry;
    }
    count++;
    printf("(");  // Push an element into buffer
    mutex_unlock(&lk);
  }
}

void Tconsume() {
  while (1) {
retry:
    mutex_lock(&lk);
    if (!CAN_CONSUME) {
      mutex_unlock(&lk);
      goto retry;
    }
    count--;
    printf(")");  // Pop an element from buffer
    mutex_unlock(&lk);
  }
}

int main(int argc, char *argv[]) {
  assert(argc == 2);
  n = atoi(argv[1]);
  setbuf(stdout, NULL);
  for (int i = 0; i < 8; i++) {
    create(Tproduce);
    create(Tconsume);
  }
}

虽然 goto 是 harmful ,但是 linux 内核里还是有很多这样的写法,而且这种用法很有效,可读性也不错。

如何验证自己写出来的程序是正确的?这时候如果会一个脚本语言就好了。就算使用C++写,也是要比脚本语言复杂一点的。

import sys

BATCH_SIZE = 100000

limit = int(sys.argv[1])
count, checked = 0, 0

while True:
    for ch in sys.stdin.read(BATCH_SIZE):
        match ch:
            case '(': count += 1
            case ')': count -= 1
            case _: assert 0
        assert 0 <= count <= limit
    checked += BATCH_SIZE
    print(f'{checked} Ok.')

注意线程那边创建了16个,8个生产者,8个消费者在同时跑。

条件变量:万能同步方法

上面这个方法是正确的,但是还是有 spin 的过程。上面的代码有个缺陷:不管空闲位置是空是满,每个线程都要拿过来看看。互斥锁本来就是独占的,一个上锁,别人就不能拿了。虽然锁争抢的时候没有自旋,但是还是效率比较低。

解决自旋锁的问题,我们让他睡眠,所以同步问题也可以类似的使用睡眠来解决。

这又回到了什么是同步的问题上。我们等左右括号的时候,我们是等待条件成立。

  • 线程 join

    • Tmain 同步条件:nexit == T

    • Tmain 达成同步:最后一个线程退出 nexit++

  • 生产者/消费者问题

    • Tproduce 同步条件:CAN_PRODUCE (count < n)

    • Tproduce 达成同步:Tconsume count--

    • Tconsume 同步条件:CAN_CONSUME (count > 0)

    • Tconsume 达成同步:Tproduce count++

实际上,sum.c里面就有同步。那个 join() 就是同步。等待一个线程结束。生产者-消费者里也有个等待的条件。第一个等待的条件是 count 的数值。

我们想做的是把自旋换成sleep。即 mutex_unlonk 换成 mutex_unlock_and_sleep

理想的 API

wait_until(CAN_PRODUCE) {
  count++;
  printf("(");
}

wait_until(CAN_CONSUME) {
  count--;
  printf(")");
}

但是很许多困难。

  • 正确性

    • 大括号内代码执行时,其他线程不得破坏等待的条件

  • 性能

    • 不能 spin check 条件达成

    • 已经在等待的线程怎么知道条件被满足?

这样的实现很困难,可读性很好,期待未来可以,现在有一套折衷的 API,也即理想与现实之间的折衷:条件变量,实现了手工唤醒。条件变量的 API ,

  • wait(cv,mutex)

    • 调用时必须保证已经持有锁

    • wait 释放锁,进入睡眠状态,此过程是原子的

    • 被唤醒后需要重新执行 lock(mutex)

  • signal/notify(cv)

    • 如果有线程正在等待 cv,则唤醒其中一个线程

  • roadcast/notifyAll(cv)

    • 唤醒全部正在等待 cv 的线程

一个错误的实现

用条件变量实现生产者消费者问题,16个线程。会发现出问题了。

调试思路:隔离出 bug 出发的最小条件,一个生产者,两个消费者。

发生了同类唤醒。消费者唤醒了消费者。正确的解决方法是,用两个条件变量。

以及 assert 的意义

正确的实现,万能同步

#include "thread.h"
#include "thread-sync.h"

int n, count = 0;
mutex_t lk = MUTEX_INIT();
cond_t cv = COND_INIT();
 
#define CAN_PRODUCE (count < n)
#define CAN_CONSUME (count > 0)

void Tproduce() {
  while (1) {
    mutex_lock(&lk);
    while (!CAN_PRODUCE) {
      cond_wait(&cv, &lk);
    }
    assert(CAN_PRODUCE);
    printf("("); count++;
    cond_broadcast(&cv);
    mutex_unlock(&lk);
  }
}

void Tconsume() {
  while (1) {
    mutex_lock(&lk);
    while (!CAN_CONSUME) {
      cond_wait(&cv, &lk);
    }
    assert(CAN_CONSUME);
    printf(")"); count--;
    cond_broadcast(&cv);
    mutex_unlock(&lk);
  }
}


int main(int argc, char *argv[]) {
  assert(argc == 3);
  n = atoi(argv[1]);
  int T = atoi(argv[2]);
  setbuf(stdout, NULL);
  for (int i = 0; i < T; i++) {
    create(Tproduce);
    create(Tconsume);
  }
}

万能并行计算框架

struct work {
  void (*run)(void *arg);
  void *arg;
}

void Tworker() {
  while (1) {
    struct work *work;
    wait_until(has_new_work() || all_done) {
      work = get_work();
    }
    if (!work) break;
    else {
      work->run(work->arg); // 允许生成新的 work (注意互斥)
      release(work);  // 注意回收 work 分配的资源
    }
  }
}

来解决一些好玩的问题,leetcode 上的习题。

(...)

最后更新于