cpp-doc-线程与进程

概念

进程与线程

  • 进程时资源竞争的基本单位
  • 线程是程序执行的最小单位
  • 线程共享进程数据,但也拥有自己的一部分数据
    • 线程ID
    • 一组寄存器
    • errno
    • 信号状态
    • 优先级

fork和创建新线程的区别

  • 当一个进程执行一个fork调用的时候,会创建出进程的一个新的拷贝,新进程将拥有它自己的变量和它自己的PID。这个新进程的运行时间独立的,它在执行时几乎完全独立于创建它的进程
  • 在进程里面创建一个新进程的时候,新的执行线程会拥有自己的堆栈(因此也就有自己的局部变量),但要与它的创建者共享全局变量、文件描述符、信号处理器和当前工作目录状态。

线程

什么是线程

  • 在一个程序里的一个执行线路就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
  • 一切进程至少都有一个执行线程

线程的优点

  • 创建一个新的线程的代价要比创建一个新的进程小得多
  • 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少得多
  • 线程占用的资源要比进程少得多
  • 能充分利用多处理器的可并行数量
  • 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
  • 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
  • I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。

线程的缺点

  • 性能损失
    • 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调用开销,而可用的资源不变。
  • 健壮性降低
    • 编写多线程需要更全面深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
  • 缺乏访问控制
    • 进程时访问控制的基本粒度,在一个线程中调用某些OS函数会对正规进程造成影响
  • 编程难度提高
    • 编写与调试一个多线程程序比单线程程序困难得多

线程调度竞争范围

  • 操作系统提供了各种模型,用来调度应用程序创建的线程。这些模型之间的主要不同是:在竞争系统资源(特别是CPU时间)时,线程调度竞争范围(thread-scheduling contention scope)不一样
  • 进程竞争范围(process contention scope):各个线程在同一个进程竞争“被调度的CPU时间”(但不直接和其他进程中的线程竞争)
  • 系统竞争范围(system contention scope):线程之间和“系统范围”内的其他线程竞争。

线程模型

  • N:1用户线程模型
    • 切换开销比较小
    • 内核不干涉线程的任何生命活动,也不干涉同意进程中的线程环境切换。
    • 一个进程中的多个线程只能调度到一个CPU,这种约束限制了可用的并行总量。
    • 如果某个线程执行一个“阻塞式”操作(如read),那么,进程中的所有线程都会阻塞,直至那个操作结束。为此,一些线程的实现是为这些阻塞式函数提供包装器,用非阻塞版本替换这些系统调用,以消除这种限制。
  • 1:1核心线程模型
    • 应用程序创建的每一个线程都由一个核心线程直接管理
    • OS内核将每一个核心线程都调到系统CPU上,因此,所有线程都工作在“系统竞争范围”
    • 这种线程的创建与调度由内核完成,因此这种线程的系统开销比较大(但一般来说,比进程开销小)
  • N:M混合线程模型
    • 提高两级控制,将用户线程映射为系统的可调度体以实现并行,这个可调度体称为轻量级进程

线程重入

在单线程程序中,整个程序都是顺序执行的,一个函数在同一时刻只能被一个函数调用,但在多线程中,由于并发性,一个函数可能同时被多个函数调用,此时这个函数就成了临界资源,很容易造成调用函数处理结果的相互影响,如果一个函数在多线程并发的环境中每次被调用产生的结果是不确定的,我们就说这个函数是”不可重入的”/“线程不安全”的。

