Initial commit

This commit is contained in:
Tony Klink 2024-01-12 09:35:31 -06:00
commit 9fe412be11
Signed by: klink
GPG key ID: 85175567C4D19231
58 changed files with 6215 additions and 0 deletions

18
src/noose/db.rs Normal file
View file

@ -0,0 +1,18 @@
use crate::{
bussy::PubSub,
utils::{error::Error, structs::Subscription},
};
use async_trait::async_trait;
use nostr::Event;
use std::sync::Arc;
#[async_trait]
pub trait Noose: Send + Sync {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error>;
async fn migration_up(&self);
async fn write_event(&self, event: Box<Event>) -> Result<String, Error>;
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error>;
}

View file

@ -0,0 +1,23 @@
CREATE TABLE events (
id TEXT PRIMARY KEY,
kind INTEGER NOT NULL,
pubkey TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER NOT NULL,
tags TEXT NOT NULL,
sig TEXT NOT NULL
);
CREATE INDEX idx_events_kind ON events (kind);
CREATE INDEX idx_events_pubkey ON events (pubkey);
CREATE TABLE tags (
tag TEXT NOT NULL,
value TEXT NOT NULL,
event_id TEXT REFERENCES events(id) ON DELETE CASCADE
);
CREATE INDEX idx_tags_tag ON tags (tag);
CREATE INDEX idx_tags_value ON tags (value);
CREATE INDEX idx_tags_event_id ON tags (event_id);

View file

@ -0,0 +1,5 @@
CREATE TABLE relays (
url TEXT PRIMARY KEY,
domain TEXT NOT NULL,
active BOOLEAN NOT NULL
);

View file

@ -0,0 +1 @@
CREATE VIRTUAL TABLE events_fts USING fts5(id, content);

View file

@ -0,0 +1,10 @@
CREATE TABLE users (
pubkey TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
inserted_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
admin BOOLEAN DEFAULT false
);
CREATE INDEX idx_users_pubkey ON users (pubkey);
CREATE INDEX idx_users_username ON users (username);

View file

@ -0,0 +1,2 @@
PRAGMA foreign_keys = ON;
PRAGMA auto_vacuum = FULL;

View file

