From 5b7d0b79380b25daed084fec0cb456832d49ab94 Mon Sep 17 00:00:00 2001 From: Tony Klink Date: Thu, 25 Jan 2024 20:43:46 -0600 Subject: [PATCH] Rework sqlite db to implement nostr_database trait --- Cargo.lock | 161 +- Cargo.toml | 13 +- .../1697409647688_create_events.sql | 7 + src/noose/migrations/1697410424624_pragma.sql | 4 + .../1706115586021_event_seen_by_relays.sql | 7 + src/noose/migrations/mod.rs | 50 + src/noose/mod.rs | 15 +- src/noose/nostr_db.rs | 111 -- src/noose/sqlite.rs | 1765 +++++++++++------ src/utils/error.rs | 23 +- 10 files changed, 1342 insertions(+), 814 deletions(-) create mode 100644 src/noose/migrations/1706115586021_event_seen_by_relays.sql create mode 100644 src/noose/migrations/mod.rs delete mode 100644 src/noose/nostr_db.rs diff --git a/Cargo.lock b/Cargo.lock index 632b141..56ae522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,14 +384,13 @@ dependencies = [ [[package]] name = "deadpool" -version = "0.9.5" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" dependencies = [ "async-trait", "deadpool-runtime", "num_cpus", - "retain_mut", "tokio", ] @@ -406,9 +405,9 @@ dependencies = [ [[package]] name = "deadpool-sqlite" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e026821eaacbce25ff0d54405e4421d71656fcae3e4a9323461280fcda6dbc7d" +checksum = "b8010e36e12f3be22543a5e478b4af20aeead9a700dd69581a5e050a070fc22c" dependencies = [ "deadpool", "deadpool-sync", @@ -424,6 +423,15 @@ dependencies = [ "deadpool-runtime", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -479,9 +487,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fallible-streaming-iterator" @@ -501,16 +509,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "flatbuffers" -version = "23.5.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" -dependencies = [ - "bitflags 1.3.2", - "rustc_version", -] - [[package]] name = "flate2" version = "1.0.28" @@ -988,9 +986,9 @@ checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "libsqlite3-sys" -version = "0.25.2" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" dependencies = [ "cc", "pkg-config", @@ -1128,29 +1126,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa0550256c8d4f0aaf74891ac986bd5ba46b2957c2c7e20f51838fa5819285f8" dependencies = [ "async-trait", - "flatbuffers", "nostr", "thiserror", "tokio", "tracing", ] -[[package]] -name = "nostr-sqlite" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f0d3672bf705c0b1f1b1dc6682e397eafd8df4b379170136bbb52280ab76fd7" -dependencies = [ - "async-trait", - "deadpool-sqlite", - "nostr", - "nostr-database", - "rusqlite", - "thiserror", - "tokio", - "tracing", -] - [[package]] name = "nu-ansi-term" version = "0.49.0" @@ -1318,6 +1299,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1493,12 +1480,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "retain_mut" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" - [[package]] name = "ring" version = "0.16.20" @@ -1516,16 +1497,30 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.28.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" +checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.3.3", + "chrono", "fallible-iterator", "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", + "serde_json", "smallvec", + "time", + "uuid", +] + +[[package]] +name = "rusqlite_migration" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4116d1697de2912db0b72069473dfb025f6c332b4a085ed041d121e8d745aea" +dependencies = [ + "log", + "rusqlite", ] [[package]] @@ -1534,15 +1529,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver", -] - [[package]] name = "rustix" version = "0.38.6" @@ -1667,12 +1653,16 @@ dependencies = [ [[package]] name = "sea-query" -version = "0.30.4" +version = "0.30.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41558fa9bb5f4d73952dac0b9d9c2ce23966493fc9ee0008037b01d709838a68" +checksum = "4166a1e072292d46dc91f31617c2a1cdaf55a8be4b5c9f4bf2ba248e3ac4999b" dependencies = [ + "chrono", "inherent", "sea-query-derive", + "serde_json", + "time", + "uuid", ] [[package]] @@ -1688,6 +1678,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "sea-query-rusqlite" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e74efa64c7ba4d62ed221bfd6bf8a685b1bbc37e29c51b1039405fc6c33a2dd" +dependencies = [ + "rusqlite", + "sea-query", +] + [[package]] name = "secp256k1" version = "0.27.0" @@ -1709,26 +1709,20 @@ dependencies = [ "cc", ] -[[package]] -name = "semver" -version = "1.0.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" - [[package]] name = "serde" -version = "1.0.181" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d3e73c93c3240c0bda063c239298e633114c69a888c3e37ca8bb33f343e9890" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.181" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be02f6cb0cd3a5ec20bbcfbcbd749f57daddb1a0882dc2e46a6c236c90b977ed" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", @@ -1827,17 +1821,21 @@ dependencies = [ "argon2", "async-trait", "chrono", + "deadpool-sqlite", "flexbuffers", "flexi_logger", "futures-util", "lazy_static", "log", "nostr", - "nostr-sqlite", + "nostr-database", "regex", + "rusqlite", + "rusqlite_migration", "rustls 0.21.6", "sailfish", "sea-query", + "sea-query-rusqlite", "serde", "serde_json", "sled", @@ -1919,6 +1917,35 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "time" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +dependencies = [ + "deranged", + "itoa", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +dependencies = [ + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 0ab54d2..c4f5d07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,16 +17,23 @@ futures-util = "0.3.28" rustls = "0.21" anyhow = "1.0" sled = "0.34.7" -# sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls", "sqlite", "migrate", "macros"] } flexi_logger = { version = "0.27.3", features = [ "async", "compress" ] } lazy_static = "1.4.0" log = "0.4" +deadpool-sqlite = "0.7.0" +rusqlite = { version = "0.30.0", features = [ "bundled", "vtab" ] } +rusqlite_migration = "1.0.2" nostr = "0.27.0" -nostr-sqlite = "0.27.0" +nostr-database = "0.27.0" regex = "1.9.5" sailfish = "0.7.0" sea-query = { version = "0.30.4", features = ["backend-sqlite", "thread-safe"] } -# sea-query-binder = { version = "0.5.0", features = ["sqlx-sqlite"] } +sea-query-rusqlite = { version="0", features = [ + "with-chrono", + "with-json", + "with-uuid", + "with-time", +] } serde = "1.0" serde_json = "1.0" thiserror = "1.0.48" diff --git a/src/noose/migrations/1697409647688_create_events.sql b/src/noose/migrations/1697409647688_create_events.sql index ea9ca33..9c663b0 100644 --- a/src/noose/migrations/1697409647688_create_events.sql +++ b/src/noose/migrations/1697409647688_create_events.sql @@ -21,3 +21,10 @@ CREATE TABLE tags ( CREATE INDEX idx_tags_tag ON tags (tag); CREATE INDEX idx_tags_value ON tags (value); CREATE INDEX idx_tags_event_id ON tags (event_id); + +CREATE TABLE deleted_coordinates ( + coordinate TEXT NOT NULL, + created_at INTEGER NOT NULL +); + +CREATE INDEX idx_coordinates_coordinate ON coordinates (coordinate); diff --git a/src/noose/migrations/1697410424624_pragma.sql b/src/noose/migrations/1697410424624_pragma.sql index 610bd0b..c0d1526 100644 --- a/src/noose/migrations/1697410424624_pragma.sql +++ b/src/noose/migrations/1697410424624_pragma.sql @@ -1,2 +1,6 @@ +PRAGMA encoding = "UTF-8"; +PRAGMA journal_mode=WAL; PRAGMA foreign_keys = ON; PRAGMA auto_vacuum = FULL; +PRAGMA journal_size_limit=32768; +PRAGMA mmap_size = 17179869184; -- cap mmap at 16GB diff --git a/src/noose/migrations/1706115586021_event_seen_by_relays.sql b/src/noose/migrations/1706115586021_event_seen_by_relays.sql new file mode 100644 index 0000000..d3ffdfd --- /dev/null +++ b/src/noose/migrations/1706115586021_event_seen_by_relays.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS event_seen_by_relays ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_id TEXT NOT NULL, + relay_url TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS event_seen_by_relays_index ON event_seen_by_relays(event_id,relay_url); diff --git a/src/noose/migrations/mod.rs b/src/noose/migrations/mod.rs new file mode 100644 index 0000000..b9c7fdd --- /dev/null +++ b/src/noose/migrations/mod.rs @@ -0,0 +1,50 @@ +use rusqlite::Connection; +use rusqlite_migration::{Migrations, M}; + +pub struct MigrationRunner {} + +impl MigrationRunner { + pub fn up(connection: &mut Connection) -> bool { + let m_create_events = include_str!("./1697409647688_create_events.sql"); + let m_event_seen_by_relays = include_str!("1706115586021_event_seen_by_relays.sql"); + let m_add_realys = include_str!("./1697410161900_add_relays.sql"); + let m_events_fts = include_str!("./1697410223576_events_fts.sql"); + let m_users = include_str!("./1697410294265_users.sql"); + let m_unattached_media = include_str!("./1697410480767_unattached_media.sql"); + let m_pragma = include_str!("./1697410424624_pragma.sql"); + + let migrations = Migrations::new(vec![ + M::up(m_create_events), + M::up(m_event_seen_by_relays), + M::up(m_add_realys), + M::up(m_events_fts), + M::up(m_users), + M::up(m_unattached_media), + M::up(m_pragma), + ]); + + match migrations.to_latest(connection) { + Ok(()) => true, + Err(err) => { + log::error!("Migrations failed: {}", err.to_string()); + false + } + } + } +} + +#[cfg(test)] +mod tests { + use rusqlite::Connection; + + use super::MigrationRunner; + + #[test] + fn get_sql_path() { + let mut connection = Connection::open_in_memory().unwrap(); + + let runner = MigrationRunner::up(&mut connection); + + assert!(runner); + } +} diff --git a/src/noose/mod.rs b/src/noose/mod.rs index d76f6f9..c97c993 100644 --- a/src/noose/mod.rs +++ b/src/noose/mod.rs @@ -3,11 +3,11 @@ use db::Noose; use pipeline::Pipeline; use tokio::runtime; pub mod db; -mod nostr_db; pub mod pipeline; pub mod sled; -// mod sqlite; +mod sqlite; pub mod user; +mod migrations; pub fn start(context: Context) { let rt = runtime::Runtime::new().unwrap(); @@ -30,17 +30,12 @@ pub fn start(context: Context) { sled_writer.start(sled_pubsub).await.unwrap(); }); - let nostr_db_writer_handle = tokio::task::spawn(async move { - let mut db_writer = nostr_db::NostrDb::new(db_config).await; + let nostr_sqlite_writer_handle = tokio::task::spawn(async move { + let mut db_writer = sqlite::NostrSqlite::new(db_config).await; db_writer.start(db_pubsub).await.unwrap(); }); - // let sqlite_writer_handle = tokio::task::spawn(async move { - // let mut db_writer = sqlite::SqliteDb::new().await; - // db_writer.start(db_pubsub).await.unwrap(); - // }); - - // sqlite_writer_handle.await.unwrap(); + nostr_sqlite_writer_handle.await.unwrap(); pipeline_handle.await.unwrap(); }); } diff --git a/src/noose/nostr_db.rs b/src/noose/nostr_db.rs deleted file mode 100644 index 8bf22be..0000000 --- a/src/noose/nostr_db.rs +++ /dev/null @@ -1,111 +0,0 @@ -use crate::{ - bussy::{channels, Command, Message, PubSub}, - noose::Noose, - utils::{config::Config, error::Error, structs::Subscription}, -}; -use nostr_sqlite::{database::NostrDatabase, SQLiteDatabase}; -use std::sync::Arc; - -pub struct NostrDb { - db: SQLiteDatabase, -} - -impl NostrDb { - pub async fn new(config: Arc) -> Self { - let db_path = config.get_db_path(); - if let Ok(db) = SQLiteDatabase::open(db_path).await { - return Self { db }; - } - - panic!("[NostrDb] Failed to initialize database"); - } -} - -impl Noose for NostrDb { - async fn start(&mut self, pubsub: Arc) -> Result<(), Error> { - let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await; - - while let Ok(message) = subscriber.recv().await { - log::info!("[Noose] received message: {:?}", message); - let command = match message.content { - Command::DbReqWriteEvent(client_id, event) => match self.write_event(event).await { - Ok(status) => Command::DbResOkWithStatus(client_id, status), - Err(e) => Command::ServiceError(e), - }, - Command::DbReqFindEvent(client_id, subscriptioin) => { - match self.find_event(subscriptioin).await { - Ok(relay_messages) => { - Command::DbResRelayMessages(client_id, relay_messages) - } - Err(e) => Command::ServiceError(e), - } - } - Command::DbReqEventCounts(client_id, subscriptioin) => { - match self.counts(subscriptioin).await { - Ok(relay_message) => Command::DbResEventCounts(client_id, relay_message), - Err(e) => Command::ServiceError(e), - } - } - _ => Command::Noop, - }; - if command != Command::Noop { - let channel = message.source; - let message = Message { - source: channels::MSG_NOOSE, - content: command, - }; - - log::info!( - "[Noose] publishing new message: {:?} to channel {}", - message, - channel - ); - - pubsub.publish(channel, message).await; - } - } - - Ok(()) - } - - async fn write_event(&self, event: Box) -> Result { - // TODO: Maybe do event validation and admin deletions here - match self.db.save_event(&event).await { - Ok(status) => { - let relay_message = nostr::RelayMessage::ok(event.id, status, ""); - Ok(relay_message) - } - Err(err) => Err(Error::bad_request(err.to_string())), - } - } - - async fn find_event( - &self, - subscription: Subscription, - ) -> Result, Error> { - match self - .db - .query(subscription.filters, nostr_sqlite::database::Order::Desc) - .await - { - Ok(events) => { - let relay_messages = events - .into_iter() - .map(|event| nostr::RelayMessage::event(subscription.id.clone(), event)) - .collect(); - Ok(relay_messages) - } - Err(err) => Err(Error::bad_request(err.to_string())), - } - } - - async fn counts(&self, subscription: Subscription) -> Result { - match self.db.count(subscription.filters).await { - Ok(counts) => { - let relay_message = nostr::RelayMessage::count(subscription.id, counts); - Ok(relay_message) - } - Err(err) => Err(Error::internal_with_message(err.to_string())), - } - } -} diff --git a/src/noose/sqlite.rs b/src/noose/sqlite.rs index 9540bfb..43f2b77 100644 --- a/src/noose/sqlite.rs +++ b/src/noose/sqlite.rs @@ -1,14 +1,76 @@ -use nostr::{Event, JsonUtil, RelayMessage}; -use sea_query::{extension::sqlite::SqliteExpr, Query}; -use sea_query_binder::SqlxBinder; -use sqlx::sqlite::{Sqlite, SqlitePoolOptions}; -use sqlx::FromRow; -use sqlx::{migrate::MigrateDatabase, Pool}; +use super::{db::Noose, migrations::MigrationRunner}; +use crate::{ + bussy::{channels, Command, Message, PubSub}, + utils::{config::Config as ServiceConfig, error::Error, structs::Subscription}, +}; +use async_trait::async_trait; +use deadpool_sqlite::{Config, Object, Pool, Runtime}; +use nostr::{nips::nip01::Coordinate, Event, EventId, Filter, RelayMessage, Timestamp, Url}; +use nostr_database::{Backend, DatabaseOptions, NostrDatabase, Order}; +use rusqlite::Row; +use sea_query::{extension::sqlite::SqliteExpr, Order as SqOrder, Query, SqliteQueryBuilder}; +use sea_query_rusqlite::RusqliteBinder; use std::sync::Arc; +use std::{collections::HashSet, str::FromStr}; -use super::db::Noose; -use crate::bussy::{channels, Command, Message, PubSub}; -use crate::utils::{error::Error, structs::Subscription}; +#[derive(Debug, Clone)] +struct EventRow { + id: String, + pubkey: String, + created_at: i64, + kind: i64, + tags: String, + sig: String, + content: String, +} + +impl From<&Row<'_>> for EventRow { + fn from(row: &Row) -> Self { + let id: String = row.get("id").unwrap(); + let pubkey: String = row.get("pubkey").unwrap(); + let created_at: i64 = row.get("created_at").unwrap(); + let kind: i64 = row.get("kind").unwrap(); + let tags: String = row.get("tags").unwrap(); + let sig: String = row.get("sig").unwrap(); + let content: String = row.get("content").unwrap(); + + Self { + id, + pubkey, + created_at, + kind, + tags, + sig, + content, + } + } +} + +impl From<&EventRow> for Event { + fn from(row: &EventRow) -> Self { + row.to_event() + } +} + +impl EventRow { + pub fn to_event(&self) -> Event { + let tags: Vec> = serde_json::from_str(&self.tags).unwrap(); + + let event_json = serde_json::json!( + { + "id": self.id, + "content": self.content, + "created_at": self.created_at, + "kind": self.kind, + "pubkey": self.pubkey, + "sig": self.sig, + "tags": tags + } + ); + + Event::from_value(event_json).unwrap() + } +} enum EventsTable { Table, @@ -85,455 +147,680 @@ impl sea_query::Iden for TagsTable { } } -#[derive(FromRow, Debug)] -struct EventsCountRow(i32); - -#[derive(FromRow, Debug)] -struct EventRow { - id: String, - pubkey: String, - created_at: i64, - kind: i64, - tags: String, - sig: String, - content: String, +enum DeletedCoordinatesTable { + Table, + Coordinate, + CreatedAt, } -impl EventRow { - pub fn to_string(&self, subscription_id: nostr::SubscriptionId) -> String { - let tags: Vec> = serde_json::from_str(&self.tags).unwrap(); - - let message = serde_json::json!([ - "EVENT", - subscription_id, - { - "id": self.id, - "content": self.content, - "created_at": self.created_at, - "kind": self.kind, - "pubkey": self.pubkey, - "sig": self.sig, - "tags": tags +impl sea_query::Iden for DeletedCoordinatesTable { + fn unquoted(&self, s: &mut dyn std::fmt::Write) { + write!( + s, + "{}", + match self { + Self::Table => "deleted_coordinates", + Self::Coordinate => "coordinate", + Self::CreatedAt => "created_at", } - ]); - - message.to_string() + ) + .unwrap() } } -pub struct SqliteDb { - pool: Pool, +enum EventSeenByRelaysTable { + Table, + Id, + EventId, + RelayURL, } -impl SqliteDb { - pub async fn new() -> Self { - let pool = SqliteDb::build_pool("noose_pool", 42).await; - - Self { pool } +impl sea_query::Iden for EventSeenByRelaysTable { + fn unquoted(&self, s: &mut dyn std::fmt::Write) { + write!( + s, + "{}", + match self { + Self::Table => "event_seen_by_relays", + Self::Id => "id", + Self::EventId => "event_id", + Self::RelayURL => "relay_url", + } + ) + .unwrap() } +} - pub fn info(&self) { - dbg!(self.pool.options()); - } - - async fn migrate(pool: &Pool) { - sqlx::migrate!("src/noose/migrations") - .run(pool) - .await - .unwrap() - } - - async fn build_pool(name: &str, max_size: u32) -> Pool { - let pool_options = SqlitePoolOptions::new() - .test_before_acquire(true) - .idle_timeout(Some(std::time::Duration::from_secs(10))) - .max_lifetime(Some(std::time::Duration::from_secs(30))) - .max_lifetime(None) - .idle_timeout(None) - .max_connections(max_size); +#[derive(Debug)] +pub struct NostrSqlite { + pool: Pool, +} +impl NostrSqlite { + pub async fn new(config: Arc) -> Self { let env_db_path = std::env::var("DATABASE_URL").unwrap_or("/tmp/sqlite.db".to_string()); - if !Sqlite::database_exists(&env_db_path).await.unwrap_or(false) { - log::info!("Creating database {}", &env_db_path); - match Sqlite::create_database(&env_db_path).await { - Ok(_) => log::info!("Db {} created", &env_db_path), - Err(_) => panic!("Failed to create database {}", &env_db_path), - } - } - if let Ok(pool) = pool_options.connect(&env_db_path).await { - log::info!("Connected to sqlite pool {}", name); - SqliteDb::migrate(&pool).await; + let cfg = Config::new(env_db_path); + let pool = cfg.create_pool(Runtime::Tokio1).unwrap(); - pool - } else { - panic!("Connection to sqlite pool {} failed", name); + if NostrSqlite::run_migrations(&pool).await { + return Self { pool }; + } + + panic!("Pool's closed"); + } + + async fn run_migrations(pool: &Pool) -> bool { + let connection = pool.get().await.unwrap(); + connection.interact(MigrationRunner::up).await.unwrap() + } + + async fn get_connection(&self) -> Result { + match self.pool.get().await { + Ok(connection) => Ok(connection), + Err(err) => Err(Error::internal(err.into())), } } - async fn add_event(&self, event: Box) -> Result { + async fn save_event(&self, event: &Event) -> Result { + let event = event.clone(); + let id = event.id.to_string(); - let kind = event.kind.to_string(); + let kind = event.kind.as_u64(); let pubkey = event.pubkey.to_string(); let content = event.content.to_string(); let created_at = event.created_at.as_i64(); let tags = serde_json::to_string(&event.tags).unwrap(); let sig = event.sig.to_string(); - let message = nostr::RelayMessage::Ok { - event_id: event.id, - status: true, - message: "".to_string(), - }; + let ids: Vec = event.event_ids().map(|eid| eid.to_string()).collect(); - // Skip events that are older than 10 minutes - if chrono::Utc::now().timestamp() - 600 > created_at { - let message = nostr::RelayMessage::Ok { - event_id: event.id, - status: false, - message: "invalid: event creation date is too far off from the current time" - .to_string(), - }; - return Ok(message); + if self.event_is_too_old(&event) { + return Ok(false); } if event.is_ephemeral() { - return Ok(message); + return Ok(false); } - { - let (sql, value) = Query::select() - .from(EventsTable::Table) - .column(EventsTable::EventId) - .and_where(sea_query::Expr::col(EventsTable::EventId).eq(event.id.to_string())) - .limit(1) - .build_sqlx(sea_query::SqliteQueryBuilder); + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; - let events = sqlx::query_with(&sql, value).fetch_one(&self.pool).await; - if events.ok().is_some() { - let message = nostr::RelayMessage::Ok { - event_id: event.id, - status: false, - message: "invalid: event with this id already exists".to_string(), - }; - return Ok(message); - } - } + let Ok(event_saved) = connection + .interact(move |conn| -> Result { + let tx = conn.transaction().unwrap(); - let tx = self.pool.begin().await.unwrap(); - { - if event.is_replaceable() { - dbg!("new event is replaceable - searching for previously stored event"); - let (sql, values) = Query::select() - .from(EventsTable::Table) - .columns([EventsTable::EventId]) - .and_where( - sea_query::Expr::col(EventsTable::Pubkey).eq(event.pubkey.to_string()), - ) - .and_where(sea_query::Expr::col(EventsTable::Kind).eq(event.kind.as_u32())) - .and_where( - sea_query::Expr::col(EventsTable::CreatedAt).gte(event.created_at.as_i64()), - ) - .limit(1) - .build_sqlx(sea_query::SqliteQueryBuilder); + if event.is_replaceable() { + dbg!("new event is replaceable - searching for previously stored event"); + let (sql, values) = Query::select() + .from(EventsTable::Table) + .columns([EventsTable::EventId]) + .and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey)) + .and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind)) + .and_where(sea_query::Expr::col(EventsTable::CreatedAt).gte(created_at)) + .limit(1) + .build_rusqlite(SqliteQueryBuilder); - let repl_count = sqlx::query_with(&sql, values).fetch_one(&self.pool).await; - - if repl_count.ok().is_some() { - return Ok(message); + if let Ok(res) = tx.execute(sql.as_str(), &*values.as_params()) { + if res > 0 { + return Ok(true); + } + }; + } - } - } - { - if event.is_parameterized_replaceable() { - dbg!( + if event.is_parameterized_replaceable() { + dbg!( "new event is parametrized replaceable - searching for previously stored event" ); - let d_tags: Vec = event - .tags - .iter() - .filter(|tag| tag.kind() == nostr::TagKind::D) - .map(|tag| tag.clone().to_vec()[1].clone()) - .collect(); - let (sql, values) = Query::select() - .from(EventsTable::Table) - .column((EventsTable::Table, EventsTable::EventId)) - .left_join( - TagsTable::Table, - sea_query::Expr::col((TagsTable::Table, TagsTable::EventId)) - .equals((EventsTable::Table, EventsTable::EventId)), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) - .eq(event.pubkey.to_string()), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)) - .eq(event.kind.as_u32()), - ) - .and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d")) - .and_where( - sea_query::Expr::col((TagsTable::Table, TagsTable::Value)) - .eq(d_tags[0].to_string()), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt)) - .gte(event.created_at.as_i64()), - ) - .limit(1) - .build_sqlx(sea_query::SqliteQueryBuilder); + let d_tags: Vec = event + .tags + .iter() + .filter(|tag| tag.kind() == nostr::TagKind::D) + .map(|tag| tag.clone().to_vec()[1].clone()) + .collect(); + let (sql, values) = Query::select() + .from(EventsTable::Table) + .column((EventsTable::Table, EventsTable::EventId)) + .left_join( + TagsTable::Table, + sea_query::Expr::col((TagsTable::Table, TagsTable::EventId)) + .equals((EventsTable::Table, EventsTable::EventId)), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind), + ) + .and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d")) + .and_where( + sea_query::Expr::col((TagsTable::Table, TagsTable::Value)) + .eq(d_tags[0].to_string()), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt)) + .gte(created_at), + ) + .limit(1) + .build_rusqlite(SqliteQueryBuilder); - let repl_count = sqlx::query_with(&sql, values).fetch_one(&self.pool).await; - - if repl_count.ok().is_some() { - return Ok(message); - } - } - } - - // Insert replaceble event - { - if event.is_replaceable() { - dbg!("deleting older replaceable event from events table"); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)) - .eq(event.kind.as_u32()), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) - .eq(event.pubkey.to_string()), - ) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) - .not_in_subquery( - Query::select() - .from(EventsTable::Table) - .column(EventsTable::EventId) - .and_where( - sea_query::Expr::col(EventsTable::Kind) - .eq(event.kind.as_u32()), - ) - .and_where( - sea_query::Expr::col(EventsTable::Pubkey) - .eq(event.pubkey.to_string()), - ) - .order_by(EventsTable::CreatedAt, sea_query::Order::Desc) - .limit(1) - .to_owned(), - ), - ) - .build_sqlx(sea_query::SqliteQueryBuilder); - - let results = sqlx::query_with(&sql, values) - .execute(&self.pool) - .await - .unwrap(); - - if results.rows_affected() > 0 { - log::info!( - "removed {} older replaceable kind {} events for author: {:?}", - results.rows_affected(), - event.kind.as_u32(), - event.pubkey.to_string() - ); - } - } - } - - // Insert parametrized replaceble event - { - if event.is_parameterized_replaceable() { - dbg!("deleting older parametrized replaceable event from events table"); - log::info!("deleting older parametrized replaceable event from events table"); - let d_tag = event.identifier(); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where( - sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) - .in_subquery( - Query::select() - .from(EventsTable::Table) - .column((EventsTable::Table, EventsTable::EventId)) - .left_join( - TagsTable::Table, - sea_query::Expr::col(( - TagsTable::Table, - TagsTable::EventId, - )) - .equals((EventsTable::Table, EventsTable::EventId)), - ) - .and_where( - sea_query::Expr::col(( - EventsTable::Table, - EventsTable::Kind, - )) - .eq(event.kind.as_u32()), - ) - .and_where( - sea_query::Expr::col(( - EventsTable::Table, - EventsTable::Pubkey, - )) - .eq(event.pubkey.to_string()), - ) - .and_where( - sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)) - .eq("d"), - ) - .and_where( - sea_query::Expr::col((TagsTable::Table, TagsTable::Value)) - .eq(d_tag), - ) - .to_owned(), - ), - ) - .build_sqlx(sea_query::SqliteQueryBuilder); - - let results = sqlx::query_with(&sql, values) - .execute(&self.pool) - .await - .unwrap(); - - if results.rows_affected() > 0 { - log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results.rows_affected(), event.kind, event.pubkey); - } - } - } - - // Process deletion events - dbg!(event.as_json()); - if event.kind.as_u32() == 5 { - dbg!("deleting event"); - log::info!("deleting event"); - let ids: Vec = event.event_ids().map(|eid| eid.to_string()).collect(); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where(sea_query::Expr::col(EventsTable::Kind).ne(5)) - .and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(event.pubkey.to_string())) - .and_where(sea_query::Expr::col(EventsTable::EventId).is_in(&ids)) - .build_sqlx(sea_query::SqliteQueryBuilder); - - let results = sqlx::query_with(&sql, values) - .execute(&self.pool) - .await - .unwrap(); - - if results.rows_affected() > 0 { - log::info!( - "removed {} events for author {:?}", - results.rows_affected(), - event.pubkey - ); - } - - // Delete from EventsFTS - let (sql, values) = Query::delete() - .from_table(EventsFTSTable::Table) - .and_where(sea_query::Expr::col(EventsFTSTable::EventId).is_in(&ids)) - .build_sqlx(sea_query::SqliteQueryBuilder); - let _ = sqlx::query_with(&sql, values) - .execute(&self.pool) - .await - .unwrap(); - } else { - dbg!("inserting new event in events"); - log::info!("inserting new event in events"); - // Insert into Events table - let (sql, values) = Query::insert() - .into_table(EventsTable::Table) - .columns([ - EventsTable::EventId, - EventsTable::Content, - EventsTable::Kind, - EventsTable::Pubkey, - EventsTable::CreatedAt, - EventsTable::Tags, - EventsTable::Sig, - ]) - .values_panic([ - id.clone().into(), - content.clone().into(), - kind.into(), - pubkey.into(), - created_at.into(), - tags.into(), - sig.into(), - ]) - .build_sqlx(sea_query::SqliteQueryBuilder); - - if let Err(e) = sqlx::query_with(&sql, values).execute(&self.pool).await { - log::error!("Error inserting event into 'events' table: {}", e); - } - - // Insert into EventsFTS table - dbg!("inserting new event into eventsFTS"); - let (sql, values) = Query::insert() - .into_table(EventsFTSTable::Table) - .columns([EventsFTSTable::EventId, EventsFTSTable::Content]) - .values_panic([id.clone().into(), content.into()]) - .build_sqlx(sea_query::SqliteQueryBuilder); - - if let Err(e) = sqlx::query_with(&sql, values).execute(&self.pool).await { - log::error!("Error inserting event into 'eventsFTS' table: {}", e); - } - - // Insert into Tags table - dbg!("inserting new event into tags"); - for tag in event.tags.clone() { - let tag = tag.to_vec(); - if tag.len() >= 2 { - let tag_name = &tag[0]; - let tag_value = &tag[1]; - if tag_name.len() == 1 { - let (sql, values) = Query::insert() - .into_table(TagsTable::Table) - .columns([TagsTable::Tag, TagsTable::Value, TagsTable::EventId]) - .values_panic([tag_name.into(), tag_value.into(), id.clone().into()]) - .build_sqlx(sea_query::SqliteQueryBuilder); - - if let Err(e) = sqlx::query_with(&sql, values).execute(&self.pool).await { - log::error!("Error inserting event into 'tags' table: {}", e); + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + return Ok(true); } } } + + // Insert replaceble event + if event.is_replaceable() { + dbg!("deleting older replaceable event from events table"); + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).eq(kind), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) + .not_in_subquery( + Query::select() + .from(EventsTable::Table) + .column(EventsTable::EventId) + .and_where(sea_query::Expr::col(EventsTable::Kind).eq(kind)) + .and_where( + sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey), + ) + .order_by(EventsTable::CreatedAt, sea_query::Order::Desc) + .limit(1) + .to_owned(), + ), + ) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + log::info!( + "removed {} older replaceable kind {} events for author: {:?}", + results, + event.kind.as_u32(), + event.pubkey.to_string() + ); + } + } + } + + // Insert parametrized replaceble event + if event.is_parameterized_replaceable() { + dbg!("deleting older parametrized replaceable event from events table"); + log::info!("deleting older parametrized replaceable event from events table"); + let d_tag = event.identifier(); + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where( + sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)) + .in_subquery( + Query::select() + .from(EventsTable::Table) + .column((EventsTable::Table, EventsTable::EventId)) + .left_join( + TagsTable::Table, + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::EventId, + )) + .equals((EventsTable::Table, EventsTable::EventId)), + ) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::Kind, + )) + .eq(kind), + ) + .and_where( + sea_query::Expr::col(( + EventsTable::Table, + EventsTable::Pubkey, + )) + .eq(&pubkey), + ) + .and_where( + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::Tag, + )) + .eq("d"), + ) + .and_where( + sea_query::Expr::col(( + TagsTable::Table, + TagsTable::Value, + )) + .eq(d_tag), + ) + .to_owned(), + ), + ) + .build_rusqlite(SqliteQueryBuilder); + + if let Ok(results) = tx.execute(sql.as_str(), &*values.as_params()) { + if results > 0 { + log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results, event.kind, event.pubkey); + } + } + } + + if event.kind == nostr::Kind::EventDeletion { + // Delete from Events + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where(sea_query::Expr::col(EventsTable::Kind).ne(5)) + .and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(&pubkey)) + .and_where(sea_query::Expr::col(EventsTable::EventId).is_in(&ids)) + .build_rusqlite(SqliteQueryBuilder); + + if let Err(err) = tx.execute(sql.as_str(), &*values.as_params()) { + tx.rollback().unwrap(); + + return Ok(false); + } + + // Delete from EventsFTS + let (sql, values) = Query::delete() + .from_table(EventsFTSTable::Table) + .and_where(sea_query::Expr::col(EventsFTSTable::EventId).is_in(ids)) + .build_rusqlite(SqliteQueryBuilder); + + if let Err(err) = tx.execute(sql.as_str(), &*values.as_params()) { + tx.rollback().unwrap(); + + return Ok(false); + } + } else { + log::debug!("inserting new event in events"); + // Insert into Events table + let (sql, values) = Query::insert() + .into_table(EventsTable::Table) + .columns([ + EventsTable::EventId, + EventsTable::Content, + EventsTable::Kind, + EventsTable::Pubkey, + EventsTable::CreatedAt, + EventsTable::Tags, + EventsTable::Sig, + ]) + .values_panic([ + id.clone().into(), + content.clone().into(), + kind.into(), + pubkey.into(), + created_at.into(), + tags.into(), + sig.into(), + ]) + .build_rusqlite(SqliteQueryBuilder); + + if let Err(err) = tx.execute(sql.as_str(), &*values.as_params()) { + log::error!("Error inserting event into 'events' table: {}", err); + tx.rollback().unwrap(); + + return Ok(false); + }; + + // Insert into EventsFTS table + log::debug!("inserting new event into eventsFTS"); + let (sql, values) = Query::insert() + .into_table(EventsFTSTable::Table) + .columns([EventsFTSTable::EventId, EventsFTSTable::Content]) + .values_panic([id.clone().into(), content.into()]) + .build_rusqlite(SqliteQueryBuilder); + + if let Err(err) = tx.execute(sql.as_str(), &*values.as_params()) { + log::error!("Error inserting event into 'eventsFTS' table: {}", err); + tx.rollback().unwrap(); + + return Ok(false); + } + + // Insert into Tags table + log::debug!("inserting new event into tags"); + for tag in event.tags.clone() { + let tag = tag.to_vec(); + if tag.len() >= 2 { + let tag_name = &tag[0]; + let tag_value = &tag[1]; + if tag_name.len() == 1 { + let (sql, values) = Query::insert() + .into_table(TagsTable::Table) + .columns([TagsTable::Tag, TagsTable::Value, TagsTable::EventId]) + .values_panic([ + tag_name.into(), + tag_value.into(), + id.clone().into(), + ]) + .build_rusqlite(SqliteQueryBuilder); + + if let Err(err) = tx.execute(sql.as_str(), &*values.as_params()) { + log::error!("Error inserting event into 'tags' table: {}", err); + tx.rollback().unwrap(); + + return Ok(false); + } + } + } + } + } + + match tx.commit() { + Ok(_) => Ok(true), + Err(err) => { + log::error!("Error during transaction commit: {}", err); + Ok(false) + } + } + }) + .await else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + event_saved + } + + async fn has_event_already_been_saved(&self, event_id: &EventId) -> Result { + match self.get_event_by_id(*event_id).await { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + } + + async fn has_event_id_been_deleted(&self, event_id: &EventId) -> Result { + match self.get_event_by_id(*event_id).await { + Ok(_) => Ok(false), + Err(_) => Ok(true), + } + } + + /// Skip events that are older than 10 minutes + fn event_is_too_old(&self, event: &Event) -> bool { + if chrono::Utc::now().timestamp() - 600 > event.created_at.as_i64() { + return true; + } + false + } + + async fn set_event_id_seen(&self, event_id: EventId, relay_url: Url) -> Result<(), Error> { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let event_id = event_id.to_string(); + let relay_url = relay_url.to_string(); + + let Ok(query_result) = connection + .interact(|conn| -> Result { + let (sql, values) = Query::insert() + .into_table(EventSeenByRelaysTable::Table) + .columns([ + EventSeenByRelaysTable::EventId, + EventSeenByRelaysTable::RelayURL, + ]) + .values_panic([event_id.into(), relay_url.into()]) + .build_rusqlite(SqliteQueryBuilder); + + match conn.execute(sql.as_str(), &*values.as_params()) { + Ok(updated_rows) => Ok(updated_rows), + Err(err) => Err(Error::internal(err.into())), + } + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + if let Ok(event_id_inserted) = query_result { + if event_id_inserted > 0 { + return Ok(()); } - } - tx.commit().await.unwrap(); - - log::info!("[SQLite] add_event completed"); - Ok(message) - } - - async fn index_search(&self, event: Box) -> Result<(), Error> { - let id = event.id.to_string(); - let content = event.content.to_string(); - let (sql, values) = Query::insert() - .into_table(EventsFTSTable::Table) - .columns([EventsFTSTable::EventId, EventsFTSTable::Content]) - .values_panic([id.into(), content.into()]) - .build_sqlx(sea_query::SqliteQueryBuilder); - - let results = sqlx::query_with(&sql, values).execute(&self.pool).await; - if results.is_ok() { - Ok(()) + Err(Error::internal_with_message("Failed to insert new record")) } else { - Err(Error::internal_with_message( - "Unable to write event to events_fts index", - )) + Err(Error::internal_with_message("Failed to insert new record")) } } - async fn index_tags(&self, event: Box) -> Result<(), Error> { - // let t: Vec = Vec::new(); - // for tag in event.tags { - // tag.kind() - // } + async fn get_event_seen_on_relays( + &self, + event_id: EventId, + ) -> Result>, Error> { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + let event_id = event_id.to_string(); - Ok(()) + let Ok(query_result) = connection + .interact(|conn| -> Result>, Error> { + let (sql, values) = Query::select() + .from(EventSeenByRelaysTable::Table) + .column(EventSeenByRelaysTable::RelayURL) + .and_where(sea_query::Expr::col(EventSeenByRelaysTable::EventId).eq(event_id)) + .build_rusqlite(SqliteQueryBuilder); + + let mut stmt = conn.prepare(sql.as_str()).unwrap(); + let mut rows = stmt.query(&*values.as_params()).unwrap(); + + let mut urls: HashSet = HashSet::new(); + while let Ok(Some(record)) = rows.next() { + if let Ok(url_string) = record.get::<_, String>("relay_url") { + if let Ok(relay_url) = Url::from_str(&url_string) { + urls.insert(relay_url); + } + } + } + + if !urls.is_empty() { + return Ok(Some(urls)); + } + + Ok(None) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + let Ok(found_relay_urls) = query_result else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + Ok(found_relay_urls) + } + + async fn has_event_already_been_seen(&self, event_id: &EventId) -> Result { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let event_id = event_id.to_string(); + + let Ok(query_result) = connection + .interact(|conn| -> Result { + let (sql, values) = Query::select() + .expr_as( + sea_query::Expr::exists( + Query::select() + .column(EventSeenByRelaysTable::EventId) + .from(EventSeenByRelaysTable::Table) + .and_where( + sea_query::Expr::col(EventSeenByRelaysTable::EventId) + .eq(event_id), + ) + .limit(1) + .take(), + ), + sea_query::Alias::new("event_exists"), + ) + .build_rusqlite(SqliteQueryBuilder); + + let mut stmt = conn.prepare(sql.as_str()).unwrap(); + let mut rows = stmt.query(&*values.as_params()).unwrap(); + + let exists = match rows.next() { + Ok(_) => 1, + Err(_) => 0, + }; + + Ok(exists == 1) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + if let Ok(result) = query_result { + return Ok(result); + } + + Ok(false) + } + + async fn admin_delete_events(&self, event: &Event) -> Result { + let event_ids: Vec = event.event_ids().map(|e| e.to_string()).collect(); + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + let Ok(query_result) = connection + .interact( + |conn: &mut rusqlite::Connection| -> Result { + let (sql, values) = Query::delete() + .from_table(EventsTable::Table) + .and_where(sea_query::Expr::col(EventsTable::EventId).is_in(event_ids)) + .build_rusqlite(SqliteQueryBuilder); + + match conn.execute(sql.as_str(), &*values.as_params()) { + Ok(affected_rows) => { + let message = format!("{} events deleted", affected_rows); + Ok(RelayMessage::notice(message)) + } + Err(err) => { + log::error!("[admin_delete_events] Failed to execute query: {}", err); + + Ok(RelayMessage::notice("unable to delete events")) + } + } + }, + ) + .await + else { + log::error!("[admin_delete_events] Failed to execute query"); + return Err(Error::internal_with_message( + "Failed to execute query 'admin_delete_events'", + )); + }; + + let Ok(relay_message) = query_result else { + let message = RelayMessage::Notice { + message: "unable to acquire pool connection".to_string(), + }; + return Ok(message); + }; + + Ok(relay_message) + } + + async fn get_event_by_id(&self, event_id: EventId) -> Result { + let eid = event_id.to_string(); + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + let Ok(query_result) = connection + .interact(|conn: &mut rusqlite::Connection| -> Result { + let (sql, value) = Query::select() + .from(EventsTable::Table) + .columns([ + EventsTable::EventId, + EventsTable::Content, + EventsTable::Kind, + EventsTable::Pubkey, + EventsTable::CreatedAt, + EventsTable::Tags, + EventsTable::Sig, + ]) + .and_where(sea_query::Expr::col(EventsTable::EventId).eq(eid)) + .limit(1) + .build_rusqlite(SqliteQueryBuilder); + + let mut stmt = conn.prepare(sql.as_str()).unwrap(); + let row = stmt + .query_row::(&*value.as_params(), |row| Ok(EventRow::from(row))) + .unwrap(); + + let event = row.clone().to_event(); + + Ok(event) + }) + .await + else { + return Err(Error::internal_with_message( + "Failed to execute query 'get_event_by_id'", + )); + }; + + let Ok(event) = query_result else { + return Err(Error::internal_with_message( + "Failed to get event from row in 'get_event_by_id'", + )); + }; + + Ok(event) + } + + async fn wipe(&self) -> Result<(), Error> { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + match connection + .interact(|conn| -> Result<(), Error> { + let Ok(result) = conn.set_db_config( + rusqlite::config::DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, + true, + ) else { + return Err(Error::internal_with_message( + "Failed to set SQLITE_DBCONFIG_RESET_DATABASE to true", + )); + }; + let Ok(result) = conn.execute("VACUUM;", []) else { + return Err(Error::internal_with_message("Failed to run VACUUM command")); + }; + let Ok(result) = conn.set_db_config( + rusqlite::config::DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, + false, + ) else { + return Err(Error::internal_with_message( + "Failed to set SQLITE_DBCONFIG_RESET_DATABASE to false", + )); + }; + + Ok(()) + }) + .await + { + Ok(_) => { + if NostrSqlite::run_migrations(&self.pool).await { + return Ok(()); + } + + Err(Error::internal_with_message("Failed to run migrations")) + } + Err(err) => Err(Error::internal_with_message(err.to_string())), + } } fn get_filter_query(&self, filter: &nostr::Filter) -> sea_query::SelectStatement { @@ -632,9 +919,12 @@ impl SqliteDb { query } - fn get_filters_query(&self, subscription: Subscription) -> Option { - subscription - .filters + fn get_filters_query( + &self, + filters: Vec, + order: SqOrder, + ) -> Option { + filters .iter() .map(|filter| { Query::select() @@ -651,67 +941,261 @@ impl SqliteDb { self.get_filter_query(filter), sea_query::Alias::new("events"), ) + .order_by(EventsTable::CreatedAt, order.to_owned()) .to_owned() }) .reduce(|mut result, query| result.union(sea_query::UnionType::All, query).to_owned()) } - async fn admin_delete_events(&self, event: Box) -> Result { - let event_ids: Vec = event.event_ids().map(|e| e.to_string()).collect(); - let (sql, values) = Query::delete() - .from_table(EventsTable::Table) - .and_where(sea_query::Expr::col(EventsTable::EventId).is_in(event_ids)) - .build_sqlx(sea_query::SqliteQueryBuilder); + async fn query(&self, filters: Vec, order: Order) -> Result, Error> { + log::debug!("making query from filters..."); - match sqlx::query_with(&sql, values).execute(&self.pool).await { - Ok(affected_rows) => { - let message = RelayMessage::Notice { - message: format!("{} events deleted", affected_rows.rows_affected()), - }; - Ok(message) - } - Err(e) => { - log::error!("[admin_delete_events] Failed to execute query: {}", e); - let message = RelayMessage::Notice { - message: "unable to delete events".to_string(), - }; - Ok(message) - } - } + // let event_ids: Vec = event.event_ids().map(|e| e.to_string()).collect(); + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let sq_order = match order { + Order::Asc => SqOrder::Asc, + Order::Desc => SqOrder::Desc, + }; + let Some(sql_statement) = self.get_filters_query(filters, sq_order) else { + return Err(Error::internal_with_message("Failed to build SQL Query")); + }; + + let Ok(query_result) = connection + .interact(move |conn| -> Result, Error> { + let (sql, values) = sql_statement.build_rusqlite(SqliteQueryBuilder); + + let mut stmt = conn.prepare(sql.as_str()).unwrap(); + let mut rows = stmt.query(&*values.as_params()).unwrap(); + + let mut event_vec: Vec = vec![]; + while let Ok(Some(row)) = rows.next() { + let event = EventRow::from(row).to_event(); + event_vec.push(event); + } + + Ok(event_vec) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + query_result } - async fn count_events_by_filters(&self, subscription: Subscription) -> i32 { - if subscription.filters.is_empty() { - return 0; - } + async fn count(&self, filters: Vec) -> Result { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; - let (sql, values) = self - .get_filters_query(subscription) - .unwrap() - .clear_selects() - .expr_as( - sea_query::Func::count(sea_query::Expr::col(( - EventsTable::Table, - EventsTable::EventId, - ))), - sea_query::Alias::new("count"), - ) - .build_sqlx(sea_query::SqliteQueryBuilder); + let Some(mut sql_statement) = self.get_filters_query(filters, SqOrder::Desc) else { + return Err(Error::internal_with_message("Failed to build SQL Query")); + }; - println!("count_filters SEA_QUERY built SQL: {}", sql.clone()); + let Ok(query_result) = connection + .interact(move |conn| { + let (sql, values) = sql_statement + .clear_selects() + .expr_as( + sea_query::Func::count(sea_query::Expr::col(( + EventsTable::Table, + EventsTable::EventId, + ))), + sea_query::Alias::new("count"), + ) + .build_rusqlite(SqliteQueryBuilder); - let counts = sqlx::query_as_with::<_, EventsCountRow, _>(&sql, values) - .fetch_one(&self.pool) + if let Ok(result) = conn.query_row(sql.as_str(), &*values.as_params(), |row| { + let count: usize = row.get(0).unwrap(); + Ok(count) + }) { + return Ok(result); + } + + Err(Error::internal_with_message( + "Failed to get event counts by filters", + )) + }) .await - .unwrap(); + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; - dbg!(counts); + query_result + } - 1 + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Error> { + let Ok(connection) = self.get_connection().await else { + return Err(Error::internal_with_message("Unable to get DB connection")); + }; + + let sq_order = match order { + Order::Asc => SqOrder::Asc, + Order::Desc => SqOrder::Desc, + }; + + let Some(mut sql_statement) = self.get_filters_query(filters, sq_order) else { + return Err(Error::internal_with_message("Failed to build SQL Query")); + }; + + let Ok(query_result) = connection + .interact(move |conn| { + let (sql, values) = sql_statement + .clear_selects() + .column(EventsTable::EventId) + .build_rusqlite(SqliteQueryBuilder); + + let mut stmt = conn.prepare(sql.as_str()).unwrap(); + let mut rows = stmt.query(&*values.as_params()).unwrap(); + + let mut event_vec: Vec = vec![]; + while let Ok(Some(row)) = rows.next() { + let event_id_string: String = row.get(0).unwrap(); + let event_id = EventId::from_str(&event_id_string).unwrap(); + event_vec.push(event_id); + } + + Ok(event_vec) + }) + .await + else { + return Err(Error::internal_with_message("Failed to execute query")); + }; + + query_result + } + + async fn has_coordinate_been_deleted( + &self, + coordinate: &Coordinate, + timestamp: Timestamp, + ) -> Result { + Ok(true) } } -impl Noose for SqliteDb { +impl From for Error { + fn from(value: nostr_database::DatabaseError) -> Self { + Error::internal_with_message(value.to_string()) + } +} + +impl From for nostr_database::DatabaseError { + fn from(val: Error) -> Self { + nostr_database::DatabaseError::backend(val) + } +} + +#[async_trait] +impl NostrDatabase for NostrSqlite { + type Err = Error; + + /// Name of the backend database used (ex. lmdb, sqlite, indexeddb, ...) + fn backend(&self) -> Backend { + Backend::SQLite + } + + /// Database options + fn opts(&self) -> DatabaseOptions { + DatabaseOptions { events: true } + } + + /// Save [`Event`] into store + /// + /// Return `true` if event was successfully saved into database. + /// + /// **This method assume that [`Event`] was already verified** + async fn save_event(&self, event: &Event) -> Result { + self.save_event(event).await + } + + /// Check if [`Event`] has already been saved + async fn has_event_already_been_saved(&self, event_id: &EventId) -> Result { + self.has_event_already_been_saved(event_id).await + } + + /// Check if [`EventId`] has already been seen + async fn has_event_already_been_seen(&self, event_id: &EventId) -> Result { + self.has_event_already_been_seen(event_id).await + } + + /// Check if [`EventId`] has been deleted + async fn has_event_id_been_deleted(&self, event_id: &EventId) -> Result { + self.has_event_id_been_deleted(event_id).await + } + + /// Check if event with [`Coordinate`] has been deleted before [`Timestamp`] + async fn has_coordinate_been_deleted( + &self, + coordinate: &Coordinate, + timestamp: Timestamp, + ) -> Result { + todo!() + } + + /// Set [`EventId`] as seen by relay + /// + /// Useful for NIP65 (aka gossip) + async fn event_id_seen(&self, event_id: EventId, relay_url: Url) -> Result<(), Self::Err> { + self.set_event_id_seen(event_id, relay_url).await + } + + /// Get list of relays that have seen the [`EventId`] + async fn event_seen_on_relays( + &self, + event_id: EventId, + ) -> Result>, Self::Err> { + self.get_event_seen_on_relays(event_id).await + } + + /// Get [`Event`] by [`EventId`] + async fn event_by_id(&self, event_id: EventId) -> Result { + self.get_event_by_id(event_id).await + } + + /// Count number of [`Event`] found by filters + /// + /// Use `Filter::new()` or `Filter::default()` to count all events. + async fn count(&self, filters: Vec) -> Result { + self.count(filters).await + } + + /// Query store with filters + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err> { + self.query(filters, order).await + } + + /// Get event IDs by filters + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err> { + self.event_ids_by_filters(filters, order).await + } + + /// Get `negentropy` items + async fn negentropy_items( + &self, + filter: Filter, + ) -> Result, Self::Err> { + todo!() + } + + /// Wipe all data + async fn wipe(&self) -> Result<(), Self::Err> { + self.wipe().await + } +} + +impl Noose for NostrSqlite { async fn start(&mut self, pubsub: Arc) -> Result<(), Error> { let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await; @@ -724,13 +1208,15 @@ impl Noose for SqliteDb { }, Command::DbReqFindEvent(client_id, subscriptioin) => { match self.find_event(subscriptioin).await { - Ok(events) => Command::DbResRelayMessage(client_id, events), + Ok(relay_messages) => { + Command::DbResRelayMessages(client_id, relay_messages) + } Err(e) => Command::ServiceError(e), } } - Command::DbReqDeleteEvents(client_id, event_ids) => { - match self.delete_events(event_ids).await { - Ok(status) => Command::DbResOkWithStatus(client_id, status), + Command::DbReqEventCounts(client_id, subscriptioin) => { + match self.counts(subscriptioin).await { + Ok(relay_message) => Command::DbResEventCounts(client_id, relay_message), Err(e) => Command::ServiceError(e), } } @@ -756,190 +1242,235 @@ impl Noose for SqliteDb { Ok(()) } - async fn migration_up(&self) { - SqliteDb::migrate(&self.pool).await; - } - - async fn write_event(&self, event: Box) -> Result { - log::info!("[Noose] write_event triggered"); - - let status = self.add_event(event).await.unwrap(); - - log::info!("[Noose] write event completed: {}", status.as_json()); - Ok(status) - } - - async fn delete_events(&self, event: Box) -> Result { - log::debug!("[Noose] delete_filters triggered"); - - let status = self.admin_delete_events(event).await.unwrap(); - - Ok(status) - } - - async fn find_event(&self, subscription: Subscription) -> Result, Error> { - log::debug!("making query from filters..."); - - let eose_message = - vec![nostr::RelayMessage::EndOfStoredEvents(subscription.id.clone()).as_json()]; - - if let Some(sql_statement) = self.get_filters_query(subscription.clone()) { - let (sql, values) = sql_statement.build_sqlx(sea_query::SqliteQueryBuilder); - log::info!("SEA_QUERY built SQL: {}", sql.clone()); - - match sqlx::query_as_with::<_, EventRow, _>(&sql, values) - .fetch_all(&self.pool) - .await - { - Ok(rows) => { - if rows.is_empty() { - return Ok(eose_message); - } else { - let relay_messages: Vec = rows - .iter() - .map(|row| row.to_string(subscription.id.clone())) - .collect(); - return Ok(relay_messages); - } - } - Err(e) => { - log::error!("{}", e); - return Err(Error::internal(e.into())); - } + async fn write_event(&self, event: Box) -> Result { + // TODO: Maybe do event validation and admin deletions here + match self.save_event(&event).await { + Ok(status) => { + let relay_message = nostr::RelayMessage::ok(event.id, status, ""); + Ok(relay_message) } + Err(err) => Err(Error::bad_request(err.to_string())), } + } - Ok(eose_message) + async fn find_event( + &self, + subscription: Subscription, + ) -> Result, Error> { + match self.query(subscription.filters, Order::Desc).await { + Ok(events) => { + let relay_messages = events + .into_iter() + .map(|event| nostr::RelayMessage::event(subscription.id.clone(), event)) + .collect(); + Ok(relay_messages) + } + Err(err) => Err(Error::bad_request(err.to_string())), + } + } + + async fn counts(&self, subscription: Subscription) -> Result { + match self.count(subscription.filters).await { + Ok(counts) => { + let relay_message = nostr::RelayMessage::count(subscription.id, counts); + Ok(relay_message) + } + Err(err) => Err(Error::internal_with_message(err.to_string())), + } } } #[cfg(test)] mod tests { - use super::Noose; - use super::SqliteDb; - use crate::utils::structs::Subscription; - - use nostr::key::FromSkStr; - use nostr::util::JsonUtil; + use crate::noose::sqlite::*; + use nostr::EventBuilder; #[tokio::test] - async fn find_event() { - let db = SqliteDb::new().await; + async fn create_db() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; - let t = std::time::Instant::now(); - let client_id = "test_id".to_string(); - - let cm = nostr::ClientMessage::from_json( - r#"["REQ","7b9bc4b6-701c-40b6-898f-4e7c6b5b1510",{"authors":["04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],"kinds":[0]}]"#, - ).unwrap(); - let (sub_id, filters) = match cm { - nostr::ClientMessage::Req { - subscription_id, - filters, - } => (subscription_id, filters), - _ => panic!("sneed :("), - }; - let sub = Subscription::new(sub_id, filters); - db.find_event(sub).await.unwrap(); - println!( - "Time passed: {}", - (std::time::Instant::now() - t).as_millis() - ); + assert_eq!(db.pool.status().max_size, 64); } #[tokio::test] - async fn admin_delete_events() { - let db = SqliteDb::new().await; + async fn event_row_to_event() { + let event_id = + "076b27df50401f2af598cb727281e1e36401592570052c0fb37543f50aba5b69".to_string(); + let event_row = EventRow { + id: event_id.clone(), + pubkey: "da9cfc5b7644aae5b3e7df1d224ef4d4a206d66d6e3b5c99a8aeda3b642c4cef".to_string(), + created_at: 1706020262, + kind: 1, + tags: "[]".to_string(), + sig: "7f92a6ebdeae4a9bd7a6e19f57b49e2c499fece644c63c6ff5776bdbafec5c524cfad1eecedef11a8701df2a826453914d39a897e0f8276a6ffc40e266cae059".to_string(), + content: "hoh".to_string(), + }; - let admin_pubkey = "npub14d2a54za7dnfzktle40vw7kdx48vk3ljy3t7w7sdpk3segea65mq2t6kc4"; - let admin_secret = "nsec1rayezcsw7txmtu3smpsgs7m5fa3dazhx6lhdm44dxclveplhajpsalyx2l"; + let event = event_row.to_event(); - let admin_keys = nostr::Keys::from_sk_str(admin_secret).unwrap(); - - let event_1 = nostr::EventBuilder::new(nostr::Kind::TextNote, "this is event 1", vec![]) - .to_event(&nostr::Keys::generate()) - .unwrap(); - let event_2 = nostr::EventBuilder::new(nostr::Kind::TextNote, "this is event 2", vec![]) - .to_event(&nostr::Keys::generate()) - .unwrap(); - - let res = db.add_event(Box::new(event_1.clone())).await.unwrap(); - let res = db.add_event(Box::new(event_2.clone())).await.unwrap(); - - let e_ids = vec![event_1.id, event_2.id]; - let event = nostr::EventBuilder::delete(e_ids) - .to_event(&admin_keys) - .unwrap(); - - let message = db.admin_delete_events(Box::new(event)).await.unwrap(); - assert_eq!(message.as_json(), "[\"NOTICE\",\"2 events deleted\"]"); + assert_eq!(event.id.to_string(), event_id); } #[tokio::test] - async fn delete_events() { - let db = SqliteDb::new().await; + async fn get_event_by_id() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; - let t = std::time::Instant::now(); - let client_id = "test_id".to_string(); - - let my_keys = nostr::Keys::generate(); - - let eid = nostr::EventId::all_zeros(); - let tag_event = nostr::Tag::Event { - event_id: eid, - relay_url: None, - marker: None, - }; - let tag_url = nostr::Tag::AbsoluteURL(nostr::types::UncheckedUrl::new( - "http://foo.net".to_string(), - )); - let tag_hashtag = nostr::Tag::Hashtag("farm".to_string()); - - let event = nostr::EventBuilder::new_text_note( - "sneed feed and seed", - vec![tag_event, tag_url, tag_hashtag], - ) - .to_event(&my_keys) - .unwrap(); - - dbg!(&event.as_json()); - let resp = db.add_event(Box::new(event.clone())).await.unwrap(); - dbg!(resp); - - let delete_event = nostr::EventBuilder::delete(vec![event.id]) - .to_event(&my_keys) + let keys = nostr::Keys::generate(); + let event = EventBuilder::new(nostr::Kind::TextNote, "Hello", vec![]) + .to_event(&keys) .unwrap(); - dbg!(&delete_event); - let resp = db.add_event(Box::new(delete_event.clone())).await.unwrap(); - dbg!(resp); + // Insert event + let result = db.save_event(&event).await.unwrap(); + assert!(result); + + // Get event by id + let eid = event.id; + let result = db.get_event_by_id(eid).await.unwrap(); + + assert_eq!(result.id, eid); } #[tokio::test] - async fn count_events() { - let db = SqliteDb::new().await; + async fn event_seen_by_other_relays() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; - let t = std::time::Instant::now(); - let client_id = "test_id".to_string(); + let keys = nostr::Keys::generate(); + let event = EventBuilder::new(nostr::Kind::TextNote, "Hello to Other relay", vec![]) + .to_event(&keys) + .unwrap(); - let cm = nostr::ClientMessage::from_json( - r#"["COUNT","7b9bc4b6-701c-40b6-898f-4e7c6b5b1510",{"authors":["6be3c1446231fe6d117d72e29b60094bbb3eec029100c34f627dc4ebe8369a64"],"kinds":[1]}]"#, - ).unwrap(); - let (sub_id, filters) = match cm { - nostr::ClientMessage::Count { - subscription_id, - filters, - } => (subscription_id, filters), - _ => panic!("sneed :("), - }; + // Insert event + let result = db.save_event(&event).await.unwrap(); + assert!(result); - let sub = Subscription::new(sub_id, filters); - let num = db.count_events_by_filters(sub).await; - println!( - "Time passed: {}", - (std::time::Instant::now() - t).as_millis() - ); + // Set seen on other relays + let url_0 = Url::from_str("wss://relay.zhitno.st").unwrap(); + db.set_event_id_seen(event.id, url_0).await.unwrap(); - assert_eq!(num, 1); + let url_1 = Url::from_str("wss://relay.damus.io").unwrap(); + db.set_event_id_seen(event.id, url_1).await.unwrap(); + + // Get status of seen event + let result = db.get_event_seen_on_relays(event.id).await.unwrap(); + + dbg!(result); + + // Get event by id + let eid = event.id; + let result = db.get_event_by_id(eid).await.unwrap(); + + assert_eq!(result.id, eid); + } + + #[tokio::test] + async fn has_event_already_been_seen() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + let keys = nostr::Keys::generate(); + let event = EventBuilder::new(nostr::Kind::TextNote, "Hello to Other relay", vec![]) + .to_event(&keys) + .unwrap(); + + // Set seen on other relays + let url_0 = Url::from_str("wss://relay.zhitno.st").unwrap(); + db.set_event_id_seen(event.id, url_0).await.unwrap(); + + let url_1 = Url::from_str("wss://relay.damus.io").unwrap(); + db.set_event_id_seen(event.id, url_1).await.unwrap(); + + // Get status of seen event + let result = db.has_event_already_been_seen(&event.id).await.unwrap(); + + assert!(result); + } + + #[tokio::test] + async fn wipe_db() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + db.wipe().await.unwrap(); + } + + #[tokio::test] + async fn query() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + let keys = nostr::Keys::generate(); + let event = EventBuilder::new(nostr::Kind::TextNote, "Hello Filters", vec![]) + .to_event(&keys) + .unwrap(); + + // Insert event + let result = db.save_event(&event).await.unwrap(); + assert!(result); + + // Get event vec from filters + let filter = Filter::new().author(event.pubkey); + let filters = vec![filter]; + let result = db.query(filters, Order::Desc).await.unwrap(); + + assert!(!result.is_empty()); + assert_eq!(result[0].pubkey, event.pubkey); + } + + #[tokio::test] + async fn count() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + let keys = nostr::Keys::generate(); + let event_0 = EventBuilder::new(nostr::Kind::TextNote, "Hello Count", vec![]) + .to_event(&keys) + .unwrap(); + let event_1 = EventBuilder::new(nostr::Kind::TextNote, "Goodbye Count", vec![]) + .to_event(&keys) + .unwrap(); + + // Insert events + let result = db.save_event(&event_0).await.unwrap(); + assert!(result); + let result = db.save_event(&event_1).await.unwrap(); + assert!(result); + + // Get event vec from filters + let filter = Filter::new().author(event_0.pubkey); + let filters = vec![filter]; + let result = db.count(filters).await.unwrap(); + + assert_eq!(result, 2); + } + + #[tokio::test] + async fn event_ids_by_filter() { + let config = Arc::new(ServiceConfig::new()); + let db = NostrSqlite::new(config).await; + + let keys = nostr::Keys::generate(); + let event_0 = EventBuilder::new(nostr::Kind::TextNote, "Hello Count", vec![]) + .to_event(&keys) + .unwrap(); + let event_1 = EventBuilder::new(nostr::Kind::TextNote, "Goodbye Count", vec![]) + .to_event(&keys) + .unwrap(); + + // Insert events + let result = db.save_event(&event_0).await.unwrap(); + assert!(result); + let result = db.save_event(&event_1).await.unwrap(); + assert!(result); + + // Get event vec from filters + let filter = Filter::new().author(event_0.pubkey); + let filters = vec![filter]; + let result = db.event_ids_by_filters(filters, Order::Desc).await.unwrap(); + + assert_eq!(result.len(), 2); } } diff --git a/src/utils/error.rs b/src/utils/error.rs index 4f34cde..e50fbc5 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -4,6 +4,7 @@ use std::{ convert::From, fmt::{self, Display}, }; +use std::error::Error as StdError; use validator::ValidationErrors; use warp::{http::StatusCode, reject::Reject}; @@ -16,6 +17,16 @@ pub struct Error { pub sneedstr_version: Option, } +impl StdError for Error { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + None + } + + fn cause(&self) -> Option<&dyn StdError> { + self.source() + } +} + impl Error { pub fn new(code: StatusCode, message: String) -> Self { Self { @@ -74,12 +85,12 @@ impl Error { } impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}: {}", self.status_code(), &self.message)?; - if let Some(val) = &self.sneedstr_version { - write!(f, "\ndiem ledger version: {}", val)?; - } - Ok(()) + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Error {{ code: {}, message: '{}', sneedstr_version: {:?} }}", + self.code, self.message, self.sneedstr_version + ) } }