编译时,定义宏_REENTRANT 即: gcc -D_REENTRANT (#define REENTRANT)

相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 创建一个新的线程
// @thread:指向新线程的标识符。通过该指针返回所创建线程的标识符
// @attr:设置线程的属性,attr为NULL标识使用默认属性
// @start_routine:是个函数地址,线程启动后要执行的函数
// @arg:传给线程启动函数的参数
// return:成功返回0;失败返回错误码
int pthread_create(pthread_t* thread, const pthread_attr_t* attr, void*(*start_routine)(void*), void* arg);

// 等待线程结束,避免僵线程的产生
// @thread:线程id
// @retval:它指向一个指针,后者指向线程的返回值
// return:成功返回0;失败返回错误码
int pthread_join(pthread_t thread, void **retval);

// 线程终止
// @value_ptr:不要指向一个局部变量
// return:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
void pthread_exit(void* value_ptr);

// 将一个线程分离,避免僵线程的产生
// @thread:线程id
// return:成功返回0;失败返回错误码
int pthread_detach(pthread_t thread);

// 返回当前线程ID
// return:成功返回0
pthread_t pthread_self(void);

// 取消一个执行中的线程
// @thread:线程id
// return:成功返回0;失败返回错误码
int pthread_cancel(pthread_t thread)

// 初始化与销毁属性
int pthread_attr_init(pthread_attr_t* attr);
int pthread_attr_destroy(pthread_attr_t* attr);

// 获取与设置分离属性
int pthread_attr_getdetachstate(const pthread_attr_t* attr, int* detachstate);
int pthread_attr_setdetachstate(pthread_attr_t* attr, int detachstate);

// 获取与设置栈大小
int pthread_attr_setstacksize(pthread_attr_t* attr, size_t stacksize)
int pthread_attr_getstacksize(pthread_attr_t* attr, size_t* stacksize);

// 获取与设置栈溢出保护区大小
int pthread_attr_setguardsize(pthread_attr_t* attr, size_t guardsize)
int pthread_attr_getguardsize(pthread_attr_t* attr, size_t* guardsize)

// 获取与设置线程竞争范围
int pthread_attr_getscope(const pthread_attr_t* attr, int* contentionscope)
int pthread_attr_setscope(pthread_attr_t* attr, int contentionscope)

// 获取与设置调度策略
int pthread_attr_getschedpolicy(const pthread_attr_t* attr, int* policy)
int pthread_attr_setschedpolicy(pthread_attr_t* attr, int policy)

// 获取与设置继承的调度策略
int pthread_attr_getinheritsched(const pthread_attr_t* attr, int* inheritsched);
int pthread_attr_setinheritsched(pthread_attr_t* attr, int inheritsched)

// 获取与设置调度参数
int pthread_attr_getschedparam(const pthread_attr_t* attr, struct sched_param* param)
int pthread_attr_setschedparam(pthread_attr_t* attr, const struct sched_param* param);

// 获取与设置并发级别
// 仅在N:M线程模型中有效,设置并发级别,给内核一个提示:表示提供给定级别数量的核心线程来映射用户线程是高效的。
int pthread_setconcurrency(int new_level)
int pthread_getconcurrency(void)

// 线程特定数据
int pthread_key_create(pthread_key_t* key, void(*destructor)(void*))
int pthread_key_delete(pthread_key_t key)
void* pthread_getspecific(pthread_key_t key)
int pthread_setspecific(pthread_key_t key, const void* value)
int pthread_once(pthread_once_t* once_conntrol, void (*init_routine)(void))
pthread_once_t once_control = PTHREAD_ONCE_INIT

例子:简单线程使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int my_global;

void* my_thread_handle(void *arg)
{
int val;

val = *((int*)arg);

printf("new thread begin, arg=%d\n", val);
my_global += val;

sleep(3);

pthread_exit(&my_global);

// 不再执行
printf("new thread end\n");
}

int main(void)
{
pthread_t mythread;
int arg;
int ret;
void *thread_return;

arg = 100;
my_global = 1000;

printf("my_global=%d\n", my_global);
printf("ready create thread...\n");

ret = pthread_create(&mythread, 0, my_thread_handle, &arg);
if (ret != 0) {
printf("create thread failed!\n");
exit(1);
}

printf("wait thread finished...\n");
ret = pthread_join(mythread, &thread_return);
if (ret != 0) {
printf("pthread_join failed!\n");
exit(1);
}
printf("wait thread end, return value is %d\n", *((int*)thread_return));
printf("my_global=%d\n", my_global);
printf("create thread finished!\n");
}

线程同步

  • 信号量
  • 互斥量
互斥量

什么是互斥量?
效果上等同于初值为1的信号量

posix互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
// 互斥量的初始化
// @mutex:指向被初始化的互斥量
// @attr:指向互斥量的属性,一般取默认属性(当一个线程已获取互斥量后,该线程再次获取该信号量,将导致死锁!)
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);

