async_wasi/snapshots/common/net/
mod.rs#[cfg(all(unix, feature = "async_tokio"))]
pub mod async_tokio;
pub use super::vfs::*;
use super::{
error::Errno,
types::{self as wasi_types, __wasi_subscription_t},
};
use std::{
future::Future,
io::{self, Read, Write},
net,
time::{Duration, SystemTime},
};
use wasi_types::{
__wasi_clockid_t::{
__WASI_CLOCKID_MONOTONIC as CLOCKID_MONOTONIC, __WASI_CLOCKID_REALTIME as CLOCKID_REALTIME,
},
__wasi_eventtype_t::{
__WASI_EVENTTYPE_CLOCK as CLOCK, __WASI_EVENTTYPE_FD_READ as RD,
__WASI_EVENTTYPE_FD_WRITE as WR,
},
};
#[derive(Debug, Clone, Copy, Default)]
pub enum AddressFamily {
#[default]
Inet4,
Inet6,
}
#[derive(Debug, Clone, Copy, Default)]
pub enum SocketType {
Datagram,
#[default]
Stream,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectState {
Empty,
Listening,
Connected,
Connecting,
}
impl Default for ConnectState {
fn default() -> Self {
Self::Empty
}
}
#[derive(Debug, Clone, Default)]
pub struct WasiSocketState {
pub sock_type: (AddressFamily, SocketType),
pub local_addr: Option<net::SocketAddr>,
pub peer_addr: Option<net::SocketAddr>,
pub bind_device: Vec<u8>,
pub backlog: u32,
pub shutdown: Option<net::Shutdown>,
pub nonblocking: bool,
pub so_reuseaddr: bool,
pub so_conn_state: ConnectState,
pub so_recv_buf_size: usize,
pub so_send_buf_size: usize,
pub so_recv_timeout: Option<Duration>,
pub so_send_timeout: Option<Duration>,
pub fs_rights: WASIRights,
}
#[derive(Debug, Clone, Copy)]
pub enum SubscriptionFdType {
Read(wasi_types::__wasi_userdata_t),
Write(wasi_types::__wasi_userdata_t),
Both {
read: wasi_types::__wasi_userdata_t,
write: wasi_types::__wasi_userdata_t,
},
}
#[derive(Debug, Clone, Copy)]
pub struct SubscriptionFd {
pub fd: wasi_types::__wasi_fd_t,
pub type_: SubscriptionFdType,
}
impl SubscriptionFd {
pub fn set_write(&mut self, userdata: wasi_types::__wasi_userdata_t) {
let read_userdata = match &mut self.type_ {
SubscriptionFdType::Read(v) => *v,
SubscriptionFdType::Write(v) => {
*v = userdata;
return;
}
SubscriptionFdType::Both { read, write } => {
*write = userdata;
return;
}
};
self.type_ = SubscriptionFdType::Both {
read: read_userdata,
write: userdata,
};
}
pub fn set_read(&mut self, userdata: wasi_types::__wasi_userdata_t) {
let write_userdata = match &mut self.type_ {
SubscriptionFdType::Write(v) => *v,
SubscriptionFdType::Read(v) => {
*v = userdata;
return;
}
SubscriptionFdType::Both { read, write } => {
*read = userdata;
return;
}
};
self.type_ = SubscriptionFdType::Both {
read: userdata,
write: write_userdata,
};
}
}
#[derive(Debug, Clone, Copy)]
pub struct SubscriptionClock {
pub timeout: Option<SystemTime>,
pub userdata: wasi_types::__wasi_userdata_t,
pub err: Option<Errno>,
}
#[derive(Debug, Clone, Copy)]
pub enum Subscription {
FD(SubscriptionFd),
RealClock(SubscriptionClock),
}
impl Subscription {
pub fn from(s: &__wasi_subscription_t) -> Result<Subscription, Errno> {
let userdata = s.userdata;
match s.u.tag {
CLOCK => {
let clock = unsafe { s.u.u.clock };
match clock.id {
CLOCKID_REALTIME | CLOCKID_MONOTONIC => {
if clock.flags == 1 {
if let Some(ddl) = std::time::UNIX_EPOCH
.checked_add(Duration::from_nanos(clock.timeout + clock.precision))
{
Ok(Subscription::RealClock(SubscriptionClock {
timeout: Some(ddl),
userdata,
err: None,
}))
} else {
Ok(Subscription::RealClock(SubscriptionClock {
timeout: None,
userdata,
err: Some(Errno::__WASI_ERRNO_INVAL),
}))
}
} else if clock.timeout == 0 {
Ok(Subscription::RealClock(SubscriptionClock {
timeout: None,
userdata,
err: None,
}))
} else {
let duration = Duration::from_nanos(clock.timeout + clock.precision);
let timeout = std::time::SystemTime::now().checked_add(duration);
Ok(Subscription::RealClock(SubscriptionClock {
timeout,
userdata,
err: None,
}))
}
}
_ => Ok(Subscription::RealClock(SubscriptionClock {
timeout: None,
userdata,
err: Some(Errno::__WASI_ERRNO_NODEV),
})),
}
}
RD => {
let fd_read = unsafe { s.u.u.fd_read };
Ok(Subscription::FD(SubscriptionFd {
fd: fd_read.file_descriptor,
type_: SubscriptionFdType::Read(userdata),
}))
}
WR => {
let fd_read = unsafe { s.u.u.fd_read };
Ok(Subscription::FD(SubscriptionFd {
fd: fd_read.file_descriptor,
type_: SubscriptionFdType::Write(userdata),
}))
}
_ => Err(Errno::__WASI_ERRNO_INVAL),
}
}
}
#[derive(Debug)]
pub enum PrePoll {
OnlyFd(Vec<SubscriptionFd>),
OnlyClock(SubscriptionClock),
ClockAndFd(SubscriptionClock, Vec<SubscriptionFd>),
}
impl PrePoll {
pub fn from_wasi_subscription(
subs: &[wasi_types::__wasi_subscription_t],
) -> Result<Self, Errno> {
use std::collections::HashMap;
let mut fds = HashMap::with_capacity(subs.len());
let mut timeout: Option<SubscriptionClock> = None;
for s in subs {
let s = Subscription::from(s)?;
match s {
Subscription::FD(fd) => {
let type_ = fd.type_;
fds.entry(fd.fd)
.and_modify(|e: &mut SubscriptionFd| match type_ {
SubscriptionFdType::Read(data) => e.set_read(data),
SubscriptionFdType::Write(data) => e.set_write(data),
SubscriptionFdType::Both { read, write } => {
e.type_ = SubscriptionFdType::Both { read, write };
}
})
.or_insert(fd);
}
Subscription::RealClock(clock) => {
if clock.err.is_some() {
return Ok(PrePoll::OnlyClock(clock));
}
if clock.timeout.is_none() {
return Ok(PrePoll::OnlyClock(clock));
}
if let Some(old_clock) = &mut timeout {
let new_timeout = clock.timeout.unwrap();
let old_timeout = old_clock.timeout.unwrap();
if new_timeout < old_timeout {
*old_clock = clock
}
} else {
timeout = Some(clock)
}
}
}
}
let fd_vec: Vec<SubscriptionFd> = fds.into_values().collect();
if let Some(clock) = timeout {
if fd_vec.is_empty() {
Ok(PrePoll::OnlyClock(clock))
} else {
Ok(PrePoll::ClockAndFd(clock, fd_vec))
}
} else {
Ok(PrePoll::OnlyFd(fd_vec))
}
}
}