commit 0de5919a1639ea0e49e0545a33d906de504c1b52
parent 699777be9e7576c754ab9e5dddb6a1a89ebacf92
Author: Daniel GarcĂa <dani-garcia@users.noreply.github.com>
Date: Sat, 21 May 2022 19:12:38 +0200
Fix incorrect pings sent, and respond to pings from the client
Diffstat:
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git a/src/api/notifications.rs b/src/api/notifications.rs
@@ -293,6 +293,7 @@ pub fn start_notification_server() -> WebSocketUsers {
let users2 = users.clone();
tokio::spawn(async move {
let addr = (CONFIG.websocket_address(), CONFIG.websocket_port());
+ info!("Starting WebSockets server on {}:{}", addr.0, addr.1);
let listener = TcpListener::bind(addr).await.expect("Can't listen on websocket port");
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
@@ -317,9 +318,11 @@ pub fn start_notification_server() -> WebSocketUsers {
users
}
-async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: SocketAddr) -> Result<(), Error> {
+async fn handle_connection(stream: TcpStream, users: WebSocketUsers, addr: SocketAddr) -> Result<(), Error> {
let mut user_uuid: Option<String> = None;
+ info!("Accepting WS connection from {addr}");
+
// Accept connection, do initial handshake, validate auth token and get the user ID
use handshake::server::{Request, Response};
let mut stream = accept_hdr_async(stream, |req: &Request, res: Response| {
@@ -346,9 +349,22 @@ async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: Sock
res = stream.next() => {
match res {
Some(Ok(message)) => {
+ //info!("RECEIVED {message:?}");
+
+ // Respond to any pings
+ if let Message::Ping(ping) = message {
+ if stream.send(Message::Pong(ping)).await.is_err() {
+ break;
+ }
+ continue;
+ } else if let Message::Pong(_) = message {
+ /* Ignored */
+ continue;
+ }
+
// We should receive an initial message with the protocol and version, and we will reply to it
if let Message::Text(ref message) = message {
- let msg = message.strip_suffix('\u{1e}').unwrap_or(message);
+ let msg = message.strip_suffix(RECORD_SEPARATOR as char).unwrap_or(message);
if serde_json::from_str(msg).ok() == Some(INITIAL_MESSAGE) {
stream.send(Message::binary(INITIAL_RESPONSE)).await?;
@@ -377,13 +393,15 @@ async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: Sock
}
_= interval.tick() => {
- if stream.send(Message::Binary(create_ping())).await.is_err() {
+ if stream.send(Message::Ping(create_ping())).await.is_err() {
break;
}
}
}
}
+ info!("Closing WS connection from {addr}");
+
// Delete from map
users.map.entry(user_uuid).or_default().retain(|(uuid, _)| uuid != &entry_uuid);
Ok(())