// 互斥量的获取
int pthread_mutex_lock (pthread_mutex_t *mutex);

// 互斥量的释放
int pthread_mutex_unlock (pthread_mutex_t *mutex);

// 互斥量的删除
int pthread_mutex_destroy (pthread_mutex_t *mutex);
main3.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

void *thread_function(void *arg);
pthread_mutex_t work_mutex;

#define WORK_SIZE 1024
char work_area[WORK_SIZE];
int time_to_exit = 0;

int main() {
int res;
pthread_t a_thread;
void *thread_result;
res = pthread_mutex_init(&work_mutex, NULL);
if (res != 0) {
perror("Mutex initialization failed");
exit(EXIT_FAILURE);
}
res = pthread_create(&a_thread, NULL, thread_function, NULL);
if (res != 0) {
perror("Thread creation failed");
exit(EXIT_FAILURE);
}
pthread_mutex_lock(&work_mutex);
printf("Input some text. Enter 'end' to finish\n");
while(!time_to_exit) {
fgets(work_area, WORK_SIZE, stdin);
pthread_mutex_unlock(&work_mutex);
while(1) {
pthread_mutex_lock(&work_mutex);
if (work_area[0] != '\0') {
pthread_mutex_unlock(&work_mutex);
sleep(1);
}
else {
break;
}
}
}
pthread_mutex_unlock(&work_mutex);
printf("\nWaiting for thread to finish...\n");
res = pthread_join(a_thread, &thread_result);
if (res != 0) {
perror("Thread join failed");
exit(EXIT_FAILURE);
}
printf("Thread joined\n");
pthread_mutex_destroy(&work_mutex);
exit(EXIT_SUCCESS);
}

void *thread_function(void *arg) {
sleep(1);
pthread_mutex_lock(&work_mutex);
while(strncmp("end", work_area, 3) != 0) {
printf("You input %d characters\n", strlen(work_area) -1);
work_area[0] = '\0';
pthread_mutex_unlock(&work_mutex);
sleep(1);
pthread_mutex_lock(&work_mutex);
while (work_area[0] == '\0' ) {
pthread_mutex_unlock(&work_mutex);
sleep(1);
pthread_mutex_lock(&work_mutex);
}
}
time_to_exit = 1;
work_area[0] = '\0';
pthread_mutex_unlock(&work_mutex);
pthread_exit(0);
}

编译指令 g++ main3.cpp -o main3 -D_REENTRANT -lpthread

posix自旋锁
1
2
3
4
pthread_spin_init
pthread_spin_destroy
pthread_spin_lock
pthread_spin_unlock
posix读写锁
1
2
3
4
5
pthread_rwlock_init
pthread_rwlock_destroy
int pthread_rwlock_rdlock
int pthread_rwlock_wrlock
int pthread_rwlock_unlock
条件变量

与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。

条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。

条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 条件变量初始化 
// @cond:条件变量指针
// @attr:条件变量高级属性
int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr);

// 通知条件变量,唤醒一个等待者
// @cond:条件变量指针
int pthread_cond_signal (pthread_cond_t *cond);

// 唤醒所有等待该条件变量的线程
// @cond:条件变量指针
int pthread_cond_broadcast (pthread_cond_t *cond);

// 等待条件变量/超时被唤醒
// 等待条件变量cond被唤醒,直到由一个信号或广播,或绝对时间abstime到 * 才唤醒该线程
// @cond:条件变量指针
// @mutex:互斥量
// @abstime:等待被唤醒的绝对超时时间
int pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

