libuv系列文章
libuv事件循环
事件循环是 libuv 功能的核心部分。它的主要职责是对 I/O 进行轮询然后基于不同的事件源执行它们的回调函数。
在上一篇文章我们也讲解了它的整个运作框架,虽然代码还未讲解,但是还是要看看怎么去写代码的。
uv_loop_t
在libuv中,事件循环的声明是这样子的:
typedef struct uv_loop_s uv_loop_t;
它是一个句柄handle
类型,它管理了同一事件循环的所有资源,并且在整个事件循环的生命周期内都是可用的。
其实到后面你就会发现,实际上它是事件循环所有资源的统一入口,所有在事件循环上运行的各类 Handle/Request
实例都被注册到 uv_loop_t
内部定义的结构中,反正知道它是可以管理事件循环的所有资源就行了。
这里再补充一个知识点的说明吧:
- IO 观察者(io_watcher):在 libuv 内部,对所有 I/O 操作进行了统一的抽象,在底层的操作系统 I/O 操作基础上,结合事件循环机制,实现了 IO 观察者,对应结构体 uv__io_s,通过它可以知道 I/O 相关的信息,比如可读、可写等,handle 通过内嵌 IO 观察者的方式获得 IO 的监测能力。
其实大家可以直接理解为,我要监测一个TCP连接,那么TCP handle就算是一个 IO 观察者,其实它是一个句柄的同时又是一个 IO 观察者。
可以看它的成员变量有非常多的东西(此处是对linux平台的讲解,Windows的不在讨论范围内,不过都差不多):
struct uv_loop_s {
/* 用户数据-可以用于任何用途,libuv是不会触碰这个字段的数据的。 */
void* data;
/* 事件循环中要判断是否有活跃状态的句柄,其实这就是活跃状态句柄计数器。 */
unsigned int active_handles;
/* handle队列是一个双向链表,而数组中这两个元素则分别指向next和prev。 */
void* handle_queue[2];
union {
/* 这是未使用的东西,主要是防止uv_loop_s结构体大小被改变了 */
void* unused[2];
/* 这才是真正使用的东西,用来对在线程池中调用的异步I/O进行计数 */
unsigned int count;
} active_reqs;
/* 内部标志,用于信号循环停止。*/
unsigned int stop_flag;
/* 这个宏定义在不同的平台有不一样的处理,具体看下面的定义 */
UV_LOOP_PRIVATE_FIELDS
};
#define UV_LOOP_PRIVATE_FIELDS \
unsigned long flags; \
int backend_fd; \
void* pending_queue[2]; \
void* watcher_queue[2]; \
uv__io_t** watchers; \
unsigned int nwatchers; \
unsigned int nfds; \
void* wq[2]; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
uv_rwlock_t cloexec_lock; \
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* check_handles[2]; \
void* idle_handles[2]; \
void* async_handles[2]; \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
uv__io_t async_io_watcher; \
int async_wfd; \
struct { \
void* min; \
unsigned int nelts; \
} timer_heap; \
uint64_t timer_counter; \
uint64_t time; \
int signal_pipefd[2]; \
uv__io_t signal_io_watcher; \
uv_signal_t child_watcher; \
int emfile_fd; \
UV_PLATFORM_LOOP_FIELDS \
这个UV_LOOP_PRIVATE_FIELDS宏跟平台相关,在这里不做过多介绍,就简单说几点:
watcher_queue
是uv__io_t
的观察者队列,其中保存的是uv__io_t
的结构体void* wq[2];
表述的是work queue,是工作队列;timer_heap
是timer
的二叉堆,它还使用了二叉树来提高遍历的效率。
demo
写个demo来讲解整个循环事件的过程吧:
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
int main()
{
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
uv_run(loop, UV_RUN_DEFAULT);
printf("quit...\n");
uv_loop_close(loop);
free(loop);
return 0;
}
这个demo是非常简单的,一般来说一个句柄都会经历 初始化、运行、停止、关闭 等过程。
uv_loop_init()
这个函数就是将uv_loop_t
初始化,给这个loop对象初始化一些默认的成员变量,比如初始化定时器、工作队列、观察者队列等。
int uv_loop_init(uv_loop_t* loop) {
void* saved_data;
int err;
/* 清空数据 */
saved_data = loop->data;
memset(loop, 0, sizeof(*loop));
loop->data = saved_data;
/* 初始化定时器堆,初始化工作队列、空闲队列、各种队列 */
heap_init((struct heap*) &loop->timer_heap);
QUEUE_INIT(&loop->wq);
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
/* 这个队列很重要,对于libuv中其他的 handle 在初始化后都会被放到此队列中 */
QUEUE_INIT(&loop->handle_queue);
/* 初始化I/O观察者相关的内容,初始化处于活跃状态的观察者句柄计数、请求计数、文件描述符等为0 */
loop->active_handles = 0;
loop->active_reqs.count = 0;
loop->nfds = 0;
loop->watchers = NULL;
loop->nwatchers = 0;
/* 初始化挂起的I/O观察者队列,挂起的I/O观察者会被插入此队列延迟处理 */
QUEUE_INIT(&loop->pending_queue);
/* 初始化 I/O观察者队列,所有初始化后的I/O观察者都会被插入此队列 */
QUEUE_INIT(&loop->watcher_queue);
loop->closing_handles = NULL;
/* 初始化时间,获取系统当前的时间 */
uv__update_time(loop);
loop->async_io_watcher.fd = -1;
loop->async_wfd = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
loop->emfile_fd = -1;
loop->timer_counter = 0;
loop->stop_flag = 0;
/* 初始化平台、linux Windows等 */
err = uv__platform_loop_init(loop);
if (err)
return err;
/* 初始化信号 */
uv__signal_global_once_init();
err = uv_signal_init(loop, &loop->child_watcher);
if (err)
goto fail_signal_init;
uv__handle_unref(&loop->child_watcher);
loop->child_watcher.flags |= UV_HANDLE_INTERNAL;
QUEUE_INIT(&loop->process_handles);
/* 初始化线程读写锁 */
err = uv_rwlock_init(&loop->cloexec_lock);
if (err)
goto fail_rwlock_init;
/* 初始化线程互斥锁 */
err = uv_mutex_init(&loop->wq_mutex);
if (err)
goto fail_mutex_init;
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
if (err)
goto fail_async_init;
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV_HANDLE_INTERNAL;
return 0;
/* 各种出错的处理 */
fail_async_init:
uv_mutex_destroy(&loop->wq_mutex);
fail_mutex_init:
uv_rwlock_destroy(&loop->cloexec_lock);
fail_rwlock_init:
uv__signal_loop_cleanup(loop);
fail_signal_init:
uv__platform_loop_delete(loop);
return err;
}
其实在libuv有一个全局的、静态的uv_loop_t
实例default_loop_struct
,与他对应的指针default_loop_ptr
,这个东西在后续的使用是经常会被用到,它在uv_default_loop()
函数第一次被调用的时候就会通过uv_loop_init()
函数进行初始化操作,这就保证了无论使用什么样的handle
,它都有一个统一的事件循环入口。
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;
uv_loop_t* uv_default_loop(void) {
if (default_loop_ptr != NULL)
return default_loop_ptr;
/* 初始化default_loop_struct实例 */
if (uv_loop_init(&default_loop_struct))
return NULL;
default_loop_ptr = &default_loop_struct;
return default_loop_ptr;
}
uv_run()
首先讲解一下他的API吧,传入uv_loop_t
事假循环的句柄,还有一个运行的模式,它的模式有3种,分别为默认模式、单次模式、不等待模式。
默认模式UV_RUN_DEFAULT:运行事件循环,直到不再有活动的和引用的句柄或请求为止。
单次模式UV_RUN_ONCE:轮询一次I/O,如果没有待处理的回调,则进入阻塞状态,完成处理后返回零,不继续运行事件循环。
不等待模式UV_RUN_NOWAIT:对I/O进行一次轮询,但如果没有待处理的回调,则不会阻塞。
注意,这个函数不是线程安全的。
typedef enum {
UV_RUN_DEFAULT = 0,
UV_RUN_ONCE,
UV_RUN_NOWAIT
} uv_run_mode;
从代码看uv_run()
函数做了什么事情,可以参考上一篇文章中的图,并结合源码来学习:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
/* 这是一个while循环 */
while (r != 0 && loop->stop_flag == 0) {
/* 更新时间并开始倒计时 */
uv__update_time(loop);
uv__run_timers(loop);
/* 处理挂起的handle */
ran_pending = uv__run_pending(loop);
/* 运行idle handle */
uv__run_idle(loop);
/* 运行prepare handle */
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
/* 计算要阻塞的时间,开始阻塞 */
uv__io_poll(loop, timeout);
/* 程序执行到这里表示被唤醒了,被唤醒的原因可能是I/O可读可写、或者超时了,检查handle是否可以操作 */
uv__run_check(loop);
/* 看看是否有close的handle */
uv__run_closing_handles(loop);
/* 单次模式 */
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
/* handle保活处理 */
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
大家可以看看相关的函数是这样子调用的(大家只看左下角部分即可):
uv_loop_close()
释放所有内部循环资源。仅当循环完成执行并且所有打开的句柄和请求已关闭时才调用此函数,否则它将返回UV_EBUSY
。此函数返回后,用户可以释放为循环分配的内存。
注意,这个函数也不是线程安全的。
int uv_loop_close(uv_loop_t* loop) {
QUEUE* q;
uv_handle_t* h;
#ifndef NDEBUG
void* saved_data;
#endif
/* 如果存在处于活跃状态的请求,则返回UV_EBUSY */
if (uv__has_active_reqs(loop))
return UV_EBUSY;
/* 如果存在处于活跃状态的handle,则返回UV_EBUSY */
QUEUE_FOREACH(q, &loop->handle_queue) {
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
if (!(h->flags & UV_HANDLE_INTERNAL))
return UV_EBUSY;
}
/* 关闭事件循环 */
uv__loop_close(loop);
#ifndef NDEBUG
saved_data = loop->data;
memset(loop, -1, sizeof(*loop));
loop->data = saved_data;
#endif
if (loop == default_loop_ptr)
default_loop_ptr = NULL;
return 0;
}