WebSockets 是一种先进的技术。它可以在用户的浏览器和服务器之间打开交互式通信会话。服务端可以主动给客户端推送消息。相比于HTTP,使用Websocket,你可以从服务器主动向客户端发送消息,而无需客户端通过轮询服务器的方式以获得响应。它基于TCP,可以从HTTP升级为Websocket。常见的应用场景就是媒体聊天,弹幕,协同编辑,基于位置的应用,体育实况更新、股票基金报价实时更新,天气预报等。有人会说 TCP 不是有 KeepAlive 机制么,通过这个机制来实现不就可以了吗?但是事实上,TCP KeepAlive 的机制其实并不适用于此。Keep Alive 机制开启后,TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性。一般时间为 7200 s,失败后重试 10 次,每次超时时间 75 s。显然默认值无法满足我们的需求,而修改过设置后就可以满足了吗?答案仍旧是否定的。我们知道 TCP 是一个基于连接的协议,其连接状态是由一个状态机进行维护,连接完毕后,双方都会处于 established 状态,这之后的状态并不会主动进行变化。这意味着如果上层不进行任何调用,一直使 TCP 连接空闲,那么这个连接虽然没有任何数据,但仍是保持连接状态,一天、一星期、甚至一个月,即使在这期间中间路由崩溃重启无数次。举个现实中经常遇到的例子:当我们 ssh 到自己的 VPS 上,然后不小心踢掉网线,此时的网络变化并不会被 TCP 检测出,当我们重新插回网线,仍旧可以正常使用 ssh,同时此时并没有发生任何 TCP 的重连。实际的网络环境要非常复杂,办公的防火墙默认配置会释放设置的时间内没有数据的TCP连接。DHCP租约到期。如果使用移动设备,则网络更加复杂。移动设备为了节省电量,切到后台的应用会关闭网络连接。移动设备使用的运营商网络,由于无线网络的频带资源相比计算机网络的光纤传输带宽而言稀缺得多,无线信号所受到空中干扰大,信号随距离的衰减快,要达到同样的带宽及同样的覆盖范围,配置密集基站的成本远比建设光纤传输网要高得多,正是因为如此,移动通信网络中才需要频繁地通过释放和重新申请无线资源来对宝贵的无线资源进行复用。Websocket有ping、pong控制帧,我们只需要在约定的时间内发送ping帧,对端回复pong帧。
我们可以在ping和pong帧附加数据。在websocket层发送心跳,可以检测网络链路是否通畅,应用是否阻塞假死等。
一是固定时间发送心跳,比如客户端每隔30秒给服务端发送一个ping控制帧,或者服务端给客户端发送一个ping帧,如果未在规定的时间内收到pong帧,则认定对端已经关闭,将websocket重新连接或者关闭。这种固定时间发送心跳,对于服务端来说,容易实现,设置定时器,或者设置读超时,在规定的时间内,未收到ping控制帧,则判定客户端连接已关闭,则将服务器端对于客户端的websocket连接关闭。如果再优化一下,可以设置未收到ping心跳次数。例如当ping心跳3次还未收到时,则断开客户端的websocket连接。这样可以解决因为ping控制帧丢包的问题。还可以再优化一下,收到客户端的发送的数据帧时,也将服务端读超时重新重置。二是非固定时间发送心跳,比如初始设置默认的双方约定的心跳时间。当手机应用在前台时,延长心跳时间,手机应用切到后台时,缩短心跳时间。这里主要考虑到手机应用切到后台时,手机操作系统会关闭网络连接,服务器端能尽快的检查到客户端的状态,并释放服务器的websocket资源。而如果用户的手机应用在前台,手机操作系统不会对网络连接进行限制,从社会工程学来说,用户也会使用应用进行浏览消息,或发送消息。下次心跳的时间,可以在本次的ping控制帧获取。这样非固定时间发送心跳,服务端比较难实现。在Rust中,常用的Websocket库有tungstenite-rs,tokio-tungstenite,其中,tokio-tungstenite基于tugstenite-rs开发,是异步调用。我们这里以tokio-tugstenite为例,在服务端,设置固定时间的心跳。 let ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
tokio::select! {
msg = ws_receiver.next() => {
match msg {
Some(msg) => {
let msg = msg?;
if msg.is_text() ||msg.is_binary() {
ws_sender.send(msg).await?;
} else if msg.is_close() {
break;
}
}
None => break,
}
}
_ = interval.tick() => {
let v:Vec<u8> = Vec::new();
ws_sender.send(Message::Ping(v)).await?;
}
}
}
其它库类似的思路。下节介绍websocket设置超时。