From d15bf891dbbb6911959256c823ede4d9ef0c81c8 Mon Sep 17 00:00:00 2001 From: Tony Klink Date: Fri, 26 Jan 2024 14:17:14 -0600 Subject: [PATCH] Fix Order and Filters --- src/noose/sqlite.rs | 444 ++++++++++++++++++++++++++------------------ 1 file changed, 265 insertions(+), 179 deletions(-) diff --git a/src/noose/sqlite.rs b/src/noose/sqlite.rs index 9e49340..8539275 100644 --- a/src/noose/sqlite.rs +++ b/src/noose/sqlite.rs @@ -220,6 +220,217 @@ impl NostrSqlite { } } + async fn check_event_replaceble(&self, event: &Event) -> Result { + if event.is_replaceable() { + let pubkey = event.pubkey.to_string(); + let kind = event.kind.as_u64(); + let created_at = event.created_at.as_i64(); + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let Ok(event_saved) = connection + .interact(move |conn| -> Result { + let tx = conn.transaction().unwrap(); + log::debug!("new event is replaceable - searching for previously stored event"); + let (sql, values) = Query::select() + .from(EventsTable::Table) + .columns([EventsTable::EventId]) + .and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey)) + .and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind)) + .and_where(sea_query::Expr::col(EventsTable::CreatedAt).gte(created_at)) + .limit(1) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(res) = tx.execute(sql.as_str(), &*values.as_params()) { + if res > 0 { + log::debug!("deleting older replaceable event from events table"); + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)) + .eq(kind), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::EventId, + )) + .not_in_subquery( + Query::select() + .from(EventsTable::Table) + .column(EventsTable::EventId) + .and_where( + sea_query::Expr::col(EventsTable::Kind).eq(kind), + ) + .and_where( + sea_query::Expr::col(EventsTable::Pubkey) + .eq(&pubkey), + ) + .order_by( + EventsTable::CreatedAt, + sea_query::Order::Desc, + ) + .limit(1) + .to_owned(), + ), + ) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + log::info!( + "removed {} older replaceable kind {} events for author: {:?}", + results, + kind, + pubkey + ); + } + } + } + }; + + tx.commit().unwrap(); + + Ok(true) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + } + Ok(true) + } + + async fn check_event_parametrized_replaceble(&self, event: &Event) -> Result { + if event.is_parameterized_replaceable() { + let identifier = match event.identifier() { + Some(id) => id.to_string(), + None => return Ok(true), + }; + let pubkey = event.pubkey.to_string(); + let kind = event.kind.as_u64(); + let created_at = event.created_at.as_i64(); + + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let Ok(event_saved) = connection + .interact(move |conn| -> Result { + let tx = conn.transaction().unwrap(); + log::debug!( + "new event is parametrized replaceable - searching for previously stored event" + ); + + let (sql, values) = Query::select() + .from(EventsTable::Table) + .column((EventsTable::Table, EventsTable::EventId)) + .left_join( + TagsTable::Table, + sea_query::Expr::col((TagsTable::Table, TagsTable::EventId)) + .equals((EventsTable::Table, EventsTable::EventId)), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind), + ) + .and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d")) + .and_where( + sea_query::Expr::col((TagsTable::Table, TagsTable::Value)) + .eq(&identifier), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt)) + .gte(created_at), + ) + .limit(1) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + log::info!( + "deleting older parametrized replaceable event from events table" + ); + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::EventId, + )) + .in_subquery( + Query::select() + .from(EventsTable::Table) + .column((EventsTable::Table, EventsTable::EventId)) + .left_join( + TagsTable::Table, + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::EventId, + )) + .equals((EventsTable::Table, EventsTable::EventId)), + ) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::Kind, + )) + .eq(kind), + ) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::Pubkey, + )) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::Tag, + )) + .eq("d"), + ) + .and_where( + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::Value, + )) + .eq(&identifier), + ) + .to_owned(), + ), + ) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results, kind, &pubkey); + } + } + } + }; + + tx.commit().unwrap(); + Ok(false) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + }; + + Ok(true) + } + async fn save_event(&self, event: &Event) -> Result { let event = event.clone(); @@ -245,175 +456,18 @@ impl NostrSqlite { return Err(Error::internal_with_message("Unable to get DB connection")); }; + let Ok(res) = self.check_event_replaceble(&event).await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let Ok(res) = self.check_event_parametrized_replaceble(&event).await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + let Ok(event_saved) = connection .interact(move |conn| -> Result { let tx = conn.transaction().unwrap(); - if event.is_replaceable() { - dbg!("new event is replaceable - searching for previously stored event"); - let (sql, values) = Query::select() - .from(EventsTable::Table) - .columns([EventsTable::EventId]) - .and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey)) - .and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind)) - .and_where(sea_query::Expr::col(EventsTable::CreatedAt).gte(created_at)) - .limit(1) - .build_rusqlite(SqliteQueryBuilder); - - if let Ok(res) = tx.execute(sql.as_str(), &*values.as_params()) { - if res > 0 { - return Ok(true); - } - }; - - } - - if event.is_parameterized_replaceable() { - dbg!( - "new event is parametrized replaceable - searching for previously stored event" - ); - let d_tags: Vec = event - .tags - .iter() - .filter(|tag| tag.kind() == nostr::TagKind::D) - .map(|tag| tag.clone().to_vec()[1].clone()) - .collect(); - let (sql, values) = Query::select() - .from(EventsTable::Table) - .column((EventsTable::Table, EventsTable::EventId)) - .left_join( - TagsTable::Table, - sea_query::Expr::col((TagsTable::Table, TagsTable::EventId)) - .equals((EventsTable::Table, EventsTable::EventId)), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) - .eq(&pubkey), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind), - ) - .and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d")) - .and_where( - sea_query::Expr::col((TagsTable::Table, TagsTable::Value)) - .eq(d_tags[0].to_string()), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt)) - .gte(created_at), - ) - .limit(1) - .build_rusqlite(SqliteQueryBuilder); - - if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { - if results > 0 { - return Ok(true); - } - } - } - - // Insert replaceble event - if event.is_replaceable() { - dbg!("deleting older replaceable event from events table"); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) - .eq(&pubkey), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) - .not_in_subquery( - Query::select() - .from(EventsTable::Table) - .column(EventsTable::EventId) - .and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind)) - .and_where( - sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey), - ) - .order_by(EventsTable::CreatedAt, sea_query::Order::Desc) - .limit(1) - .to_owned(), - ), - ) - .build_rusqlite(SqliteQueryBuilder); - - if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { - if results > 0 { - log::info!( - "removed {} older replaceable kind {} events for author: {:?}", - results, - event.kind.as_u32(), - event.pubkey.to_string() - ); - } - } - } - - // Insert parametrized replaceble event - if event.is_parameterized_replaceable() { - dbg!("deleting older parametrized replaceable event from events table"); - log::info!("deleting older parametrized replaceable event from events table"); - let d_tag = event.identifier(); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) - .in_subquery( - Query::select() - .from(EventsTable::Table) - .column((EventsTable::Table, EventsTable::EventId)) - .left_join( - TagsTable::Table, - sea_query::Expr::col(( - TagsTable::Table, - TagsTable::EventId, - )) - .equals((EventsTable::Table, EventsTable::EventId)), - ) - .and_where( - sea_query::Expr::col(( - EventsTable::Table, - EventsTable::Kind, - )) - .eq(kind), - ) - .and_where( - sea_query::Expr::col(( - EventsTable::Table, - EventsTable::Pubkey, - )) - .eq(&pubkey), - ) - .and_where( - sea_query::Expr::col(( - TagsTable::Table, - TagsTable::Tag, - )) - .eq("d"), - ) - .and_where( - sea_query::Expr::col(( - TagsTable::Table, - TagsTable::Value, - )) - .eq(d_tag), - ) - .to_owned(), - ), - ) - .build_rusqlite(SqliteQueryBuilder); - - if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { - if results > 0 { - log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results, event.kind, event.pubkey); - } - } - } - if event.kind == nostr::Kind::EventDeletion { // Delete from Events let (sql, values) = Query::delete() @@ -524,8 +578,9 @@ impl NostrSqlite { } } }) - .await else { - return Err(Error::internal_with_message("Failed to execute query")); + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); }; event_saved @@ -917,11 +972,7 @@ impl NostrSqlite { query } - fn get_filters_query( - &self, - filters: Vec, - order: SqOrder, - ) -> Option { + fn get_filters_query(&self, filters: Vec) -> Option { filters .iter() .map(|filter| { @@ -957,13 +1008,15 @@ impl NostrSqlite { Order::Asc => SqOrder::Asc, Order::Desc => SqOrder::Desc, }; - let Some(sql_statement) = self.get_filters_query(filters, sq_order) else { + let Some(mut sql_statement) = self.get_filters_query(filters) else { return Err(Error::internal_with_message("Failed to build SQL Query")); }; let Ok(query_result) = connection .interact(move |conn| -> Result, Error> { - let (sql, values) = sql_statement.build_rusqlite(SqliteQueryBuilder); + let (sql, values) = sql_statement + .order_by(EventsTable::CreatedAt, sq_order.to_owned()) + .build_rusqlite(SqliteQueryBuilder); let mut stmt = conn.prepare(sql.as_str()).unwrap(); let mut rows = stmt.query(&*values.as_params()).unwrap(); @@ -989,7 +1042,7 @@ impl NostrSqlite { return Err(Error::internal_with_message("Unable to get DB connection")); }; - let Some(mut sql_statement) = self.get_filters_query(filters, SqOrder::Desc) else { + let Some(mut sql_statement) = self.get_filters_query(filters) else { return Err(Error::internal_with_message("Failed to build SQL Query")); }; @@ -1039,7 +1092,7 @@ impl NostrSqlite { Order::Desc => SqOrder::Desc, }; - let Some(mut sql_statement) = self.get_filters_query(filters, sq_order) else { + let Some(mut sql_statement) = self.get_filters_query(filters) else { return Err(Error::internal_with_message("Failed to build SQL Query")); }; @@ -1048,6 +1101,7 @@ impl NostrSqlite { let (sql, values) = sql_statement .clear_selects() .column(EventsTable::EventId) + .order_by(EventsTable::CreatedAt, sq_order.to_owned()) .build_rusqlite(SqliteQueryBuilder); let mut stmt = conn.prepare(sql.as_str()).unwrap(); @@ -1257,7 +1311,6 @@ impl Noose for NostrSqlite { ) -> Result, Error> { match self.query(subscription.filters, Order::Desc).await { Ok(events) => { - let relay_messages: Vec = events .into_iter() .map(|event| nostr::RelayMessage::event(subscription.id.clone(), event)) @@ -1362,8 +1415,6 @@ mod tests { // Get status of seen event let result = db.get_event_seen_on_relays(event.id).await.unwrap(); - dbg!(result); - // Get event by id let eid = event.id; let result = db.get_event_by_id(eid).await.unwrap(); @@ -1478,4 +1529,39 @@ mod tests { assert_eq!(result.len(), 2); } + + #[tokio::test] + async fn save_parametrized_replaceable_event() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + let keys = nostr::Keys::generate(); + let d_tag = nostr::Tag::Identifier("test".to_string()); + let event = nostr::EventBuilder::new(nostr::Kind::CategorizedPeopleList, "", vec![d_tag]) + .to_event(&keys) + .unwrap(); + + let result = db.save_event(&event).await.unwrap(); + + let filter_0 = nostr::Filter::new() + .kinds(vec![nostr::Kind::ContactList]) + .authors(vec![event.pubkey]); + let filter_1 = nostr::Filter::new() + .kinds(vec![nostr::Kind::MuteList]) + .authors(vec![event.pubkey]); + let filter_2 = nostr::Filter::new() + .kinds(vec![nostr::Kind::CategorizedPeopleList]) + .authors(vec![event.pubkey]) + .identifiers(vec!["test".to_string()]); + let filter_3 = nostr::Filter::new() + .kinds(vec![nostr::Kind::ApplicationSpecificData]) + .authors(vec![event.pubkey]) + .identifiers(vec!["app-config".to_string()]); + let result = db + .query(vec![filter_0, filter_1, filter_2, filter_3], Order::Desc) + .await + .unwrap(); + + assert_eq!(result.len(), 1) + } }