Occlum 异步网络框架
Table of Contents
1. io_uring_callback
这是 libos 中对 occlum io_uring 的进一步封装。对于原始的 occlum io_uring,他最大的缺点是,使用者必须手动的从 CQ 中获得 CQE,然后映射到对应的 SQE 的请求进行处理,这使得整个使用流程略显繁琐。io_uring_callback 提供了用户更加友好的 io_uring api,其特点主要表现在下面三个方面:
- callback_based,在一个 IO 请求被完成后,对应的用户注册的回调函数会被调用,而不需要手动分配 IO 的完成事件。
- Async/await,在提交完 io 请求之后,用户会获得一个代表这个正在进行的 io 请求的 handle,可以 await 这个 handle,类似 future。
- polling_based IO,无论是 IO 的提交还是完成都便于使用轮询模式。
下面先介绍 io_uring 的总体设计,然后从上面三个方面详细介绍具体实现原理。
这里介绍一下各个模块的作用:
- Builder:直接套壳的 io_uring 中的 builder,也是 occlum 中创建 IoUring 的唯一方式,可以在创建前配置对应的参数,以便在内核实例化创建的 io_uring 具有不同的特性。
- IoUring:核心结构体,主要包括的字段有:
- ring:io_uring 实例,调用 crate 进行具体的操作,
- fd_map:当前 io_uring 中所有处理过的 IO 事件的 fd 以及 io 事件数,除非 unregister 否则一直存在
- sq_lock:为向 SQ 提交 SQE 加锁,防止同一时间操作同一个 SQ,线程安全
- token_table:标识 SQE 和 CQE 的一一映射,是回调和 handle 实现的核心
- 在 IoUring 中的方法主要分为两类:
- op_method:主要是针对特定的 opcode 的对 socket 的封装,比如 accept 等,创建出 SQE,随后通过调用 push_entry,传递进 SQ
- 其他方法,这里特别指出 poll_completions,此方法包含了 IoUring 对 CQ 操作的封装
解下来分析其中几个重点方法:push_entry
,poll_completions
,cancel
。
io_uring_callback 每次创建 SQE 时会生成一个对应的 IoHandle,这个 Iohandle 的结构大致如下所示:
IoHandle
-> IoToken
->Inner
-> state
completion_callback
token_key
在注册 SQE 时,传入 callback 参数,然后在提交 SQE 时,通过生成一个 token_key,注册在 IoUring 的 token_table 中,不仅关联了 SQE 和 IoHandle,通过将 userdata 设置成 tokenkey 还可以将 SQE 和 CQE 对应起来,大致流程如下所示:
回调机制的实现流程:
- 用户通过 fd、callback 调用对应的 op method 创建 sqe
- 在 IoUring 的 token_table 中生成一个 token_key,利用 token_key 和 callback 生成 token,将 token_key 和 token 关联注册到 table 中
- 在 sqe 的 userdata 中写入 token_key,push 到 SQ 中
- 在 CQ 中对应的 cqe 从 ring 中出来后根据 key 的值找到 token
- 将 token 从 table 中注销
- 调用 token 中的 callback 实现回调
这只是单个 sqe 的同步流程,对于已经在运行的异步 IoUring 的宏观形势如下:
token_table 注册大量已经运行,或者正在被取消的 IO 事件,通过cancel
方法取消一个已经提交的事件,同时 handle 的存在还可以实时维护在内核运行中的 IO 事件的状态,具体表现为:
- 一开始创建的 sqe,在 push_entry 后提交到内核后为 submitted 状态
- 如果想要取消 IO,则会向内核发起取消请求,此时 Token 标记为 cacelling
即:在提交到内核 IO 事件处理完成前,Token 只有两个状态,即 submitted 和 cancelling
- 在
poll_completions
后,cqe 出现,利用 key 找到对应的 Token- Token 为 submitted,一个正常的 IO 事件完成,调用回调,标记为 processed
- Token 为 cacelling
- 如果返回值为 CANCEL_RETVAL,则表示已取消,标记为 cacelled
- 如果返回值不是,则表示取消失败,标记为 processed,仍旧回调
整个大致的基于回调和 handle 的机制大概就是这样。
2. Libos UringSet 单例
在 Occlum libos 中整个网络的 socket 层使用单例的 UringSet 进行 IO 操作,结构为:
主要对外暴露两个接口:
- get_uring:分为初始阶段和非初始阶段
- 初始阶段:依据配置创建足够数量的 IoUring 实例
- 非初始阶段:不创建,直接找工作负载最少的 IoUring 实例(两个方面,注册的 socketfd 以及 op num)
- disattach_uring
- 从 IoUring 中取消注册一个 socketfd
3. Event 模型
occlum 中事件机制共有三种:
- Waiter、Waker 和 Waiter Queue,和线程休眠唤醒相关
- Event、Observer 和 Notifier 用来处理和广播事件
- WaiterQueueObserver 实现了当事件发生唤醒对应线程的机制
这里主要介绍如何处理和广播事件,其机制大致如下图所示:
对于一个 Notifier,其内部动态维护者一个 subscriber 队列,可以通过 register/unregister 从 notifier 中添加和删除 subscriber,Notifiter 每次向队列中的每一个 subscriber 广播事件。而每一个 subscriber 中由一个 observer 和 filter 组成,filter 过滤当前 subscriber 不感兴趣的事件,在 Notifiter 广播事件时,不做处理,observer 则在得到事件时执行对应的 on_event 行为。
Pollee-Poller 模型:
Pollee 作为 Notifier,Poller 作为 subscriber,在 Poller 通过 connect 和 Pollee 关联后,每次有 add_event 事件添加时,都会向所有的 subscriber 广播,Poller 会执行对应的线程唤醒操作,其他的 Observer 则会执行对应的 on_event。
4. Socket file
在 Linux 中将 socket 看做一个文件,在 Occlum 中也是如此。对于所有使用 io_uring 实现的异步 socket,其对应的系统调用经过 Occlum 包装后返回的都是 SocketFile 类型,不同的 socket 类型在 SocketFile 中的 AnySocket 枚举中实现,其对外的接口由 SocketFile 暴露,总体架构如下:
当对 Occlum libos 发起一个 Socket 系统调用时,首先会判断 socket 的类型。代码示例如下(这里节选自 bind 系统调用:
let file_ref = current!().file(fd as FileDesc)?;
if let Ok(socket) = file_ref.as_host_socket() {
let raw_addr = addr.to_raw();
socket.bind(&raw_addr)?;
} else if let Ok(unix_socket) = file_ref.as_unix_socket() {
let unix_addr = addr.to_unix()?;
unix_socket.bind(unix_addr)?;
} else if let Ok(uring_socket) = file_ref.as_uring_socket() {
uring_socket.bind(&addr)?;
} else {
return_errno!(ENOTSOCK, "not a socket");
}
这里的第一行current!().file(fd as FileDesc)?
如果成功的话,会返回一个 FileRef 类型,在 socket_file 中,有如下实现:
pub trait UringSocketType {
fn as_uring_socket(&self) -> Result<&SocketFile>;
}
impl UringSocketType for FileRef {
fn as_uring_socket(&self) -> Result<&SocketFile> {
self.as_any()
.downcast_ref::<SocketFile>()
.ok_or_else(|| errno!(ENOTSOCK, "not a uring socket"))
}
}
因此对于一个 uring socket,可以通过普通的 fd 和 as_uring_socket 方法拿到 SocketFile,而 SocketFile 的实现很简单:
pub struct SocketFile {
socket: AnySocket,
}
enum AnySocket {
Ipv4Stream(Ipv4Stream),
Ipv6Stream(Ipv6Stream),
Ipv4Datagram(Ipv4Datagram),
Ipv6Datagram(Ipv6Datagram),
}
SocketFile 中拥有 syscall 对应的接口,对于 Socket 中比较通用的操作,比如 write、read,采用 apply_fn_on_any_socket! 宏实现,而其它不同 Socket 实现逻辑不同的接口则通过 AnySocket 调用具体枚举类实现,比如 bind、listen、accept 等。
5. Common
common 中集合了不同状态下 Stream Socket 中的一些通用的属性和方法,比如是否阻塞、是否已经关闭、绑定的 io_uring 实例等等,其定义如下:
// A 泛型标识地址类型,比如 IPv4、IPv6,R 泛型表示运行时,这里主要用于 io_uring
pub struct Common<A: Addr + 'static, R: Runtime> {
host_fd: FileDesc, // host_fd:主机上的文件描述符
type_: SocketType, // socket 类型
nonblocking: AtomicBool, // 是否是阻塞类型
is_closed: AtomicBool, // socket 是否关闭
pollee: Pollee, // 用于 Pollee-Poller 机制,实现异步 IO
inner: Mutex<Inner<A>>, // 保存了一个 Stream Socket 连接(bind)的 C/S 地址
timeout: Mutex<Timeout>, // 等待时延
errno: Mutex<Option<Errno>>, // 错误码
io_uring: Arc<IoUring>, // socket 绑定的 IoUring 实例
phantom_data: PhantomData<(A, R)>, // 标识 Common 结构体拥有和泛型一样的生命周期
}
common 中主要就是维护一些 socket 共有的属性和方法,同时也封装了除去 IO 相关的系统调用,例如 do_bind、do_close 等。
6. Datagram Socket
在 Occlum 中 Datagram 类型的 socket 实现如下:
// 泛型类型和 Common 类似
pub struct DatagramSocket<A: Addr + 'static, R: Runtime> {
common: Arc<Common<A, R>>, // 持有一个 common 结构
state: RwLock<State>, // 当前 DG socket 的状态,包含绑定状态和是否连接
sender: Arc<Sender<A, R>>, // socket 的发送端,封装发送行为,对应 output 事件
receiver: Arc<Receiver<A, R>>, // socket 的接受端,封装接收行为,对应 input 事件
}
struct State {
bind_state: BindState, // 这里的绑定状态有三种:没有绑定、显式和隐式
is_connected: bool, // 是否建立连接
}
6.1 State 的转化
没有绑定的状态:
- new 方法:在初次创建 socket 时,BoundState 为 unbound,is_connected 为 false
- new_connection方法:BoundState 为 unbound,is_connected 为 true
new_connection 方法主要是为了创建一个具有连接的 socket 对,此时这个连接就应该是固定的,不需要 bind 一个地址,有任何其他的 socket 向这个 socket 对的任何一个发送数据。
当创建一个 Datagram socket 以后,我们可以通过 bind,将 socket 和一个 Addr 进行绑定,之后其他的 socket 就可以向这个地址发送数据,绑定 addr 的 socket 负责接收数据。在这里这种绑定分为显式和隐式两种。一个 Datagram socket 只能绑定一次,在没有显式绑定的情况下,一些方法的调用会触发隐式绑定,在这之后,再进行显式绑定返回错误。
显式绑定:
通过 bind 函数,显式设置当前 socket 绑定的地址,并修改 state:
pub fn bind(&self, addr: &A) -> Result<()> {
let mut state = self.state.write().unwrap();
if state.is_bound() {
return_errno!(EINVAL, "The socket is already bound to an address");
}
do_bind(self.host_fd(), addr)?;
self.common.set_addr(addr);
state.mark_explicit_bind();
// Start async recv after explicit binding or implicit binding
self.receiver.initiate_async_recv();
Ok(())
}
所谓绑定,就是设置 common inner 中的 addr 的值。
隐式绑定:
当调用 connect、sendto、sendmsg 会触发隐式绑定。直接将 BoundState 设置成隐式绑定,具体的端口分配应该是有 host os 完成,在这之后 socket 就不能再使用 bind 绑定显式的地址。
6.2 Sender && Receiver
数据接收:
整个数据的接收流程如下:
- 在显式绑定或者隐式绑定之后,Datagram socket 会对内核发送一个 recv 请求,也就是说在没有进行 recvmsg 调用之前,IoUring 已经开始接收网卡数据,并且保留在缓冲区中。
- 之后当应用真的调用 recvmsg 之后,会通过 try_recvmsg 方法尝试从 recv_buf 中拷贝数据
- 如果没有拷贝到数据,则判断是否出错或特殊处理,如果都不是则继续调用 do_recv
对于整个请求的构造,其结构如图:
在 Receiver 的 Inner 中包含了数据缓冲区和控制信息缓冲区,构造一条 request msg 到内核中的方式如下:
- 初始化 recv_buf,msg_control
- 构造一个 RecvReq,包含 msghdr、iovec、addr,其中 iovec 指向 recv_buf,缓存接收的数据,addr 保存原地址信息,msghdr 用于提交到 host os,传递 recv 的数据
- 在 iouring recv 事件完成后,msghdr 中 msg_iovec(指向 recv_buf)保存了数据,msg_name(指向 addr)保存了数据的源地址、msg_control(指向 inner 的 msg_control)保存控制信息,后续可以直接通过 Inner 获取到对应的数据,完成 recv。
数据发送:
整个数据结构的组织如下:
对于应用来说,当 sendmsg 将数据写到 DataMsg 中的 send_buf 之后,会有 io_uring 将 msg 给到 host os 的内核进行实际的 send 系统调用。所有的发送数据被组织到一个 MsgQueue 中,队列先进先出,io_uring 中每完成一次 send 操作会 pop 出当前的队头,然后继续执行队列中后续数据的发送的操作。
7. Stream Socket
StreamSocket 的实现是面向连接的,因此,这里实现了一个状态机,按照连接中的 socket 的不同状态将 socket 分为:
pub struct StreamSocket<A: Addr + 'static, R: Runtime> {
state: RwLock<State<A, R>>,
common: Arc<Common<A, R>>,
}
enum State<A: Addr + 'static, R: Runtime> {
// Start state 初始状态
Init(Arc<InitStream<A, R>>),
// Intermediate state 瞬时状态
Connect(Arc<ConnectingStream<A, R>>),
// Final state 1 终态 1
Connected(Arc<ConnectedStream<A, R>>),
// Final state 2 终态 2
Listen(Arc<ListenerStream<A, R>>),
}
下面先介绍各个状态的实现和方法:
Init Stream 状态:
pub struct InitStream<A: Addr + 'static, R: Runtime> {
common: Arc<Common<A, R>>,
inner: Mutex<Inner>,
}
struct Inner {
has_bound: bool,
}
在调用了构造方法之后,一个 Stream Socket 的初始状态为 InitStream,InitStream 只有是否 bind 的标识。
Connecting Stream 状态:
pub struct ConnectingStream<A: Addr + 'static, R: Runtime> {
common: Arc<Common<A, R>>,
peer_addr: A, // 连接对侧地址
req: Mutex<ConnectReq<A>>, // 连接请求
connected: AtomicBool, // Mainly use for nonblocking socket to update status asynchronously
}
struct ConnectReq<A: Addr> {
io_handle: Option<IoHandle>, // 请求的 io_handle
c_addr: UntrustedBox<libc::sockaddr_storage>, // socket 地址
c_addr_len: usize, // 地址长度
errno: Option<Errno>,
phantom_data: PhantomData<A>,
}
这是一个瞬时状态,会在建立连接的请求后发生改变,所以整个状态过程中就是发送建立连接的请求。在传入 peer_addr 创建 Connecting Stream 之后,可以调用 connect 方法发送连接请求,其具体实现在 initiate_async_connect 中:
- 根据传入的 Addr 翻译为 C 的 addr,然后向 io_uring 注册一个 connect IO 请求。
- 请求的 callback 为:
- 正常,则将 connected 参数设置为 true 标识连接建立成功
- 有错误码,根据具体的错误码,向 common 中的 pollee 添加事件,比如重置连接
Connected Stream 状态:
pub struct ConnectedStream<A: Addr + 'static, R: Runtime> {
common: Arc<Common<A, R>>,
sender: Sender,
receiver: Receiver,
}
在建立连接之后,进入 ConnectStream 状态,通过 sender 和 receiver 进行数据的发送和接收,这个在之后详细解释。
Listener Stream 状态:
pub struct ListenerStream<A: Addr + 'static, R: Runtime> {
common: Arc<Common<A, R>>,
inner: Mutex<Inner<A>>,
}
struct Inner<A: Addr> {
backlog: Backlog<A>,
fatal: Option<Errno>,
}
struct Backlog<A: Addr> {
// The entries in the backlog.
entries: Box<[Entry]>,
// Arguments of the io_uring requests submitted for the entries in the backlog.
reqs: UntrustedBox<[AcceptReq]>,
// The indexes of completed entries.
completed: VecDeque<usize>,
// The number of free entries.
num_free: usize,
phantom_data: PhantomData<A>,
}
进入 Listener Stream 状态的 socket 等待即将到来的 connect,主要维护了一组 backlog,即还未 connect 成功的 socket。
7.1 状态转化
状态转化如下图所示:
重点解析 connect 方法,首先将 connect 方法分成两部分来看,第一部分,转变瞬时状态 connecting:
let (init_stream, connecting_stream) = {
let mut state = self.state.write().unwrap();
match &*state {
State::Init(init_stream) => {
let connecting_stream = {
let common = init_stream.common().clone();
ConnectingStream::new(peer_addr, common)?
};
let init_stream = init_stream.clone();
*state = State::Connect(connecting_stream.clone());
(init_stream, connecting_stream)
}
State::Connect(connecting_stream) => {
if let Some(connected_stream) =
Self::try_switch_to_connected_state(connecting_stream)
{
*state = State::Connected(connected_stream);
return_errno!(EISCONN, "the socket is already connected");
} else {
// Not connected, keep the connecting state and try connect
let init_stream =
InitStream::new_with_common(connecting_stream.common().clone())?;
(init_stream, connecting_stream.clone())
}
}
State::Connected(_) => {
return_errno!(EISCONN, "the socket is already connected");
}
State::Listen(_) => {
return_errno!(EINVAL, "the socket is listening");
}
}
};
由于 Stream Socket 的 connect 方法是一个耗时任务,需要连接到远程的 socket 地址,所以创建了 connecting 表示一个连接的请求发出后 socket 所处的状态。对于一个 Stream Socket 对于 connect 的调用,将情况分为三种:
- Init 状态下:创建出新的瞬时状态 connecting
- Connecting 状态:已经有 connect 请求发出,这里先检查是否已经建立连接,若有则直接返回,没有则创建出一个 Init 状态,为防止之后连接失败状态回退
- 其他:返回不同的错误
在创建完瞬时状态后,进入第二部分,发起 connect 的异步请求:
let res = connecting_stream.connect();
// If success, then the state is switched to connected; otherwise, for blocking socket
// the state is restored to the init state, and for non-blocking socket, the state
// keeps in connecting state.
match &res {
Ok(()) => {
let connected_stream = {
let common = init_stream.common().clone();
common.set_peer_addr(peer_addr);
ConnectedStream::new(common)
};
let mut state = self.state.write().unwrap();
*state = State::Connected(connected_stream);
}
Err(_) => {
if !connecting_stream.common().nonblocking() {
let mut state = self.state.write().unwrap();
*state = State::Init(init_stream);
}
}
}
如果请求成功,则进入 connected 状态可以进行收发数据。对于阻塞式的 socket,不成功会进入 Init 状态,非阻塞则进入 connecting。
7.2 ListenerStream
从构造函数开始:
pub fn new(backlog: u32, common: Arc<Common<A, R>>) -> Result<Arc<Self>> {
// Here we use different variables for the backlog. For the libos, as we will issue async accept request
// ahead of time, and cacel when the socket closes, we set the libos backlog to a reasonable value which
// is no greater than the max value we set to save resources and make it more efficient. For the host,
// we just use the backlog value for maximum connection.
let libos_backlog = std::cmp::min(backlog, PENDING_ASYNC_ACCEPT_NUM_MAX as u32);
let host_backlog = backlog;
let inner = Inner::new(libos_backlog)?;
Self::do_listen(common.host_fd(), host_backlog)?;
common.pollee().reset_events();
let new_self = Arc::new(Self {
common,
inner: Mutex::new(inner),
});
// Start async accept requests right as early as possible to improve performance
{
let inner = new_self.inner.lock();
new_self.initiate_async_accepts(inner);
}
Ok(new_self)
}
这里的 backlog 有两个,一个是 libos_backlog,用于系统调用,并且在 Listener Stream 的 inner 中作为监听队列的长度,模拟内核,存储还未被 accept 的连接,一个是 host_backlog,表示在 listen 时传入的 backlog 参数。在创建本状态时,就会调用 listen 接收连接,并且调用 initiate_async_accepts 将所有处在空闲状态的 Entry 都进行 accept 请求,以便之后的 accept 调用,和之前的 Datagram 的 recv 类似,都是预先操作。
在 new 之后,所有的 Backlog entry 都会发送 accept 请求到 IoUring,然后转化为 pending 状态,在连接进入之后,其回调如下:
// 部分
let host_fd = retval as FileDesc;
inner.backlog.entries[entry_idx] = Entry::Completed { host_fd };
inner.backlog.completed.push_back(entry_idx);
stream.common.pollee().add_events(IoEvents::IN);
stream.initiate_async_accepts(inner);
pending 转化为 completed,并且继续让所有的 free entry 向 IoUring 提交 accept 请求。对于外部调用的 accept 方法,并不发起请求,而是直接向已完成的 compelted queue 中拿到 fd 和 addr,如果没有则阻塞等待,知道有连接到达,在 callback 中添加唤醒事件。
7.3 ConnectedStream
在这个状态下进行数据的收发,主要实现和 DataGram 类似,不同的是在组装 request 时,不是简单的将缓冲区的地址写到 msg,而是需要在缓冲区记录位置偏移量,且在必要的时候更新缓冲区。