libuv系列文章
stream handle
stream handle
可以被译为流句柄,它在 libuv
中是一个抽象的数据类型,为 libuv
提供了全双工的通信方式,可以说它只是一个父类,通过它派生出 uv_tcp_t、uv_pipe_t、uv_tty_t
这 3 个子类,在这些handle
中,都使用了stream handle
的成员变量及处理方法。
数据结构
通过 uv_stream_t 可以定义一个 stream handle
的实例。
typedef struct uv_stream_s uv_stream_t;
struct uv_stream_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
};
#define UV_STREAM_FIELDS \
/* 等待写的字节数 */ \
size_t write_queue_size; \
/* 分配内存的函数 */ \
uv_alloc_cb alloc_cb; \
/* 读取完成时候执行的回调函数 */ \
uv_read_cb read_cb; \
/* private */ \
UV_STREAM_PRIVATE_FIELDS
其实 stream handle
是属于handle
的子类,因此它的数据结构中包含了handle的成员变量,还包含它自身的一个成员变量 UV_STREAM_FIELDS
,它分为公有字段与私有字段,公有字段只有 write_queue_size、 alloc_cb 、 read_cb
,私有字段就是 UV_STREAM_PRIVATE_FIELDS
,它是分为Windows
平台与linux
平台的,此处以linux为例。
#define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
uv__io_t io_watcher; \
void* write_queue[2]; \
void* write_completed_queue[2]; \
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
connect_req:其实
uv_connect_t
是一个请求,从前面的文章我们也知道,在libuv
存在handle
与request
,很明显connect_req
就是一个请求,它的作用就是请求建立连接,比如类似建立tcp连接。shutdown_req:
uv_shutdown_t
也是一个请求,它的作用与uv_connect_t
刚好相反,关闭一个连接。io_watcher:抽象出来的
io观察者
。write_queue:写数据队列。
write_completed_queue:完成的写数据队列。
connection_cb:有新连接时的回调函数。
delayed_error:延时的错误代码。
accepted_fd:接受连接的描述符
fd
。queued_fds:
fd
队列,可能有多个fd
在排队。UV_STREAM_PRIVATE_PLATFORM_FIELDS:目前为空。
其实现在不太了解无所谓,就先看下去,我在写文章的时候其实也没有完全理解透彻。
总结一下它的框架示意图,如下:
内部API
stream handle
其实并未提供用户的API
接口,但提供了内部的API
接口,供子类使用,比如在创建一个tcp
的时候,就会通过uv__stream_init()
函数去初始化一个 stream handle
,又比如在读写流操作的时候肯定是通过stream handle
去操作的,因此它又需要实现内部的读写操作接口,相关的函数如下:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);
uv__stream_init()
初始化一个stream handle
,设置
函数原型:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type);
参数:
- loop:传入了事件循环的句柄。
- stream:指定初始化的
stream handle
。 - type:指定
stream handle
的类型,注意看它的类型参数是handle
类型的,而handle
类型有很多,但是对与这个stream handle
来说可选的值基本上只有UV_TCP、UV_TTY、UV_PIPE
。
源码的实现:
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
/* In the rare case that "/dev/null" isn't mounted open "/"
* instead.
*/
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
说说处理的逻辑:
调用
uv__handle_init()
函数将stream handle
初始化,主要设置loop
、类型
、以及UV_HANDLE_REF
标记。初始化
stream handle
中的成员变量。初始化
write_queue
与write_completed_queue
队列,可能有人有疑问了,为啥要写队列还要 写完成 两个队列,因为啊libuv是为了实现异步,写操作为了实现异步非阻塞,你不能直接写,你得通过写队列去操作,它会首先将数据丢到队列中,下层 io 观察者触发可写事件时才去写入,当写完了就告诉你。最后调用uv__io_init()函数去初始化
io观察者
,并设置stream
的回调处理函数uv__stream_io()
,这个处理回调函数后续慢慢讲解吧,先来看看stream handle
的读写操作。
uv__read()
当io观察者
发现stream handle
有可读事件时,uv__read()
函数会被调用,其实是被uv__stream_io()
函数调用,因为io观察者
发现了底层有数据可读。所以该函数是用于从底层读取数据,这也是stream handle
的读取操作。
uv__read()函数是通过 read() 函数从底层文件描述符读取数据,读取的数据写入由 stream->alloc_cb
分配到内存块中,并在完成读取后由 stream->read_cb
回调函数传递到用户。因为数据已经由底层准备好,直接读取即可,效率非常高,是不需要等待的。而当底层没有数据的情况时,read() 系统调用也会阻塞,而是直接返回,因为文件描述符工作在非阻塞模式下,即使底层还没有数据,它也不会阻塞的,而真正阻塞的地方是在io循环中。
简单看看函数源码吧:
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int count;
int err;
int is_ipc;
stream->flags &= ~UV_HANDLE_READ_PARTIAL;
/* Prevent loop starvation when the data comes in as fast as (or faster than)
* we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
*/
count = 32;
/* 看看是不是管道,IPC通讯机制 */
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
/* XXX: Maybe instead of having UV_HANDLE_READING we just test if
* tcp->read_cb is NULL or not?
*/
while (stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);
buf = uv_buf_init(NULL, 0);
/* 分配内存 */
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if (buf.base == NULL || buf.len == 0) {
/* 如果内存分配失败或者buffer的长度是0则无法读取 */
stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}
/* 断言,保证有内存空间与stream handle的文件描述符是存在的 */
assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);
/* 如果不是pipe */
if (!is_ipc) {
do {
/* 通过read()去读取底层数据 */
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
/* 直到读取完毕 */
while (nread < 0 && errno == EINTR);
} else {
/* ipc 需要使用 recvmsg() 函数去读取 */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
/* Set up to receive a descriptor even if one isn't in the message */
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;
do {
/* 读取ipc机制的数据 */
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
/* 读取数据错误 */
if (nread < 0) {
/* Error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* 开始下一次的等待. */
if (stream->flags & UV_HANDLE_READING) {
/* 重新设置io观察者活跃 */
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
} else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
uv__stream_eof(stream, &buf);
return;
#endif
} else {
/* 错误,用户应该调用uv_close()关闭 */
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, &buf);
return;
} else {
/* 成功读取到数据 */
ssize_t buflen = buf.len;
/* ipc就这样子读取 */
if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}
#if defined(__MVS__)
if (is_ipc && msg.msg_controllen > 0) {
uv_buf_t blankbuf;
int nread;
struct iovec *old;
blankbuf.base = 0;
blankbuf.len = 0;
old = msg.msg_iov;
msg.msg_iov = (struct iovec*) &blankbuf;
nread = 0;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
msg.msg_iov = old;
return;
}
} while (nread == 0 && msg.msg_controllen > 0);
msg.msg_iov = old;
}
#endif
/* 通过回调函数告诉应用层读取完成 */
stream->read_cb(stream, nread, &buf);
/* 如果没有填满缓冲区,则返回没有更多数据可读取。 */
if (nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
}
}
uv__write()
同理地,当 io 观察者
发现要写入数据的时候,它也会去将数据写入到底层,函数 uv__write()
会被调用,那什么时候才是可写呢,回顾 stream handle
的成员变量,它有两个队列,当 stream->write_queue
队列存在数据时,表示可以写入,如果队列为空则表示没有数据可以写。
libuv的异步处理都是差不多的,都是通过io观察者
去发现是否有可读可写,写数据的过程大致如下:用户将数据丢到写队列中就直接返回了,io观察者
发现队列有数据,stream handle
的处理 uv__stream_io()
函数被调用,开始写入操作,这个写入的操作是依赖系统的函数接口的,比如write()
等,等写完了就通知用户即可。
源码的实现:
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
/* 健壮性的处理,断言,确保存在stream handle的fd、队列存在等 */
assert(uv__stream_fd(stream) >= 0);
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
/* 转换为iovec。我们必须拥有自己的uv_buf_t而不是iovec,因为Windows的WSABUF不是iovec。 */
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iovmax = uv__getiovmax();
/* 限制iov计数以避免来自writev()的EINVAL */
if (iovcnt > iovmax)
iovcnt = iovmax;
if (req->send_handle) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
char data[64];
struct cmsghdr alias;
} scratch;
if (uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto error;
}
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset(&scratch, 0, sizeof(scratch));
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0)
req->send_handle = NULL;
} else {
do
/* 写操作 */
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
}
if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;
/* 重新启动io观察者. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
return;
error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
uv__stream_io()
uv__stream_io()
函数是 stream handle
的事件处理函数,它在uv__io_init()
函数就被注册了,在调用 uv__stream_io()
函数时,传递了事件循环对象、io 观察者
对象、事件类型等信息。
我们来看看stream handle
是如何处理可读写事件的:
通过
container_of()
函数获取stream handle
的实例,其实是计算出来的。如果
stream->connect_req
存在,说明 该stream handle
需要进行连接,于是调用uv__stream_connect()
函数请求建立连接。满足可读取数据的条件,调用
uv__read()
函数进行数据读取如果满足流结束条件 调用
uv__stream_eof()
进行相关处理。如果满足可写条件,调用
uv__write()
函数去写入数据,当然,数据会被放在stream->write_queue
队列中。在写完数据后,调用
uv__write_callbacks()
函数去清除队列的数据,并通知应用层已经写完了。
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
/* 获取 stream handle 的实例 */
stream = container_of(w, uv_stream_t, io_watcher);
/* 断言,判断是否满足类型 */
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
/* 如果需要建立连接,则请求建立连接 */
uv__stream_connect(stream);
return;
}
/* 断言 */
assert(uv__stream_fd(stream) >= 0);
/* 满足读数据条件,进行数据读取 */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
/* read_cb 可能会关闭 stream,此处判断一下是否需要关闭fd */
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
/* 如果满足流结束条件 调用 uv__stream_eof() 进行相关处理。 */
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
/* 如果有数据要写入,则调用uv__write()去写数据,写完了调用uv__write_callbacks()函数 */
if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
/* Write queue drained. */
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
uv__write_callbacks()
清理 stream->write_completed_queue
已完成写请求的队列,清理空间,并调用回调函数。
static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
QUEUE pq;
if (QUEUE_EMPTY(&stream->write_completed_queue))
return;
/* 从写完成队列获取队列 */
QUEUE_MOVE(&stream->write_completed_queue, &pq);
/* 队列不为空 */
while (!QUEUE_EMPTY(&pq)) {
/* 获取队列头部节点 */
q = QUEUE_HEAD(&pq);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
/* 清除并释放内存 */
if (req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
/* 通知应用层 */
if (req->cb)
req->cb(req, req->error);
}
}
外部API
内容较多,在下一章讲解吧。