// 等待条件变量被唤醒
// 等待条件变量cond被唤醒(由一个信号或者广播)
// @cond:条件变量指针
// @mutex:互斥量
// return:
// [EINVAL] cond或mutex无效,
// [EINVAL] 同时等待不同的互斥量
// [EINVAL] 主调线程没有占有互斥量
int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);

// 释放/销毁条件变量
// @cond:条件变量指针
int pthread_cond_destroy (pthread_cond_t *cond);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex;
pthread_cond_t cond;

void *thread1(void *arg)
{
while (1) {

printf("thread1 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread1 applied the condition\n");
pthread_mutex_unlock(&mutex);

sleep(4);
}
}


void *thread2(void *arg)
{
while (1) {

printf("thread2 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread2 applied the condition\n");
pthread_mutex_unlock(&mutex);

sleep(2);
}
}

int main()
{
pthread_t thid1, thid2;

printf("condition variable study!\n");

pthread_mutex_init(&mutex, NULL);

pthread_cond_init(&cond, NULL);

pthread_create(&thid1, NULL, thread1, NULL);

pthread_create(&thid2, NULL, thread2, NULL);

do {
pthread_cond_signal(&cond);
sleep(1);

} while (1);

return 0;
}

编译命令 g++ main4.cpp -o main4 -D_REENTRANT -lpthread

线程池

由一个任务队列和一组处理队列的线程组成。一旦工作进程需要处理某个可能“阻塞”的操作,不用自己操作,将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。

线程池图示

为什么需要线程池?

创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多

活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足

线程池实现核心组建

  • 任务 - 待处理的工作,通常由标识、上下文和处理函数组成。
  • 任务队列 - 按顺序保存待处理的任务序列,等待线程中的线程组处理。
  • 线程池 - 由多个已启动的一组线程组成。
  • 条件变量 - 一种同步机制,允许线程挂起,直到共享数据上的某些条件得到满足。
  • 互斥锁 - 保证在任一时刻,只能有一个线程访问该对象。
nginx线程池
thread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#ifndef _THREAD_H_INCLUDED_
#define _THREAD_H_INCLUDED_

#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>

typedef intptr_t int_t;
typedef uintptr_t uint_t;

#define OK 0
#define ERROR -1

int thread_mutex_create(pthread_mutex_t *mtx);
int thread_mutex_destroy(pthread_mutex_t *mtx);
int thread_mutex_lock(pthread_mutex_t *mtx);
int thread_mutex_unlock(pthread_mutex_t *mtx);


int thread_cond_create(pthread_cond_t *cond);
int thread_cond_destroy(pthread_cond_t *cond);
int thread_cond_signal(pthread_cond_t *cond);
int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx);

#endif /* _THREAD_H_INCLUDED_ */
thread_mutex.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

#include "thread.h"

int
thread_mutex_create(pthread_mutex_t *mtx)
{
int err;
pthread_mutexattr_t attr;

err = pthread_mutexattr_init(&attr);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}

err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno));
return ERROR;
}

err = pthread_mutex_init(mtx, &attr);
if (err != 0) {
fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}

err = pthread_mutexattr_destroy(&attr);
if (err != 0) {
fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno));
}

return OK;
}

int
thread_mutex_destroy(pthread_mutex_t *mtx)
{
int err;

err = pthread_mutex_destroy(mtx);
if (err != 0) {
fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno));
return ERROR;
}

return OK;
}

int
thread_mutex_lock(pthread_mutex_t *mtx)
{
int err;

err = pthread_mutex_lock(mtx);
if (err == 0) {
return OK;
}
fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno));

return ERROR;
}

int
thread_mutex_unlock(pthread_mutex_t *mtx)
{
int err;

err = pthread_mutex_unlock(mtx);

#if 0
ngx_time_update();
#endif

if (err == 0) {
return OK;
}

fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno));
return ERROR;
}
thread_cond.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "thread.h"

