use crate::bussy::{channels, Command, Message, PubSub}; use crate::utils::error::Error; use nostr::Event; use std::sync::Arc; pub struct Pipeline { pubsub: Arc, } impl Pipeline { pub fn new(pubsub: Arc) -> Self { Self { pubsub } } pub async fn start(&mut self) -> Result<(), Error> { let mut subscriber = self.pubsub.subscribe(channels::MSG_PIPELINE).await; while let Ok(message) = subscriber.recv().await { log::debug!("[Pipeline] received message: {:?}", message); let command = match message.content { Command::PipelineReqEvent(client_id, event) => { match self.handle_event(client_id, event.clone()).await { Ok(_) => { let message = nostr::RelayMessage::new_ok(event.id, true, "".to_string()); Command::PipelineResRelayMessageOk(client_id, message) } Err(e) => Command::ServiceError(e), } } _ => Command::Noop, }; if command != Command::Noop { let channel = message.source; let message = Message { source: channels::MSG_PIPELINE, content: command, }; log::info!( "[Pipeline] channel: {} - publishing new message: {:?}", channel, message ); self.pubsub.publish(channel, message).await; } } Ok(()) } pub async fn handle_event( &self, client_id: uuid::Uuid, event: Box, ) -> Result<(), Error> { let store_event_task = self.store_event(event.clone()); let process_deletions_task = self.process_deletions(event.clone()); let track_hashtags_task = self.track_hashtags(event.clone()); let process_media_task = self.process_media(event.clone()); let stream_out_task = self.stream_out(event.clone()); let broadcast_task = self.broadcast(event.clone()); let ( store_event_result, process_deletions_result, track_hashtags_result, process_media_result, stream_out_result, broadcast_result, ) = tokio::join!( store_event_task, process_deletions_task, track_hashtags_task, process_media_task, stream_out_task, broadcast_task ); match ( store_event_result, process_deletions_result, track_hashtags_result, process_media_result, stream_out_result, broadcast_result, ) { (Ok(_), Ok(_), Ok(_), Ok(_), Ok(_), Ok(_)) => { log::info!("[Pipeline] Tasks finished successfully"); Ok(()) } _ => { log::error!("[Pipeline] One or more futures returned an error."); Err(Error::internal_with_message( "[Pipeline] One or more futures returned an error.", )) } } } async fn store_event(&self, event: Box) -> Result<(), Error> { if event.kind.is_ephemeral() { return Ok(()); } self.pubsub .publish( channels::MSG_NOOSE, Message { source: channels::MSG_PIPELINE, content: Command::DbReqWriteEvent(event), }, ) .await; Ok(()) } async fn process_deletions(&self, event: Box) -> Result<(), Error> { // if event.kind.as_u32() == 5 { // let events_for_deletion: Vec = event // .tags // .iter() // .filter_map(|tag| match tag { // nostr::Tag::Event(event_id, _, _) => Some(event_id.to_string()), // _ => None, // }) // .collect(); // self.pubsub // .publish( // channels::MSG_NOOSE, // Message { // source: channels::MSG_PIPELINE, // content: Command::DbReqDeleteEvents(events_for_deletion), // }, // ) // .await; // } Ok(()) } async fn track_hashtags(&self, event: Box) -> Result<(), Error> { Ok(()) } async fn process_media(&self, event: Box) -> Result<(), Error> { Ok(()) } async fn stream_out(&self, event: Box) -> Result<(), Error> { let message = Message { source: channels::MSG_PIPELINE, content: Command::PipelineResStreamOutEvent(event), }; self.pubsub.publish(channels::MSG_RELAY, message).await; Ok(()) } async fn broadcast(&self, event: Box) -> Result<(), Error> { Ok(()) } }