Fix Order and Filters

This commit is contained in:
Tony Klink 2024-01-26 14:17:14 -06:00
parent ab9fe76494
commit d15bf891db
Signed by: klink
GPG key ID: 85175567C4D19231

View file

@ -220,6 +220,217 @@ impl NostrSqlite {
}
}
async fn check_event_replaceble(&self, event: &Event) -> Result<bool, Error> {
if event.is_replaceable() {
let pubkey = event.pubkey.to_string();
let kind = event.kind.as_u64();
let created_at = event.created_at.as_i64();
let Ok(connection) = self.get_connection().await else {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Ok(event_saved) = connection
.interact(move |conn| -> Result<bool, Error> {
let tx = conn.transaction().unwrap();
log::debug!("new event is replaceable - searching for previously stored event");
let (sql, values) = Query::select()
.from(EventsTable::Table)
.columns([EventsTable::EventId])
.and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey))
.and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind))
.and_where(sea_query::Expr::col(EventsTable::CreatedAt).gte(created_at))
.limit(1)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(res) = tx.execute(sql.as_str(), &*values.as_params()) {
if res > 0 {
log::debug!("deleting older replaceable event from events table");
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind))
.eq(kind),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::EventId,
))
.not_in_subquery(
Query::select()
.from(EventsTable::Table)
.column(EventsTable::EventId)
.and_where(
sea_query::Expr::col(EventsTable::Kind).eq(kind),
)
.and_where(
sea_query::Expr::col(EventsTable::Pubkey)
.eq(&pubkey),
)
.order_by(
EventsTable::CreatedAt,
sea_query::Order::Desc,
)
.limit(1)
.to_owned(),
),
)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
log::info!(
"removed {} older replaceable kind {} events for author: {:?}",
results,
kind,
pubkey
);
}
}
}
};
tx.commit().unwrap();
Ok(true)
})
.await
else {
return Err(Error::internal_with_message("Failed to execute query"));
};
}
Ok(true)
}
async fn check_event_parametrized_replaceble(&self, event: &Event) -> Result<bool, Error> {
if event.is_parameterized_replaceable() {
let identifier = match event.identifier() {
Some(id) => id.to_string(),
None => return Ok(true),
};
let pubkey = event.pubkey.to_string();
let kind = event.kind.as_u64();
let created_at = event.created_at.as_i64();
let Ok(connection) = self.get_connection().await else {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Ok(event_saved) = connection
.interact(move |conn| -> Result<bool, Error> {
let tx = conn.transaction().unwrap();
log::debug!(
"new event is parametrized replaceable - searching for previously stored event"
);
let (sql, values) = Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((TagsTable::Table, TagsTable::EventId))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind),
)
.and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d"))
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Value))
.eq(&identifier),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt))
.gte(created_at),
)
.limit(1)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
log::info!(
"deleting older parametrized replaceable event from events table"
);
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::EventId,
))
.in_subquery(
Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((
TagsTable::Table,
TagsTable::EventId,
))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Kind,
))
.eq(kind),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Pubkey,
))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((
TagsTable::Table,
TagsTable::Tag,
))
.eq("d"),
)
.and_where(
sea_query::Expr::col((
TagsTable::Table,
TagsTable::Value,
))
.eq(&identifier),
)
.to_owned(),
),
)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results, kind, &pubkey);
}
}
}
};
tx.commit().unwrap();
Ok(false)
})
.await
else {
return Err(Error::internal_with_message("Failed to execute query"));
};
};
Ok(true)
}
async fn save_event(&self, event: &Event) -> Result<bool, Error> {
let event = event.clone();
@ -245,175 +456,18 @@ impl NostrSqlite {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Ok(res) = self.check_event_replaceble(&event).await else {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Ok(res) = self.check_event_parametrized_replaceble(&event).await else {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Ok(event_saved) = connection
.interact(move |conn| -> Result<bool, Error> {
let tx = conn.transaction().unwrap();
if event.is_replaceable() {
dbg!("new event is replaceable - searching for previously stored event");
let (sql, values) = Query::select()
.from(EventsTable::Table)
.columns([EventsTable::EventId])
.and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey))
.and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind))
.and_where(sea_query::Expr::col(EventsTable::CreatedAt).gte(created_at))
.limit(1)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(res) = tx.execute(sql.as_str(), &*values.as_params()) {
if res > 0 {
return Ok(true);
}
};
}
if event.is_parameterized_replaceable() {
dbg!(
"new event is parametrized replaceable - searching for previously stored event"
);
let d_tags: Vec<String> = event
.tags
.iter()
.filter(|tag| tag.kind() == nostr::TagKind::D)
.map(|tag| tag.clone().to_vec()[1].clone())
.collect();
let (sql, values) = Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((TagsTable::Table, TagsTable::EventId))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind),
)
.and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d"))
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Value))
.eq(d_tags[0].to_string()),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt))
.gte(created_at),
)
.limit(1)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
return Ok(true);
}
}
}
// Insert replaceble event
if event.is_replaceable() {
dbg!("deleting older replaceable event from events table");
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId))
.not_in_subquery(
Query::select()
.from(EventsTable::Table)
.column(EventsTable::EventId)
.and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind))
.and_where(
sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey),
)
.order_by(EventsTable::CreatedAt, sea_query::Order::Desc)
.limit(1)
.to_owned(),
),
)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
log::info!(
"removed {} older replaceable kind {} events for author: {:?}",
results,
event.kind.as_u32(),
event.pubkey.to_string()
);
}
}
}
// Insert parametrized replaceble event
if event.is_parameterized_replaceable() {
dbg!("deleting older parametrized replaceable event from events table");
log::info!("deleting older parametrized replaceable event from events table");
let d_tag = event.identifier();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId))
.in_subquery(
Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((
TagsTable::Table,
TagsTable::EventId,
))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Kind,
))
.eq(kind),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Pubkey,
))
.eq(&pubkey),
)
.and_where(
sea_query::Expr::col((
TagsTable::Table,
TagsTable::Tag,
))
.eq("d"),
)
.and_where(
sea_query::Expr::col((
TagsTable::Table,
TagsTable::Value,
))
.eq(d_tag),
)
.to_owned(),
),
)
.build_rusqlite(SqliteQueryBuilder);
if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) {
if results > 0 {
log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results, event.kind, event.pubkey);
}
}
}
if event.kind == nostr::Kind::EventDeletion {
// Delete from Events
let (sql, values) = Query::delete()
@ -524,8 +578,9 @@ impl NostrSqlite {
}
}
})
.await else {
return Err(Error::internal_with_message("Failed to execute query"));
.await
else {
return Err(Error::internal_with_message("Failed to execute query"));
};
event_saved
@ -917,11 +972,7 @@ impl NostrSqlite {
query
}
fn get_filters_query(
&self,
filters: Vec<Filter>,
order: SqOrder,
) -> Option<sea_query::SelectStatement> {
fn get_filters_query(&self, filters: Vec<Filter>) -> Option<sea_query::SelectStatement> {
filters
.iter()
.map(|filter| {
@ -957,13 +1008,15 @@ impl NostrSqlite {
Order::Asc => SqOrder::Asc,
Order::Desc => SqOrder::Desc,
};
let Some(sql_statement) = self.get_filters_query(filters, sq_order) else {
let Some(mut sql_statement) = self.get_filters_query(filters) else {
return Err(Error::internal_with_message("Failed to build SQL Query"));
};
let Ok(query_result) = connection
.interact(move |conn| -> Result<Vec<Event>, Error> {
let (sql, values) = sql_statement.build_rusqlite(SqliteQueryBuilder);
let (sql, values) = sql_statement
.order_by(EventsTable::CreatedAt, sq_order.to_owned())
.build_rusqlite(SqliteQueryBuilder);
let mut stmt = conn.prepare(sql.as_str()).unwrap();
let mut rows = stmt.query(&*values.as_params()).unwrap();
@ -989,7 +1042,7 @@ impl NostrSqlite {
return Err(Error::internal_with_message("Unable to get DB connection"));
};
let Some(mut sql_statement) = self.get_filters_query(filters, SqOrder::Desc) else {
let Some(mut sql_statement) = self.get_filters_query(filters) else {
return Err(Error::internal_with_message("Failed to build SQL Query"));
};
@ -1039,7 +1092,7 @@ impl NostrSqlite {
Order::Desc => SqOrder::Desc,
};
let Some(mut sql_statement) = self.get_filters_query(filters, sq_order) else {
let Some(mut sql_statement) = self.get_filters_query(filters) else {
return Err(Error::internal_with_message("Failed to build SQL Query"));
};
@ -1048,6 +1101,7 @@ impl NostrSqlite {
let (sql, values) = sql_statement
.clear_selects()
.column(EventsTable::EventId)
.order_by(EventsTable::CreatedAt, sq_order.to_owned())
.build_rusqlite(SqliteQueryBuilder);
let mut stmt = conn.prepare(sql.as_str()).unwrap();
@ -1257,7 +1311,6 @@ impl Noose for NostrSqlite {
) -> Result<Vec<nostr::RelayMessage>, Error> {
match self.query(subscription.filters, Order::Desc).await {
Ok(events) => {
let relay_messages: Vec<RelayMessage> = events
.into_iter()
.map(|event| nostr::RelayMessage::event(subscription.id.clone(), event))
@ -1362,8 +1415,6 @@ mod tests {
// Get status of seen event
let result = db.get_event_seen_on_relays(event.id).await.unwrap();
dbg!(result);
// Get event by id
let eid = event.id;
let result = db.get_event_by_id(eid).await.unwrap();
@ -1478,4 +1529,39 @@ mod tests {
assert_eq!(result.len(), 2);
}
#[tokio::test]
async fn save_parametrized_replaceable_event() {
let config = Arc::new(ServiceConfig::new());
let db = NostrSqlite::new(config).await;
let keys = nostr::Keys::generate();
let d_tag = nostr::Tag::Identifier("test".to_string());
let event = nostr::EventBuilder::new(nostr::Kind::CategorizedPeopleList, "", vec![d_tag])
.to_event(&keys)
.unwrap();
let result = db.save_event(&event).await.unwrap();
let filter_0 = nostr::Filter::new()
.kinds(vec![nostr::Kind::ContactList])
.authors(vec![event.pubkey]);
let filter_1 = nostr::Filter::new()
.kinds(vec![nostr::Kind::MuteList])
.authors(vec![event.pubkey]);
let filter_2 = nostr::Filter::new()
.kinds(vec![nostr::Kind::CategorizedPeopleList])
.authors(vec![event.pubkey])
.identifiers(vec!["test".to_string()]);
let filter_3 = nostr::Filter::new()
.kinds(vec![nostr::Kind::ApplicationSpecificData])
.authors(vec![event.pubkey])
.identifiers(vec!["app-config".to_string()]);
let result = db
.query(vec![filter_0, filter_1, filter_2, filter_3], Order::Desc)
.await
.unwrap();
assert_eq!(result.len(), 1)
}
}