int
thread_cond_create(pthread_cond_t *cond)
{
int err;

err = pthread_cond_init(cond, NULL);
if (err == 0) {
return OK;
}

fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno));
return ERROR;
}


int
thread_cond_destroy(pthread_cond_t *cond)
{
int err;

err = pthread_cond_destroy(cond);
if (err == 0) {
return OK;
}

fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno));
return ERROR;
}


int
thread_cond_signal(pthread_cond_t *cond)
{
int err;

err = pthread_cond_signal(cond);
if (err == 0) {
return OK;
}

fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno));
return ERROR;
}


int
thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
{
int err;

err = pthread_cond_wait(cond, mtx);


if (err == 0) {
return OK;
}

fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno));
return ERROR;
}
thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#ifndef _THREAD_POOL_H_INCLUDED_
#define _THREAD_POOL_H_INCLUDED_


#include "thread.h"

#define DEFAULT_THREADS_NUM 4
#define DEFAULT_QUEUE_NUM 65535


typedef unsigned long atomic_uint_t;
typedef struct thread_task_s thread_task_t;
typedef struct thread_pool_s thread_pool_t;


struct thread_task_s {
thread_task_t *next;
uint_t id;
void *ctx;
void (*handler)(void *data);
};

typedef struct {
thread_task_t *first;
thread_task_t **last;
} thread_pool_queue_t;

#define thread_pool_queue_init(q) \
(q)->first = NULL; \
(q)->last = &(q)->first


struct thread_pool_s {
pthread_mutex_t mtx;
thread_pool_queue_t queue;
int_t waiting;
pthread_cond_t cond;

char *name;
uint_t threads;
int_t max_queue;
};

thread_task_t *thread_task_alloc(size_t size);
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task);
thread_pool_t* thread_pool_init();
void thread_pool_destroy(thread_pool_t *tp);


#endif /* _THREAD_POOL_H_INCLUDED_ */
thread_pool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#include "thread_pool.h"


static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);

static uint_t thread_pool_task_id;

static int debug = 0;

thread_pool_t* thread_pool_init()
{
int err;
pthread_t tid;
uint_t n;
pthread_attr_t attr;
thread_pool_t *tp=NULL;

tp = calloc(1,sizeof(thread_pool_t));

if(tp == NULL){
fprintf(stderr, "thread_pool_init: calloc failed!\n");
}

thread_pool_init_default(tp, NULL);

thread_pool_queue_init(&tp->queue);

if (thread_mutex_create(&tp->mtx) != OK) {
free(tp);
return NULL;
}

if (thread_cond_create(&tp->cond) != OK) {
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}

err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}

err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}


for (n = 0; n < tp->threads; n++) {
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
}

(void) pthread_attr_destroy(&attr);

return tp;
}


void thread_pool_destroy(thread_pool_t *tp)
{
uint_t n;
thread_task_t task;
volatile uint_t lock;

memset(&task,'\0', sizeof(thread_task_t));

task.handler = thread_pool_exit_handler;
task.ctx = (void *) &lock;

for (n = 0; n < tp->threads; n++) {
lock = 1;

if (thread_task_post(tp, &task) != OK) {
return;
}

while (lock) {
sched_yield();
}

//task.event.active = 0;
}

(void) thread_cond_destroy(&tp->cond);
(void) thread_mutex_destroy(&tp->mtx);

free(tp);
}


static void
thread_pool_exit_handler(void *data)
{
uint_t *lock = data;

*lock = 0;

pthread_exit(0);
}


thread_task_t *
thread_task_alloc(size_t size)
{
thread_task_t *task;

task = calloc(1,sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}

task->ctx = task + 1;

return task;
}


int_t
thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
if (thread_mutex_lock(&tp->mtx) != OK) {
return ERROR;
}

if (tp->waiting >= tp->max_queue) {
(void) thread_mutex_unlock(&tp->mtx);

fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
tp->name, tp->waiting);
return ERROR;
}

//task->event.active = 1;

