条件变量

Thursday, November 7, 2019

条件变量

很多情况下,我们不仅仅需要锁,有时候线程还需要检查某一条件变量满足后,才开始运行。

比如父进程检查子进程是否执行完毕。

当然我们可以通过一个共享的变量,然后父进程通过自旋等待变量被修改,再执行下去的方案来实现,但是这样是效率低下的。

定义和程序

线程可以使用条件变量来等待条件变真,条件变量是一个显示的队列,当条件不满足时,线程将自己加入队列,另一个线程当它改变了上述状态后,就可以去队列里唤醒匹配的等待线程(通过在该条件发信号)

void thr_exit() {
    lock(&m);
    done = 1;
    signal(&c);
    unlock(&m);
}

void thr_join() {
    lock(&m);
    while(done == 0)
       wait(&c, &m);
    unlock(&m);
}

void *child(void *arg) {
    printf("child\n");
    thr_exit();
    return NULL;
}
int main(int argc, char *argv[]) {
    printf("parent: begin\n");
    pthread_t p;
    Pthread_create(&p, NULL, child, NULL);
    thr_join();
    printf("parent: end\n");
    return 0;
}

上面的伪代码有几点是要说明的,wait 是睡眠,signal 是唤醒。并且exit 和 join 函数的上锁和状态变量都是必要的,缺少了这个会在一些情况下出现问题。

生产者/消费者问题

生产者/消费者有一个共享的有界缓冲区,让生产者放入数据,消费者取出数据。

我们先简单描述一种方案:


void *producer(void *arg) {
    int i;
    for(i = 0; i < loops; i++) {
       Pthread_mutex_lock(&mutex);
       if (count == 1)
           Pthread_mutex_wait(&cond, &mutex);
           put(i);
           Pthread_mutex_signal(&cond);
           Pthread_mutex_unlock(&mutex);
    }
}


void *consumer(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
        Pthread_mutex_lock(&mutex);
        if (count == 0)
            Pthread_mutex_wait(&cond, &mutex);
        int tmp = get();
        Pthread_mutex_signal(&cond);
        Pthread_mutex_unlock(&mutex);
        printf("%d\n", tmp);
    }

这就是一个简单的生产者消费者示例,用了锁和条件变量。这个方案是在一个生产者一个消费者的情况下是没问题的,但是如果有多个消费者或者生产者,则会有问题了。

问题就在于,会被抢夺消费。比如一个生产者两个消费者的情况下,消费者1等待生产者生产完毕后,被消费者2消费了该数据,这时消费者1的数据就没了。

OS30.1

使用 wherer 代替 if(但仍有问题)

我们把上述代码的 判断 count 的部分,改成 while 的方式。

这样的话,消费者1被唤醒后,会立刻重新判断 count 是否为1,如果不是,则继续睡眠,避免了上述的被唤醒后拿不到数据的问题。

但是这里又有另一个问题还没解决了,就是该唤醒谁的问题。比如消费者1和消费者2一开始都在睡眠状态,生产者1生产好了数据,假定唤醒了消费者1,并且因为缓冲区满了,自己也睡眠。消费者1消费完毕后,会执行signal 方法,那么问题来了,该唤醒谁,目前这个方法,唤醒消费者2和生产者1都是有可能的。如果唤醒了消费者2,就出问题了。

所以,信号量需要有指向性。

解决方案其实也很简单,用两个条件变量,分别对应生产者和消费者,而不是共同用一个。

最终的方案

结合了上述所说的改进,我们再把缓冲区变大,可以放入多个数据,就变成了最终的生产者/消费者方案了。

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

void put(int value) {
    buffler[fill] = value;
    fill = (fill + 1) % MAX;
    count ++;
}


void get() {
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    count--;
    return tmp;
}

cond_t empty, fill;
mutex_t mutex;

void *producer(void *arg) {
    int i;
    for(i = 0; i < loops; i++) {
        Pthread_mutex_lock(&mutex);
        while(count == MAX)
           Pthread_cond_wait(&empty, &mutex);
           put(i);
           Pthread_cond_signal(&fill);
           Pthread_mutex_unlock(&mutex);
    }
}

void *consumer(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
        Pthread_mutex_lock(&mutex);
        while(count == 0)
          Pthread_mutex_wait(&fill, &mutex);
        int tmp = get();
        Pthread_cond_signal(&empty);
        Pthread_mutex_unlock(&mutex);
        printf("%d\n",tmp);
    }
}

OS

信号量

基于锁的并发数据结构