基于锁的并发数据结构

Wednesday, November 6, 2019

并发计数器

并发计数器的一个简单实现就是,在increment,decrement,get 方法里面都加上锁。这样就完成了一个可以并发使用的计数器了。

但是上述实现在多线程的情况下性能非常之低,可能单线程更新100万次只要0.03s,但是双线程并发执行,每个都执行100w 次,时间需要5s 左右。

我们需要的理想状况是,多线程像单线程一样快,达到一种完美扩展的状态。

可扩展的计数器

为了解决上述的性能问题,我们引入了一种称为懒惰计数器的方法。

核心要点就是通过多个局部计数器和一个全局计数器来实现一个逻辑计数器。比如每个 CPU 上面都有一个局部计数器,每个局部计数器有一个锁,全局计数器有一个锁。

通过对应的局部锁来访问对应的局部计数器,不同的 CPU 上的线程不会竞争。所以计数器的更新扩展器扩展性好。

为了保持全局计数器的更新,局部计数器通过一个阈值,定期去通过全局锁更新全局计数器。,然后将局部计数器置零。

所以我们也可以知道,阈值越高,性能越好,但是数据会有延时

并发链表

我们只讨论插入的情况,其他删除之类的也大体类似。

很简单的方案就是,在插入函数的头部和尾部加上lock 和 unlock 函数,并在 malloc 失败的地方加上 unlock。

但是这样的异常控制容易产生问题,异常控制流总是容易出错的地方。我们需要缩小加锁的范围,避开在失败的情况下也要释放锁。

所以我们可以只在真正增加链表节点的代码里加入锁,比如:

lock(&L-lock)
new->next = L->head;
L->head = new;
unlock(&L-lock);

并发队列

有了上面的基础,并发队列的加锁方式也是类似,只在关键地方加锁,并且由于队列的特性,我们可以用两把锁,一个用于队头,一个用于队尾。并且这里有一个技巧,用了一个假节点来维护head 和 tail

void enqueue(queue_t *q, int value) {
    node_t *tmp = malloc(sizeof(node_t));
    assert(tmp != NULL);
    tmp->value = value;
    tmp->next = NULL;
    lock(&q->tailLock);
    q->tail->next = tmp;
    q->tail = tmp;
    unlock(&q->tailLock);
}


void dequeue(queue_t *q, int *value) {
    lock(&q->headLock);
    node_t *tmp = q->haed;
    node_t *newHead = tmp->next;
    if(newHead == NULL) {
        unlock(&q->headLock);
        return -1;
    }
    *value = newHead->value;
    q->head = newHead;
    unlock(&q->headLock)
    free(tmp);
    return -;
}

并发散列表

假设是一个固定大小比较简单的散列表,我们用前面所说的并发链表来实现。

int Hash_Insert(hash_t *H, int key) {
    int bucket = key % BUCKETS;
    return List_Insert(&H->lists[bucket], key);
}

int Hash_Lookup(hash_t *H, int key) {
    int bucket = key % BUCKETS;
    return list_Lookup(&H->lists[bucket], key);
}

OS29.1

图中我们可以看出,并发的散列表扩展性极好,普通链表则不是。

OS

条件变量