我们使用rust实现了一个聊天室的服务端,可以搜索使用在线的websocket客户端就测试,你也可以自行使用熟悉的语言开发客户端。
1,我们使用map存储websocket的地址和消息通道的发射端。为什么使用消息通道,而不直接使用websocket,很多原因,主要的原因为方便在多个线程之间不用锁进行通信,增加缓存。
2,我们将收到的消息转发给聊天室中的其它客户端,map存储了所有客户端的socket地址和消息通道的发射端。我们迭代map,将消息发送给其他客户端的消息通道。
type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
struct Peer {
stream: WebSocketStream<TcpStream>,
rx: Rx,
}
impl Shared {
/// Create a new, empty, instance of `Shared`.
fn new() -> Self {
Shared {
peers: HashMap::new(),
}
}
// 广播
async fn broadcast(&mut self, sender: SocketAddr, msg: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
let _ = peer.1.send(Message::Text(msg.to_string()));
}
}
}
}
impl Peer {
/// Create a new instance of `Peer`.
async fn new(state: Arc<Mutex<Shared>>, stream: WebSocketStream<TcpStream>) -> Result<Peer> {
// Get the client socket address
let addr = stream.get_ref().peer_addr()?;
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded_channel();
// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);
Ok(Peer { stream, rx })
}
}
3,websocket使用rust库tokio_tungstenite,通过它可以建立websocket连接,在此连接上,服务器和客户端可以互相收发消息。
4,我们有一个逻辑循环处理消息通道,当消息通道中有消息时,直接将消息从消息通道取出,然后通过自身的webscoket发送给客户端。
5,我们添加了心跳,服务器定期向客户端进行发送心跳,客户端收到心跳后进行回应。我们设置超时定时器,超时之后没有收到客户端的响应,我们就就认为客户端已经不在线。然后删除socket地址,和消息通道。
async fn handle_connection(
state: Arc<Mutex<Shared>>,
raw_stream: TcpStream,
addr: SocketAddr,
) -> Result<()> {
println!("Incoming TCP connection from: {}", addr);
let mut ws_stream = accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
ws_stream
.send(Message::Text("请输入您的昵称:".to_string()))
.await?;
let mut username = String::new();
loop {
let line = ws_stream.next().await;
match line {
Some(Ok(line)) => {
if line.is_binary() || line.is_text() {
let msg = line.to_text().unwrap();
if msg.len() > 0 {
username.push_str(line.to_text().unwrap());
break;
}
}
ws_stream
.send(Message::Text("请输入您的昵称:".to_string()))
.await?;
}
// We didn't get a line so we return early here.
_ => {
println!("Failed to get username from {}. Client disconnected.", addr);
return Ok(());
}
}
}
// 获取到昵称,发送消息给ta
let hello_msg = format!("您设置的昵称是{},欢迎进入聊天室", username);
ws_stream.send(Message::Text(hello_msg)).await?;
let mut peer = Peer::new(state.clone(), ws_stream).await?;
{
let mut state = state.lock().await;
let n = state.peers.len();
let msg = format!("欢迎[{}]进入聊天室,当前人数:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
// 通道中有值时
Some(msg) = peer.rx.recv() => {
peer.stream.send(msg).await?;
},
// websocket接收到消息时
msg = timeout(Duration::from_secs(33),peer.stream.next()) => {
match msg {
Ok(Some(Ok(msg)))=>{
if msg.is_text() || msg.is_binary(){
let content = msg.to_text().unwrap();
let content = content.trim();
if content.len() > 0 {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, content);
state.broadcast(addr, &msg).await;
}
}else if msg.is_close() {
println!("客户端断开连接");
break;
}
},
Ok(Some(Err(e)))=>{
println!("客户端发生错误,{}",e);
},
Ok(None)=>{
println!("客户端已经掉线");
break;
},
Err(e)=>{
println!("获取消息超时,{}",e);
break;
}
}
}
// 时钟到达时
_ = interval.tick()=> {
let payload = Vec::new();
let ping = Message::Ping(payload);
peer.stream.send(ping).await?;
}
}
}
{
let mut state = state.lock().await;
state.peers.remove(&addr);
let n = state.peers.len();
let msg = format!("[{}]离开了聊天室,当前人数:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
Ok(())
}
最后,我们将它添加main函数中。
use std::{
collections::HashMap, env, io::Error as IoError, net::SocketAddr, sync::Arc, time::Duration,
};
use tokio::sync::{mpsc, Mutex};
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{
accept_async,
tungstenite::{Message, Result},
};
#[tokio::main]
async fn main() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "0.0.0.0:9528".to_string());
let state = Arc::new(Mutex::new(Shared::new()));
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
println!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
let state = Arc::clone(&state);
tokio::spawn(handle_connection(state, stream, addr));
}
Ok(())
}