Lighttpd 1.5的多线程实现
lighttpd 1.4.x是一个典型的多进程linux程序,在单个进程内部没有使用多线程,同一时刻只有一个线程在运行。
而到了lighttpd 1.5版本,也使用了多线程来完成某些工作。lighttpd通过GAsyncQueue(GLIB异步队列)的方式实现了一个线程池,从而完成了多线程的运作(需要安装glibc-dev库)。
GAsyncQueue类型的异步消息队列的定义在base.h中
- GAsyncQueue *stat_queue; /* send a stat_job into this queue and joblist_queue will get a wakeup when the stat is finished */
- GAsyncQueue *joblist_queue;
- GAsyncQueue *aio_write_queue;
在sever.c文件中,对其进行初始化:
- srv->stat_queue = g_async_queue_new();
- srv->joblist_queue = g_async_queue_new();
- srv->aio_write_queue = g_async_queue_new();
同时,在server.c中,也定义了线程:
- #ifdef USE_GTHREAD
- GThread **stat_cache_threads; //定义了一个指针的指针,也就是一个数组。GThread由g_thread-2.0库提供,需要安装这个库
- GThread **aio_write_threads = NULL;
- #ifdef USE_LINUX_AIO_SENDFILE
- GThread *linux_aio_read_thread_id = NULL;
- #endif
- GError *gerr = NULL;
- #endif
下面以stat_cache_threads为例,说明GAsyncQueue实现线程池的方式,这个线程的功能是处理页面的stat_cache信息:
首先在lighttpd的main函数中,为线程申请空间、创建线程,其中srv->srvconf.max_stat_threads即stat_cache线程数量,是从配置文件中读取到的:
- stat_cache_threads = calloc(srv->srvconf.max_stat_threads, sizeof(*stat_cache_threads));
- for (i = 0; i < srv->srvconf.max_stat_threads; i++) {
- stat_cache_threads[i] = g_thread_create(stat_cache_thread, srv, 1, &gerr);
- if (gerr) {
- ERROR("g_thread_create failed: %s", gerr->message);
- return -1;
- }
- }
于是stat_cache_thread函数(注意,结尾没有“s”)就被作为线程,执行起来了,其句柄保存在stat_cache_threads数组中。
stat_cache_thread函数在stat_cache.c文件中,为:
- gpointer stat_cache_thread(gpointer _srv) {
- server *srv = (server *)_srv;
- stat_job *sj = NULL;
- /* take the stat-job-queue */
- GAsyncQueue * inq;
- g_async_queue_ref(srv->stat_queue);
- inq = srv->stat_queue;
- /* */
- while (!srv->is_shutdown) {
- /* let's see what we have to stat */
- struct stat st;
- if ((sj = g_async_queue_pop(inq))) {
- if(sj == (stat_job *) 1)
- continue; /* just notifying us that srv->is_shutdown changed */
- /* don't care about the return code for now */
- stat(sj->name->ptr, &st);
- joblist_async_append(srv, sj->con);
- stat_job_free(sj);
- }
- }
- g_async_queue_unref(srv->stat_queue);
- return NULL;
- }
在这里,又看到了g_async_queue的身影,g_async_queue_ref函数的作用是在不加锁的情况下,获得对stat_gueue异步队列的引用。在个在线程启动的时候调用,之后,就会进入while循环,这个while循环的条件是!srv->is_shutdown,也就是程序只要不退出,循环一直进行。在循环内部,可以看到一条if判断语句:
- if ((sj = g_async_queue_pop(inq)))
g_async_queue_pop也是线程池的关键,glibc中对这个函数的注释为:
Pops data from the queue. This function blocks until data become available
当异步队列中有可用数据时,将其弹出,否则将一直阻塞在这里。
接下来还要判断从异步队列中取得的数据是否为1,这是因为lighttpd将1设为线程退出的指令,当异步队列中pop出1时,线程就要退出。如果不为1说明是正常的数据(任务),需要处理。处理完成后,继续循环等待新的任务到来。
那又是在哪把任务、消息加入到stat_cache队列中的呢?主要有两个地方,一个是在stat_cache.c中的stat_cache_get_entry_internal()函数内部,调用了g_async_queue_push(srv->stat_queue, sj);将sj推入了stat_queue队列;另一处在server.c的main函数的最后几行,
- for (i = 0; i < srv->srvconf.max_stat_threads; i++) {
- g_async_queue_push(srv->stat_queue, (void *) 1);
- }
注意,这里向stat_cache队列中加入了多次1,目的通知每个线程,程序即将结束,需要线程退出。
整个脉络现在基本清晰了,也就是lighttpd程序维护了异步消息队列stat_queue,然后启动了几个线程stat_cache_thread,每个线程都在循环等待stat_queue中是否有新加入的数据(任务)。当新的数据(任务)来临时,某一个线程的g_async_queue_pop函数就会返回这个数据的指针,然后线程就将处理这次的任务。处理完成后,stat_cache_thread线程就会运行到g_async_queue_pop函数处,并阻塞在这里,等待下次任务的到来。
Lighttpd 的详细介绍:请点这里
Lighttpd 的下载地址:请点这里
相关阅读: