并发编程 - CS:APP 第十二章

并发编程 - CS:APP 第十二章

本章主要内容:

  • 实现并发程序的三种方法:
    1. fork() 进程
    2. I/O 多路复用
    3. 使用线程
  • 使用信号量同步线程
  • 线程安全问题

使用进程进行并发编程

fork() & execve

优点:进程模型清晰,有独立的地址空间

缺点:不方便进程之间共享信息

基于进程的并发echo服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <csapp.h>

void echo(int connfd)
{
size_t n;
char buf[MAXLINE];
rio_t rio;

Rio_readinitb(&rio, connfd);
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
printf("server received %d bytes\n", (int)n);
Rio_writen(connfd, buf, n);
}
}

void sigchld_handler(int sig)
{
while (waitpid(-1, 0, WNOHANG) > 0);
return ;
}

int main(int argc, char **argv)
{
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;

if (argc != 2) {
fprintf(stderr, "args error");
exit(0);
}

Signal(SIGCHLD, sigchld_handler);
listenfd = Open_listenfd(argv[1]);
while (1) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA*)&clientaddr, &clientlen);
if (Fork() == 0) {
printf("process:%d connected\n", getpid());
Close(listenfd);
echo(connfd);
printf("process:%d close connection\n", getpid());
Close(connfd);
exit(0);
}
Close(connfd);
}
}

IO多路复用

可以使用select() 函数显示等待一个进程有一个IO事件发生。

例如我们有一个监听描述符和很多链接描述符,如果有一个描述符准备好读,我们就相应它,这样也可以实现并发。

只要有一个IO事件发生,程序的逻辑流就会改变。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <sys/select.h>

/*
如果fdset中的描述符准备好读或者写,就返回
返回值:返回已准备好的描述符数目,若出错返回-1
*/
int select(int n, fd_set *fdset, NULL, NULL, NULL);

FD_ZERO(fd_set *fdset);
FD_CLR(int fd, fd_set *fdset);
FD_SET(int fd, fd_set *fdset);
FD_ISSET(int fd, fd_set *fdset);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <csapp.h>

typedef struct {
int maxfd;
fd_set read_set;
fd_set ready_set;
int nready;
int maxi;
int clientfd[FD_SETSIZE];
rio_t clientrio[FD_SETSIZE];
}pool;

int byte_cnt = 0;

void init_pool(int listenfd, pool *p);
void add_client(int connfd, pool *p);
void check_clients(pool *p);

int main(int argc, char **argv)
{
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;

static pool pool;

if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}

listenfd = Open_listenfd(argv[1]);
init_pool(listenfd, &pool);

while (1) {
pool.ready_set = pool.read_set;
pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);

printf("%d", pool.nready);
if (FD_ISSET(listenfd, &pool.ready_set)) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA*)&clientaddr, &clientlen);
add_client(connfd, &pool);
}

check_clients(&pool);
}
}

void init_pool(int listenfd, pool *p)
{
int i;
p->maxi = -1;
for (i = 0; i < FD_SETSIZE; i++)
p->clientfd[i] = -1;

p->maxfd = listenfd;
FD_ZERO(&p->read_set);
FD_SET(listenfd, &p->read_set);
}

void add_client(int connfd, pool *p)
{
int i;
p->nready--;
for (i = 0; i < FD_SETSIZE; i++) {
if (p->clientfd[i] < 0) {
p->clientfd[i] = connfd;
Rio_readinitb(&p->clientrio[i], connfd);

FD_SET(connfd, &p->read_set);

if(connfd > p->maxfd)
p->maxfd = connfd;
if (i > p->maxi)
p->maxi = i;
break;
}
}
if (i == FD_SETSIZE)
app_error("add_client error: Too many clients");
}

void check_clients(pool *p)
{
int i, connfd, n;
char buf[MAXLINE];
rio_t rio;

for (i = 0; (i <= p->maxi) && (p->nready) > 0; i++) {
connfd = p->clientfd[i];
rio = p->clientrio[i];

if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) {
p->nready--;
if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
byte_cnt += n;
printf("Server received %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd);
Rio_writen(connfd, buf, n);
}
else {
Close(connfd);
FD_CLR(connfd, &p->read_set);
p->clientfd[i] = -1;
}
}
}
}

缺点:编码特别复杂

基于线程的并发编程

线程是运行在进程上下文中的逻辑流

运行在同一个进程里的线程共享

  • 虚拟地址空间

有自己独立的

  • 栈、栈指针、PC、通用目的寄存器和条件码

线程之间是对等的,没有和进程一样有父子之分

进程有两种状态:可结合的和分离的

  • 可结合的:能被其他线程回收和杀死,但它的内存资源需要被显示回收
  • 分离的:不能被其他线程杀死,内存资源结束时由系统自动释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <pthread.h>
typedef void *(func)(void *);

// 创建线程
int pthread_create(pthread_t *tid, pthread_attr_t * attr, func *f, void *arg);

// 获取自己的tid
pthread_t pthread_self(void);

// 当前线程会显示的终止,如果主调线程是主线程,那么他会等待其他对等线程终止,然后终止这个进程
void pthread_exit(void *thread_return);

// 终止 tid
int pthread_cancel(pthread_t tid);

// 阻塞,等待tid终止
int pthread_join(pthread_t tid, void **thread_return);

// 当这个线程第一次被调用时,执行 init_routine(),用于初始化
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

用信号量同步线程

如果两个线程交错地调用某个共享变量,如果第一个线程还没有将新值更新,第二个线程就已经取出了共享变量的值,就可能会造成错误

