为了解决上述问题,线程必须使用一个锁,该所仅仅允许同时只能有一个线程访问变量,图11.8展示了这一同步机制,如果想要读取变量,线程B就需要获取锁,类似地,当线程A更新变量的时候,它也会获取到相同的锁,这样线程B就不能读取变量的值了,需要等待线程A释放锁。
我们也需要同步两个或者多个线程同时修改同一变量的问题,考虑我们需要对变量执行自增操作的情况(如图11.9所示),整个自增操作通常会被分成三步完成:- 将内存中的变量读取到寄存器。
- 对寄存器中的数值进行自增处理。
- 将新的数值写会到内存中; 如果两个线程几乎同时对同一变量进行自增操作,并且没有进行同步,结果可能就会出现不一致,在两个线程完成相应的自增操作以后,实际变量的值可能增加了1,也可能增加了2,这与第二个线程相对第一个线程开始操作的时间有关。如果第二个线程在第一个线程执行第三步之前执行操作1,那么第二个线程将会读取到与第一个线程完全一样的初始值,自增以后,然后写会,并没有达到想要的效果。 如果操作是原子的,那么就不会出现竞态条件,在前一个例子中,如果自增仅仅花费一个内存周期就可以完成,那么就不会出现竞态条件。
11.6.1 互斥
我们可以利用pthreads的互斥接口来保护数据,确保同时只能有一个线程可以访问数据,互斥(mutex)是基于锁的,在我们访问共享数据之前需要先设置锁,在完成数据访问之后需要释放锁,当锁被设定的时候,其他尝试去设置锁的线程将进入阻塞,直到当前获取到锁的线程释放掉锁。在解锁的时候,如果有多个线程同时被阻塞,所有阻塞在该锁上的线程都将变成可运行的,其中第一个运行的线程将能够设置锁,其他线程将会看到互斥锁仍然处于锁定状态,然后又愉快地开始了等待锁在一次变成可用的,采取这种方式就只有一个线程能够进行处理。
互斥机制当且仅当我们按照统一的数据访问规则设计线程的情况下才会工作,操作系统并不会为我们设置串行数据访问。如果我们允许一个线程在访问数据之前不需要进行获取锁,那么及时其余的所有线程都在尝试访问数据之前获取锁,不一致性也可能会出现。 互斥变量使用数据类型pthread_mutex_t表示,在我们使用一个互斥变量之前,我们必须对其进行初始化工作,要么将其初始化为常量PTHREAD_MUTEX_INITIALIZER(for statically allocated mutexes only),或者是调用函数pthread_mutex_init对其进行初始化。如果我们动态地进行互斥锁分配(比如说,调用函数malloc),我们我们需要在释放掉内存之前调用函数pthread_mutex_destroy.#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
Both: return 0 if OK,error number on failure.
当设置attr为NULL的时候,函数使用默认属性初始化互斥锁,我们将在12.4节中讨论互斥锁属性。
为了锁定一个互斥锁,我们需要调用函数pthread_mutex_lock,如果锁已经被锁定了,调用线程将会被阻塞,直到互斥锁被接触为止,为了接触一个互斥锁,我们需要调用函数pthread_mutex_unblock.#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
All return: 0 if OK, error number on failure.
如果一个线程不想要被阻塞的话,那么它可以调用函数pthread_mutex_trylock,从而实现条件锁定互斥锁。如果互斥锁在函数pthread_mutex_trylock被调用的时候是未锁定的,那么函数pthread_mutex_trylock将不会阻塞并且锁定互斥锁,最后返回0.否则,函数pthread_mutex_trylock将会失败,返回EBUSY并且并不会锁定互斥锁。
Example
图11.10中使用了互斥锁来保护一个数据结构。当多个线程需要同时访问一个动态 分配的对象的时候,我们可以嵌入一个引用计数器,以确保所有线程完成使用之前不会对其进行释放。
#include <stdlib.h>
#include <pthread.h>
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ...more stuff here... */
};
struct foo *foo_alloc(int id) /*allocate the object */
{
struct foo *fp;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return NULL;
}
/* ...continue initialization... */
}
return fp;
}
void foo_hold(struct foo *fp) /*add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
void release(struct foo *fp) /*release a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0) /*last reference*/
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&fp->f_lock);
}
}
图11.10 使用互斥锁来保护一个数据结构
我们在增加引用计数,减小引用计数,检查引用计数是否为0的时候锁定f_lock.在首次分配空间以及初始化引用计数为1的时候并没有必要获取锁,因为只有分配线程在引用正在分配的数据结构实体。但是如果我们将新分配的数据结构放到一个链表中,那么就可能被其他的线程访问到,所以在这种情况下就需要首先锁定。 在使用对象之前,线程需要通过调用函数foo_hold来增加一次引用,在完成对象使用以后,必须通过函数foo_release函数释放引用,当最后一次引用被释放的时候,对象存储空间被释放。 在这个例子中,我们忽略了线程在调用函数foo_hold之前是如何找到对象的。即使引用计数为0,如果另外一个线程在调用函数foo_hold时候被阻塞在互斥锁上时使用函数foo_release释放对象内存将会是一个错误???.我们可以通过确保在释放其内存之前保证该对象不被发现来避免问题,我们将在后续的例子中看到如何实现这一想法。11.6.2 避免死锁
如果一个线程尝试锁定相同的互斥锁两次的话,该线程就陷入了死锁状态,除此之外,还有一些不那么明显的方法会产生死锁。举例来说,当我们在我们的程序中使用超过一个互斥锁的时候,如果我们允许一个线程锁定一个互斥锁的同时获取第二个互斥锁,同时另一个线程锁定了第二个互斥锁并尝试获取第一个互斥锁,两个线程都将无法运行,因为每一个线程都需要被对方锁定的资源,于是死锁就出现了。
死锁可以通过仔细控制互斥锁获取的顺序来避免,举例来说,假设你有两个互斥锁,A以及B.如果所有线程总是在获取互斥锁B之前获取互斥锁A,那么这两个互斥锁就不会产生死锁问题(但是其他资源仍然可能产生死锁问题)。类似地,如果所有线程都总是在锁定互斥锁A之前锁定互斥锁B,那么也不会出现死锁:仅仅当一个线程尝试以与另一个线程相反的互斥锁获取顺序进行获取的时候才会产生潜在的死锁风险。 有些时候,一些应用程序架构很难保证互斥锁的获取顺序,如果在你写的函数中设计到了很多的锁以及数据结构,并且你不能将他们分解成简单的架构,那么你就必须尝试一些其他方法,在这种情况下,你可以尝试在获取锁失败的情况下释放已经获取到的锁,并稍后在获取一次。你可以使用函数pthread_mutex_trylock接口来避免死锁,如果你已经获取到了一个锁,并且函数pthread_mutex_trylock成功了,那么你就可以继续往后处理。但是如果不能获取到锁,那么你可能需要释放掉你已经获取到的锁,并在一段时间之后再行尝试。Example
在这个例子中,我们更新了图11.10中的程序,使用了两个互斥锁,我们通过保证相同的锁定顺序来确保死锁问题不会出现。其中第二个互斥锁用于锁定一个跟踪foo数据结构的哈希表,hashlock不仅保护了哈希列表,还保护了foo结构中的f_next数据域。锁f_lock保护了foo结构中的其他数据域。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here */
};
struct foo *foo_alloc(int id)
{
struct foo *fp;
int idx;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *foo_find(int id) /* find an exsting object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
{
if(fp->f_id == id)
{
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void foo_rele(struct foo *fp) /*release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if(fp->f_count == 1)
{
/* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
/*need to recheck the condition */
if(fp->f_count != 1)
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if(tfp == fp)
{
fh[idx] = fp->f_next;
}
else
{
while(tfp->f_next != fp)
{
tfp = tfp->f_next;
}
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
图11.11 使用两个互斥锁
比较图11.11与11.10中的程序,可以看出,现在在分配函数中首先锁定hash表,增加一个数据结构到哈希数组中,在解锁hash锁之前,获取新分配结构中的互斥锁,因为将新分配的结构放到了一个全局列表中,因此在初始化结构内容的过程中,需要防止其他线程对新分配的结构进行访问。 函数foo_find首先锁定hash表,然后搜索指定的数据结构,如果找到指定的数据结构,那么就增加一次引用计数,并返回一个结构指针,注意,在顺序上,保证了先获取hash锁在获取f_lock互斥锁的顺序。 使用两个互斥锁以后,函数foo_release更加复杂了,如果是最后一次引用,我们需要首先解锁结构锁,以致于我们可以获取到哈西表结构,因为我们需要从hash表中移除一个指定结构。然后我们再次获取结构锁,在此,需要再次检查是否需要释放掉指定结构,因为在上一次释放掉结构锁的时候可能其他线程被线程锁阻塞,当释放掉锁的时候,其他线程会对其执行操作,如果其他线程对该结构增加了一次引用,那么我们就只是简单地将引用技术减少一次即可。解锁掉两个互斥锁,然后直接返回即可。 可以看出上述锁方法实现比较复杂,所以我们需要再次进行简化设计,考虑使用hash表增加对引用计数的保护。 图11.12反映了这一该改变:#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count; /* protected by hashlock */
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here */
};
struct foo *foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *foo_find(int id) /* find an exsting object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
{
if(fp->f_id == id)
{
fp->f_count++;
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void foo_rele(struct foo *fp) /*release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0)
{
/* last reference ,remove from list*/
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if(tfp == fp)
{
fh[idx] = fp->f_next;
}
else
{
while(tfp->f_next != fp)
{
tfp = tfp->f_next;
}
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&hashlock);
}
}
图11.12 简化的锁示例
注意图11.12中的程序对比图11.11中的程序而言变得更加简单了,围绕hash列表以及引用计数的锁定顺序问题已经没有了,因为我们对于上述两个资源使用了同一个锁。多线程软件设计涉及到了像这样的一些折中问题。如果你使用的锁粒度太大,那么就会由许多线程阻塞到同一个锁上的问题。如果锁的粒度太小,你可能会遇到性能问题,因为过多的锁定过程导致性能下降,并且代码设计会比较复杂。作为一个程序员,你需要在代码复杂性与性能之间找到一个平衡,并且仍然保证你的锁定满足要求。11.6.3 pthread_mutex_timedlock 函数
如下函数允许我们将获取锁的操作与时间进行绑定,函数pthread_mutex_timedlock基本上与函数pthread_mutex_lock相同,但是当其设定的超时时间到达的时候,函数pthread_mutex_timedlock将会返回错误码ETIMEDOUT.并且不会成功获取到互斥锁。
#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
Returns: 0 if OK, error number on failure.
参数timeout指定了该函数的最长等待时间,指定的形式是使用绝对时间,即是说是一直等待直到时间X,而不是等待时长X.timeout使用timespec进行存储,该结构以秒以及毫秒的形式进行时间的存储。
Example
在图11.13中,我们将看到如何使用函数pthread_mutex_timedlock来避免永久性的锁定。
#include "apue.h"
#include <pthread.h>
int main(void)
{
int err;
struct timespec tout;
struct tm *tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
printf("mutex is locked\n");
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s\n", buf);
tout.tv_sec += 10; /* 10 seconds from now */
/*caution: this could lead to deadlock */
err = pthread_mutex_timedlock(&lock, &tout);
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("the time is now %s\n", buf);
if(err == 0)
{
printf("mutex locked again!\n");
}
else
{
printf("can't lock mutex again: %s\n", strerror(err));
}
exit(0);
}
图11.13 函数pthread_mutex_timedlock的使用
如下是程序11.13的输出:$./a.out
mutex is locked
current time is 11:41:58 AM
the time is now 11:42:08 AM
can't lock mutex again:Connection timed out
上述程序故意尝试去锁定一个自身线程已经拥有的互斥锁,以便于阐述函数pthread_mutex_timedlock如何工作的。这种方式在实际应用中并不推荐,因为这可能导致死锁的出现。
注意被被阻塞的时间可能会发生变化,原因由如下几点:- 起始时间位于一秒钟的中间;
- 系统始终解析度不够精确来支持我们设置的timerout时间;
- 调度策略导致延迟时间被延长;
Mac OS X 10.6.8并不支持函数pthread_mutex_timedlock.但是FreeBSD8.0以及Linux 3.2.0 和 Solaris 10支持该函数。但是Solaris仍然将其绑定到了实时库librt中,Solaris 10也提供了另外一个使用相对时间的函数。注意,在最新的gcc中,编译是不再支持这个函数,估计是被放弃了。
11.6.4Reader-Writer Locks
读写锁与互斥锁比较类似,但是读写锁允许更高的并行程度。使用互斥锁,状态要么是锁定的要么是未锁定的,在任意时间,最多只能由一个线程可以锁定他。但是对于读写锁而言,可能存在三种不同的情况:锁定在读模式,锁定在写模式i,未锁定状态。在写模式下,仅仅只能由一个线程可以拥有锁,但是在读模式下,可以由多个线程可以同时拥有一个读写锁。
当一个读写锁处于写锁定状态,那么所有尝试对其进行访问的线程都将被阻塞。当一个读写锁处于读锁定状态时,所有尝试以读模式获取锁的线程都将可以成功获取到锁,但是所有尝试以写模式获取锁的线程都将被阻塞,直到读线程完成访问。虽然各个平台实现存在差异,但是通常来说,如果一个锁处于读锁定模式,并且一个线程尝试以写模式获取锁,那么这个时候其他尝试以读方式获取锁的线程都将被阻塞。这样做的目的是预防出现常量读模式访问的线程流导致写模式访问的线程出现饥饿的情况。 读写锁也适用与如下情况:数据结构以读模式进行访问的次数比被修改的次数多很多。当一个读写锁被保持在写模式的时候,他保护的数据结构可以被安全第写入。当读写锁被锁定在读模式的时候,他保护的数据结构可以被多个线程安全地读取,就像这些线程是第一个获取到读写锁的线程一样。 读写锁又被称为共享互斥锁,当读写锁被锁定在读模式的时候,就称为被锁定在共享读模式。当他处于写锁定的时候,它被锁定在互斥模式。 正如互斥锁一样,读写锁在使用之前必须被初始化,并且在释放它们的存储空间之前需要destroy.#include <pthread.h>
int pthread_rwlock_init(pthread_relock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
Both return: 0 if OK, error number on failure.
读写锁通过调用函数pthread_rwlock_init函数进行初始化,我们可以传递一个空指针给参数attr,如果我们想要设置读写锁为默认属性的话。我们将在12.4.2节中讨论读写锁属性。
The Single Unix Specification标准在XSI选项中定义了常量PTHREAD_RWLOCK_INITIALIZER.该常量在设置未默认属性的情况下可以用于初始化一个静态分配的读写锁。 在释放读写锁的内存之前,我们需要调用函数pthread_rwlock_destroy来对读写锁进行清理工作,pthread_relock_init在初始化读写锁是会分配一些资源,需要使用函数pthread_rwlock_destroy在不再使用的时候对这些资源进行释放。如果不这样做,那么之前分配给锁的资源就将会丢失。 为了将读写锁锁定在读模式,我们需要调用函数pthread_rwlock_rdlock.为了锁定读写锁到写模式,需要调用函数pthread_rwlock_wrlock.无论我们是如何锁定读写锁的,我们都可以调用函数pthread_rwlock_unlock来释放锁。#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
All return: 0 if OK, error number on failure.
实现可能会对读写锁处于共享读模式时候的访问线程数量进行限制,所以我们在调用函数pthread_rwlock_rdlock的时候需要检查返回值。即使是函数pthread_rwlock_wrlock以及pthread_rwlock_unlock都可能产生错误的返回值,逻辑上来来说,对于任何可能出现错误的函数我们都需要对其执行结果进行检查,但是如果我们正确设计和实现了它们,其实可以不用执行检查,唯一出现错误的情况就是我们没有正确地使用它们,比如说使用一个未经初始化的锁,或者是尝试获取一个我们已经拥有的锁,无论如何,需要注意的是:要知道不同的实现可能定义一些其他的错误返回。
The Single Unix Specification也定义了读写锁的尝试执行版本。
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
Both return: 0 if OK, error number on failure.
当这些锁可以被锁定的时候,上述函数会返回0.否则,它们就会返回错误状态EBUSY,这些函数可以用于避免死锁的出现,尤其是锁的层次结构很难分清的情况下尤其有用,正如我们在前面所讨论的那样。
Example
图11.14中的程序阐述了读写锁的使用,一个请求任务队列通过一个读写锁进行保护。
#include <stdlib.h>
#include <pthread.h>
struct job
{
struct job *j_next;
struct job *j_prev;
pthread_t j_id; /*tells which thread handles this job */
/* ... more stuff here ... */
};
struct queue
{
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
/* Initialize a queue */
int queue_init(struct queue *qp)
{
int err;
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if(err != 0)
return err;
/* ...continue initalization ... */
return 0;
}
/* Insert a job at the head of the queue */
void job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_prev = NULL;
jp->j_next = qp->q_head;
qp->q_head = jp;
if(qp == NULL)
{
qp->q_tail = jp;
}
else
{
qp->q_head->j_prev = jp;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/* Append a job on the tail of the queue */
void job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_prev = qp->q_tail;
jp->j_next = NULL;
if(qp->q_tail != NULL)
{
qp->q_tail->j_next = jp;
}
qp->q_tail = jp;
if(qp->q_head == NULL)
{
qp->q_head = jp;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/* remove the given job from a queue */
void job_remove(struct queue *qp, struct job * jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if(jp == qp->q_head)
{
qp->q_head = jp->j_next;
if(qp->q_tail == jp)
{
qp->q_tail = NULL;
}
else
{
jp->j_next->j_prev = NULL;
}
}
else if(jp == qp->q_tail)
{
jp->j_prev->j_next = NULL;
qp->q_tail = jp->j_prev;
}
else
{
jp->j_next->j_prev = jp->j_prev;
jp->j_prev->j_next = jp->j_next;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/* Find a job for the given thread ID */
struct job * job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
if(pthread_rwlock_rdlock(&qp->q_lock) != 0)
return NULL;
jp = qp->q_head;
while((jp != NULL) && (!pthread_equal(jp->j_id, id)))
{
jp = jp->j_next;
}
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
图11.14 使用读写锁
在上述例子中,我们在需要增加一个任务到队列或者从队列中移除一个任务的时候会将读写锁锁定在写模式,在搜索队列的时候,将读写锁锁定在读模式,从而允许所有的工作者线程并行搜索队列,本例中使用读写锁,在线程对队列执行的搜索动作远比增加和移除操作频繁的时候对性能改善明显。 工作者线程仅仅取走线程ID匹配的任务,因为一个任务仅仅被一个线程使用一次,因此并不需要额外的锁进行保护。11.6.5 带有超时功能的读写锁
正如互斥锁一样,The Single Unix Specification 提供了函数来实现读写锁的超时锁定,从而避免在获取读写锁的时候不会出现永久等待的情况,函数生命如下:
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
Both return:0 if OK, error number on failure.
上述函数表现与它们对应的无超时功能的函数比较类似,参数tsptr参数指针指向了一个timespec结构,用于指定线程最多阻塞到什么时间停止,如果函数一直不能获取到指定的读写锁,那么在超时时间到达的时候就会返回错误数值ETIMEDOUT,正如函数pthread_mutex_timedlock一样,超时时间参数指定的是一个绝对时间,而不是一个相对时间。
11.6.6 条件变量
条件变量是另外一个可以用于线程的同步机制,这些同步对象提供了一个位置用于用于线程同步。当与互斥锁一起使用的时候,条件变量允许线程以一种无竞争的方式等待人任意条件的出现。
条件变量本身也是被互斥锁保护的,线程必须要先锁定互斥锁然后才能改变条件变量。其他线程在它们获取到互斥锁之前并不能知道条件变量的改变,因为为了获取条件变量的状态必须先获取到互斥锁。 在条件变量被使用之前,它首先需要初始化,条件变量使用类型pthread_cond_t数据类型进行存储,可以采用如下两种方式进行初始化,我们可以赋值常量PTHREAD_COND_INITIALIZER给静态分配的变量,但是如果条件变量是动态分配的,我们可以使用函数pthread_cond_init进行初始化工作. 在释放掉条件变量内部使用的内存之前,必须首先调用函数pthread_cond_destroy来对其内部使用的变量进行释放。#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_conattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t * cond);
Both return: 0 if OK,error number on failure.
除非需要创建一个非默认属性的条件变量,否则attr参数可以被设置未NULL,我们将在12.4.3中对条件变量属性进行讨论。
我们使用函数pthread_cond_wait函数来等待条件变量为真,该函数的一个变体用于在指定的时间内条件变量不能够得到满足的情况下返回一个错误。
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
Both return: 0 if OK, error number on failure.
传递给函数pthread_cond_wait的互斥锁用于保护条件变量。调用函数会需要传递一个锁定的互斥锁变量给到该函数,该函数会自动将调用线程放置到条件变量的等待线程列表中,并且会自动解除互斥锁的锁定状态。这样做可以保证从条件变量被检查到线程为了等待条件变量满足而进入睡眠状态的时间窗口被关闭,因此线程并不会丢失条件变量的任何改变,当函数pthread_cond_wait返回的时候,互斥锁再一次被锁定。
函数pthread_cond_timedwait提供了与函数pthread_cond_wait函数相似的功能,但是增加了一个额外的参数tsptr,该时间指定了我们最多期望等待的时间长度。 正如我们在图11.13中看到的那样,我们需要指定的时间是一个绝对时间,而不是相对时间,比如说,我们想要等待3分钟,并不是简单地将3分钟传递进入timespec结构,我们需要装换当前时间+3分钟为一个timespec变量传入。 我们可以采用函数clock_gettime来获取当前以结构timespec表达的时间,然而到目前未知,该函数并不被所有平台支持,另外一种方法是,我们可以使用函数gettimeofday来获取当前时间,以类型timeval结构表示,然后将其转换成为timespec结构,为了获取超时时间的绝对时间值,我们可以使用如下所示的函数(假设阻塞时间单位为分钟):#include <sys/time.h>
#incude <stdlib.h>
void maketimeout(struct timespec *tsp, long minutes)
{
struct timeval now;
/*get the current time */
gettimeofday(&now, NULL);
tsp->tv_sec = now.tv_sec;
tsp->tv_nsec = now.tv_usec * 1000; /* usec to nsec */
/* add the offset to get timeout value */
tsp->tv_sec += minutes * 60;
}
如果超时时间到达并且条件变量并没有出现,那么pthread_cond_timedwait函数将会重新获取配置锁并且返回错误值ETIMEOUT.
当从函数pthread_cond_wait或者是pthread_cond_timedwait成功返回的时候,因为其他线程可能已经运行并且修改了环境变量。 有两个函数可以通知线程条件变量已经满足了,函数pthread_cond_signal用于至少唤醒一个等待条件变量的线程,同时函数pthread_cond_broadcast将会唤醒等待一个环境变量的所有线程。POSIX Specification允许函数pthread_cond_signal的实现唤醒多于一个线程,从而使得实现更加简单。
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t * cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
Both return: 0 if OK, error number on failure.
当我们调用函数pthread_cond_signal或者是函数pthread_cond_broadcast的时候,我们称之为发送信号到线程或者是条件变量,我们必须细心保证仅仅在条件状态改变以后才发出信号。
Example
图11.15展示了一个如何使用条件变量和互斥锁来同步线程的例子。
#include <pthread.h>
struct msg
{
struct msg *m_next;
/* ... more stuff here ... */
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg(void)
{
struct msg *mp;
for(; ; )
{
pthread_mutex_lock(&qlock);
while(workq == NULL)
pthread_cond_wait(&qready, &qlock);
mp = workq;
workq = workq->m_next;
pthread_mutex_unlock(&qlock);
/* now pocess the message mp */
}
}
void enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthrad_cond_signal(&qready);
}
图11.15 使用条件变量
上述例子中的条件变量是工作队列的状态,我们使用一个条件变量以及一个互斥锁在一个while循环中查询条件变量的状态。当我们放置一个消息到队列中的时候,我们首先需要锁定互斥锁,当时当我们向等待线程发送信号的时候并不需要锁定互斥锁。前提是在我们调用函数pthread_cond_signal之前从队列中取出消息是可以的,因为我们使用while循环来检查条件,这并不会出现问题,因为即是一个线程被唤醒,但是却发现队列是空的时候,将会再一次进入等待状态。如果代码不能接受这一竞态条件,那么在发送信号到等待线程之前就必须一直保持互斥锁的锁定状态。
11.6.7 自旋锁
自旋锁与互斥锁比较类似,但是自旋锁并不是采用使进程进入睡眠状态的方式来实现阻塞,而是通过忙等待的方式来实现等待,知道可以获取到互次锁为止,自旋锁通常用于锁仅仅会被锁定很短一段时间,并且线程不希望被调度而出现比较大的开销的情况下。
自旋锁通常是实现其他类型的锁的比较低级的实现。更具系统架构,自旋锁可能有效地通过test-and-set指令进行实现。谁然有效,但是会浪费CPU资源,当线程处于自旋状态等待锁可以获取的时候,CPU不能做其他事情,这也就是自旋锁只能被保持一小段时间的原因。 自旋锁对于非抢占式内核是非常有用的:除了提供互斥机制之外,自旋锁也可以阻塞中断,以致于中断处理函数不会因为尝试获取已经锁定的自旋锁而导致死锁情况的出现。在这种类型的内核中,中断处理函数不能进入睡眠,因此可以使用的同步原语只有自旋锁。 然而,在用户层,自旋锁并没有什么用处,除非你正运行在一个不允许抢占的实时调度系统上,运行在分时调度型系统上的用户级线程可能会在其时间片结束或者是更高优先级的进程变得可用的时候被抢占,在这种情况下,如果一个线程持有了一个自旋锁,当该线程进入睡眠状态的时候,其他阻塞在这个锁上的线程将会自旋得比想要的时间更长。 许多互斥锁实现是非常高效的,使用互斥锁的性能几乎与使用自旋锁的性能一致,事实上,许多互斥锁的实现将会首先自旋一段时间,在这段时间内尝试获取互斥锁,仅仅在自旋时间达到阈值以后才会进入睡眠状态。这些因素,外加上现代操作系统越来越高级,切换上下文环境的速度越来越快,从而使得自旋锁仅仅在资源受限的环境下才有用。 对于自旋锁的接口与互斥锁的接口比较类似,因此采用其中一个替换其他的锁是相对容易的,我们可以使用函数pthread_spin_init初始化一个自旋锁,为了释放一个自旋锁,我们可以调用函数pthread_spin_destroy.#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
Both return: 0 if OK, error number on failure.
自旋锁的属性只有一个变量,该变量仅仅在支持Thread Process-Shared Synchronization选项(现在在Single Unix Specification中是作强制要求的)的平台上有意义,参数pshared表示process-shared的属性,对应了自旋锁将被如何获取。如果被设置为PTHREAD_PROCESS_SHARED,那么这个自旋锁就可以被能够访问锁内存的任何线程访问,既是有一些线程源于不同的进程。否则,参数pshared就只能设置为PTHREAD__PROCESS_PRIVATE,并且自旋锁仅仅可以被初始化该自旋锁的进程内的线程访问。
为了锁定自旋锁,我们可以调用函数pthread_spin_lock,该函数将一直处于自旋状态直到锁定成功,或者是调用函数pthread_spin_try_lock,该函数在锁不可用的时候会立即返回一个错误EBUSY,需要注意的是函数pthread_spin_trylock并不会自旋。不管是采用何种方式锁定,自旋锁都可以调用函数pthread_spin_unlock进行解锁。#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
All return: 0 if OK, error number on failure.
当一个自旋锁处于未锁定状态的时候,函数pthread_spin_lock并不需要自旋就可以锁定自旋锁了,如果一个线程已经锁定了一个自旋锁,那么再次调用函数pthread_spin_lock的结果是未定义的,可能会失败并返回错误编号EDEADLK,或者是其他错误,也有可能线程会一直处于自旋状态无法退出。这种情况下程序的行为是与实现相关的,如果我们尝试对一个未锁定的自旋锁进行解锁操作,其结果也是未定义的。
只要调用了函数pthread_spin_lock或者是函数pthread_spin_trylock,一旦调用锁定成功以后,对于接下来的函数调用就需要非常小心了,不能调用可能进入睡眠的函数,如果出现这样的情况的话,我们将极大地浪费了其他想要锁定自旋锁的线程由于自旋增加的CPU资源.11.6.8 Barriers
Barrier是可以用于协调并行工作的多线程的一种同步机制,Barrier可以实现每一个线程都处于等待状态,直到所有协作线程达到某一个指定的点,然后从停下来的位置继续向后运行,我们已经看到过一种形式的barrier了—-函数pthread_join就是一个barrier,可以实现一个线程等待另一个线程退出。
Barrier的概念比pthread_join的概念更加通用,Barrier允许任意数量的线程处于等待状态,知道所有线程完成处理,并且线程并不要求退出,在所有线程到达Barrier以后,所有线程继续往后运行。 我们可以使用函数pthread_barrier_ini来初始化一个barrier变量,我们可以使用函数pthread_barrier_destroy来释放一个barrier变量。#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
Both return: 0 if OK, error number on failure.
在我们初始化barrier的时候,我们使用参数count来指定在线程继续往下运行之前需要到达barrier位置的线程数量,我们可以使用参数attr来指定barrier对象的属性,在下一章中会由更加详细的介绍,就目前来说,我们可以设置attr参数为NULL,即是使用模式属性初始化carrier对象。如果函数pthread_barrier_init函数为barrier分配了所有资源,这些资源将在我们调用函数pthread_barrier_destroy的时候进行释放。
我们使用函数pthread_barrier_wait来表示一个线程已经完成了任务,接下来就是需要等待其他线程完成任务。#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
Returns: 0 or PTHREAD_BARRIER_SERIAL_THREAD if OK, error number on failure.
调用函数pthread_barrier_wait的线程如果barrier计数仍然未满足的话线程将会被切换到睡眠状态,如果线程是最后一个调用函数pthread_barrier_wait,即是说满足了barrier计数,那么所有线程都将被唤醒。
对于一个任意线程,函数pthread_barrier_wait可能将会返回数值PTHREAD_BARRIER_SERIAL_THREAD,其余线程将会全部返回数值0,这与需一个线程继续作为主线程处理所有其他线程完成的工作。 一旦barrier计数达到,并且线程不再处于阻塞状态,那么barrier就可以再一次使用了,无论如何barrier计数器都不能被修改,除非我们使用函数pthread_barrier_destroy之后在使用pthread_barrier_init并传参一个不同的count进行初始化。Example
图11.16 显示了barrier如何让协调控制处理同一个任务的多个线程。
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM / NTHR) /* number of sort per threads */
long nums[NUMNUM];
long snums[NUMNUM];
pthread_barrier_t b;
#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t, int (*)(const void *, const void *));
#endif
/* Compare two long integers (helper function for heapsort) */
int complong(const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;
if(l1 == l2)
return 0;
else if(l1 < l2)
return -1;
else
return 1;
}
/* Worker thread to sort a portion of the set of the set of numbers */
void *thr_fn(void *arg)
{
long idx = (long)arg;
heapsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);
/* Go off and perform more work ... */
return ((void *)0);
}
/* Merge the results of the individual sorted ranges. */
void merge()
{
long idx[NTHR];
long i, minidx, sidx, num;
for(i = 0; i < NTHR; i++)
idx[i] = i * NTHR;
for(sidx = 0; sidx < NUMNUM; sidx++)
{
num = LONG_MAX;
for(i = 0; i < NTHR; i++)
{
if((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num))
{
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
int main()
{
unsigned long i;
struct timeval start,end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;
/* Create the initial set of the number to sort */
srandom(1);
for(i = 0; i < NUMNUM; i++)
{
nums[i] = random();
}
/* Create 8 threads to sort the numbers */
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for(i = 0; i < NTHR; i++)
{
err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
if(err != 0)
err_exit(err, "can't create thread");
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
/* Print the sorted list. */
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double) (endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for(i = 0; i < NUMNUM; i++)
{
printf("%ld\n", snums[i]);
}
exit(0);
}
图11.16 barrier使用 该例子展示了barrier的使用方法,采用的是一种比较简单的情景,在更加复杂的情况下,工作者线程将会在函数pthread_barrier_wait返回之后执行其他动作。 在上述例子中,我们使用了8个线程来完成排序8million数字的任务,每个线程使用对排序算法排序1million个数值,最后主线程调用函数合并结果。 我们并没有使用返回值PTHREAD_BARRIER_SERIAL_THREAD来决定哪一个线程合并结果,我们使用主线程来完成这一任务。 如果我们写一个程序仅仅使用一个线程来排序8百万数字,在同样使用对排序的情况下,我们将会看到性能比上述使用多线程的程序性能明显变差,在一个8核系统上,单线程程序执行时间为12.14秒,在相同的系统上,使用8线程进行排序的话,仅仅使用1.91秒,快了6倍。