原文地址:https://www.douyacun.com/article/0cd356011af37460645425702b5e35f8
疑问:
先说一下线程出现的背景,也就是有了进程为什么还需要多线程的原因,首先在许多应用同时发生着多种活动(并行),某种活动会随着时间推移会被阻塞,将这些应用程序分解成可以准并行运行的多个顺序线程,程序设计模型会变得更简单。其次由于线程比进程更轻量级,它们比进程更容易创建和销毁。第三个就是性能的比较,如果是CPU密集型,多线程并不能获得性能上的提升,但如果是I/O密集型或者计算密集型的,多个线程会允许彼此重叠运行,加快应用程序执行的速度
POSIX(pthread)线程库,C/C++标准线程API,线程比fork新的进程开销更少,是因为系统不会为进程初始化新的虚拟内存空间和环境,进程中所有线程共享相同的地址空间,通过定义一个函数及其将在线程中处理的参数来生成一个线程。在软件中使用POSIX线程库的目的是更快地执行软件。
基础知识:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *print_message_func(void *ptr);
int main()
{
pthread_t thread1, thread2;//线程id
int iret1, iret2;
char *message1 = "Thread 1";
char *message2 = "Thread 2";
iret1 = pthread_create(&thread1, NULL, print_message_func, (void *) message1);
iret2 = pthread_create(&thread2, NULL, print_message_func, (void *) message2);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
printf("Thread 1 return %d\n", iret1);
printf("Thread 2 return %d\n", iret2);
exit(0);
}
void *print_message_func(void *ptr)
{
char *message;
message = (char *)ptr;
printf("%s\n", message);
}
// 输出
// Thread 2
// Thread 1
// Thread 1 return 0
// Thread 2 return 0
// gdb 调试打印一下thread1、thread2
// (gdb) p thread2
// $5 = 140737337296640
// (gdb) p thread1
// $6 = 140737345689344
// (gdb)
int pthread_create(pthread_t * thread,
const pthread_attr_t * attr,
void * (*start_routine)(void *),
void *arg);
/usr/include/bits/pthreadtypes.h
中定义typedef unsigned long int pthread_t;
属性 | 值 | 结果 |
---|---|---|
__scope | PTHREAD_SCOPE_SYSTEM : 与系统中所有线程一起竞争CPU时间PTHREAD_SCOPE_PROCESS: 仅与同进程中的线程竞争CPU |
竞争CPU的范围,缺省值PTHREAD_SCOPE_SYSTEM |
__detachstate | PTHREAD_CREATE_DETACHED : 新线程与其他线程脱离同步,不能使用pthread_join来同步PTHREAD_CREATE_JOINABLE : 新线程与其他线程同步,新线程创建后可以使用pthread_detached来脱离同步 |
新线程是否与进程中其他线程脱离同步,缺省值为PTHREAD_CREATE_JOINABLE |
__stackaddr | NULL |
新线程具有系统分配的栈地址。 |
__stacksize | 新线程具有系统定义的栈大小。默认2M | |
__schedpolicy | SCHED_OTHER :正常、非实时 SCHED_RR : 实时、轮转法SCHED_FIFO : 实时、先入先出 |
线程的调度策略,缺省值为SCHED_OTHER |
__inheritsched | PTHREAD_EXPLICIT_SCHED : 显示式指定调度策略和调度参数PTHREAD_INHERIT_SCHED : 继承调用者线程的值 |
新线程是否继承父线程调度优先级。 |
void * (*start_routine)
指向线程运行的函数,函数有一个参数执行void *
void *arg
执行函数参数的指针,如果传递多个参数,发送指向结构体的指针void pthread_exit(void *retval);
一般情况下,进程中各个线程的运行都是相互独立的,线程的终止并不会通知,也不会影响其他线程,终止线程所占用的资源也并不会随着线程的终止而释放,正如进程之间可以用wait()系统调用来同步终止并释放资源一样,线程就是pthread_join等待并释放
注意pthread_exit只会让主线程自己退出不会让产生的子线程退出;用return则所有的线程退出
使线程退出的方法有:
pthread_exit
如何让子线程完整执行?
int pthread_detach(pthread_t thread);
pthread_t thread
线程id线程创建默认是joinable,但如果一个线程结束但没有被join,则他的状态类似于僵尸进程(Zombie Process),还有一部分资源没有被回收,所以创建线程者应该调用pthread_join来等待线程退出,并可得到线程的退出代码,回收其资源,但是调用pthread_join后,线程没有结束,调用者会被阻塞,有些情况我们并不希望如此,比如在web服务器创建一个字线程来处理每一个请求,主线程并不希望因为调用pthread_join而阻塞,这时可以在子线程代码中加入pthread_detach(pthread_self())或是在父线程调用pthread_detach(pthread_t thread_id)非阻塞立即返回,这将使子线程的状态设置为detached,该线程结束后会自动释放所有资源
int pthread_join(pthread_t thread, void **retval);
pthread_t thread
线程idretval
返回值调用pthread_join的线程会阻塞,直到指定的线程返回或调用了pthread_exit,如果线程简单的返回,retval会被设置成函数的返回值;如果调用了pthread_exit,那个可以传参给pthread_exit("hello world")当作线程的返回值;如果线程被取消,retval被设置成PTHREAD_CANCELED
,注意一个线程只允许唯一的一个线程使用pthread_join()等待它的终止,并且被等待线程必须是joinable状态,非Detached状态
问题:
int bar = 0;
void foo()
{
if (bar == 0)
bar = 1; /* This must only be done once. */
}
这里有一个问题:
Thread 1 Thread 2
if (bar == 0)
if (bar == 0)
bar = 1;
bar = 1;
线程1、2会同时设置bar
线程库提供了三种线程同步机制:
#include <pthread.h>
#include <stdio.h>
void *foo(void *ptr);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int bar = 0;
int main()
{
pthread_t t1, t2;
int rc1, rc2;
if ((rc1 = pthread_create(&t1, NULL, foo, NULL))) {
printf("Thread creation failed: %d\n", rc1);
}
if ((rc2 = pthread_create(&t2, NULL, foo, NULL))) {
printf("Thread creation failed: %d\n", rc2);
}
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Final counter bar is %d\n", bar);
}
void *foo(void *ptr)
{
printf("Thread number %llu\n", pthread_self());
pthread_mutex_lock(&mutex1);
bar++;
pthread_mutex_unlock(&mutex1);
}
// cc -g -lpthread main.c -o thread
// -g 允许gdb调试
通过pthread_join可以等待多个线程执行完成并获取执行结果
#include <pthread.h>
#include <stdio.h>
#define NUM_OF_THREAD 10
void *foo(void *ptr);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int bar = 0;
int main()
{
pthread_t ts[NUM_OF_THREAD];
int i,j;
for (i = 0; i < NUM_OF_THREAD; i++) {
pthread_create(&ts[i], NULL, foo, NULL);
}
for (j = 0; j < NUM_OF_THREAD; j++) {
pthread_join(ts[j], NULL);
}
printf("Final counter bar is %d\n", bar);
}
void *foo(void *ptr)
{
printf("Thread number %llu\n", pthread_self());
pthread_mutex_lock(&mutex1);
bar++;
pthread_mutex_unlock(&mutex1);
}
互斥锁,平时接触的比较多,那条件变量是干啥的?为什么条件变量总是和互斥锁搭配来使用
条件变量,线程可以原子方式阻塞,直到满足某个条件为止,比如:线程A和B存在依赖关系,B要在某个条件发生之后才能继续运行,而这个条件只有A才能完成,这个时候就可以用条件变量来完成
create/destory:
int pthread_condattr_init(pthread_condattr_t *cattr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
int pthread_condattr_destroy(pthread_condattr_t *cattr);
PTHREAD_COND_INITIALIZER
给静态分配的条件变量waiting on condition:
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
这两个函数分别是阻塞等待和超时等待
waking thread based on condition:
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
这两个函数通知线程条件已经满足
以下代码就是典型的生产者消费者模型
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define NUM_OF_THREAD 5
#define QUEUE_CAPACITY 5
struct Queue{
int ppos;
int cpos;
char *data;
int capacity;
int len;
}q;
pthread_cond_t cd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
void *start(void *args);
void producer();
void consumer();
int count = 0;
int main()
{
pthread_t threads[NUM_OF_THREAD];
int i, j;
memset(&q, 0, sizeof(struct Queue));
q.capacity = QUEUE_CAPACITY;
q.data = (char *)malloc(sizeof(char) * QUEUE_CAPACITY);
for(i=0; i<NUM_OF_THREAD;i++) {
int *n = (int *)malloc(sizeof(int) * 1);
*n = i;
pthread_create(&threads[i], NULL, start, n);
}
for(j=0;j<NUM_OF_THREAD;j++){
pthread_join(threads[j], NULL);
}
exit(0);
}
void *start(void *args)
{
int n = *(int *)args;
if (n==0) {
producer();// 1个生产者
} else {
consumer();// 4个消费者
}
}
void producer()
{
char c;
while(1)
{
if ((c = getchar()) == '\n'){
continue;
}
pthread_mutex_lock(&mtx);
if (q.len >= QUEUE_CAPACITY) {
printf("Queue is full\n");
} else {
q.len++;
q.ppos = (q.ppos + 1) % q.capacity;
q.data[q.ppos] = c;
printf("get a task\n");
}
pthread_cond_broadcast(&cd); // 通知消费线程 来活了
pthread_mutex_unlock(&mtx);
}
return;
}
void consumer()
{
int len, cpos;
char c;
while(1)
{
pthread_mutex_lock(&mtx);
while(q.len <= 0) {
printf("waitting ...\n");
pthread_cond_wait(&cd, &mtx);
}
q.len--;
len = q.len;
q.cpos =(q.cpos + 1) % q.capacity;
cpos = q.cpos;
c = q.data[q.cpos];
pthread_mutex_unlock(&mtx); // 这里生产任务已经接到没必要在占用锁了,否则就没有并发了
sleep(3);
printf("completed this task from queue: 【%c】, len 【%d】, cpos 【%d】\n", c, len, cpos);
}
return;
}
void * function1()
{
...
pthread_mutex_lock(&lock1); - 执行步骤1
pthread_mutex_lock(&lock2); - 执行步骤3 DEADLOCK !!!
...
...
pthread_mutex_lock(&lock2);
pthread_mutex_lock(&lock1);
...
}
void * function2()
{
...
pthread_mutex_lock(&lock2); - 执行步骤2
pthread_mutex_lock(&lock1);
...
...
pthread_mutex_lock(&lock1);
pthread_mutex_lock(&lock2);
...
}
int main()
{
...
pthread_create(&thread1,NULL,function1,NULL);
pthread_create(&thread2,NULL,function1,NULL);
...
}
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{} // 这里waitgroup类似于pthread_join,等待线程运行完成
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Hello world")
}
Grouting是go语言特有的并发体,是一种轻量级的线程,由go关键字启动,goroutine和系统线程也不是等价的,两者的区 别实际上只是一个量的区别,正是这个量变引发了质变
__stacksize
的设定,大小难以确定,系统线程默认是2Mgo采用了用户层轻量级thread,goroutine的stack size非常小(默认2k),goroutine调度的切换不会陷入(trap)操作系统内核层完成,代价很低
一个go程序对于一个操作系统来说知识一个用户层程序,对于操作系统而言只有线程没有goroutine,goroutine的调度全靠go自己完成,一个go程序中,除了用户代码,剩下的就是go runtime了;
线程竞争的是真实物理CPU,go程序是用户层程序,本身整体运行在一个或多个操作系统线程,因此goroutine竞争的 CPU资源 实际指的是系统线程
原子操作,并发编程中“最小的且不可并行化”的操作。从线程角度说,在当前线程修改共享资源期间,其他的线程时不能访问改资源的。go语言是使用sync.Mutex来实现,类似于线程的pthread_mutex_t
var total struct {
sync.Mutex
value int
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i <= 100; i++ {
total.Lock()
total.value += i
total.Unlock()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
fmt.Println(total.value)
}
使用互斥锁保护一个数值型的共享资源,效率比较低,有点大才小用的感觉,标准库sync/atomic提供了丰富的支持
var total uint64
func worker(wg *sync.WaitGroup) {
defer wg.Done()
var i uint64
for i = 0; i <= 100; i++ {
atomic.AddUint64(&total, i)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
无缓存的发送操作总在对应的接收操作完成之前发生
var message string
var done = make(chan struct{})
func main() {
go worker()
<-done// 优先执行接收操作,但会被阻塞直到channel被写入,message被正常赋值
fmt.Println(message)
}
func worker() {
message = "hello world"
done <- struct{}{}
}
对于无缓存channel进程的接收,发生在对该channel进行的发送完成之前
var message string
var done = make(chan struct{})
func main() {
go worker()
done <- struct{}{}//
fmt.Println(message)
}
func worker() {
message = "hello world"
<-done
}
对于channle的第K个接收操作发生在第K+C个操作完成之前,其中c是channle的capacity