@ -0,0 +1,11 @@
CREATE TABLE unattached_media (
id TEXT PRIMARY KEY,
pubkey TEXT NOT NULL,
url TEXT NOT NULL,
data TEXT NOT NULL,
uploaded_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX unattached_media_id ON unattached_media (id);
CREATE INDEX unattached_media_pubkey ON unattached_media (pubkey);
CREATE INDEX unattached_media_url ON unattached_media (url);

33
src/noose/mod.rs Normal file
View file

@ -0,0 +1,33 @@
use crate::utils::structs::Context;
use tokio::runtime;
use db::Noose;
use pipeline::Pipeline;
pub mod db;
pub mod pipeline;
// mod sled;
mod sqlite;
pub mod user;
pub fn start(context: Context) {
let rt = runtime::Runtime::new().unwrap();
rt.block_on(async move {
let pipeline_pubsub = context.pubsub.clone();
let db_pubsub = context.pubsub.clone();
let pipeline_handle = tokio::task::spawn(async move {
let mut pipeline = Pipeline::new(pipeline_pubsub);
pipeline.start().await.unwrap();
});
let sqlite_writer_handle = tokio::task::spawn(async move {
let mut db_writer = sqlite::SqliteDb::new().await;
db_writer.start(db_pubsub).await.unwrap();
});
sqlite_writer_handle.await.unwrap();
pipeline_handle.await.unwrap();
});
}

166
src/noose/pipeline.rs Normal file
View file

@ -0,0 +1,166 @@
use crate::bussy::{channels, Command, Message, PubSub};
use crate::utils::error::Error;
use nostr::Event;
use std::sync::Arc;
pub struct Pipeline {
pubsub: Arc<PubSub>,
}
impl Pipeline {
pub fn new(pubsub: Arc<PubSub>) -> Self {
Self { pubsub }
}
pub async fn start(&mut self) -> Result<(), Error> {
let mut subscriber = self.pubsub.subscribe(channels::MSG_PIPELINE).await;
while let Ok(message) = subscriber.recv().await {
log::debug!("[Pipeline] received message: {:?}", message);
let command = match message.content {
Command::PipelineReqEvent(client_id, event) => {
match self.handle_event(client_id, event.clone()).await {
Ok(_) => {
let message =
nostr::RelayMessage::new_ok(event.id, true, "".to_string());
Command::PipelineResRelayMessageOk(client_id, message)
}
Err(e) => Command::ServiceError(e),
}
}
_ => Command::Noop,
};
if command != Command::Noop {
let channel = message.source;
let message = Message {
source: channels::MSG_PIPELINE,
content: command,
};
log::info!(
"[Pipeline] channel: {} - publishing new message: {:?}",
channel,
message
);
self.pubsub.publish(channel, message).await;
}
}
Ok(())
}
pub async fn handle_event(
&self,
client_id: uuid::Uuid,
event: Box<Event>,
) -> Result<(), Error> {
let store_event_task = self.store_event(event.clone());
let process_deletions_task = self.process_deletions(event.clone());
let track_hashtags_task = self.track_hashtags(event.clone());
let process_media_task = self.process_media(event.clone());
let stream_out_task = self.stream_out(event.clone());
let broadcast_task = self.broadcast(event.clone());
let (
store_event_result,
process_deletions_result,
track_hashtags_result,
process_media_result,
stream_out_result,
broadcast_result,
) = tokio::join!(
store_event_task,
process_deletions_task,
track_hashtags_task,
process_media_task,
stream_out_task,
broadcast_task
);
match (
store_event_result,
process_deletions_result,
track_hashtags_result,
process_media_result,
stream_out_result,
broadcast_result,
) {
(Ok(_), Ok(_), Ok(_), Ok(_), Ok(_), Ok(_)) => {
log::info!("[Pipeline] Tasks finished successfully");
Ok(())
}
_ => {
log::error!("[Pipeline] One or more futures returned an error.");
Err(Error::internal_with_message(
"[Pipeline] One or more futures returned an error.",
))
}
}
}
async fn store_event(&self, event: Box<Event>) -> Result<(), Error> {
if event.kind.is_ephemeral() {
return Ok(());
}
self.pubsub
.publish(
channels::MSG_NOOSE,
Message {
source: channels::MSG_PIPELINE,
content: Command::DbReqWriteEvent(event),
},
)
.await;
Ok(())
}
async fn process_deletions(&self, event: Box<Event>) -> Result<(), Error> {
// if event.kind.as_u32() == 5 {
// let events_for_deletion: Vec<String> = event
// .tags
// .iter()
// .filter_map(|tag| match tag {
// nostr::Tag::Event(event_id, _, _) => Some(event_id.to_string()),
// _ => None,
// })
// .collect();
// self.pubsub
// .publish(
// channels::MSG_NOOSE,
// Message {
// source: channels::MSG_PIPELINE,
// content: Command::DbReqDeleteEvents(events_for_deletion),
// },
// )
// .await;
// }
Ok(())
}
async fn track_hashtags(&self, event: Box<Event>) -> Result<(), Error> {
Ok(())
}
async fn process_media(&self, event: Box<Event>) -> Result<(), Error> {
Ok(())
}
async fn stream_out(&self, event: Box<Event>) -> Result<(), Error> {
let message = Message {
source: channels::MSG_PIPELINE,
content: Command::PipelineResStreamOutEvent(event),
};
self.pubsub.publish(channels::MSG_RELAY, message).await;
Ok(())
}
async fn broadcast(&self, event: Box<Event>) -> Result<(), Error> {
Ok(())
}
}

234
src/noose/sled.rs Normal file
View file

@ -0,0 +1,234 @@
use super::db::Noose;
use crate::bussy::{channels, Command, Message, PubSub};
use crate::utils::error::Error;
use crate::utils::structs::Subscription;
use async_trait::async_trait;
use nostr::Event;
use serde::Serialize;
use std::sync::Arc;
use super::user::{User, UserRow};
// Db Interface
pub struct SledDb {
db: sled::Db,
events: sled::Tree,
nip05s: sled::Tree,
pub users: sled::Tree,
index: sled::Db,
}
impl SledDb {
pub fn new() -> Self {
let db = sled::open("/tmp/sled_db").unwrap();
let events = db.open_tree("events").unwrap();
let nip05s = db.open_tree("identifiers").unwrap();
let accounts = db.open_tree("accounts").unwrap();
let index = sled::open("/tmp/sled_index").unwrap();
Self {
db,
events,
nip05s,
users: accounts,
index,
}
}
fn clear_db(&self) -> Result<(), sled::Error> {
self.db.clear()
}
fn clear_index(&self) -> Result<(), sled::Error> {
self.index.clear()
}
async fn insert_user(&self, user: UserRow) -> Result<(), Error> {
let pubkey = user.pubkey.clone();
let username = user.username.clone();
if let Ok(Some(_)) = self.nip05s.get(&username) {
return Err(Error::internal_with_message("User already exists"));
}
let mut user_buff = flexbuffers::FlexbufferSerializer::new();
user.serialize(&mut user_buff).unwrap();
self.nip05s.insert(&username, user_buff.view()).unwrap();
let prefix = "nip05:";
let key = format!("{}{}", prefix, pubkey);
self.index.insert(key, username.as_bytes()).unwrap();
Ok(())
}
async fn get_user(&self, user: User) -> Result<UserRow, Error> {
let mut user_row = None;
if let Some(username) = user.name {
if let Ok(Some(buff)) = self.nip05s.get(username) {
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
user_row = Some(b);
}
} else if let Some(pubkey) = user.pubkey {
let prefix = "nip05:";
let reference = format!("{}{}", prefix, pubkey);
if let Ok(Some(row)) = self.index.get(reference) {
let key = String::from_utf8(row.to_vec()).unwrap();
if let Ok(Some(buff)) = self.nip05s.get(key) {
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
user_row = Some(b);
}
}
}
match user_row {
Some(user) => Ok(user),
None => Err(Error::internal_with_message("User not found")),
}
}
}
#[async_trait]
impl Noose for SledDb {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
while let Ok(message) = subscriber.recv().await {
log::info!("noose subscriber received: {:?}", message);
let command = match message.content {
Command::DbReqInsertUser(user) => match self.insert_user(user).await {
Ok(_) => Command::DbResOk,
Err(e) => Command::ServiceError(e),
},
Command::DbReqGetUser(user) => match self.get_user(user).await {
Ok(user) => Command::DbResUser(user),
Err(e) => Command::ServiceError(e),
},
Command::DbReqWriteEvent(event) => match self.write_event(event).await {
Ok(_) => Command::DbResOk,
Err(e) => Command::ServiceError(e),
},
_ => Command::Noop,
};
if command != Command::Noop {
log::info!("Publishing new message");
let channel = message.source;
pubsub
.publish(
channel,
Message {
source: channels::MSG_NOOSE,
content: command,
},
)
.await;
}
}
Ok(())
}
async fn migration_up(&self) {}
async fn write_event(&self, event: Box<Event>) -> Result<String, Error> {
let mut event_buff = flexbuffers::FlexbufferSerializer::new();
event.serialize(&mut event_buff).unwrap();
self.events.insert(event.id, event_buff.view()).unwrap();
{
// Timestamp
let key = format!("created_at:{}|#e:{}", event.created_at, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
}
{
// Author, pubkeys #p
let key = format!("#author:{}|#e:{}", event.pubkey, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
{
// Kinds
let key = format!("#k:{}|#e:{}", event.kind, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
{
// Tags
event.tags.iter().for_each(|tag| {
if let Some(key) = match tag {
// #e tag
nostr::Tag::Event(event_id, _, _) => Some(format!("#e:{}", event_id)),
// #p tag
nostr::Tag::PubKey(pubkey, _) => Some(format!("#p:{}|#e:{}", pubkey, event.id)),
// #t tag
nostr::Tag::Hashtag(hashtag) => Some(format!("#t:{}|#e:{}", hashtag, event.id)),
// #a tag
nostr::Tag::A {
kind,
public_key,
identifier,
relay_url,
} => Some(format!(
"#a:kind:{}|#a:pubkey:{}#a:identifier:{}|#e:{}",
kind, public_key, identifier, event.id
)),
_ => None,
} {
self.index.insert(key, event.id.as_bytes()).unwrap();
}
});
// let key = format!("#t:{}|#e:{}", event.kind, event.id);
// self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
let message = format!("[\"OK\", \"{}\", true, \"\"]", event.id.to_string());
Ok(message)
}
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error> {
todo!()
}
}
#[cfg(test)]
mod tests {
use super::SledDb;
use crate::{
bussy::PubSub,
noose::user::{User, UserRow},
};
use std::sync::Arc;
#[tokio::test]
async fn get_db_names() {
let pubsub = Arc::new(PubSub::new());
let db = SledDb::new();
let pk = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
let username = "klink".to_string();
let user = UserRow::new(pk, username, false);
let result = db.insert_user(user).await;
let pubkey = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
let username = "klink".to_string();
let user = User {
name: None,
pubkey: Some(pubkey),
};
let user = db.get_user(user).await;
db.clear_db().unwrap();
db.clear_index().unwrap();
}
}

889
src/noose/sqlite.rs Normal file
View file

@ -0,0 +1,889 @@
use async_trait::async_trait;
use nostr::{Event, JsonUtil};
use sea_query::{extension::sqlite::SqliteExpr, Query};
use sea_query_binder::SqlxBinder;
use sqlx::sqlite::{Sqlite, SqlitePoolOptions};
use sqlx::FromRow;
use sqlx::{migrate::MigrateDatabase, Pool};
use std::sync::Arc;
use super::db::Noose;
use crate::bussy::{channels, Command, Message, PubSub};
use crate::utils::{error::Error, structs::Subscription};
enum EventsTable {
Table,
EventId,
Kind,
Pubkey,
Content,
CreatedAt,
Tags,
Sig,
}
impl sea_query::Iden for EventsTable {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(
s,
"{}",
match self {
Self::Table => "events",
Self::EventId => "id",
Self::Kind => "kind",
Self::Pubkey => "pubkey",
Self::Content => "content",
Self::CreatedAt => "created_at",
Self::Tags => "tags",
Self::Sig => "sig",
}
)
.unwrap()
}
}
enum EventsFTSTable {
Table,
EventId,
Content,
}
impl sea_query::Iden for EventsFTSTable {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(
s,
"{}",
match self {
Self::Table => "events_fts",
Self::EventId => "id",
Self::Content => "content",
}
)
.unwrap()
}
}
enum TagsTable {
Table,
Tag,
Value,
EventId,
}
impl sea_query::Iden for TagsTable {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(
s,
"{}",
match self {
Self::Table => "tags",
Self::Tag => "tag",
Self::Value => "value",
Self::EventId => "event_id",
}
)
.unwrap()
}
}
#[derive(FromRow, Debug)]
struct EventsCountRow(i32);
#[derive(FromRow, Debug)]
struct EventRow {
id: String,
pubkey: String,
created_at: i64,
kind: i64,
tags: String,
sig: String,
content: String,
}
impl EventRow {
pub fn to_string(&self, subscription_id: nostr::SubscriptionId) -> String {
let tags: Vec<Vec<String>> = serde_json::from_str(&self.tags).unwrap();
let message = serde_json::json!([
"EVENT",
subscription_id,
{
"id": self.id,
"content": self.content,
"created_at": self.created_at,
"kind": self.kind,
"pubkey": self.pubkey,
"sig": self.sig,
"tags": tags
}
]);
message.to_string()
}
}
pub struct SqliteDb {
pool: Pool<Sqlite>,
}
impl SqliteDb {
pub async fn new() -> Self {
let pool = SqliteDb::build_pool("noose_pool", 42).await;
Self { pool }
}
pub fn info(&self) {
dbg!(self.pool.options());
}
async fn migrate(pool: &Pool<Sqlite>) {
sqlx::migrate!("src/noose/migrations")
.run(pool)
.await
.unwrap()
}
async fn build_pool(name: &str, max_size: u32) -> Pool<Sqlite> {
let pool_options = SqlitePoolOptions::new()
.test_before_acquire(true)
// .idle_timeout(Some(Duration::from_secs(10)))
// .max_lifetime(Some(Duration::from_secs(30)))
.max_lifetime(None)
.idle_timeout(None)
.max_connections(max_size);
let db_url = "sqlite://sqlite.db";
if !Sqlite::database_exists(db_url).await.unwrap_or(false) {
log::info!("Creating database {}", db_url);
match Sqlite::create_database(db_url).await {
Ok(_) => log::info!("Db {} created", db_url),
Err(_) => panic!("Failed to create database {}", db_url),
}
}
if let Ok(pool) = pool_options.connect(db_url).await {
log::info!("Connected to sqlite pool {}", name);
pool
} else {
panic!("Connection to sqlite pool {} failed", name);
}
}
async fn add_event(&self, event: Box<Event>) -> Result<String, Error> {
let id = event.id.to_string();
let kind = event.kind.to_string();
let pubkey = event.pubkey.to_string();
let content = event.content.to_string();
let created_at = event.created_at.as_i64();
let tags = serde_json::to_string(&event.tags).unwrap();
let sig = event.sig.to_string();
let message = format!("[\"OK\", \"{}\", true, \"\"]", id.clone());
if event.is_ephemeral() {
return Ok(message);
}
let tx = self.pool.begin().await.unwrap();
{
if event.is_replaceable() {
dbg!("new event is replaceable - searching for previously stored event");
let (sql, values) = Query::select()
.from(EventsTable::Table)
.columns([EventsTable::EventId])
.and_where(
sea_query::Expr::col(EventsTable::Pubkey).eq(event.pubkey.to_string()),
)
.and_where(sea_query::Expr::col(EventsTable::Kind).eq(event.kind.as_u32()))
.and_where(
sea_query::Expr::col(EventsTable::CreatedAt).gte(event.created_at.as_i64()),
)
.limit(1)
.build_sqlx(sea_query::SqliteQueryBuilder);
let repl_count = sqlx::query_with(&sql, values).fetch_one(&self.pool).await;
if repl_count.ok().is_some() {
return Ok(message);
}
}
}
{
if event.is_parameterized_replaceable() {
dbg!(
"new event is parametrized replaceable - searching for previously stored event"
);
let d_tags: Vec<String> = event
.tags
.iter()
.filter(|tag| tag.kind() == nostr::TagKind::D)
.map(|tag| tag.clone().to_vec()[1].clone())
.collect();
let (sql, values) = Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((TagsTable::Table, TagsTable::EventId))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(event.pubkey.to_string()),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind))
.eq(event.kind.as_u32()),
)
.and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq("d"))
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Value))
.eq(d_tags[0].to_string()),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt))
.gte(event.created_at.as_i64()),
)
.limit(1)
.build_sqlx(sea_query::SqliteQueryBuilder);
let repl_count = sqlx::query_with(&sql, values).fetch_one(&self.pool).await;
if repl_count.ok().is_some() {
return Ok(message);
}
}
}
// Insert replaceble event
{
if event.is_replaceable() {
dbg!("deleting older replaceable event from events table");
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind))
.eq(event.kind.as_u32()),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey))
.eq(event.pubkey.to_string()),
)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId))
.not_in_subquery(
Query::select()
.from(EventsTable::Table)
.column(EventsTable::EventId)
.and_where(
sea_query::Expr::col(EventsTable::Kind)
.eq(event.kind.as_u32()),
)
.and_where(
sea_query::Expr::col(EventsTable::Pubkey)
.eq(event.pubkey.to_string()),
)
.order_by(EventsTable::CreatedAt, sea_query::Order::Desc)
.limit(1)
.to_owned(),
),
)
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
if results.rows_affected() > 0 {
log::info!(
"removed {} older replaceable kind {} events for author: {:?}",
results.rows_affected(),
event.kind.as_u32(),
event.pubkey.to_string()
);
}
}
}
// Insert parametrized replaceble event
{
if event.is_parameterized_replaceable() {
dbg!("deleting older parametrized replaceable event from events table");
let d_tag = event.identifier();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId))
.in_subquery(
Query::select()
.from(EventsTable::Table)
.column((EventsTable::Table, EventsTable::EventId))
.left_join(
TagsTable::Table,
sea_query::Expr::col((
TagsTable::Table,
TagsTable::EventId,
))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Kind,
))
.eq(event.kind.as_u32()),
)
.and_where(
sea_query::Expr::col((
EventsTable::Table,
EventsTable::Pubkey,
))
.eq(event.pubkey.to_string()),
)
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Tag))
.eq("d"),
)
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Value))
.eq(d_tag),
)
.to_owned(),
),
)
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
if results.rows_affected() > 0 {
log::info!("removed {} older parameterized replaceable kind {} events for author: {:?}", results.rows_affected(), event.kind, event.pubkey);
}
}
}
// Process deletion events
dbg!(event.as_json());
if event.kind.as_u32() == 5 {
dbg!("deleting event");
let ids: Vec<String> = event.event_ids().map(|eid| eid.to_string()).collect();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(sea_query::Expr::col(EventsTable::Kind).ne(5))
.and_where(sea_query::Expr::col(EventsTable::Pubkey).eq(event.pubkey.to_string()))
.and_where(sea_query::Expr::col(EventsTable::EventId).is_in(&ids))
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
if results.rows_affected() > 0 {
log::info!(
"removed {} events for author {:?}",
results.rows_affected(),
event.pubkey
);
}
// Delete from EventsFTS
let (sql, values) = Query::delete()
.from_table(EventsFTSTable::Table)
.and_where(sea_query::Expr::col(EventsFTSTable::EventId).is_in(&ids))
.build_sqlx(sea_query::SqliteQueryBuilder);
let _ = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
} else {
dbg!("inserting new event in events");
// Insert into Events table
let (sql, values) = Query::insert()
.into_table(EventsTable::Table)
.columns([
EventsTable::EventId,
EventsTable::Content,
EventsTable::Kind,
EventsTable::Pubkey,
EventsTable::CreatedAt,
EventsTable::Tags,
EventsTable::Sig,
])
.values_panic([
id.clone().into(),
content.clone().into(),
kind.into(),
pubkey.into(),
created_at.into(),
tags.into(),
sig.into(),
])
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
// Insert into EventsFTS table
dbg!("inserting new event into eventsFTS");
let (sql, values) = Query::insert()
.into_table(EventsFTSTable::Table)
.columns([EventsFTSTable::EventId, EventsFTSTable::Content])
.values_panic([id.clone().into(), content.into()])
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
// Insert into Tags table
dbg!("inserting new event into tags");
for tag in event.tags.clone() {
let tag = tag.to_vec();
if tag.len() >= 2 {
let tag_name = &tag[0];
let tag_value = &tag[1];
if tag_name.len() == 1 {
let (sql, values) = Query::insert()
.into_table(TagsTable::Table)
.columns([TagsTable::Tag, TagsTable::Value, TagsTable::EventId])
.values_panic([tag_name.into(), tag_value.into(), id.clone().into()])
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values)
.execute(&self.pool)
.await
.unwrap();
}
}
}
}
tx.commit().await.unwrap();
Ok(message)
}
async fn index_search(&self, event: Box<Event>) -> Result<(), Error> {
let id = event.id.to_string();
let content = event.content.to_string();
let (sql, values) = Query::insert()
.into_table(EventsFTSTable::Table)
.columns([EventsFTSTable::EventId, EventsFTSTable::Content])
.values_panic([id.into(), content.into()])
.build_sqlx(sea_query::SqliteQueryBuilder);
let results = sqlx::query_with(&sql, values).execute(&self.pool).await;
if results.is_ok() {
Ok(())
} else {
Err(Error::internal_with_message(
"Unable to write event to events_fts index",
))
}
}
async fn index_tags(&self, event: Box<Event>) -> Result<(), Error> {
// let t: Vec<String> = Vec::new();
// for tag in event.tags {
// tag.kind()
// }
Ok(())
}
fn get_filter_query(&self, filter: &nostr::Filter) -> sea_query::SelectStatement {
let mut query = Query::select()
.column((EventsTable::Table, EventsTable::EventId))
.column((EventsTable::Table, EventsTable::Content))
.columns([
EventsTable::Kind,
EventsTable::Pubkey,
EventsTable::CreatedAt,
EventsTable::Tags,
EventsTable::Sig,
])
.from(EventsTable::Table)
.order_by(EventsTable::CreatedAt, sea_query::Order::Desc)
.to_owned();
if !filter.ids.is_empty() {
let ids = filter.ids.iter().map(|id| id.to_string());
query = query
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId)).is_in(ids),
)
.to_owned();
}
if !filter.kinds.is_empty() {
let kinds: Vec<u32> = filter.kinds.iter().map(|kind| kind.as_u32()).collect();
query = query
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Kind)).is_in(kinds),
)
.to_owned();
}
if !filter.authors.is_empty() {
let authors = filter.authors.iter().map(|author| author.to_string());
query = query
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::Pubkey)).is_in(authors),
)
.to_owned();
}
if let Some(since) = filter.since {
query = query
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt))
.gte(since.as_u64()),
)
.to_owned();
}
if let Some(until) = filter.until {
query = query
.and_where(
sea_query::Expr::col((EventsTable::Table, EventsTable::CreatedAt))
.lte(until.as_u64()),
)
.to_owned();
}
if let Some(limit) = filter.limit {
query = query.limit(limit as u64).to_owned();
}
filter.generic_tags.iter().for_each(|(tag, values)| {
let values = values.iter().map(|val| val.to_string());
query = query
.left_join(
TagsTable::Table,
sea_query::Expr::col((TagsTable::Table, TagsTable::EventId))
.equals((EventsTable::Table, EventsTable::EventId)),
)
.and_where(
sea_query::Expr::col((TagsTable::Table, TagsTable::Tag)).eq(tag.to_string()),
)
.and_where(sea_query::Expr::col((TagsTable::Table, TagsTable::Value)).is_in(values))
.to_owned();
});
if let Some(search) = &filter.search {
query = query
.inner_join(
EventsFTSTable::Table,
sea_query::Expr::col((EventsTable::Table, EventsTable::EventId))
.equals((EventsFTSTable::Table, EventsFTSTable::EventId)),
)
.and_where(
sea_query::Expr::col((EventsFTSTable::Table, EventsFTSTable::Content))
.matches(search),
)
.to_owned();
}
query
}
fn get_filters_query(&self, subscription: Subscription) -> Option<sea_query::SelectStatement> {
subscription
.filters
.iter()
.map(|filter| {
Query::select()
.column((EventsTable::Table, EventsTable::EventId))
.column((EventsTable::Table, EventsTable::Content))
.columns([
EventsTable::Kind,
EventsTable::Pubkey,
EventsTable::CreatedAt,
EventsTable::Tags,
EventsTable::Sig,
])
.from_subquery(
self.get_filter_query(filter),
sea_query::Alias::new("events"),
)
.to_owned()
})
.reduce(|mut result, query| result.union(sea_query::UnionType::All, query).to_owned())
}
async fn delete_filters(&self, subscription: Subscription) -> Vec<EventRow> {
todo!()
}
async fn count_events_by_filters(&self, subscription: Subscription) -> i32 {
if subscription.filters.is_empty() {
return 0;
}
let (sql, values) = self
.get_filters_query(subscription)
.unwrap()
.clear_selects()
.expr_as(
sea_query::Func::count(sea_query::Expr::col((
EventsTable::Table,
EventsTable::EventId,
))),
sea_query::Alias::new("count"),
)
.build_sqlx(sea_query::SqliteQueryBuilder);
println!("count_filters SEA_QUERY built SQL: {}", sql.clone());
let counts = sqlx::query_as_with::<_, EventsCountRow, _>(&sql, values)
.fetch_one(&self.pool)
.await
.unwrap();
dbg!(counts);
1
}
}
#[async_trait]
impl Noose for SqliteDb {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
while let Ok(message) = subscriber.recv().await {
log::info!("[Noose] received message: {:?}", message);
let command = match message.content {
Command::DbReqWriteEvent(event) => match self.write_event(event).await {
Ok(status) => Command::DbResOkWithStatus(status),
Err(e) => Command::ServiceError(e),
},
Command::DbReqFindEvent(client_id, subscriptioin) => {
match self.find_event(subscriptioin).await {
Ok(events) => Command::DbResRelayMessage(client_id, events),
Err(e) => Command::ServiceError(e),
}
}
_ => Command::Noop,
};
if command != Command::Noop {
let channel = message.source;
let message = Message {
source: channels::MSG_NOOSE,
content: command,
};
log::debug!("[Noose] publishing new message: {:?}", message);
pubsub.publish(channel, message).await;
}
}
Ok(())
}
async fn migration_up(&self) {
SqliteDb::migrate(&self.pool).await;
}
async fn write_event(&self, event: Box<Event>) -> Result<String, Error> {
log::debug!("[Noose] write_event triggered");
let status = self.add_event(event).await.unwrap();
return Ok(status);
}
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error> {
log::debug!("making query from filters...");
let eose_message =
vec![nostr::RelayMessage::EndOfStoredEvents(subscription.id.clone()).as_json()];
if let Some(sql_statement) = self.get_filters_query(subscription.clone()) {
let (sql, values) = sql_statement.build_sqlx(sea_query::SqliteQueryBuilder);
log::info!("SEA_QUERY built SQL: {}", sql.clone());
match sqlx::query_as_with::<_, EventRow, _>(&sql, values)
.fetch_all(&self.pool)
.await
{
Ok(rows) => {
if rows.is_empty() {
return Ok(eose_message);
} else {
let relay_messages: Vec<String> = rows
.iter()
.map(|row| row.to_string(subscription.id.clone()))
.collect();
return Ok(relay_messages);
}
}
Err(e) => {
log::error!("{}", e);
return Err(Error::internal(e.into()));
}
}
}
return Ok(eose_message);
}
}
#[cfg(test)]
mod tests {
use super::Noose;
use super::SqliteDb;
use crate::utils::structs::Subscription;
use nostr::util::JsonUtil;
#[tokio::test]
async fn find_event() {
let db = SqliteDb::new().await;
let t = std::time::Instant::now();
let client_id = "test_id".to_string();
let cm = nostr::ClientMessage::from_json(
r#"["REQ","7b9bc4b6-701c-40b6-898f-4e7c6b5b1510",{"authors":["04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],"kinds":[0]}]"#,
).unwrap();
let (sub_id, filters) = match cm {
nostr::ClientMessage::Req {
subscription_id,
filters,
} => (subscription_id, filters),
_ => panic!("sneed :("),
};
let sub = Subscription::new(sub_id, filters);
db.find_event(sub).await.unwrap();
println!(
"Time passed: {}",
(std::time::Instant::now() - t).as_millis()
);
}
#[tokio::test]
async fn delete_events() {
let db = SqliteDb::new().await;
let t = std::time::Instant::now();
let client_id = "test_id".to_string();
let my_keys = nostr::Keys::generate();
let eid = nostr::EventId::all_zeros();
let tag_event = nostr::Tag::Event {
event_id: eid,
relay_url: None,
marker: None,
};
let tag_url = nostr::Tag::AbsoluteURL(nostr::types::UncheckedUrl::new(
"http://foo.net".to_string(),
));
let tag_hashtag = nostr::Tag::Hashtag("farm".to_string());
let event = nostr::EventBuilder::new_text_note(
"sneed feed and seed",
vec![tag_event, tag_url, tag_hashtag],
)
.to_event(&my_keys)
.unwrap();
dbg!(&event.as_json());
let resp = db.add_event(Box::new(event.clone())).await.unwrap();
dbg!(resp);
let delete_event = nostr::EventBuilder::delete(vec![event.id])
.to_event(&my_keys)
.unwrap();
dbg!(&delete_event);
let resp = db.add_event(Box::new(delete_event.clone())).await.unwrap();
dbg!(resp);
// let sub_id = nostr::SubscriptionId::new("test".to_string());
// let mut subscription = Subscription::new(sub_id, vec![]);
// if delete_event.kind == nostr::Kind::EventDeletion {
// delete_event
// .tags
// .iter()
// .filter(|tag| {
// matches!(
// tag,
// nostr::Tag::Event {
// event_id,
// relay_url,
// marker,
// }
// )
// })
// .for_each(|tag| {
// if let nostr::Tag::Event {
// event_id,
// relay_url,
// marker,
// } = tag
// {
// let filter = nostr::Filter::new();
// let filter = &filter.event(*event_id);
// subscription.filters.push(filter.clone());
// }
// });
// dbg!(&subscription);
// }
// let res = db.delete_filters(subscription).await;
// dbg!(res);
// let sub = Subscription::new(sub_id, filters);
// let num = db.delete_filters(sub).await.len();
// println!(
// "Time passed: {}",
// (std::time::Instant::now() - t).as_millis()
// );
// assert_eq!(num, 1);
}
#[tokio::test]
async fn count_events() {
let db = SqliteDb::new().await;
let t = std::time::Instant::now();
let client_id = "test_id".to_string();
let cm = nostr::ClientMessage::from_json(
r#"["COUNT","7b9bc4b6-701c-40b6-898f-4e7c6b5b1510",{"authors":["6be3c1446231fe6d117d72e29b60094bbb3eec029100c34f627dc4ebe8369a64"],"kinds":[1]}]"#,
).unwrap();
let (sub_id, filters) = match cm {
nostr::ClientMessage::Count {
subscription_id,
filters,
} => (subscription_id, filters),
_ => panic!("sneed :("),
};
let sub = Subscription::new(sub_id, filters);
let num = db.count_events_by_filters(sub).await;
println!(
"Time passed: {}",
(std::time::Instant::now() - t).as_millis()
);
assert_eq!(num, 1);
}
}

43
src/noose/user.rs Normal file
View file

@ -0,0 +1,43 @@
use chrono::Utc;
use regex::Regex;
use serde::{Deserialize, Serialize};
use validator::{Validate, ValidationError};
lazy_static! {
static ref VALID_CHARACTERS: Regex = Regex::new(r"^[a-zA-Z0-9\_]+$").unwrap();
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Validate)]
pub struct UserRow {
pub pubkey: String,
pub username: String,
inserted_at: i64,
admin: bool,
}
impl UserRow {
pub fn new(pubkey: String, username: String, admin: bool) -> Self {
Self {
pubkey,
username,
inserted_at: Utc::now().timestamp(),
admin,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Validate)]
pub struct User {
#[validate(custom = "validate_pubkey")]
pub pubkey: Option<String>,
#[validate(length(min = 1), regex = "VALID_CHARACTERS")]
pub name: Option<String>,
}
pub fn validate_pubkey(value: &str) -> Result<(), ValidationError> {
use nostr::prelude::FromPkStr;
match nostr::Keys::from_pk_str(value) {
Ok(_) => Ok(()),
Err(_) => Err(ValidationError::new("Unable to parse pubkey")),
}
}