概念 进程与线程
进程时资源竞争的基本单位
线程是程序执行的最小单位
线程共享进程数据,但也拥有自己的一部分数据
线程ID
一组寄存器
栈
errno
信号状态
优先级
fork和创建新线程的区别
当一个进程执行一个fork调用的时候,会创建出进程的一个新的拷贝,新进程将拥有它自己的变量和它自己的PID。这个新进程的运行时间独立的,它在执行时几乎完全独立于创建它的进程
在进程里面创建一个新进程的时候,新的执行线程会拥有自己的堆栈(因此也就有自己的局部变量),但要与它的创建者共享全局变量、文件描述符、信号处理器和当前工作目录状态。
线程 什么是线程
在一个程序里的一个执行线路就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
一切进程至少都有一个执行线程
线程的优点
创建一个新的线程的代价要比创建一个新的进程小得多
与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少得多
线程占用的资源要比进程少得多
能充分利用多处理器的可并行数量
在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
线程的缺点
性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调用开销,而可用的资源不变。
健壮性降低
编写多线程需要更全面深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
缺乏访问控制
进程时访问控制的基本粒度,在一个线程中调用某些OS函数会对正规进程造成影响
编程难度提高
线程调度竞争范围
操作系统提供了各种模型,用来调度应用程序创建的线程。这些模型之间的主要不同是:在竞争系统资源(特别是CPU时间)时,线程调度竞争范围(thread-scheduling contention scope)不一样
进程竞争范围(process contention scope):各个线程在同一个进程竞争“被调度的CPU时间”(但不直接和其他进程中的线程竞争)
系统竞争范围(system contention scope):线程之间和“系统范围”内的其他线程竞争。
线程模型
N:1用户线程模型
切换开销比较小
内核不干涉线程的任何生命活动,也不干涉同意进程中的线程环境切换。
一个进程中的多个线程只能调度到一个CPU,这种约束限制了可用的并行总量。
如果某个线程执行一个“阻塞式”操作(如read),那么,进程中的所有线程都会阻塞,直至那个操作结束。为此,一些线程的实现是为这些阻塞式函数提供包装器,用非阻塞版本替换这些系统调用,以消除这种限制。
1:1核心线程模型
应用程序创建的每一个线程都由一个核心线程直接管理
OS内核将每一个核心线程都调到系统CPU上,因此,所有线程都工作在“系统竞争范围”
这种线程的创建与调度由内核完成,因此这种线程的系统开销比较大(但一般来说,比进程开销小)
N:M混合线程模型
提高两级控制,将用户线程映射为系统的可调度体以实现并行,这个可调度体称为轻量级进程
线程重入 在单线程程序中,整个程序都是顺序执行的,一个函数在同一时刻只能被一个函数调用,但在多线程中,由于并发性,一个函数可能同时被多个函数调用,此时这个函数就成了临界资源,很容易造成调用函数处理结果的相互影响,如果一个函数在多线程并发的环境中每次被调用产生的结果是不确定的,我们就说这个函数是”不可重入的”/“线程不安全”的。
编译时,定义宏_REENTRANT 即: gcc -D_REENTRANT (#define REENTRANT)
相关函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 int pthread_create (pthread_t * thread, const pthread_attr_t * attr, void *(*start_routine)(void *), void * arg) ;int pthread_join (pthread_t thread, void **retval) ;void pthread_exit (void * value_ptr) ;int pthread_detach (pthread_t thread) ;pthread_t pthread_self (void ) ;int pthread_cancel (pthread_t thread) int pthread_attr_init (pthread_attr_t * attr) ;int pthread_attr_destroy (pthread_attr_t * attr) ;int pthread_attr_getdetachstate (const pthread_attr_t * attr, int * detachstate) ;int pthread_attr_setdetachstate (pthread_attr_t * attr, int detachstate) ;int pthread_attr_setstacksize (pthread_attr_t * attr, size_t stacksize) int pthread_attr_getstacksize (pthread_attr_t * attr, size_t * stacksize) ;int pthread_attr_setguardsize (pthread_attr_t * attr, size_t guardsize) int pthread_attr_getguardsize (pthread_attr_t * attr, size_t * guardsize) int pthread_attr_getscope (const pthread_attr_t * attr, int * contentionscope) int pthread_attr_setscope (pthread_attr_t * attr, int contentionscope) int pthread_attr_getschedpolicy (const pthread_attr_t * attr, int * policy) int pthread_attr_setschedpolicy (pthread_attr_t * attr, int policy) int pthread_attr_getinheritsched (const pthread_attr_t * attr, int * inheritsched) ;int pthread_attr_setinheritsched (pthread_attr_t * attr, int inheritsched) int pthread_attr_getschedparam (const pthread_attr_t * attr, struct sched_param* param) int pthread_attr_setschedparam (pthread_attr_t * attr, const struct sched_param* param) ;int pthread_setconcurrency (int new_level) int pthread_getconcurrency (void ) int pthread_key_create (pthread_key_t * key, void (*destructor)(void *)) int pthread_key_delete (pthread_key_t key) void * pthread_getspecific (pthread_key_t key) int pthread_setspecific (pthread_key_t key, const void * value) int pthread_once (pthread_once_t * once_conntrol, void (*init_routine)(void )) pthread_once_t once_control = PTHREAD_ONCE_INIT
例子:简单线程使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> int my_global;void * my_thread_handle (void *arg) { int val; val = *((int *)arg); printf ("new thread begin, arg=%d\n" , val); my_global += val; sleep(3 ); pthread_exit(&my_global); printf ("new thread end\n" ); } int main (void ) { pthread_t mythread; int arg; int ret; void *thread_return; arg = 100 ; my_global = 1000 ; printf ("my_global=%d\n" , my_global); printf ("ready create thread...\n" ); ret = pthread_create(&mythread, 0 , my_thread_handle, &arg); if (ret != 0 ) { printf ("create thread failed!\n" ); exit (1 ); } printf ("wait thread finished...\n" ); ret = pthread_join(mythread, &thread_return); if (ret != 0 ) { printf ("pthread_join failed!\n" ); exit (1 ); } printf ("wait thread end, return value is %d\n" , *((int *)thread_return)); printf ("my_global=%d\n" , my_global); printf ("create thread finished!\n" ); }
线程同步
互斥量 什么是互斥量? 效果上等同于初值为1的信号量
posix互斥锁 1 2 3 4 5 6 7 8 9 10 11 12 13 int pthread_mutex_init (pthread_mutex_t *mutex, pthread_mutexattr_t *attr) ; int pthread_mutex_lock (pthread_mutex_t *mutex) ; int pthread_mutex_unlock (pthread_mutex_t *mutex) ; int pthread_mutex_destroy (pthread_mutex_t *mutex) ;
main3.cpp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> void *thread_function (void *arg) ;pthread_mutex_t work_mutex; #define WORK_SIZE 1024 char work_area[WORK_SIZE];int time_to_exit = 0 ;int main () { int res; pthread_t a_thread; void *thread_result; res = pthread_mutex_init(&work_mutex, NULL ); if (res != 0 ) { perror("Mutex initialization failed" ); exit (EXIT_FAILURE); } res = pthread_create(&a_thread, NULL , thread_function, NULL ); if (res != 0 ) { perror("Thread creation failed" ); exit (EXIT_FAILURE); } pthread_mutex_lock(&work_mutex); printf ("Input some text. Enter 'end' to finish\n" ); while (!time_to_exit) { fgets(work_area, WORK_SIZE, stdin ); pthread_mutex_unlock(&work_mutex); while (1 ) { pthread_mutex_lock(&work_mutex); if (work_area[0 ] != '\0' ) { pthread_mutex_unlock(&work_mutex); sleep(1 ); } else { break ; } } } pthread_mutex_unlock(&work_mutex); printf ("\nWaiting for thread to finish...\n" ); res = pthread_join(a_thread, &thread_result); if (res != 0 ) { perror("Thread join failed" ); exit (EXIT_FAILURE); } printf ("Thread joined\n" ); pthread_mutex_destroy(&work_mutex); exit (EXIT_SUCCESS); } void *thread_function (void *arg) { sleep(1 ); pthread_mutex_lock(&work_mutex); while (strncmp ("end" , work_area, 3 ) != 0 ) { printf ("You input %d characters\n" , strlen (work_area) -1 ); work_area[0 ] = '\0' ; pthread_mutex_unlock(&work_mutex); sleep(1 ); pthread_mutex_lock(&work_mutex); while (work_area[0 ] == '\0' ) { pthread_mutex_unlock(&work_mutex); sleep(1 ); pthread_mutex_lock(&work_mutex); } } time_to_exit = 1 ; work_area[0 ] = '\0' ; pthread_mutex_unlock(&work_mutex); pthread_exit(0 ); }
编译指令 g++ main3.cpp -o main3 -D_REENTRANT -lpthread
posix自旋锁 1 2 3 4 pthread_spin_init pthread_spin_destroy pthread_spin_lock pthread_spin_unlock
posix读写锁 1 2 3 4 5 pthread_rwlock_init pthread_rwlock_destroy int pthread_rwlock_rdlockint pthread_rwlock_wrlockint pthread_rwlock_unlock
条件变量 与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。
条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。
条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr) ;int pthread_cond_signal (pthread_cond_t *cond) ; int pthread_cond_broadcast (pthread_cond_t *cond) ;int pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime) ; int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;int pthread_cond_destroy (pthread_cond_t *cond) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <stdio.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t mutex;pthread_cond_t cond;void *thread1 (void *arg) { while (1 ) { printf ("thread1 is running\n" ); pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); printf ("thread1 applied the condition\n" ); pthread_mutex_unlock(&mutex); sleep(4 ); } } void *thread2 (void *arg) { while (1 ) { printf ("thread2 is running\n" ); pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); printf ("thread2 applied the condition\n" ); pthread_mutex_unlock(&mutex); sleep(2 ); } } int main () { pthread_t thid1, thid2; printf ("condition variable study!\n" ); pthread_mutex_init(&mutex, NULL ); pthread_cond_init(&cond, NULL ); pthread_create(&thid1, NULL , thread1, NULL ); pthread_create(&thid2, NULL , thread2, NULL ); do { pthread_cond_signal(&cond); sleep(1 ); } while (1 ); return 0 ; }
编译命令 g++ main4.cpp -o main4 -D_REENTRANT -lpthread
线程池 由一个任务队列和一组处理队列的线程组成。一旦工作进程需要处理某个可能“阻塞”的操作,不用自己操作,将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。
线程池图示
为什么需要线程池?
创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多
活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足
线程池实现核心组建
任务 - 待处理的工作,通常由标识、上下文和处理函数组成。
任务队列 - 按顺序保存待处理的任务序列,等待线程中的线程组处理。
线程池 - 由多个已启动的一组线程组成。
条件变量 - 一种同步机制,允许线程挂起,直到共享数据上的某些条件得到满足。
互斥锁 - 保证在任一时刻,只能有一个线程访问该对象。
nginx线程池 thread.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #ifndef _THREAD_H_INCLUDED_ #define _THREAD_H_INCLUDED_ #include <stdio.h> #include <stdint.h> #include <stdlib.h> #include <sys/types.h> #include <pthread.h> #include <errno.h> #include <string.h> typedef intptr_t int_t ;typedef uintptr_t uint_t ;#define OK 0 #define ERROR -1 int thread_mutex_create (pthread_mutex_t *mtx) ;int thread_mutex_destroy (pthread_mutex_t *mtx) ;int thread_mutex_lock (pthread_mutex_t *mtx) ;int thread_mutex_unlock (pthread_mutex_t *mtx) ;int thread_cond_create (pthread_cond_t *cond) ;int thread_cond_destroy (pthread_cond_t *cond) ;int thread_cond_signal (pthread_cond_t *cond) ;int thread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mtx) ;#endif
thread_mutex.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include "thread.h" int thread_mutex_create(pthread_mutex_t *mtx) { int err; pthread_mutexattr_t attr; err = pthread_mutexattr_init(&attr); if (err != 0 ) { fprintf (stderr , "pthread_mutexattr_init() failed, reason: %s\n" ,strerror(errno)); return ERROR; } err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); if (err != 0 ) { fprintf (stderr , "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n" ,strerror(errno)); return ERROR; } err = pthread_mutex_init(mtx, &attr); if (err != 0 ) { fprintf (stderr ,"pthread_mutex_init() failed, reason: %s\n" ,strerror(errno)); return ERROR; } err = pthread_mutexattr_destroy(&attr); if (err != 0 ) { fprintf (stderr ,"pthread_mutexattr_destroy() failed, reason: %s\n" ,strerror(errno)); } return OK; } int thread_mutex_destroy(pthread_mutex_t *mtx) { int err; err = pthread_mutex_destroy(mtx); if (err != 0 ) { fprintf (stderr ,"pthread_mutex_destroy() failed, reason: %s\n" ,strerror(errno)); return ERROR; } return OK; } int thread_mutex_lock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_lock(mtx); if (err == 0 ) { return OK; } fprintf (stderr ,"pthread_mutex_lock() failed, reason: %s\n" ,strerror(errno)); return ERROR; } int thread_mutex_unlock(pthread_mutex_t *mtx) { int err; err = pthread_mutex_unlock(mtx); #if 0 ngx_time_update(); #endif if (err == 0 ) { return OK; } fprintf (stderr ,"pthread_mutex_unlock() failed, reason: %s\n" ,strerror(errno)); return ERROR; }
thread_cond.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include "thread.h" int thread_cond_create(pthread_cond_t *cond) { int err; err = pthread_cond_init(cond, NULL ); if (err == 0 ) { return OK; } fprintf (stderr , "pthread_cond_init() failed, reason: %s\n" ,strerror(errno)); return ERROR; } int thread_cond_destroy(pthread_cond_t *cond) { int err; err = pthread_cond_destroy(cond); if (err == 0 ) { return OK; } fprintf (stderr , "pthread_cond_destroy() failed, reason: %s\n" ,strerror(errno)); return ERROR; } int thread_cond_signal(pthread_cond_t *cond) { int err; err = pthread_cond_signal(cond); if (err == 0 ) { return OK; } fprintf (stderr , "pthread_cond_signal() failed, reason: %s\n" ,strerror(errno)); return ERROR; } int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx) { int err; err = pthread_cond_wait(cond, mtx); if (err == 0 ) { return OK; } fprintf (stderr , "pthread_cond_wait() failed, reason: %s\n" ,strerror(errno)); return ERROR; }
thread_pool.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #ifndef _THREAD_POOL_H_INCLUDED_ #define _THREAD_POOL_H_INCLUDED_ #include "thread.h" #define DEFAULT_THREADS_NUM 4 #define DEFAULT_QUEUE_NUM 65535 typedef unsigned long atomic_uint_t ;typedef struct thread_task_s thread_task_t ;typedef struct thread_pool_s thread_pool_t ;struct thread_task_s { thread_task_t *next; uint_t id; void *ctx; void (*handler)(void *data); }; typedef struct { thread_task_t *first; thread_task_t **last; } thread_pool_queue_t ; #define thread_pool_queue_init(q) \ (q)->first = NULL ; \ (q)->last = &(q)->first struct thread_pool_s { pthread_mutex_t mtx; thread_pool_queue_t queue ; int_t waiting; pthread_cond_t cond; char *name; uint_t threads; int_t max_queue; }; thread_task_t *thread_task_alloc (size_t size ) ;int_t thread_task_post (thread_pool_t *tp, thread_task_t *task) ;thread_pool_t * thread_pool_init () ;void thread_pool_destroy (thread_pool_t *tp) ;#endif
thread_pool.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 #include "thread_pool.h" static void thread_pool_exit_handler (void *data) ;static void *thread_pool_cycle (void *data) ;static int_t thread_pool_init_default (thread_pool_t *tpp, char *name) ;static uint_t thread_pool_task_id;static int debug = 0 ;thread_pool_t * thread_pool_init () { int err; pthread_t tid; uint_t n; pthread_attr_t attr; thread_pool_t *tp=NULL ; tp = calloc (1 ,sizeof (thread_pool_t )); if (tp == NULL ){ fprintf (stderr , "thread_pool_init: calloc failed!\n" ); } thread_pool_init_default(tp, NULL ); thread_pool_queue_init(&tp->queue ); if (thread_mutex_create(&tp->mtx) != OK) { free (tp); return NULL ; } if (thread_cond_create(&tp->cond) != OK) { (void ) thread_mutex_destroy(&tp->mtx); free (tp); return NULL ; } err = pthread_attr_init(&attr); if (err) { fprintf (stderr , "pthread_attr_init() failed, reason: %s\n" ,strerror(errno)); free (tp); return NULL ; } err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (err) { fprintf (stderr , "pthread_attr_setdetachstate() failed, reason: %s\n" ,strerror(errno)); free (tp); return NULL ; } for (n = 0 ; n < tp->threads; n++) { err = pthread_create(&tid, &attr, thread_pool_cycle, tp); if (err) { fprintf (stderr , "pthread_create() failed, reason: %s\n" ,strerror(errno)); free (tp); return NULL ; } } (void ) pthread_attr_destroy(&attr); return tp; } void thread_pool_destroy (thread_pool_t *tp) { uint_t n; thread_task_t task; volatile uint_t lock; memset (&task,'\0' , sizeof (thread_task_t )); task.handler = thread_pool_exit_handler; task.ctx = (void *) &lock; for (n = 0 ; n < tp->threads; n++) { lock = 1 ; if (thread_task_post(tp, &task) != OK) { return ; } while (lock) { sched_yield(); } } (void ) thread_cond_destroy(&tp->cond); (void ) thread_mutex_destroy(&tp->mtx); free (tp); } static void thread_pool_exit_handler(void *data) { uint_t *lock = data; *lock = 0 ; pthread_exit(0 ); } thread_task_t *thread_task_alloc(size_t size ) { thread_task_t *task; task = calloc (1 ,sizeof (thread_task_t ) + size ); if (task == NULL ) { return NULL ; } task->ctx = task + 1 ; return task; } int_t thread_task_post(thread_pool_t *tp, thread_task_t *task) { if (thread_mutex_lock(&tp->mtx) != OK) { return ERROR; } if (tp->waiting >= tp->max_queue) { (void ) thread_mutex_unlock(&tp->mtx); fprintf (stderr ,"thread pool \"%s\" queue overflow: %ld tasks waiting\n" , tp->name, tp->waiting); return ERROR; } task->id = thread_pool_task_id++; task->next = NULL ; if (thread_cond_signal(&tp->cond) != OK) { (void ) thread_mutex_unlock(&tp->mtx); return ERROR; } *tp->queue .last = task; tp->queue .last = &task->next; tp->waiting++; (void ) thread_mutex_unlock(&tp->mtx); if (debug)fprintf (stderr ,"task #%lu added to thread pool \"%s\"\n" , task->id, tp->name); return OK; } static void *thread_pool_cycle(void *data) { thread_pool_t *tp = data; int err; thread_task_t *task; if (debug)fprintf (stderr ,"thread in pool \"%s\" started\n" , tp->name); for ( ;; ) { if (thread_mutex_lock(&tp->mtx) != OK) { return NULL ; } tp->waiting--; while (tp->queue .first == NULL ) { if (thread_cond_wait(&tp->cond, &tp->mtx) != OK) { (void ) thread_mutex_unlock(&tp->mtx); return NULL ; } } task = tp->queue .first; tp->queue .first = task->next; if (tp->queue .first == NULL ) { tp->queue .last = &tp->queue .first; } if (thread_mutex_unlock(&tp->mtx) != OK) { return NULL ; } if (debug) fprintf (stderr ,"run task #%lu in thread pool \"%s\"\n" , task->id, tp->name); task->handler(task->ctx); if (debug) fprintf (stderr ,"complete task #%lu in thread pool \"%s\"\n" ,task->id, tp->name); task->next = NULL ; } } static int_t thread_pool_init_default(thread_pool_t *tpp, char *name) { if (tpp) { tpp->threads = DEFAULT_THREADS_NUM; tpp->max_queue = DEFAULT_QUEUE_NUM; tpp->name = strdup(name?name:"default" ); if (debug)fprintf (stderr , "thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n" , tpp->name, tpp->threads, tpp->max_queue); return OK; } return ERROR; }
main.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include "thread_pool.h" struct test { int arg1; int arg2; }; void task_handler1 (void * data) { static int index = 0 ; printf ("Hello, this is 1th test.index=%d\r\n" , index++); } void task_handler2 (void * data) { static int index = 0 ; printf ("Hello, this is 2th test.index=%d\r\n" , index++); } void task_handler3 (void * data) { static int index = 0 ; struct test *t = (struct test *) data ; printf ("Hello, this is 3th test.index=%d\r\n" , index++); printf ("arg1: %d, arg2: %d\n" , t->arg1, t->arg2); } int main(int argc, char **argv) { thread_pool_t * tp = NULL ; int i = 0 ; tp = thread_pool_init(); thread_task_t * test1 = thread_task_alloc(0 ); thread_task_t * test2 = thread_task_alloc(0 ); thread_task_t * test3 = thread_task_alloc(sizeof (struct test)); test1->handler = task_handler1; test2->handler = task_handler2; test3->handler = task_handler3; ((struct test*)test3->ctx)->arg1 = 666 ; ((struct test*)test3->ctx)->arg2 = 888 ; thread_task_post(tp, test1); thread_task_post(tp, test2); thread_task_post(tp, test3); sleep(10 ); thread_pool_destroy(tp); }
进程 什么是进程 我们经常谈论程序,实际上就是一堆指令和数据的集合,这个集合反映在了一个静态可执行文件和相关的配置文件等。
从操作系统上看上面提到的运行程序就是指一个进程,因为存在切换,所以进程管理了很多资源(如打开的文件、挂起的信号、进程状态、内存地址空间等等),也就是说进程参与了CPU的调度,和管理了所有资源,哦,这句话,不是很正确,实际上现代CPU的执行非常非常快,而且操作系统有多个CPU,使用一个进程参与调度时,频繁地从CPU的寄存器和进程堆栈的保存运行状态和对应的信息都很耗时,所以现代CPU将进程仅仅作为一个资源管理的东东,而引入了线程作为CPU调度的基本单位,多个线程可以共享同一进程的所有资源(后面会讲线程)。
注意,程序并不是进程,实际上两个或多个进程不仅有可能执行同一程序,而且还有可能共享地址空间等资源。Linux内核通过一个被称为进程描述符的task_struct结构体来管理进程,这个结构体包含了一个进程所需的所有信息。它定义在include/linux/sched.h文件中。
传统的linux操作系统以统一的方式对待所有的进程:子进程复制父进程所拥有的所有资源,这种方法使得创建进程非常非常非常慢,因为子进程需要拷贝父进程的所有的地址空间,那现代的操作系统,是如何处理的呢?主要有以下三种方式:
产生进程 创建进程很简单,直接调用fork函数。
调用fork函数后,会创建一个子进程,并且父子两个进程都从fork处执行,fork函数有两个返回值,对于父进程会返回子进程的pid,此时pid会大于0,对于子进程来说,pid会等于0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <unistd.h> #include <stdio.h> int main () { pid_t fpid; int count=0 ; fpid=fork(); if (fpid < 0 ) printf ("error in fork!" ); else if (fpid == 0 ) { printf ("i am the child process, my process id is %d\n" ,getpid()); count +=2 ; } else { printf ("i am the parent process, my process id is %d\n" ,getpid()); count++; } printf ("%d 统计结果是: %d\n" , getpid(), count); return 0 ; }
销毁进程 exit - 终止正在执行的进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> int main () { pid_t fpid; int count=0 ; int status = 0 ; fpid=fork(); if (fpid < 0 ) printf ("error in fork!\n" ); else if (fpid == 0 ) { printf ("i am the child process, my process id is %d\n" ,getpid()); count +=2 ; exit (-10 ); } else { printf ("i am the parent process, my process id is %d\n" ,getpid()); count++; } printf ("%d 统计结果是: %d\n" ,getpid(), count); wait(&status); printf ("parent: status: %d\n" , WEXITSTATUS(status)); return 0 ; }
进程间通信分类
文件
文件锁
管道(pipe)和有名管理(FIFO)
信号(signal)
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
套接字(socket)
孤儿进程 一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程所收养,并由init进程对它们完成状态收集工作。
僵尸进程 一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。
怎么产生产生僵尸进程? 一个进程在调用exit命令结束自己的生命的时候,其实它并没有真正的被销毁,而是留下一个称为僵尸进程(Zombie)的数据结构(系统调用 exit,它的作用是使进程退出,但也仅仅限于将一个正常的进程变成一个僵尸进程,并不能将其完全销毁)。
在Linux进程的状态中,僵尸进程是非常特殊的一种,它已经放弃了几乎所有内存空间,没有任何可执行代码,也不能被调度,仅仅在进程列表中保留一个位置,记载该进程的退出状态等信息供其他进程收集,除此之外,僵尸进程不再占有任何内存空间。它需要它的父进程来为它收尸,如果他的父进程没安装 SIGCHLD信号处理函数调用wait或waitpid()等待子进程结束,又没有显式忽略该信号,那么它就一直保持僵尸状态,如果这时父进程结束了, 那么init进程自动会接手这个子进程,为它收尸,它还是能被清除的。但是如果如果父进程是一个循环,不会结束,那么子进程就会一直保持僵尸状态,这就是 为什么系统中有时会有很多的僵尸进程。
如果查看僵尸进程? 利用命令ps,可以看到有标记为<defunct>
的进程就是僵尸进程。
怎样来清除僵尸进程? 改写父进程,在子进程死后要为它收尸。具体做法是接管SIGCHLD信号。子进程死后,会发送SIGCHLD信号给父进程,父进程收到此信号后,执行waitpid()函数为子进程收尸。这是基于这样的原理:就算父进程没有调用 wait,内核也会向它发送SIGCHLD消息,尽管对默认处理是忽略,如果想响应这个消息,可以设置一个处理函数。
把父进程杀掉。父进程死后,僵尸进程成为”孤儿进程”,过继给1号进程init,init始终会负责清理僵尸进程。它产生的所有僵尸进程也跟着消失。
守护进程 不与任何终端关联的进程,通常情况下守护进程在系统启动时就在运行,它们以root用户或者其他特殊用户(apache和postfix)运行,并能处理一些系统级的任务。守护进程脱离于终端,是为了避免进程在执行过程中的信息在任何终端上显示,并且进程也不会被任何终端所产生的终端信息所打断(比如关闭终端等)。
步骤如下:
调用fork(),创建新进程,它会是将来的守护进程.
在父进程中调用exit,保证子进程不是进程组长
调用setsid()创建新的会话区
将当前目录改成根目录(如果把当前目录作为守护进程的目录,当前目录不能被卸载他作为守护进程的工作目录)
将标准输入,标准输出,标准错误重定向到/dev/null.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <fcntl.h> #include <unistd.h> #include <stdio.h> int daemon (int nochdir, int noclose) { int fd; switch (fork()) { case -1 : return (-1 ); case 0 : break ; default : _exit(0 ); } if (setsid() == -1 ) return (-1 ); if (!nochdir) (void )chdir("/" ); if (!noclose && (fd = open("/dev/null" , O_RDWR, 0 )) != -1 ) { (void )dup2(fd, STDIN_FILENO); (void )dup2(fd, STDOUT_FILENO); (void )dup2(fd, STDERR_FILENO); if (fd > 2 ) (void )close (fd); } return (0 ); } int main () { daemon(1 , 0 ); FILE *out = fopen("log.txt" , "a+" ); int i; for (i = 0 ; i < 100 ; i++) { fprintf (out, "log %d\n" , i); fflush(out); sleep(1 ); } return 0 ; }
例子:多进程高并发
fork多个子进程作为work进程
对子进程进行cpu亲缘绑定
可以使用命令 ps -eLo ruser,pid,lwp,psr,args 进行子进程cpu情况查看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #define _GNU_SOURCE #include <sched.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdint.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/sysinfo.h> #include <errno.h> #include <string.h> typedef void (*spawn_proc_pt) (void *data) ;static void worker_process_cycle (void *data) ;static void start_worker_processes (int n) ;pid_t spawn_process (spawn_proc_pt proc, void *data, const char *name) ; int main (int argc,char **argv) { start_worker_processes(8 ); wait(NULL ); } void start_worker_processes (int n) { int i=0 ; for (i = n - 1 ; i >= 0 ; i--){ const char *pname = "worker process" ; spawn_process(worker_process_cycle,(void *)(intptr_t ) i, pname); } } pid_t spawn_process (spawn_proc_pt proc, void *data, const char *name) { pid_t pid; pid = fork(); switch (pid){ case -1 : fprintf (stderr ,"fork() failed while spawning \"%s\"\n" ,name); return -1 ; case 0 : proc(data); return 0 ; default : break ; } printf ("start %s %ld\n" ,name,(long int )pid); return pid; } static void worker_process_init (int worker) { cpu_set_t cpu_affinity; int cpu_num = get_nprocs(); int cpu_no = worker % cpu_num; CPU_ZERO(&cpu_affinity); CPU_SET(cpu_no, &cpu_affinity); fprintf (stdout ,"sched_setaffinity cpu_num(%d) cpu_no(%d) mask(%d))\n" , cpu_num, cpu_no, &cpu_affinity); if (sched_setaffinity(0 ,sizeof (cpu_set_t ),&cpu_affinity) == -1 ){ fprintf (stderr ,"sched_setaffinity() failed, err(%s))\n" , strerror(errno)); } } void worker_process_cycle (void *data) { int worker = (intptr_t ) data; worker_process_init(worker); for (;;){ sleep(10 ); printf ("pid %ld ,doing ...\n" ,(long int )getpid()); } }