task->id = thread_pool_task_id++;
task->next = NULL;

if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}

*tp->queue.last = task;
tp->queue.last = &task->next;

tp->waiting++;

(void) thread_mutex_unlock(&tp->mtx);

if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
task->id, tp->name);

return OK;
}


static void *
thread_pool_cycle(void *data)
{
thread_pool_t *tp = data;

int err;
thread_task_t *task;


if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);



for ( ;; ) {
if (thread_mutex_lock(&tp->mtx) != OK) {
return NULL;
}


tp->waiting--;

while (tp->queue.first == NULL) {
if (thread_cond_wait(&tp->cond, &tp->mtx)
!= OK)
{
(void) thread_mutex_unlock(&tp->mtx);
return NULL;
}
}

task = tp->queue.first;
tp->queue.first = task->next;

if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}

if (thread_mutex_unlock(&tp->mtx) != OK) {
return NULL;
}



if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
task->id, tp->name);

task->handler(task->ctx);

if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);

task->next = NULL;

//notify
}
}


static int_t
thread_pool_init_default(thread_pool_t *tpp, char *name)
{
if(tpp)
{
tpp->threads = DEFAULT_THREADS_NUM;
tpp->max_queue = DEFAULT_QUEUE_NUM;


tpp->name = strdup(name?name:"default");
if(debug)fprintf(stderr,
"thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",
tpp->name, tpp->threads, tpp->max_queue);

return OK;
}

return ERROR;
}
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "thread_pool.h"

struct test{
int arg1;
int arg2;
};

void task_handler1(void* data){
static int index = 0;
printf("Hello, this is 1th test.index=%d\r\n", index++);

}

void task_handler2(void* data){
static int index = 0;
printf("Hello, this is 2th test.index=%d\r\n", index++);

}

void task_handler3(void* data){
static int index = 0;
struct test *t = (struct test *) data;

printf("Hello, this is 3th test.index=%d\r\n", index++);
printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2);

}

int
main(int argc, char **argv)
{
thread_pool_t* tp = NULL;
int i = 0;

tp = thread_pool_init();
//sleep(1);
thread_task_t * test1 = thread_task_alloc(0);
thread_task_t * test2 = thread_task_alloc(0);
thread_task_t * test3 = thread_task_alloc(sizeof(struct test));
test1->handler = task_handler1;
test2->handler = task_handler2;
test3->handler = task_handler3;
((struct test*)test3->ctx)->arg1 = 666;
((struct test*)test3->ctx)->arg2 = 888;
//for(i=0; i<10;i++){
thread_task_post(tp, test1);
thread_task_post(tp, test2);
thread_task_post(tp, test3);
//}
sleep(10);
thread_pool_destroy(tp);
}

进程

什么是进程

我们经常谈论程序,实际上就是一堆指令和数据的集合,这个集合反映在了一个静态可执行文件和相关的配置文件等。

从操作系统上看上面提到的运行程序就是指一个进程,因为存在切换,所以进程管理了很多资源(如打开的文件、挂起的信号、进程状态、内存地址空间等等),也就是说进程参与了CPU的调度,和管理了所有资源,哦,这句话,不是很正确,实际上现代CPU的执行非常非常快,而且操作系统有多个CPU,使用一个进程参与调度时,频繁地从CPU的寄存器和进程堆栈的保存运行状态和对应的信息都很耗时,所以现代CPU将进程仅仅作为一个资源管理的东东,而引入了线程作为CPU调度的基本单位,多个线程可以共享同一进程的所有资源(后面会讲线程)。

注意,程序并不是进程,实际上两个或多个进程不仅有可能执行同一程序,而且还有可能共享地址空间等资源。Linux内核通过一个被称为进程描述符的task_struct结构体来管理进程,这个结构体包含了一个进程所需的所有信息。它定义在include/linux/sched.h文件中。

