use crate::{ bussy::channels, utils::structs::{Client, Context, Subscription}, }; use futures_util::StreamExt; use nostr::{ClientMessage, Event, Filter, JsonUtil, SubscriptionId}; use serde_json::from_str; use std::net::SocketAddr; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::ws::{Message, WebSocket}; use futures_util::SinkExt; pub async fn client_connection( ws: WebSocket, context: Context, real_client_ip: Option, ) { let (mut ws_sender, mut ws_receiver) = ws.split(); let (client_sender, client_receiver) = mpsc::unbounded_channel(); let mut client_receiver = UnboundedReceiverStream::new(client_receiver); // Create and Add to the Context new Client and set its sender let ip = if let Some(ip) = real_client_ip { ip.to_string() } else { "".to_string() }; let mut client = Client::new(ip); client.client_connection = Some(client_sender); let mut subscriber = context.pubsub.subscribe(channels::MSG_RELAY).await; loop { tokio::select! { Ok(message) = subscriber.recv() => { match message.content { crate::bussy::Command::ClientSubscriptionError(error_message) => { if let Some(sender) = &client.client_connection { log::info!("[Relay] sending [\"CLOSED\"] event to client: {}", client.client_id); if !sender.is_closed() {sender.send(Ok(Message::text(error_message))).unwrap()}; } }, crate::bussy::Command::PipelineResRelayMessageOk(client_id, relay_message) => { if client.client_id == client_id { if let Some(sender) = &client.client_connection { if !sender.is_closed() { log::info!("[Relay] sending back the status of the processed event: {}", relay_message.as_json()); sender.send(Ok(Message::text(relay_message.as_json()))).unwrap(); } } } }, crate::bussy::Command::PipelineResStreamOutEvent(event) => { // if client.client_id == client_id { if let Some(sender) = &client.client_connection { if let Some(relay_message) = get_relay_message(&client, event) { log::info!("[Relay] sending processed event to subscribed client: {}", relay_message.as_json()); if !sender.is_closed() {sender.send(Ok(Message::text(relay_message.as_json()))).unwrap()}; } } // } } crate::bussy::Command::DbResRelayMessages(client_id, relay_messages) => { if client.client_id == client_id { if let Some(sender) = &client.client_connection { if !sender.is_closed() { for message in relay_messages { sender.send(Ok(Message::text(message.as_json()))).unwrap(); } } } } } crate::bussy::Command::DbResEventCounts(client_id, relay_message) => { if client.client_id == client_id { if let Some(sender) = &client.client_connection { if !sender.is_closed() { sender.send(Ok(Message::text(relay_message.as_json()))).unwrap(); } } } } crate::bussy::Command::DbResOkWithStatus(client_id, status) => { if client.client_id == client_id { if let Some(sender) = &client.client_connection { sender.send(Ok(Message::text(status.as_json()))).unwrap(); } } }, _ => () } }, Some(message) = client_receiver.next() => { match message { Ok(message) => { // ws_sender // .send(message) // .unwrap_or_else(|e| { // log::error!("websocket send error: {}", e); // }) // .await; match ws_sender.send(message).await { Ok(_) => (), Err(e) => { log::error!("websocket send error: {}", e); break; } } } Err(e) => { log::error!("websocket send error: {}", e); break; } } }, Some(result) = ws_receiver.next() => { let msg = match result { Ok(msg) => msg, Err(e) => { log::error!( "error receiving ws message for id: {}: {}", client.client_id.clone(), e ); break; } }; socket_on_message(&context, &mut client, msg).await; } } } // Handle proper disconnects socket_on_close(&client).await; } /// Checking if client with id needs the event fn get_relay_message(client: &Client, event: Box) -> Option { let mut id = &"".to_string(); log::info!( "Checking if client with id {} needs the event", client.client_id ); if client.subscriptions.iter().any(|(sub_id, sub)| { if sub.interested_in_event(&event) { id = sub_id; return true; } false }) { return Some(nostr::RelayMessage::Event { subscription_id: nostr::SubscriptionId::new(id), event, }); } None } async fn socket_on_close(client: &Client) { // clients.write().await.remove(id); log::info!("{} disconnected", client.client_id); } async fn socket_on_message(context: &Context, client: &mut Client, msg: Message) { let message = match msg.to_str() { Ok(raw_message) => raw_message, Err(_) => return, }; if message == "ping" || message == "ping\n" { return; } let client_message: ClientMessage = match from_str(message) { Ok(parsed_message) => parsed_message, Err(e) => { log::error!("error while parsing client message request: {}", e); let response = nostr::RelayMessage::notice("Invalid message"); let message = Message::text(response.as_json()); send(client, message); return; } }; log::info!( "[client {} - {}] message: {}", client.ip(), client.client_id, client_message.as_json() ); handle_msg(context, client, client_message).await; } fn send(client: &Client, message: Message) { if let Some(sender) = &client.client_connection { if !sender.is_closed() { sender.send(Ok(message)).unwrap(); } } } async fn handle_msg(context: &Context, client: &mut Client, client_message: ClientMessage) { match client_message { ClientMessage::Event(event) => handle_event(context, client, event).await, ClientMessage::Req { subscription_id, filters, } => handle_req(context, client, subscription_id, filters).await, ClientMessage::Count { subscription_id, filters, } => handle_count(context, client, subscription_id, filters).await, ClientMessage::Close(subscription_id) => handle_close(client, subscription_id).await, ClientMessage::Auth(event) => handle_auth(client, event).await, _ => (), } } async fn handle_event(context: &Context, client: &Client, event: Box) { log::debug!("handle_event is processing new event"); if let Err(err) = event.verify() { let relay_message = nostr::RelayMessage::ok(event.id, false, "Failed to verify event signature"); let message = crate::bussy::Message { source: channels::MSG_RELAY, content: crate::bussy::Command::PipelineResRelayMessageOk( client.client_id, relay_message, ), }; context.pubsub.publish(channels::MSG_RELAY, message).await; return; } context .pubsub .publish( channels::MSG_PIPELINE, crate::bussy::Message { source: channels::MSG_RELAY, content: crate::bussy::Command::PipelineReqEvent(client.client_id, event), }, ) .await; } async fn handle_req( context: &Context, client: &mut Client, subscription_id: SubscriptionId, filters: Vec, ) { let subscription = Subscription::new(subscription_id.clone(), filters); let needs_historical_events = subscription.needs_historical_events(); if let Err(subscription_error) = client.subscribe(subscription.clone()) { log::error!( "Error on handle_req. client IP: {:?}, message: {}", client.ip(), &subscription_error.message ); let message = format!( "[\"CLOSED\", \"{}\", \"{}\"]", subscription_id, subscription_error.message ); context .pubsub .publish( channels::MSG_RELAY, crate::bussy::Message { source: channels::MSG_RELAY, content: crate::bussy::Command::ClientSubscriptionError(message), }, ) .await; return; }; log::info!("[SUBSCRIPTION] needs historical events"); if needs_historical_events { context .pubsub .publish( channels::MSG_NOOSE, crate::bussy::Message { source: channels::MSG_RELAY, content: crate::bussy::Command::DbReqFindEvent(client.client_id, subscription), }, ) .await } } async fn handle_count( context: &Context, client: &Client, subscription_id: SubscriptionId, filters: Vec, ) { let subscription = Subscription::new(subscription_id, filters); context .pubsub .publish( channels::MSG_NOOSE, crate::bussy::Message { source: channels::MSG_RELAY, content: crate::bussy::Command::DbReqEventCounts(client.client_id, subscription), }, ) .await } async fn handle_close(client: &mut Client, subscription_id: SubscriptionId) { // context.pubsub.send(new nostr event) then handle possible errors client.unsubscribe(subscription_id); // let message = Message::text("CLOSE not implemented"); // send(client, message); } async fn handle_auth(client: &Client, event: Box) { let message = Message::text("AUTH not implemented"); send(client, message); }