Tuesday, November 5, 2019

从前文我们知道,我们在并发编程中,希望能够原子性的去执行一系列指令,但是单纯硬件上很难做到,所以我们需要引入锁的机制,让临界区像单条指令一样执行。

一段简单的锁代码如下所示:

lock_t mutex;
...
lock(&mutex);
a = a + 1;
unlock(&mutex);

我们声明了一个锁变量,用来保存锁在某一时刻的状态。状态要么是可用的,表明没有线程占用锁,或者不可用的。

lock 和 unlock 就是为了尝试获取锁和释放锁的操作。一旦获取了锁,其他线程就无法进入该临界区,除非等当前线程调用了 unlock。然后其他线程的其中一个就会获取到锁,并进入到临界区。

锁是程序员的最小程度的调度控制。

Pthread 锁

POSIX 库将锁称为互斥量,可以给线程之间提供互斥功能。并且lock 和 unlock 函数还可以传入参数,用来区分不同的锁,这样可以增加并发量。用不同的锁来保护不同的数据和结构,从而可以让更多线程进入临界区。

实现一个锁

上面已经讲清了锁的工作原理,那我们如何实现一个锁,这是现在需要讨论的。

实现一个锁,需要考虑三个要点:

  • 提供互斥
  • 公平性(是否会有竞争的线程饿死)
  • 性能

控制中断

最简单的方案就是在临界区里面执行关闭中断的方法,禁止程序切换线程。这是在单处理器上可用的方案。但是这个方案非常多问题。

  • 可能不安全,让恶意程序滥用,一开始就调用 lock,从而独占 CPU。
  • 不支持多处理器
  • 关闭中断导致中断丢失,比如丢失了磁盘设备完成了读取请求,CPU 错失了这个事件
  • 效率低,CPU 开关中断的代码执行的很慢

测试并设置指令(原子交换)

最简单的硬件支持是测试并设置指令,也叫原子交换。 如果我们不依赖硬件,只是用标记位来标记锁的状态来达到锁的效果的话,是会有异常的。

OS28.6

实现可用的锁

通过上面的图片我们可以知道,没有硬件的支持,是无法实现可用的自旋锁的。 在硬件的支持下,提供了测试设置指令,我们来看看测试设置指令做了什么,值得注意的是,这些代码是原子地执行的。

int TestAndSet(int *old_str, int new) {
    int old = *old_ptr;
    *old_ptr = new;
    return old;
}

所以我们的 lock 和 unlock 就可以这样写:

void lock(lock_t *lock) {
    while(TestAndSet(&lock->flag, 1) == 1)
        ; // spin--do nothing
}

void unlock(lock_t *lock) {
    lock->flag = 0;
}

上述的自旋锁,能保证我们需要的锁的三个要点吗? 首先互斥是支持的,但是不保证公平性,因为自旋的线程在竞态下可能永远自旋。有的线程可能会饿死。另外性能上,在单处理器上开销很大,因为在竞争的线程,都会在 CPU 放弃之前,执行自旋的操作。在多CPU 上,自旋锁性能不错。

比较并交换

硬件的另一种指令,比较并交换

int CompareAndSwap(int *ptr, int expected, int new) {
    int actual = *ptr;
    if (actual == expected) {
        *ptr = new
    }
    return actual
}

比较并交换指令比测试并设置指令更强大,在无等待同步会的情况中会体现出来。

链式加载和条件式存储指令(LL/SC)

类似典型的加载指令,都是从内存中取出值放入寄存器中,关健在于条件式存储指令,只有上次加载的地址在期间都没更新才会成功。成功时会更新值,失败会返回0,并不更新值。

获取并增加

它原子地返回特定地址的旧值,并且让该值自增一。

int FetchAndAdd(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old
}

typedef struct lock_t {
    int ticket;
    int turn;
}

void lock_init(lockt_t *lock) {
    lock->ticket = 0;
    lock->turn = 0;
}

void lock(lock_t *lock) {
    int myturn = FetchAndAdd(&lock->ticket);
    while(lock->turn != myturn)
        ;  //spin
}

void unlock(lock_t *lock) {
    FetchAndAdd(lock->turn);
}

不同于上面的方法,这个方法能保证所有线程都能抢到锁。只要有一个线程获得了 ticket 值,最终就会被调度。之前的则不会有保证。

自旋过多怎么办

前面我们讨论到的基于硬件的锁,有个问题就是,等待的线程会一直在自旋,而浪费掉整个时间片。N 个线程去竞争一个锁,则会浪费 N-1个时间片。所以只有硬件支持是不够的,我们还需要操作系统来解决这个问题。

放弃 CPU

最简单的方法就是,在自旋的时候,主动放弃 CPU。这样在简单的情况下,还是非常有效的。 但我们考虑如果100个线程竞争一个锁,一个线程持有了锁,另外99个线程会一直处于运行->让出这个模式,虽然性能表现会比一直自旋好,但上下文切换还是有成本的,所以成本还是很高。

使用队列:休眠代替自旋

前面的方法,或多或少有些问题,取决于调度程序的合理性,如果调度不合理,线程或者一直自旋,或者立刻让出 CPU,都会造成浪费。

所以我们需要做到,决定释放的时候,谁能抢到锁。我们用一个队列,来保存等待锁的线程。

void lock(lock_t *m) {
    while(TestAndSet(&m->guard, 1) == 1)
        ;
    if (m->flag = 0) {
        m->flag = 1;
        m->guard = 0;
    } else {
        queue_add(m->q, gettid());
        setpark(); //防止线程切换的竞态条件,导致线程永远睡死。
        m->guard = 0;
        park(); //休眠
    }
}


void unlock(lock_t *m) {
    while(TestAndSet(&m->guard, 1) == 1)
        ;
    if(queue_empty(m->q)) {
        m->flag = 0;
    }else {
        unpark(queue_remove(m->q)); //唤醒
    }
    m->guard = 0;
}

两阶段锁

先自旋一定次数来等待获取锁,然后再使用上面的队列睡眠方案。

OS

基于锁的并发数据结构

并发:介绍