传统的linux操作系统以统一的方式对待所有的进程:子进程复制父进程所拥有的所有资源,这种方法使得创建进程非常非常非常慢,因为子进程需要拷贝父进程的所有的地址空间,那现代的操作系统,是如何处理的呢?主要有以下三种方式:

  • 写时复制

  • 轻量级进程允许父子进程共享每进程在内核的很多数据结构,比如地址空间、打开文件表和信号处理。

  • vfork系统调用创建的进程能共享其父进程的内存地址空间,为了防止父进程重写子进程需要的数据,阻塞父进程的执行,一直到子进程退出为止。

产生进程

创建进程很简单,直接调用fork函数。

调用fork函数后,会创建一个子进程,并且父子两个进程都从fork处执行,fork函数有两个返回值,对于父进程会返回子进程的pid,此时pid会大于0,对于子进程来说,pid会等于0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <unistd.h>
#include <stdio.h>
int main()
{
pid_t fpid;//fpid表示fork函数返回的值
int count=0;
fpid=fork();
if (fpid < 0)
printf("error in fork!");
else if (fpid == 0) {
printf("i am the child process, my process id is %d\n",getpid());
count +=2;
}
else {
printf("i am the parent process, my process id is %d\n",getpid());
count++;
}

printf("%d 统计结果是: %d\n", getpid(), count);
return 0;
}

销毁进程

exit - 终止正在执行的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

int main()
{
pid_t fpid;//fpid表示fork函数返回的值
int count=0;
int status = 0;

fpid=fork();
if (fpid < 0)
printf("error in fork!\n");
else if (fpid == 0) {
printf("i am the child process, my process id is %d\n",getpid());
count +=2;
exit(-10);
} else {
printf("i am the parent process, my process id is %d\n",getpid());
count++;
}

printf("%d 统计结果是: %d\n",getpid(), count);
// 父进程捕捉子进程的状态
wait(&status);
printf("parent: status: %d\n", WEXITSTATUS(status));
return 0;
}

进程间通信分类

  • 文件
  • 文件锁
  • 管道(pipe)和有名管理(FIFO)
  • 信号(signal)
  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁
  • 套接字(socket)

孤儿进程

一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程所收养,并由init进程对它们完成状态收集工作。

僵尸进程

一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。

怎么产生产生僵尸进程?
一个进程在调用exit命令结束自己的生命的时候,其实它并没有真正的被销毁,而是留下一个称为僵尸进程(Zombie)的数据结构(系统调用 exit,它的作用是使进程退出,但也仅仅限于将一个正常的进程变成一个僵尸进程,并不能将其完全销毁)。

在Linux进程的状态中,僵尸进程是非常特殊的一种,它已经放弃了几乎所有内存空间,没有任何可执行代码,也不能被调度,仅仅在进程列表中保留一个位置,记载该进程的退出状态等信息供其他进程收集,除此之外,僵尸进程不再占有任何内存空间。它需要它的父进程来为它收尸,如果他的父进程没安装 SIGCHLD信号处理函数调用wait或waitpid()等待子进程结束,又没有显式忽略该信号,那么它就一直保持僵尸状态,如果这时父进程结束了, 那么init进程自动会接手这个子进程,为它收尸,它还是能被清除的。但是如果如果父进程是一个循环,不会结束,那么子进程就会一直保持僵尸状态,这就是 为什么系统中有时会有很多的僵尸进程。

如果查看僵尸进程?
利用命令ps,可以看到有标记为<defunct>的进程就是僵尸进程。

怎样来清除僵尸进程?
改写父进程,在子进程死后要为它收尸。具体做法是接管SIGCHLD信号。子进程死后,会发送SIGCHLD信号给父进程,父进程收到此信号后,执行waitpid()函数为子进程收尸。这是基于这样的原理:就算父进程没有调用 wait,内核也会向它发送SIGCHLD消息,尽管对默认处理是忽略,如果想响应这个消息,可以设置一个处理函数。

把父进程杀掉。父进程死后,僵尸进程成为”孤儿进程”,过继给1号进程init,init始终会负责清理僵尸进程。它产生的所有僵尸进程也跟着消失。

