支持1000K数量级stackful模式的协程原型库ezCoroutine

ezCoroutine协程原型库只是个原型库,但是已经能够支持1000K以上数量的协程运行,而且是stackful模式。基本的对外接口有两类,一类是类似Posix线程的接口:协程创建threadCreat,协程回收threadJoin,调度函数switch_to;另外一类是类似lua协程接口,但是有所不同,我们的返回规则更加简单,所以功能也有所限制:挂起协程yield,恢复协程resume;然后是一些辅助函数接口:通过协程号寻找到协程结构体函数findThread,初始化协程函数init,销毁协程函数destroy,destroyAll函数,以及printThread,printLoop,printSharedStack等打印函数。

具体代码放在GitHub上:https://github.com/Yuandong-Chen/ezCoroutine

注意,该库只能在X86/X64 Linux环境下用gnu/gcc集成的编译工具(不能用llvm/clang),代码只支持32位,得用-m32参数,而且得加上-fno-stack-protector参数关闭栈检测机制,否则我不能保证不出现bus IO error 和 segment  fault。因为具体实现我用了一些特殊的手段,smash了栈空间,以后会逐步改善。

下面通过几个问题来介绍: 

1.怎么用这个协程库呢?

这里给出经典的生产者-消费者模式的例子:

1 /************************* Test *************************/
 2 
 3 #include "ezco.h"
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 
 7 #define NUM 2
 8 #define CAL 2
 9 Thread_t *tid[2];
10 int *res1[2];
11 
12 void *produceFunc(void *d)
13 {
14     int j=0;
15     while(1)
16     {
17     j = (int)yield(tid[1],233);
18     printf("produce get: %d\n", j);
19     }
20     
21     return ((void *)j);
22 }
23 
24 void *consumeFunc(void *d)
25 {
26     int j=0;
27 
28     while(1)
29     {
30     j = resume(d,332);
31     printf("consume get: %d\n", j);
32     }
33     
34     return ((void *)j);
35 }
36 
37 int main(int argc, char const *argv[])
38 {
39     int i = 0;
40     init();
41 
42     threadCreat(&tid[1],produceFunc,NULL);
43     threadCreat(&tid[0],consumeFunc,tid[1]);
44     
45     while (1){
46         printf("main is grunting... %d\n", i++); 
47         sleep(1);
48         switch_to(0); //Give up control to next thread
49     }
50 
51     for(i = 0; i<NUM; i++){
52     threadJoin(tid[i], &res1[i]); //Collect and Release the resourse of tid1
53     }
54     
55     printLoop(&dead);printLoop(&live);
56     destroyAll();
57     return 0;
58 }
59

测试结果如下:

支持1000K数量级stackful模式的协程原型库ezCoroutine

2.设计与实现原理是什么?为何能够支持1000K数量级别的协程?平均每个协程内存占用多大?

设计与实现的基本原理请参考ucontext.h实现协程(用ucontext组件,所以代码量很小),或者参考longjmp实现协程,或者用内联汇编实现协程,在这里不累述。这里说下为何能支持大量协程。其实我们用了共享栈技术,每个协程共享一个4K或者更大的空间(你可以用堆,用匿名内存映射或者直接开个数组也都是可以的,总之得保证4K页对齐的空间),每个协程自己有私有栈空间指针privateptr,每个时刻只有一个协程在运行,此时栈空间在这个4K共享空间中(当然除了main以外),当切换协程时,动态分配一个堆内存,大小为此时协程栈实际大小(一般都很小,小的只有几十个Bytes, 大的有几百个Bytes,完全不用4KB),然后用privateptr指向它,把现在的栈里头的值复制进去。切换到下一个协程,当然下一个协程和上一个切出的一样,已经有privateptr指向了自己的私有栈空间,而这私有栈空间保存在堆内存中(有点拗口),我们只要用memcpy把这个堆内存中的栈复制回到4K共享空间中,再恢复些寄存器的值(保存在任务控制块结构体中),然后用该协程私有的寄存器的值跳转即可,这样一套操作就完成了运行时上下文切换工作。这样实现最大的开销就是不停的copy进copy出,不停地malloc和不停地free栈空间,这样的代价换来的是空间的节约。有多节约?我写了个测试用例如下:

支持1000K数量级stackful模式的协程原型库ezCoroutine支持1000K数量级stackful模式的协程原型库ezCoroutine

1 /************************* Test *************************/
 2 
 3 #include "ezthread.h"
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 
 7 #define NUM 1000000
 8 #define CAL 2
 9 
10 void *sum1tod(void *d)
11 {
12     int i, j=0;
13 
14     for (i = 1; i <= d; ++i)
15     {
16         j += i;
17         printf("thread %d is grunting... %d\n",live.current->data->tid , i);
18         switch_to(0); // Give up control to next thread
19     }
20     
21     return ((void *)j);
22 }
23 
24 void *hello(void *d)
25 {
26     int i, j=0;
27 
28     for (i = 1; i <= d; ++i)
29     {
30         printf("hello\n");
31         switch_to(0); // Give up control to next thread
32     }
33     
34     return ((void *)j);
35 }
36 
37 int main(int argc, char const *argv[])
38 {
39     int res = 0;
40     int i;
41     init();
42     Thread_t *tid1[NUM];
43     int *res1[NUM];
44     for(i = 0; i<NUM; i++){
45     threadCreat(&tid1[i], (i%2)?sum1tod:hello, CAL);
46     }
47     
48     
49 
50     for (i = 1; i <= CAL; ++i){
51         res+=i;
52         printf("main is grunting... %d\n", i); 
53         switch_to(0); //Give up control to next thread
54     }
55 
56     for(i = 0; i<NUM; i++){
57     threadJoin(tid1[i], &res1[i]); //Collect and Release the resourse of tid1
58         res += (int)res1[i];
59     }
60     
61     printf("parallel compute: %d\n", (int)res);
62     printLoop(&dead);printLoop(&live);
63     destroyAll();
64     return 0;
65 }
Test

测试结果如下:

支持1000K数量级stackful模式的协程原型库ezCoroutine

简单地解释一下:50W个协程做1+2,并返回计算结果3,另外50W个打印hello,用top指令看了下内存占用率,基本是1022404KB*27.2%/1000000 = 285Bytes/Coroutine, 即峰值是平均每个协程占用285Bytes左右。1500003这个结果是3*500000+3(main也在无聊的计算1+2)得来的。无独有偶,我们这种共享栈实现的原理和云风的协程库是一样的。

3.大量的内存分配和释放难道不会造成内存碎片化么?

对于这个问题,我的理解是,把任务交给编写标准库的人去做吧,当然你也可以实现自己的malloc,free内存分配器,用上slab等流行的方式分配内存,但是这不是我们所要考虑的任务。

4.可移植性问题以及能否和其他语言配合使用?

理论上,可以拓展成X64,ARM,MIPS,SPARC等等版本,并且能够配合C++使用,但是其中共享栈实现依赖了编译器对volatile关键字处理的行为,能否实现还得看这类CPU的操作系统有没有提供gnu/gcc编译工具。

5.TODO list:

(a)不依赖编译器行为,重构共享栈的实现。(难度大,本人不希望用ucontext.h组件实现,但以后stackless模式会用setjmp,longjmp实现)

(b)用hash表结合原有的循环链表使得搜索速度达到O(1)。(几乎没难度)

(d)拓展到X64版本等版本。(难度一般)

(e)提供多种实战用例,在实际开发项目中用上。(可行性未知,后果未知)

相关推荐