1.使用st-thread
我们已一个简单的demo研究一下st框架。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include "st.h" static void *_thread(void *arg) { printf("thread: %lu\n", pthread_self()); return NULL; } int main(int argc, char *argv[]) { if (st_init() < 0) { perror("st_init"); exit(1); } for (unsigned int i=0; i<10; i++) { if (st_thread_create(_thread, NULL, 0, 0) == NULL) { perror("st_thread_create"); exit(1); } } st_thread_exit(NULL); return 0; }
out:
thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184 thread: 140727908825184
2.初始化st_init
这里主要做了几件事:
int st_init(void) { _st_thread_t *thread; if (_st_active_count) { /* Already initialized */ return 0; }
ST_INIT_CLIST(&_st_free_stacks); /* We can ignore return value here */ // 选择使用哪种IO多路复用函数,eg:_st_select_eventsys st_set_eventsys(ST_EVENTSYS_DEFAULT); // 屏蔽SIGPIPE信号和调整文件描述符限制 pthread_once(&io_once_control, (void (*)(void))_st_io_init); memset(&_st_this_vp, 0, sizeof(_st_vp_t)); // 线程队列初始化 ST_INIT_CLIST(&_ST_RUNQ); ST_INIT_CLIST(&_ST_IOQ); ST_INIT_CLIST(&_ST_ZOMBIEQ); #ifdef DEBUG ST_INIT_CLIST(&_ST_THREADQ); #endif // io多路复用函数对应的初始化,eg:_st_select_init if ((*_st_eventsys->init)() < 0) return -1; // 获取内存页大小,一般4096 _st_this_vp.pagesize = getpagesize(); // 获取时间,微妙 _st_this_vp.last_clock = st_utime(); // 创建ide线程:主要用来处理io事件和定时器 _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0); if (!_st_this_vp.idle_thread) return -1; _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; _st_active_count--; // 从run队列移除ide线程:因为只有没有线程调用时,才会调用ide线程 _ST_DEL_RUNQ(_st_this_vp.idle_thread); // 初始化primorial线程,primorial线程用来标记系统进程 thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *))); if (!thread) return -1; _st_this_vp.primorial_thread = thread; thread->private_data = (void **) (thread + 1); thread->state = _ST_ST_RUNNING; thread->flags = _ST_FL_PRIMORDIAL; // 当前运行的是primorial线程,当primorial线程退出时,整个进程也会终止。 _ST_SET_CURRENT_THREAD(thread); _st_active_count++; #ifdef DEBUG _ST_ADD_THREADQ(thread); #endif return 0; }
2.1 全局结构 _st_this_vp
全局的相关信息都保存在_st_this_vp,主要保存了:
typedef struct _st_vp { _st_thread_t *primorial_thread; _st_thread_t *idle_thread; /* Idle thread for this vp */ st_utime_t last_clock; /* The last time we went into vp_check_clock() */ _st_clist_t run_q; /* run queue for this vp */ _st_clist_t io_q; /* io queue for this vp */ _st_clist_t zombie_q; /* zombie queue for this vp */ #ifdef DEBUG _st_clist_t thread_q; /* all threads of this vp */ #endif int pagesize; _st_thread_t *sleep_q; /* sleep queue for this vp */ int sleepq_size; /* number of threads on sleep queue */ #ifdef ST_SWITCH_CB st_switch_cb_t switch_out_cb; /* called when a thread is switched out */ st_switch_cb_t switch_in_cb; /* called when a thread is switched in */ #endif } _st_vp_t;
2.2 线程结构 _st_thread_t
线程的相关信息都保存在_st_thread,主要保存了:
struct _st_thread { int state; /* 线程状态 */ int flags; /* Thread‘s flags */ void *(*start)(void *arg); /* 线程启动函数 */ void *arg; /* 线程启动参数 */ void *retval; /* 线程启动函数返回值 */ _st_stack_t *stack; /* 记录堆栈信息 */ _st_clist_t links; /* 用于插入到 run/sleep/zombie 线程队列 */ _st_clist_t wait_links; /* 用于插入到 mutex/condvar 等待队列 */ #ifdef DEBUG _st_clist_t tlink; /* For putting on thread queue */ #endif st_utime_t due; /* Wakeup time when thread is sleeping */ _st_thread_t *left; /* 记录sleep定时器二叉树左节点 */ _st_thread_t *right; /* 记录sleep定时器二叉树右节点 */ int heap_index; /* 堆节点 */ void **private_data; /* 线程私有数据 */ _st_cond_t *term; /* joinable类型的线程 */ jmp_buf context; /* 线程上下文:线程的跳转主要通过setjmp/longjmp跳转,jmp_buf保存了线程的上下文信息 */ };
2.3 线程状态
// 线程状态 #define _ST_ST_RUNNING 0 // 执行中 #define _ST_ST_RUNNABLE 1 // 可执行状态,等待调度 #define _ST_ST_IO_WAIT 2 // 等待IO事件 #define _ST_ST_LOCK_WAIT 3 // 等待互斥锁 #define _ST_ST_COND_WAIT 4 // 等待条件变量 #define _ST_ST_SLEEPING 5 // sleep #define _ST_ST_ZOMBIE 6 // 线程已结束,待其它线程调用st_thread_join收尸 #define _ST_ST_SUSPENDED 7 // 暂停,只能调用st_thread_interrupt唤醒 // 线程flag #define _ST_FL_PRIMORDIAL 0x01 // 原生线程,非创建的,没有分配私有栈 #define _ST_FL_IDLE_THREAD 0x02 // 空闲线程,用于epoll,处理定时器 #define _ST_FL_ON_SLEEPQ 0x04 // 线程sleep中,如调用st_usleep、st_cond_timedwait等 #define _ST_FL_INTERRUPT 0x08 // 线程被st_thread_interrupt()中断 #define _ST_FL_TIMEDOUT 0x10 // 定时器超时
线程状态转换:
3.创建线程 st_thread_create
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size) { _st_thread_t *thread; _st_stack_t *stack; void **ptds; char *sp; // 1.创建线程栈 if (stk_size == 0) stk_size = ST_DEFAULT_STACK_SIZE; // 页对齐 eg: 64 * 1024 = 65536 stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE; stack = _st_stack_new(stk_size); if (!stack) return NULL; /* Allocate thread object and per-thread data off the stack */ // 2.分配线程对象 sp = stack->stk_top; sp = sp - (ST_KEYS_MAX * sizeof(void *)); ptds = (void **) sp; sp = sp - sizeof(_st_thread_t); thread = (_st_thread_t *) sp; // 栈指针64位对齐 if ((unsigned long)sp & 0x3f) sp = sp - ((unsigned long)sp & 0x3f); stack->sp = sp - _ST_STACK_PAD_SIZE; memset(thread, 0, sizeof(_st_thread_t)); memset(ptds, 0, ST_KEYS_MAX * sizeof(void *)); // 3.初始化线程 thread->private_data = ptds; thread->stack = stack; thread->start = start; thread->arg = arg; // 4.初始化线程上下文 #ifndef __ia64__ _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main); #else _ST_INIT_CONTEXT(thread, stack->sp, stack->bsp, _st_thread_main); #endif /* If thread is joinable, allocate a termination condition variable */ if (joinable) { thread->term = st_cond_new(); if (thread->term == NULL) { _st_stack_free(thread->stack); return NULL; } } // 5.设置线程状态 thread->state = _ST_ST_RUNNABLE; _st_active_count++; _ST_ADD_RUNQ(thread); #ifdef DEBUG _ST_ADD_THREADQ(thread); #endif #ifndef NVALGRIND thread->stack->valgrind_stack_id = VALGRIND_STACK_REGISTER(thread->stack->stk_top, thread->stack->stk_bottom); #endif return thread; }
3.1线程堆栈 _st_stack_t
typedef struct _st_stack { _st_clist_t links; /* 空闲栈链表 */ char *vaddr; /* 内存分配的起始位置 */ int vaddr_size; /* 栈 总大小 */ int stk_size; /* 栈 可用部分大小 */ char *stk_bottom; /* 私有栈 结束位置 */ char *stk_top; /* 私有栈 起始位置 */ void *sp; /* 栈指针 */ #ifdef __ia64__ void *bsp; /* Register stack backing store pointer */ #endif #ifndef NVALGRIND /* id returned by VALGRIND_STACK_REGISTER */ /* http://valgrind.org/docs/manual/manual-core-adv.html */ unsigned int valgrind_stack_id; #endif } _st_stack_t;
线程上下文 jmp_buf
/* Calling environment, plus possibly a saved signal mask. */ struct __jmp_buf_tag { /* NOTE: The machine-dependent definitions of `__sigsetjmp‘ assume that a `jmp_buf‘ begins with a `__jmp_buf‘ and that `__mask_was_saved‘ follows it. Do not move these members or add others before it. */ __jmp_buf __jmpbuf; /* Calling environment. */ int __mask_was_saved; /* Saved the signal mask? */ __sigset_t __saved_mask; /* Saved signal mask. */ }; __BEGIN_NAMESPACE_STD typedef struct __jmp_buf_tag jmp_buf[1];
栈创建
_st_stack_t *_st_stack_new(int stack_size) { _st_clist_t *qp; _st_stack_t *ts; int extra; // 如果_st_free_stacks非空,则从里面取 for (qp = _st_free_stacks.next; qp != &_st_free_stacks; qp = qp->next) { ts = _ST_THREAD_STACK_PTR(qp); if (ts->stk_size >= stack_size) { /* Found a stack that is big enough */ ST_REMOVE_LINK(&ts->links); _st_num_free_stacks--; ts->links.next = NULL; ts->links.prev = NULL; return ts; } } /* Make a new thread stack object. */ if ((ts = (_st_stack_t *)calloc(1, sizeof(_st_stack_t))) == NULL) return NULL; // eg: _st_randomize_stacks:0 extra:0 extra = _st_randomize_stacks ? _ST_PAGE_SIZE : 0; // eg: stack_size:65536 REDZONE:pagesize(4096)*2=8192 // vaddr_size = 16 * 4096 + 2*4096 = 18 *4096 ts->vaddr_size = stack_size + 2*REDZONE + extra; // 申请内存,返回起始地址 ts->vaddr = _st_new_stk_segment(ts->vaddr_size); if (!ts->vaddr) { free(ts); return NULL; } // 栈总大小 ts->stk_size = stack_size; // 栈 ts->stk_bottom = ts->vaddr + REDZONE; ts->stk_top = ts->stk_bottom + stack_size; #ifdef DEBUG mprotect(ts->vaddr, REDZONE, PROT_NONE); mprotect(ts->stk_top + extra, REDZONE, PROT_NONE); #endif if (extra) { long offset = (random() % extra) & ~0xf; ts->stk_bottom += offset; ts->stk_top += offset; } return ts; }
到此,我们可以分析出,栈空间布局:
4.线程调度
4.1 setjmp和longjmp函数
这里线程的调度是基于 setjmp和longjmp,和goto类似,不过goto是函数内部跳转,这个跳转范围更大。所以讲解一下用法,
#include <setjmp.h>
int setjmp(jmp_buf env);
返回值:若直接调用则返回0,若从longjmp调用返回则返回非0值的longjmp中的val值
void longjmp(jmp_buf env,int val);
调用此函数则返回到语句setjmp所在的地方,其中env 就是setjmp中的 env,而val 则是使setjmp的返回值变为val。
当检查到一个错误时,则以两个参数调用longjmp函数,第一个就是在调用setjmp时所用的env,第二个参数是具有非0值的val,它将成为从setjmp处返回的值。
使用第二个参数的原因是对于一个setjmp可以有多个longjmp。
我们看个demo
#include <stdio.h> #include <setjmp.h> static jmp_buf buf; void second(void) { printf("second\n"); // 打印 longjmp(buf,1); // 跳回setjmp的调用处 - 使得setjmp返回值为1 } void first(void) { second(); printf("first\n"); // 不可能执行到此行 } int main() { if ( ! setjmp(buf) ) { first(); // 进入此行前,setjmp返回0 } else { // 当longjmp跳转回,setjmp返回1,因此进入此行 printf("main\n"); // 打印 } return 0; }
out:
second
main
注意到虽然first()子程序被调用,"first"不可能被打印。"main"被打印,因为条件语句if ( ! setjmp(buf) )被执行第二次。
使用setjmp和longjmp要注意以下几点:
1、setjmp与longjmp结合使用时,它们必须有严格的先后执行顺序,也即先调用setjmp函数,之后再调用longjmp函数,以恢复到先前被保存的“程序执行点”。否则,如果在setjmp调用之前,执行longjmp函数,将导致程序的执行流变的不可预测,很容易导致程序崩溃而退出
2. longjmp必须在setjmp调用之后,而且longjmp必须在setjmp的作用域之内。具体来说,在一个函数中使用setjmp来初始化一个全局标号,然后只要该函数未曾返回,那么在其它任何地方都可以通过longjmp调用来跳转到 setjmp的下一条语句执行。实际上setjmp函数将发生调用处的局部环境保存在了一个jmp_buf的结构当中,只要主调函数中对应的内存未曾释放 (函数返回时局部内存就失效了),那么在调用longjmp的时候就可以根据已保存的jmp_buf参数恢复到setjmp的地方执行。
4.2 线程上下文初始化
可以看到线程上下文对象就是jmp_buf,我们看看怎么初始化的。
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
展开如下:
{ if (setjmp(thread->context)) _st_thread_main(); thread->context[0].__jmpbuf[JB_RSP] }
和上面的demo是不是很像,这里主要设置了setjmp返回0,那什么时候进入_st_thread_main呢?答案就在st_thread_exit
4.3 线程退出st_thread_exit
void st_thread_exit(void *retval) { _st_thread_t *thread = _ST_CURRENT_THREAD(); printf("st_thread_exit current thread: %p\n", thread); thread->retval = retval; _st_thread_cleanup(thread); _st_active_count--; if (thread->term) { /* Put thread on the zombie queue */ thread->state = _ST_ST_ZOMBIE; _ST_ADD_ZOMBIEQ(thread); /* Notify on our termination condition variable */ st_cond_signal(thread->term); /* Switch context and come back later */ _ST_SWITCH_CONTEXT(thread); /* Continue the cleanup */ st_cond_destroy(thread->term); thread->term = NULL; } #ifdef DEBUG _ST_DEL_THREADQ(thread); #endif #ifndef NVALGRIND if (!(thread->flags & _ST_FL_PRIMORDIAL)) { VALGRIND_STACK_DEREGISTER(thread->stack->valgrind_stack_id); } #endif if (!(thread->flags & _ST_FL_PRIMORDIAL)) { _st_stack_free(thread->stack); } // 启动线程调度 _ST_SWITCH_CONTEXT(thread); free(thread); (*_st_eventsys->free)(); }
这里主要就是干了两件件事:
我们展开看一下_ST_SWITCH_CONTEXT,代码如下:
if (!setjmp(_thread->context)) { _st_vp_schedule(); }
这里设置了当前线程的jmp_buf ,返回0,启动调度_st_vp_schedule
void _st_vp_schedule(void) { _st_thread_t *thread; // 查找RUN队列,空就调度idle_thread if (_ST_RUNQ.next != &_ST_RUNQ) { // 从run队列取出next线程 thread = _ST_THREAD_PTR(_ST_RUNQ.next); _ST_DEL_RUNQ(thread); } else { // 如果空就切换到idle线程 thread = _st_this_vp.idle_thread; } ST_ASSERT(thread->state == _ST_ST_RUNNABLE); // 切换到thread线程 thread->state = _ST_ST_RUNNING; _ST_RESTORE_CONTEXT(thread); }
这里展开一下_ST_RESTORE_CONTEXT
_st_this_thread = (_thread) longjmp((_thread)->context, 1)
就是设置了一下当前线程,然后通过longjmp跳转到_st_thread_main,执行线程的启动函数,到这里调度的基本原理就清楚了。
总结一下:
参考:
1.https://cloud.tencent.com/developer/article/1197338
原文:https://www.cnblogs.com/vczf/p/14556736.html