守护进程

不与任何终端关联的进程,通常情况下守护进程在系统启动时就在运行,它们以root用户或者其他特殊用户(apache和postfix)运行,并能处理一些系统级的任务。守护进程脱离于终端,是为了避免进程在执行过程中的信息在任何终端上显示,并且进程也不会被任何终端所产生的终端信息所打断(比如关闭终端等)。

  • 步骤如下:
    1. 调用fork(),创建新进程,它会是将来的守护进程.
    2. 在父进程中调用exit,保证子进程不是进程组长
    3. 调用setsid()创建新的会话区
    4. 将当前目录改成根目录(如果把当前目录作为守护进程的目录,当前目录不能被卸载他作为守护进程的工作目录)
    5. 将标准输入,标准输出,标准错误重定向到/dev/null.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>


int daemon(int nochdir, int noclose)
{
int fd;

switch (fork()) {
case -1:
return (-1);
case 0:
break;
default:
_exit(0);
}

if (setsid() == -1)
return (-1);

if (!nochdir)
(void)chdir("/");

if (!noclose && (fd = open("/dev/null", O_RDWR, 0)) != -1) {
(void)dup2(fd, STDIN_FILENO);
(void)dup2(fd, STDOUT_FILENO);
(void)dup2(fd, STDERR_FILENO);
if (fd > 2)
(void)close (fd);
}
return (0);
}

int main() {
daemon(1, 0);

// FILE *out = freopen("log.txt", "a+", stdout);
FILE *out = fopen("log.txt", "a+");

int i;
for(i = 0; i < 100; i++) {
fprintf(out, "log %d\n", i);
fflush(out);
sleep(1);
}

return 0;
}

例子:多进程高并发

  • fork多个子进程作为work进程
  • 对子进程进行cpu亲缘绑定

可以使用命令 ps -eLo ruser,pid,lwp,psr,args 进行子进程cpu情况查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/sysinfo.h>

#include <errno.h>
#include <string.h>

typedef void (*spawn_proc_pt) (void *data);
static void worker_process_cycle(void *data);
static void start_worker_processes(int n);
pid_t spawn_process(spawn_proc_pt proc, void *data, const char *name);

int main(int argc,char **argv){
start_worker_processes(8);
// 管理子进程
wait(NULL);
}

void start_worker_processes(int n){
int i=0;
for(i = n - 1; i >= 0; i--){
const char *pname = "worker process";
spawn_process(worker_process_cycle,(void *)(intptr_t) i, pname);
}
}

pid_t spawn_process(spawn_proc_pt proc, void *data, const char *name){

pid_t pid;
pid = fork();

switch(pid){
case -1:
fprintf(stderr,"fork() failed while spawning \"%s\"\n",name);
return -1;
case 0:
proc(data);
return 0;
default:
break;
}
printf("start %s %ld\n",name,(long int)pid);
return pid;
}


static void worker_process_init(int worker){
cpu_set_t cpu_affinity;

// 多核高并发处理 4core 0 - 0 core 1 - 1 2 -2 3 -3
// int cpu_num = CPU_SETSIZE;
int cpu_num = get_nprocs();
int cpu_no = worker % cpu_num;
CPU_ZERO(&cpu_affinity);
CPU_SET(cpu_no, &cpu_affinity);// 0 1 2 3

fprintf(stdout,"sched_setaffinity cpu_num(%d) cpu_no(%d) mask(%d))\n", cpu_num, cpu_no, &cpu_affinity);

if(sched_setaffinity(0,sizeof(cpu_set_t),&cpu_affinity) == -1){
fprintf(stderr,"sched_setaffinity() failed, err(%s))\n", strerror(errno));
}
}

void worker_process_cycle(void *data){
int worker = (intptr_t) data;

// 初始化
worker_process_init(worker);

// 模拟工作
for(;;){
sleep(10);
printf("pid %ld ,doing ...\n",(long int)getpid());
}
}