sneedstr/src/relay/ws.rs

294 lines
10 KiB
Rust

use crate::{
bussy::channels,
utils::structs::{Client, Context, Subscription},
};
use futures_util::StreamExt;
use nostr::{ClientMessage, Event, Filter, JsonUtil, SubscriptionId};
use serde_json::from_str;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use futures_util::SinkExt;
pub async fn client_connection(ws: WebSocket, context: Context, client_addr: Option<SocketAddr>) {
let (mut ws_sender, mut ws_receiver) = ws.split();
let (client_sender, client_receiver) = mpsc::unbounded_channel();
let mut client_receiver = UnboundedReceiverStream::new(client_receiver);
// Create and Add to the Context new Client and set its sender
let ip = client_addr.unwrap().ip().to_string();
let mut client = Client::new(ip);
client.client_connection = Some(client_sender);
let mut subscriber = context.pubsub.subscribe(channels::MSG_RELAY).await;
loop {
tokio::select! {
Ok(message) = subscriber.recv() => {
match message.content {
crate::bussy::Command::ClientSubscriptionError(error_message) => {
if let Some(sender) = &client.client_connection {
log::info!("[Relay] sending [\"CLOSED\"] event to client: {}", client.client_id);
if !sender.is_closed() {sender.send(Ok(Message::text(error_message))).unwrap()};
}
},
crate::bussy::Command::PipelineResRelayMessageOk(client_id, relay_message) => {
if client.client_id == client_id {
if let Some(sender) = &client.client_connection {
if !sender.is_closed() {
log::info!("[Relay] sending back the status of the processed event: {}", relay_message.as_json());
sender.send(Ok(Message::text(relay_message.as_json()))).unwrap();
}
}
}
},
crate::bussy::Command::PipelineResStreamOutEvent(event) => {
// if client.client_id == client_id {
if let Some(sender) = &client.client_connection {
if let Some(relay_message) = get_relay_message(&client, event) {
log::info!("[Relay] sending processed event to subscribed client: {}", relay_message.as_json());
if !sender.is_closed() {sender.send(Ok(Message::text(relay_message.as_json()))).unwrap()};
}
}
// }
}
crate::bussy::Command::DbResRelayMessage(client_id, events) => {
if client.client_id == client_id {
if let Some(sender) = &client.client_connection {
if !sender.is_closed() {
for event in events {
sender.send(Ok(Message::text(event))).unwrap();
}
}
}
}
}
crate::bussy::Command::DbResOkWithStatus(status) => {
if let Some(sender) = &client.client_connection {
sender.send(Ok(Message::text(status))).unwrap();
}
},
_ => ()
}
},
Some(message) = client_receiver.next() => {
match message {
Ok(message) => {
// ws_sender
// .send(message)
// .unwrap_or_else(|e| {
// log::error!("websocket send error: {}", e);
// })
// .await;
match ws_sender.send(message).await {
Ok(_) => (),
Err(e) => {
log::error!("websocket send error: {}", e);
break;
}
}
}
Err(e) => {
log::error!("websocket send error: {}", e);
break;
}
}
},
Some(result) = ws_receiver.next() => {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
log::error!(
"error receiving ws message for id: {}: {}",
client.client_id.clone(),
e
);
break;
}
};
socket_on_message(&context, &mut client, msg).await;
}
}
}
// Handle proper disconnects
socket_on_close(&client).await;
}
/// Checking if client with id needs the event
fn get_relay_message(client: &Client, event: Box<Event>) -> Option<nostr::RelayMessage> {
let mut id = &"".to_string();
log::info!(
"Checking if client with id {} needs the event",
client.client_id
);
if client.subscriptions.iter().any(|(sub_id, sub)| {
if sub.interested_in_event(&event) {
id = sub_id;
return true;
}
false
}) {
return Some(nostr::RelayMessage::Event {
subscription_id: nostr::SubscriptionId::new(id),
event,
});
}
None
}
async fn socket_on_close(client: &Client) {
// clients.write().await.remove(id);
log::info!("{} disconnected", client.client_id);
}
async fn socket_on_message(context: &Context, client: &mut Client, msg: Message) {
let message = match msg.to_str() {
Ok(raw_message) => raw_message,
Err(_) => return,
};
if message == "ping" || message == "ping\n" {
return;
}
let client_message: ClientMessage = match from_str(message) {
Ok(parsed_message) => parsed_message,
Err(e) => {
log::error!("error while parsing client message request: {}", e);
let response = nostr::RelayMessage::new_notice("Invalid message");
let message = Message::text(response.as_json());
send(client, message);
return;
}
};
log::info!(
"[client {} - {}] message: {}",
client.ip(),
client.client_id,
client_message.as_json()
);
handle_msg(context, client, client_message).await;
}
fn send(client: &Client, message: Message) {
if let Some(sender) = &client.client_connection {
if !sender.is_closed() {
sender.send(Ok(message)).unwrap();
}
}
}
async fn handle_msg(context: &Context, client: &mut Client, client_message: ClientMessage) {
match client_message {
ClientMessage::Event(event) => handle_event(context, client, event).await,
ClientMessage::Req {
subscription_id,
filters,
} => handle_req(context, client, subscription_id, filters).await,
ClientMessage::Count {
subscription_id,
filters,
} => handle_count(client, subscription_id, filters).await,
ClientMessage::Close(subscription_id) => handle_close(client, subscription_id).await,
ClientMessage::Auth(event) => handle_auth(client, event).await,
_ => (),
}
}
async fn handle_event(context: &Context, client: &Client, event: Box<Event>) {
log::debug!("handle_event is processing new event");
context
.pubsub
.publish(
channels::MSG_PIPELINE,
crate::bussy::Message {
source: channels::MSG_RELAY,
content: crate::bussy::Command::PipelineReqEvent(client.client_id, event),
},
)
.await;
}
async fn handle_req(
context: &Context,
client: &mut Client,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
) {
let subscription = Subscription::new(subscription_id.clone(), filters);
let needs_historical_events = subscription.needs_historical_events();
if let Err(subscription_error) = client.subscribe(subscription.clone()) {
log::error!(
"Error on handle_req. client IP: {:?}, message: {}",
client.ip(),
&subscription_error.message
);
let message = format!(
"[\"CLOSED\", \"{}\", \"{}\"]",
subscription_id, subscription_error.message
);
context
.pubsub
.publish(
channels::MSG_RELAY,
crate::bussy::Message {
source: channels::MSG_RELAY,
content: crate::bussy::Command::ClientSubscriptionError(message),
},
)
.await;
return;
};
if needs_historical_events {
context
.pubsub
.publish(
channels::MSG_NOOSE,
crate::bussy::Message {
source: channels::MSG_RELAY,
content: crate::bussy::Command::DbReqFindEvent(client.client_id, subscription),
},
)
.await
}
}
async fn handle_count(client: &Client, subscription_id: SubscriptionId, filters: Vec<Filter>) {
// context.pubsub.send(new nostr event) then handle possible errors
let subscription = Subscription::new(subscription_id, filters);
let message = Message::text("COUNT not implemented");
send(client, message);
}
async fn handle_close(client: &mut Client, subscription_id: SubscriptionId) {
// context.pubsub.send(new nostr event) then handle possible errors
client.unsubscribe(subscription_id);
// let message = Message::text("CLOSE not implemented");
// send(client, message);
}
async fn handle_auth(client: &Client, event: Box<Event>) {
let message = Message::text("AUTH not implemented");
send(client, message);
}