对于线程i,操作共享变量的指令,构成了一个关于共享变量的临界区,这个临界区不应该和其他程序的临界区交错执行。换句话说,我们想要确保每个线程在执行它的临界区中的指令时,拥有对共享变量互斥的访问。

想要实现互斥的访问,我们可以使用信号量机制

信号量:

信号量s是具有非负整数值的全局变量,只能通过两种操作来改变它:

  • P(s) : 如果s是非零的,那么P将s减1,并且立即返回。如果s为零,那么就挂起进程,直到s变为非零,并且该进程被一个V操作重启。在重启之后,P操作将s减1,并将控制返回给调用者。
  • V(S) : V操作将s加1。如果有任何进程阻塞在P操作等待s变成非零,那么V操作会重启这些进程中的一个,然后该进程将s减1,完成它的P操作。

P中测试和加一的操作是不可分割的

V中测试和加以的操作也是不可分割的

如果s的值只能是0或者1,我们就将这个信号量成为互斥锁,它可以提供对共享变量互斥的访问。

互斥锁的使用:当一个线程要使用共享变量时,它对S进行P操作,互斥锁加锁,S变为0,当另一个线程想要使用共享变量时,他也对s进行P操作,因为P已经变为了0,所以这个线程被挂起,等待一个其他线程的V操作将它激活。当第一个线程使用完共享变量,它执行V操作,互斥锁解锁,第二个线程被激活。

生产者消费者问题

生产者线程反复地生成新的项目(item),并把它们插入到缓冲区中。消费者线程不断地从缓冲区中取出这些项目,然后消费它们。

因为插入和取出项目都包括更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。但是只保证互斥访问是不够的,我们还需要调度对缓冲区的访问。如果缓冲区是满的(没有空的槽位),那么生产者必须等待直到有一个槽位变为可用。与之相似,如果缓冲区是空的(没有可取用的项目),那么消费者必须等待直到有一个可用的项目。

基于生产者消费者的sbuf包:

sbuf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#ifndef H_SBUF
#define H_SBUF
#include <csapp.h>

typedef struct
{
int *buf; // 缓冲区
int n; // 缓冲区长度
int front; // 队列头
int rear; // 队列尾
sem_t mutex; // 对缓冲区的互斥锁
sem_t slots; // 控制生产者的信号量
sem_t items; // 控制消费者的信号量
}sbuf_t;

void sbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *sp);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);

#endif

sbuf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "sbuf.h"

// 初始化
void sbuf_init(sbuf_t *sp, int n)
{
sp->buf = Calloc(n, sizeof(int));
sp->n = n;
sp->front = sp->rear = 0;
Sem_init(&sp->mutex, 0, 1);
Sem_init(&sp->slots, 0, n);
Sem_init(&sp->items, 0, 0);
}

// 释放缓冲区空间
void sbuf_deinit(sbuf_t *sp)
{
Free(sp->buf);
}

// 向缓冲区插入数据
void sbuf_insert(sbuf_t *sp, int item)
{
P(&sp->slots); // 如果缓冲区已满,想要插入的线程会阻塞在这里
P(&sp->mutex); // 互斥锁加锁
sp->buf[(++sp->rear) % (sp->n)] = item; // 因为信号量的机制,我们不用担心插入的数据大小超过缓冲区大小
V(&sp->mutex); // 互斥锁解锁
V(&sp->items); // 如果有线程因为缓冲区为空而阻塞在这里,这个操作会激活它们
}

int sbuf_remove(sbuf_t *sp)
{
int item;
P(&sp->items); // 如果缓冲区为空,想要取出的线程会阻塞
P(&sp->mutex);
item = sp->buf[(++sp->front) % sp->n]; // 因为信号量的机制,我们不用担心从空缓冲区里取出数据
V(&sp->mutex);
V(&sp->slots); // 如果有线程因为缓冲区已满而阻塞在这里,这个操作会通知激活他们
return item;
}

基于预线程化的并发服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <csapp.h>
#include "sbuf.h"
#define NTHREADS 4
#define SBUFSIZE 16

void echo_cnt(int connfd);
void *thread(void *vargp);

sbuf_t sbuf;

static int byte_cnt;
static sem_t mutex;

int main(int argc, char **argv)
{
int i, listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;

if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}

listenfd = Open_listenfd(argv[1]);

sbuf_init(&sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++)
Pthread_create(&tid, NULL, thread, NULL);

while (1) {
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA*)&clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd);
}
}

void *thread(void *vargp)
{
Pthread_detach(pthread_self());
while (1) {
int connfd = sbuf_remove(&sbuf);
echo_cnt(connfd);
Close(connfd);
}
}

static void init_echo_cnt(void)
{
Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}

void echo_cnt(int connfd)
{
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;

Pthread_once(&once, init_echo_cnt);
Rio_readinitb(&rio, connfd);
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex);
byte_cnt += n;
printf("server received %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd);
V(&mutex);
Rio_writen(connfd, buf, n);
}
}

线程安全

四个种不安全的函数

  • 不保护共享变量的函数
  • 保持跨越多个调用的状态的函数(依赖前次调用结果的函数)
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

可重入的函数

通常与线程安全的函数相混淆,但其实可重入的函数是线程安全函数的子集

定义:当它被多个线程使用时,不会引用任何共享数据

竞争

一个程序的正确性依赖于一个线程要在另一个线程到达y点之前到达它控制流种的x点时,就会发生竞争

死锁

deadlock:一个程序被阻塞了,等待一个永远也不会为真的条件

给定所有互斥操作的一个全序,如果每个线程都是以一种顺序获得互斥锁,并以相反的顺序释放,那么这个程序是无死锁的