From ec0b0683884830d11c5d2cddc324da7c502a25c9 Mon Sep 17 00:00:00 2001 From: Tony Klink Date: Mon, 15 Jan 2024 09:33:11 -0600 Subject: [PATCH] Return ["CLOSED"] on failing subscriptions --- src/bussy/mod.rs | 2 ++ src/relay/ws.rs | 31 ++++++++++++++++++++++++++++++- src/utils/structs.rs | 8 ++++++-- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/bussy/mod.rs b/src/bussy/mod.rs index d0c9c55..2470ce9 100644 --- a/src/bussy/mod.rs +++ b/src/bussy/mod.rs @@ -41,6 +41,8 @@ pub enum Command { PipelineResRelayMessageOk(/* client_id */ uuid::Uuid, nostr::RelayMessage), PipelineResStreamOutEvent(Box), PipelineResOk, + // Subscription Errors + ClientSubscriptionError(/* error message */ String), // Other Str(String), ServiceError(Error), diff --git a/src/relay/ws.rs b/src/relay/ws.rs index c642847..bf0d1dc 100644 --- a/src/relay/ws.rs +++ b/src/relay/ws.rs @@ -31,6 +31,12 @@ pub async fn client_connection(ws: WebSocket, context: Context, client_addr: Opt tokio::select! { Ok(message) = subscriber.recv() => { match message.content { + crate::bussy::Command::ClientSubscriptionError(error_message) => { + if let Some(sender) = &client.client_connection { + log::info!("[Relay] sending [\"CLOSED\"] event to client: {}", client.client_id); + if !sender.is_closed() {sender.send(Ok(Message::text(error_message))).unwrap()}; + } + }, crate::bussy::Command::PipelineResRelayMessageOk(client_id, relay_message) => { if client.client_id == client_id { if let Some(sender) = &client.client_connection { @@ -226,7 +232,30 @@ async fn handle_req( let subscription = Subscription::new(subscription_id.clone(), filters); let needs_historical_events = subscription.needs_historical_events(); - client.subscribe(subscription.clone()).unwrap(); + if let Err(subscription_error) = client.subscribe(subscription.clone()) { + log::error!( + "Error on handle_req. client IP: {:?}, message: {}", + client.ip(), + &subscription_error.message + ); + let message = format!( + "[\"CLOSED\", \"{}\", \"{}\"]", + subscription_id, subscription_error.message + ); + + context + .pubsub + .publish( + channels::MSG_RELAY, + crate::bussy::Message { + source: channels::MSG_RELAY, + content: crate::bussy::Command::ClientSubscriptionError(message), + }, + ) + .await; + + return; + }; if needs_historical_events { context diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 10b711e..d30dade 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -75,7 +75,9 @@ impl Client { sub_id_len ); - return Err(Error::bad_request("sub request is too long")); + return Err(Error::bad_request( + "error: subscription request is too long", + )); } if self.subscriptions.contains_key(&k) { @@ -92,7 +94,9 @@ impl Client { } if self.subscriptions.len() >= self.max_subs { - return Err(Error::bad_request("max subs exceeded")); + return Err(Error::bad_request( + "error: max subscriptions limit is exceeded", + )); } // Insert subscription