Merge pull request 'Rework database' (#1) from ban-list into master
Reviewed-on: Klink/sneedstr#1
This commit is contained in:
commit
df15dba3cc
17 changed files with 1721 additions and 1549 deletions
760
Cargo.lock
generated
760
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
14
Cargo.toml
14
Cargo.toml
|
@ -17,15 +17,23 @@ futures-util = "0.3.28"
|
|||
rustls = "0.21"
|
||||
anyhow = "1.0"
|
||||
sled = "0.34.7"
|
||||
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls", "sqlite", "migrate", "macros"] }
|
||||
flexi_logger = { version = "0.27.3", features = [ "async", "compress" ] }
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4"
|
||||
nostr = "0.26.0"
|
||||
deadpool-sqlite = "0.7.0"
|
||||
rusqlite = { version = "0.30.0", features = [ "bundled", "vtab" ] }
|
||||
rusqlite_migration = "1.0.2"
|
||||
nostr = "0.27.0"
|
||||
nostr-database = "0.27.0"
|
||||
regex = "1.9.5"
|
||||
sailfish = "0.7.0"
|
||||
sea-query = { version = "0.30.4", features = ["backend-sqlite", "thread-safe"] }
|
||||
sea-query-binder = { version = "0.5.0", features = ["sqlx-sqlite"] }
|
||||
sea-query-rusqlite = { version="0", features = [
|
||||
"with-chrono",
|
||||
"with-json",
|
||||
"with-uuid",
|
||||
"with-time",
|
||||
] }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0.48"
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::{
|
||||
noose::user::{User, UserRow},
|
||||
noose::sled::BanInfo,
|
||||
utils::{error::Error, structs::Subscription},
|
||||
};
|
||||
use nostr::secp256k1::XOnlyPublicKey;
|
||||
|
@ -11,6 +12,7 @@ pub mod channels {
|
|||
pub static MSG_NIP05: &str = "MSG_NIP05";
|
||||
pub static MSG_RELAY: &str = "MSG_RELAY";
|
||||
pub static MSG_PIPELINE: &str = "MSG_PIPELINE";
|
||||
pub static MSG_SLED: &str = "MSG_SLED";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
|
@ -19,6 +21,7 @@ pub enum Command {
|
|||
DbReqWriteEvent(/* client_id */ uuid::Uuid, Box<nostr::Event>),
|
||||
DbReqFindEvent(/* client_id*/ uuid::Uuid, Subscription),
|
||||
DbReqDeleteEvents(/* client_id*/ uuid::Uuid, Box<nostr::Event>),
|
||||
DbReqEventCounts(/* client_id*/ uuid::Uuid, Subscription),
|
||||
|
||||
// Old messages
|
||||
DbReqInsertUser(UserRow),
|
||||
|
@ -27,15 +30,16 @@ pub enum Command {
|
|||
DbReqGetAccount(String),
|
||||
DbReqClear,
|
||||
// DbResponse
|
||||
DbResRelayMessage(
|
||||
DbResRelayMessages(
|
||||
/* client_id*/ uuid::Uuid,
|
||||
/* Vec<RelayMessage::Event> */ Vec<String>,
|
||||
/* Vec<RelayMessage::Event> */ Vec<nostr::RelayMessage>,
|
||||
),
|
||||
DbResInfo,
|
||||
DbResOk,
|
||||
DbResOkWithStatus(/* client_id */ uuid::Uuid, nostr::RelayMessage),
|
||||
DbResAccount, // TODO: Add Account DTO as a param
|
||||
DbResUser(UserRow),
|
||||
DbResEventCounts(/* client_id */ uuid::Uuid, nostr::RelayMessage),
|
||||
// Event Pipeline
|
||||
PipelineReqEvent(/* client_id */ uuid::Uuid, Box<nostr::Event>),
|
||||
PipelineResRelayMessageOk(/* client_id */ uuid::Uuid, nostr::RelayMessage),
|
||||
|
@ -43,6 +47,14 @@ pub enum Command {
|
|||
PipelineResOk,
|
||||
// Subscription Errors
|
||||
ClientSubscriptionError(/* error message */ String),
|
||||
// Sled
|
||||
SledReqBanUser(Box<BanInfo>),
|
||||
SledReqBanInfo(/* pubkey */ String),
|
||||
SledReqUnbanUser(/* pubkey */ String),
|
||||
SledReqGetBans,
|
||||
SledResBan(Option<BanInfo>),
|
||||
SledResBans(Vec<BanInfo>),
|
||||
SledResSuccess(bool),
|
||||
// Other
|
||||
Str(String),
|
||||
ServiceError(Error),
|
||||
|
|
|
@ -9,11 +9,9 @@ use std::sync::Arc;
|
|||
pub trait Noose: Send + Sync {
|
||||
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error>;
|
||||
|
||||
async fn migration_up(&self);
|
||||
|
||||
async fn write_event(&self, event: Box<Event>) -> Result<RelayMessage, Error>;
|
||||
|
||||
async fn delete_events(&self, event_ids: Box<Event>) -> Result<RelayMessage, Error>;
|
||||
async fn find_event(&self, subscription: Subscription) -> Result<Vec<RelayMessage>, Error>;
|
||||
|
||||
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error>;
|
||||
async fn counts(&self, subscription: Subscription) -> Result<RelayMessage, Error>;
|
||||
}
|
||||
|
|
|
@ -21,3 +21,10 @@ CREATE TABLE tags (
|
|||
CREATE INDEX idx_tags_tag ON tags (tag);
|
||||
CREATE INDEX idx_tags_value ON tags (value);
|
||||
CREATE INDEX idx_tags_event_id ON tags (event_id);
|
||||
|
||||
CREATE TABLE deleted_coordinates (
|
||||
coordinate TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_coordinates_coordinate ON coordinates (coordinate);
|
||||
|
|
|
@ -1,2 +1,6 @@
|
|||
PRAGMA encoding = "UTF-8";
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA auto_vacuum = FULL;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
PRAGMA mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
CREATE TABLE IF NOT EXISTS event_seen_by_relays (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
event_id TEXT NOT NULL,
|
||||
relay_url TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_seen_by_relays_index ON event_seen_by_relays(event_id,relay_url);
|
50
src/noose/migrations/mod.rs
Normal file
50
src/noose/migrations/mod.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
use rusqlite::Connection;
|
||||
use rusqlite_migration::{Migrations, M};
|
||||
|
||||
pub struct MigrationRunner {}
|
||||
|
||||
impl MigrationRunner {
|
||||
pub fn up(connection: &mut Connection) -> bool {
|
||||
let m_create_events = include_str!("./1697409647688_create_events.sql");
|
||||
let m_event_seen_by_relays = include_str!("1706115586021_event_seen_by_relays.sql");
|
||||
let m_add_realys = include_str!("./1697410161900_add_relays.sql");
|
||||
let m_events_fts = include_str!("./1697410223576_events_fts.sql");
|
||||
let m_users = include_str!("./1697410294265_users.sql");
|
||||
let m_unattached_media = include_str!("./1697410480767_unattached_media.sql");
|
||||
let m_pragma = include_str!("./1697410424624_pragma.sql");
|
||||
|
||||
let migrations = Migrations::new(vec![
|
||||
M::up(m_create_events),
|
||||
M::up(m_event_seen_by_relays),
|
||||
M::up(m_add_realys),
|
||||
M::up(m_events_fts),
|
||||
M::up(m_users),
|
||||
M::up(m_unattached_media),
|
||||
M::up(m_pragma),
|
||||
]);
|
||||
|
||||
match migrations.to_latest(connection) {
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
log::error!("Migrations failed: {}", err.to_string());
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rusqlite::Connection;
|
||||
|
||||
use super::MigrationRunner;
|
||||
|
||||
#[test]
|
||||
fn get_sql_path() {
|
||||
let mut connection = Connection::open_in_memory().unwrap();
|
||||
|
||||
let runner = MigrationRunner::up(&mut connection);
|
||||
|
||||
assert!(runner);
|
||||
}
|
||||
}
|
|
@ -1,14 +1,13 @@
|
|||
use crate::utils::structs::Context;
|
||||
use tokio::runtime;
|
||||
|
||||
use db::Noose;
|
||||
use pipeline::Pipeline;
|
||||
|
||||
use tokio::runtime;
|
||||
pub mod db;
|
||||
pub mod pipeline;
|
||||
// mod sled;
|
||||
pub mod sled;
|
||||
mod sqlite;
|
||||
pub mod user;
|
||||
mod migrations;
|
||||
|
||||
pub fn start(context: Context) {
|
||||
let rt = runtime::Runtime::new().unwrap();
|
||||
|
@ -16,19 +15,27 @@ pub fn start(context: Context) {
|
|||
rt.block_on(async move {
|
||||
let pipeline_pubsub = context.pubsub.clone();
|
||||
let pipeline_config = context.config.clone();
|
||||
|
||||
let db_config = context.config.clone();
|
||||
let db_pubsub = context.pubsub.clone();
|
||||
let sled_pubsub = context.pubsub.clone();
|
||||
|
||||
let pipeline_handle = tokio::task::spawn(async move {
|
||||
let mut pipeline = Pipeline::new(pipeline_pubsub, pipeline_config);
|
||||
pipeline.start().await.unwrap();
|
||||
});
|
||||
|
||||
let sqlite_writer_handle = tokio::task::spawn(async move {
|
||||
let mut db_writer = sqlite::SqliteDb::new().await;
|
||||
let sled_handle = tokio::task::spawn(async move {
|
||||
let mut sled_writer = sled::SledDb::new();
|
||||
sled_writer.start(sled_pubsub).await.unwrap();
|
||||
});
|
||||
|
||||
let nostr_sqlite_writer_handle = tokio::task::spawn(async move {
|
||||
let mut db_writer = sqlite::NostrSqlite::new(db_config).await;
|
||||
db_writer.start(db_pubsub).await.unwrap();
|
||||
});
|
||||
|
||||
sqlite_writer_handle.await.unwrap();
|
||||
nostr_sqlite_writer_handle.await.unwrap();
|
||||
pipeline_handle.await.unwrap();
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,234 +1,138 @@
|
|||
use super::db::Noose;
|
||||
use std::sync::Arc;
|
||||
use crate::bussy::{channels, Command, Message, PubSub};
|
||||
use crate::utils::error::Error;
|
||||
use crate::utils::structs::Subscription;
|
||||
use async_trait::async_trait;
|
||||
use nostr::Event;
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::user::{User, UserRow};
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct BanInfo {
|
||||
pub pubkey: String,
|
||||
pub reason: String,
|
||||
}
|
||||
|
||||
// Db Interface
|
||||
pub struct SledDb {
|
||||
db: sled::Db,
|
||||
events: sled::Tree,
|
||||
nip05s: sled::Tree,
|
||||
pub users: sled::Tree,
|
||||
|
||||
index: sled::Db,
|
||||
banned_pubkeys: sled::Tree,
|
||||
}
|
||||
|
||||
impl SledDb {
|
||||
pub fn new() -> Self {
|
||||
let db = sled::open("/tmp/sled_db").unwrap();
|
||||
let events = db.open_tree("events").unwrap();
|
||||
let nip05s = db.open_tree("identifiers").unwrap();
|
||||
let accounts = db.open_tree("accounts").unwrap();
|
||||
|
||||
let index = sled::open("/tmp/sled_index").unwrap();
|
||||
|
||||
let banned_pubkeys = db.open_tree("banned_pubkeys").unwrap();
|
||||
Self {
|
||||
db,
|
||||
events,
|
||||
nip05s,
|
||||
users: accounts,
|
||||
index,
|
||||
banned_pubkeys
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
|
||||
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
|
||||
|
||||
while let Ok(message) = subscriber.recv().await {
|
||||
log::info!("[Noose] received message: {:?}", message);
|
||||
let command = match message.content {
|
||||
Command::SledReqBanUser(ban_info) => match self.ban_user(ban_info).await {
|
||||
Ok(status) => Command::SledResSuccess(status),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::SledReqUnbanUser(pubkey) => match self.unban_user(&pubkey).await {
|
||||
Ok(status) => Command::SledResSuccess(status),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::SledReqGetBans => match self.get_bans().await {
|
||||
Ok(bans) => Command::SledResBans(bans),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::SledReqBanInfo(pubkey) => match self.get_ban_by_pubkey(&pubkey).await {
|
||||
Ok(ban_info) => Command::SledResBan(ban_info),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
_ => Command::Noop,
|
||||
};
|
||||
if command != Command::Noop {
|
||||
let channel = message.source;
|
||||
let message = Message {
|
||||
source: channels::MSG_SLED,
|
||||
content: command,
|
||||
};
|
||||
|
||||
log::info!(
|
||||
"[Sled] publishing new message: {:?} to channel {}",
|
||||
message,
|
||||
channel
|
||||
);
|
||||
|
||||
pubsub.publish(channel, message).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clear_db(&self) -> Result<(), sled::Error> {
|
||||
self.db.clear()
|
||||
}
|
||||
|
||||
fn clear_index(&self) -> Result<(), sled::Error> {
|
||||
self.index.clear()
|
||||
}
|
||||
|
||||
async fn insert_user(&self, user: UserRow) -> Result<(), Error> {
|
||||
let pubkey = user.pubkey.clone();
|
||||
let username = user.username.clone();
|
||||
|
||||
if let Ok(Some(_)) = self.nip05s.get(&username) {
|
||||
return Err(Error::internal_with_message("User already exists"));
|
||||
async fn ban_user(&self, ban_info: Box<BanInfo>) -> Result<bool, Error> {
|
||||
if let Ok(Some(_)) = self.banned_pubkeys.insert(ban_info.pubkey, ban_info.reason.as_bytes()) {
|
||||
return Ok(true)
|
||||
}
|
||||
|
||||
let mut user_buff = flexbuffers::FlexbufferSerializer::new();
|
||||
user.serialize(&mut user_buff).unwrap();
|
||||
|
||||
self.nip05s.insert(&username, user_buff.view()).unwrap();
|
||||
|
||||
let prefix = "nip05:";
|
||||
let key = format!("{}{}", prefix, pubkey);
|
||||
self.index.insert(key, username.as_bytes()).unwrap();
|
||||
|
||||
Ok(())
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn get_user(&self, user: User) -> Result<UserRow, Error> {
|
||||
let mut user_row = None;
|
||||
if let Some(username) = user.name {
|
||||
if let Ok(Some(buff)) = self.nip05s.get(username) {
|
||||
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
|
||||
user_row = Some(b);
|
||||
fn is_banned(&self, pubkey: &String) -> bool{
|
||||
if let Ok(Some(banned)) = self.banned_pubkeys.get(pubkey) {
|
||||
return true
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn unban_user(&self, pubkey: &String) -> Result<bool, Error> {
|
||||
if self.is_banned(pubkey) {
|
||||
self.banned_pubkeys.remove(pubkey).unwrap();
|
||||
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn get_bans(&self) -> Result<Vec<BanInfo>, Error> {
|
||||
let bans: Vec<BanInfo> = self.banned_pubkeys.iter().filter_map(|row| {
|
||||
if let Ok((k, v)) = row {
|
||||
let ban_info = BanInfo {
|
||||
pubkey: String::from_utf8(k.to_vec()).unwrap(),
|
||||
reason: String::from_utf8(v.to_vec()).unwrap(),
|
||||
};
|
||||
|
||||
Some(ban_info)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else if let Some(pubkey) = user.pubkey {
|
||||
let prefix = "nip05:";
|
||||
let reference = format!("{}{}", prefix, pubkey);
|
||||
if let Ok(Some(row)) = self.index.get(reference) {
|
||||
let key = String::from_utf8(row.to_vec()).unwrap();
|
||||
}).collect();
|
||||
|
||||
if let Ok(Some(buff)) = self.nip05s.get(key) {
|
||||
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
|
||||
Ok(bans)
|
||||
}
|
||||
|
||||
user_row = Some(b);
|
||||
}
|
||||
async fn get_ban_by_pubkey(&self, pubkey: &String) -> Result<Option<BanInfo>, Error> {
|
||||
if self.is_banned(pubkey) {
|
||||
if let Ok(Some(reason)) = self.banned_pubkeys.get(pubkey) {
|
||||
let ban_info = BanInfo {
|
||||
pubkey: pubkey.to_owned(),
|
||||
reason: String::from_utf8(reason.to_vec()).unwrap()
|
||||
};
|
||||
|
||||
return Ok(Some(ban_info));
|
||||
}
|
||||
}
|
||||
match user_row {
|
||||
Some(user) => Ok(user),
|
||||
None => Err(Error::internal_with_message("User not found")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Noose for SledDb {
|
||||
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
|
||||
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
|
||||
|
||||
while let Ok(message) = subscriber.recv().await {
|
||||
log::info!("noose subscriber received: {:?}", message);
|
||||
let command = match message.content {
|
||||
Command::DbReqInsertUser(user) => match self.insert_user(user).await {
|
||||
Ok(_) => Command::DbResOk,
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::DbReqGetUser(user) => match self.get_user(user).await {
|
||||
Ok(user) => Command::DbResUser(user),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::DbReqWriteEvent(event) => match self.write_event(event).await {
|
||||
Ok(_) => Command::DbResOk,
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
_ => Command::Noop,
|
||||
};
|
||||
if command != Command::Noop {
|
||||
log::info!("Publishing new message");
|
||||
let channel = message.source;
|
||||
|
||||
pubsub
|
||||
.publish(
|
||||
channel,
|
||||
Message {
|
||||
source: channels::MSG_NOOSE,
|
||||
content: command,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migration_up(&self) {}
|
||||
|
||||
async fn write_event(&self, event: Box<Event>) -> Result<String, Error> {
|
||||
let mut event_buff = flexbuffers::FlexbufferSerializer::new();
|
||||
event.serialize(&mut event_buff).unwrap();
|
||||
|
||||
self.events.insert(event.id, event_buff.view()).unwrap();
|
||||
{
|
||||
// Timestamp
|
||||
let key = format!("created_at:{}|#e:{}", event.created_at, event.id);
|
||||
self.index.insert(key, event.id.as_bytes()).unwrap();
|
||||
}
|
||||
|
||||
{
|
||||
// Author, pubkeys #p
|
||||
let key = format!("#author:{}|#e:{}", event.pubkey, event.id);
|
||||
self.index.insert(key, event.id.as_bytes()).unwrap();
|
||||
// self.index.scan_prefix(
|
||||
}
|
||||
|
||||
{
|
||||
// Kinds
|
||||
let key = format!("#k:{}|#e:{}", event.kind, event.id);
|
||||
self.index.insert(key, event.id.as_bytes()).unwrap();
|
||||
// self.index.scan_prefix(
|
||||
}
|
||||
|
||||
{
|
||||
// Tags
|
||||
event.tags.iter().for_each(|tag| {
|
||||
if let Some(key) = match tag {
|
||||
// #e tag
|
||||
nostr::Tag::Event(event_id, _, _) => Some(format!("#e:{}", event_id)),
|
||||
// #p tag
|
||||
nostr::Tag::PubKey(pubkey, _) => Some(format!("#p:{}|#e:{}", pubkey, event.id)),
|
||||
// #t tag
|
||||
nostr::Tag::Hashtag(hashtag) => Some(format!("#t:{}|#e:{}", hashtag, event.id)),
|
||||
// #a tag
|
||||
nostr::Tag::A {
|
||||
kind,
|
||||
public_key,
|
||||
identifier,
|
||||
relay_url,
|
||||
} => Some(format!(
|
||||
"#a:kind:{}|#a:pubkey:{}#a:identifier:{}|#e:{}",
|
||||
kind, public_key, identifier, event.id
|
||||
)),
|
||||
_ => None,
|
||||
} {
|
||||
self.index.insert(key, event.id.as_bytes()).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
// let key = format!("#t:{}|#e:{}", event.kind, event.id);
|
||||
// self.index.insert(key, event.id.as_bytes()).unwrap();
|
||||
// self.index.scan_prefix(
|
||||
}
|
||||
|
||||
let message = format!("[\"OK\", \"{}\", true, \"\"]", event.id.to_string());
|
||||
Ok(message)
|
||||
}
|
||||
|
||||
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error> {
|
||||
todo!()
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::SledDb;
|
||||
use crate::{
|
||||
bussy::PubSub,
|
||||
noose::user::{User, UserRow},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_db_names() {
|
||||
let pubsub = Arc::new(PubSub::new());
|
||||
|
||||
let db = SledDb::new();
|
||||
|
||||
let pk = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
|
||||
let username = "klink".to_string();
|
||||
let user = UserRow::new(pk, username, false);
|
||||
let result = db.insert_user(user).await;
|
||||
|
||||
let pubkey = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
|
||||
let username = "klink".to_string();
|
||||
let user = User {
|
||||
name: None,
|
||||
pubkey: Some(pubkey),
|
||||
};
|
||||
let user = db.get_user(user).await;
|
||||
|
||||
db.clear_db().unwrap();
|
||||
db.clear_index().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
1765
src/noose/sqlite.rs
1765
src/noose/sqlite.rs
File diff suppressed because it is too large
Load diff
|
@ -67,17 +67,26 @@ pub async fn client_connection(
|
|||
}
|
||||
// }
|
||||
}
|
||||
crate::bussy::Command::DbResRelayMessage(client_id, events) => {
|
||||
crate::bussy::Command::DbResRelayMessages(client_id, relay_messages) => {
|
||||
if client.client_id == client_id {
|
||||
if let Some(sender) = &client.client_connection {
|
||||
if !sender.is_closed() {
|
||||
for event in events {
|
||||
sender.send(Ok(Message::text(event))).unwrap();
|
||||
for message in relay_messages {
|
||||
sender.send(Ok(Message::text(message.as_json()))).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
crate::bussy::Command::DbResEventCounts(client_id, relay_message) => {
|
||||
if client.client_id == client_id {
|
||||
if let Some(sender) = &client.client_connection {
|
||||
if !sender.is_closed() {
|
||||
sender.send(Ok(Message::text(relay_message.as_json()))).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
crate::bussy::Command::DbResOkWithStatus(client_id, status) => {
|
||||
if client.client_id == client_id {
|
||||
if let Some(sender) = &client.client_connection {
|
||||
|
@ -177,7 +186,7 @@ async fn socket_on_message(context: &Context, client: &mut Client, msg: Message)
|
|||
Err(e) => {
|
||||
log::error!("error while parsing client message request: {}", e);
|
||||
|
||||
let response = nostr::RelayMessage::new_notice("Invalid message");
|
||||
let response = nostr::RelayMessage::notice("Invalid message");
|
||||
let message = Message::text(response.as_json());
|
||||
send(client, message);
|
||||
|
||||
|
@ -212,7 +221,7 @@ async fn handle_msg(context: &Context, client: &mut Client, client_message: Clie
|
|||
ClientMessage::Count {
|
||||
subscription_id,
|
||||
filters,
|
||||
} => handle_count(client, subscription_id, filters).await,
|
||||
} => handle_count(context, client, subscription_id, filters).await,
|
||||
ClientMessage::Close(subscription_id) => handle_close(client, subscription_id).await,
|
||||
ClientMessage::Auth(event) => handle_auth(client, event).await,
|
||||
_ => (),
|
||||
|
@ -224,7 +233,7 @@ async fn handle_event(context: &Context, client: &Client, event: Box<Event>) {
|
|||
|
||||
if let Err(err) = event.verify() {
|
||||
let relay_message =
|
||||
nostr::RelayMessage::new_ok(event.id, false, "Failed to verify event signature");
|
||||
nostr::RelayMessage::ok(event.id, false, "Failed to verify event signature");
|
||||
let message = crate::bussy::Message {
|
||||
source: channels::MSG_RELAY,
|
||||
content: crate::bussy::Command::PipelineResRelayMessageOk(
|
||||
|
@ -253,7 +262,7 @@ async fn handle_req(
|
|||
client: &mut Client,
|
||||
subscription_id: SubscriptionId,
|
||||
filters: Vec<Filter>,
|
||||
) {
|
||||
) {
|
||||
let subscription = Subscription::new(subscription_id.clone(), filters);
|
||||
let needs_historical_events = subscription.needs_historical_events();
|
||||
|
||||
|
@ -263,6 +272,7 @@ async fn handle_req(
|
|||
client.ip(),
|
||||
&subscription_error.message
|
||||
);
|
||||
|
||||
let message = format!(
|
||||
"[\"CLOSED\", \"{}\", \"{}\"]",
|
||||
subscription_id, subscription_error.message
|
||||
|
@ -282,6 +292,7 @@ async fn handle_req(
|
|||
return;
|
||||
};
|
||||
|
||||
log::info!("[SUBSCRIPTION] needs historical events");
|
||||
if needs_historical_events {
|
||||
context
|
||||
.pubsub
|
||||
|
@ -296,12 +307,24 @@ async fn handle_req(
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_count(client: &Client, subscription_id: SubscriptionId, filters: Vec<Filter>) {
|
||||
// context.pubsub.send(new nostr event) then handle possible errors
|
||||
async fn handle_count(
|
||||
context: &Context,
|
||||
client: &Client,
|
||||
subscription_id: SubscriptionId,
|
||||
filters: Vec<Filter>,
|
||||
) {
|
||||
let subscription = Subscription::new(subscription_id, filters);
|
||||
|
||||
let message = Message::text("COUNT not implemented");
|
||||
send(client, message);
|
||||
context
|
||||
.pubsub
|
||||
.publish(
|
||||
channels::MSG_NOOSE,
|
||||
crate::bussy::Message {
|
||||
source: channels::MSG_RELAY,
|
||||
content: crate::bussy::Command::DbReqEventCounts(client.client_id, subscription),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_close(client: &mut Client, subscription_id: SubscriptionId) {
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use nostr::{key::FromPkStr, secp256k1::XOnlyPublicKey};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
admin_pubkey: XOnlyPublicKey,
|
||||
db_path: PathBuf,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
|
@ -13,32 +16,36 @@ impl Default for Config {
|
|||
|
||||
impl Config {
|
||||
pub fn new() -> Self {
|
||||
if let Ok(env_admin_pk) = std::env::var("ADMIN_PUBKEY") {
|
||||
match nostr::Keys::from_pk_str(&env_admin_pk) {
|
||||
Ok(admin_keys) => {
|
||||
return Self {
|
||||
admin_pubkey: admin_keys.public_key(),
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Unable to parse ADMIN_PUBKEY: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let admin_pubkey = std::env::var("ADMIN_PUBKEY")
|
||||
.map(|env_pk| nostr::Keys::from_pk_str(&env_pk))
|
||||
.and_then(|result| result.map_err(|err| panic!("{}", err)))
|
||||
.unwrap()
|
||||
.public_key();
|
||||
|
||||
panic!("Environment variable ADMIN_PUBKEY not defined");
|
||||
let db_path = std::env::var("DATABASE_URL")
|
||||
.map(PathBuf::from)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
admin_pubkey,
|
||||
db_path,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_admin_pubkey(&self) -> &XOnlyPublicKey {
|
||||
&self.admin_pubkey
|
||||
}
|
||||
|
||||
pub fn get_db_path(&self) -> PathBuf {
|
||||
self.db_path.clone()
|
||||
}
|
||||
|
||||
pub fn get_relay_config_json(&self) -> serde_json::Value {
|
||||
serde_json::json!({
|
||||
"contact": "klink@zhitno.st",
|
||||
"name": "zhitno.st",
|
||||
"description": "Very *special* nostr relay",
|
||||
"supported_nips": [ 1, 9, 11, 12, 15, 16, 20, 22, 28, 33 ],
|
||||
"supported_nips": [ 1, 9, 11, 12, 15, 16, 20, 22, 28, 33, 45 ],
|
||||
"software": "git+https://git.zhitno.st/Klink/sneedstr.git",
|
||||
"version": "0.1.0"
|
||||
})
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::{
|
|||
convert::From,
|
||||
fmt::{self, Display},
|
||||
};
|
||||
use std::error::Error as StdError;
|
||||
use validator::ValidationErrors;
|
||||
use warp::{http::StatusCode, reject::Reject};
|
||||
|
||||
|
@ -16,6 +17,16 @@ pub struct Error {
|
|||
pub sneedstr_version: Option<u16>,
|
||||
}
|
||||
|
||||
impl StdError for Error {
|
||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||
None
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
self.source()
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn new(code: StatusCode, message: String) -> Self {
|
||||
Self {
|
||||
|
@ -74,12 +85,12 @@ impl Error {
|
|||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}: {}", self.status_code(), &self.message)?;
|
||||
if let Some(val) = &self.sneedstr_version {
|
||||
write!(f, "\ndiem ledger version: {}", val)?;
|
||||
}
|
||||
Ok(())
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Error {{ code: {}, message: '{}', sneedstr_version: {:?} }}",
|
||||
self.code, self.message, self.sneedstr_version
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod filter;
|
||||
// mod nostr_filter_helpers;
|
||||
mod nostr_filter_helpers;
|
||||
pub mod config;
|
||||
pub mod rejection_handler;
|
||||
pub mod response;
|
||||
|
|
|
@ -1,27 +1,27 @@
|
|||
use nostr::{Event, Filter, Kind, Tag};
|
||||
use nostr::{key::FromPkStr, Event, Filter, Kind};
|
||||
|
||||
fn ids_match(filter: &Filter, event: &Event) -> bool {
|
||||
if filter.ids.is_empty() {
|
||||
println!("[FILTER][IDS] skipped");
|
||||
log::info!("[FILTER][ids_match] skipped");
|
||||
return true;
|
||||
}
|
||||
|
||||
println!(
|
||||
"[FILTER][IDS] matched: {:?}",
|
||||
filter.ids.iter().any(|id| id == &event.id.to_string())
|
||||
log::info!(
|
||||
"[FILTER][ids_match] matched: {:?}",
|
||||
filter.ids.iter().any(|id| id == &event.id)
|
||||
);
|
||||
|
||||
filter.ids.iter().any(|id| id == &event.id.to_string())
|
||||
filter.ids.iter().any(|id| id == &event.id)
|
||||
}
|
||||
|
||||
fn kind_match(filter: &Filter, kind: Kind) -> bool {
|
||||
if filter.kinds.is_empty() {
|
||||
println!("[FILTER][KINDS] skipped");
|
||||
log::debug!("[FILTER][kind_match] skipped");
|
||||
return true;
|
||||
}
|
||||
|
||||
println!(
|
||||
"[FILTER][KIND] matched: {:?}",
|
||||
log::debug!(
|
||||
"[FILTER][kind_match] matched: {:?}",
|
||||
filter.kinds.iter().any(|k| k == &kind)
|
||||
);
|
||||
|
||||
|
@ -29,122 +29,142 @@ fn kind_match(filter: &Filter, kind: Kind) -> bool {
|
|||
}
|
||||
|
||||
fn pubkeys_match(filter: &Filter, event: &Event) -> bool {
|
||||
if filter.pubkeys.is_empty() {
|
||||
println!("[FILTER][PUBKEYS] skipped");
|
||||
return true;
|
||||
log::debug!(
|
||||
"[FILTER][pubkeys_match] matched: {:?}",
|
||||
if let Some((p_tag, p_set)) = filter.generic_tags.get_key_value(&nostr::Alphabet::P) {
|
||||
if p_set.is_empty() {
|
||||
log::debug!("[FILTER][PUBKEYS] skipped");
|
||||
return true;
|
||||
}
|
||||
|
||||
return p_set.iter().any(|pk| match pk {
|
||||
nostr::GenericTagValue::Pubkey(pk) => pk == &event.pubkey,
|
||||
_ => false,
|
||||
});
|
||||
}
|
||||
);
|
||||
if let Some((p_tag, p_set)) = filter.generic_tags.get_key_value(&nostr::Alphabet::P) {
|
||||
if p_set.is_empty() {
|
||||
log::debug!("[FILTER][PUBKEYS] skipped");
|
||||
return true;
|
||||
}
|
||||
|
||||
return p_set.iter().any(|pk| match pk {
|
||||
nostr::GenericTagValue::Pubkey(pk) => pk == &event.pubkey,
|
||||
_ => false,
|
||||
});
|
||||
}
|
||||
|
||||
println!(
|
||||
"[FILTER][PUBKEYS] matched: {:?}",
|
||||
filter.pubkeys.iter().any(|pk| pk == &event.pubkey)
|
||||
);
|
||||
filter.pubkeys.iter().any(|pk| pk == &event.pubkey)
|
||||
false
|
||||
}
|
||||
|
||||
fn authors_match(filter: &Filter, event: &Event) -> bool {
|
||||
dbg!(filter);
|
||||
if filter.authors.is_empty() {
|
||||
println!("[FILTER][AUTHORS] skipped");
|
||||
log::debug!("[FILTER][authors_match] skipped");
|
||||
return true;
|
||||
}
|
||||
|
||||
println!(
|
||||
"[FILTER][AUTHORS] matched: {:?}",
|
||||
filter
|
||||
.authors
|
||||
.iter()
|
||||
.any(|author| author == &event.pubkey.to_string())
|
||||
log::debug!(
|
||||
"[FILTER][authors_match] matched: {:?}",
|
||||
filter.authors.iter().any(|author| author == &event.pubkey)
|
||||
);
|
||||
filter
|
||||
.authors
|
||||
.iter()
|
||||
.any(|author| author == &event.pubkey.to_string())
|
||||
filter.authors.iter().any(|author| author == &event.pubkey)
|
||||
}
|
||||
|
||||
fn delegated_authors_match(filter: &Filter, event: &Event) -> bool {
|
||||
// Optional implementation
|
||||
|
||||
// let delegated_authors_match = filter.authors.iter().any(|author| {
|
||||
// event.tags.iter().any(|tag| match tag {
|
||||
// Tag::Delegation {
|
||||
// delegator_pk,
|
||||
// conditions,
|
||||
// sig,
|
||||
// } => filter
|
||||
// .authors
|
||||
// .iter()
|
||||
// .any(|author| author == &delegator_pk.to_string()),
|
||||
// _ => false,
|
||||
// })
|
||||
// });
|
||||
println!(
|
||||
"[FILTER][DELEGATED_AUTHORS] matched: {:?}",
|
||||
event.tags.iter().any(|tag| match tag {
|
||||
Tag::Delegation {
|
||||
delegator_pk,
|
||||
conditions,
|
||||
sig,
|
||||
} => filter
|
||||
.authors
|
||||
.iter()
|
||||
.any(|author| author == &delegator_pk.to_string()),
|
||||
_ => false,
|
||||
log::debug!(
|
||||
"[FILTER][delegated_authors_match] matched: {:?}",
|
||||
event.tags.iter().any(|tag| {
|
||||
if tag.kind() == nostr::TagKind::Delegation {
|
||||
let tag = tag.as_vec();
|
||||
if let Ok(event_pubkey) = nostr::Keys::from_pk_str(&tag[1]) {
|
||||
let pk = event_pubkey.public_key();
|
||||
return filter.authors.iter().any(|author| author == &pk);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
false
|
||||
})
|
||||
);
|
||||
|
||||
event.tags.iter().any(|tag| match tag {
|
||||
Tag::Delegation {
|
||||
delegator_pk,
|
||||
conditions,
|
||||
sig,
|
||||
} => filter
|
||||
.authors
|
||||
.iter()
|
||||
.any(|author| author == &delegator_pk.to_string()),
|
||||
_ => true,
|
||||
event.tags.iter().any(|tag| {
|
||||
if tag.kind() == nostr::TagKind::Delegation {
|
||||
let tag = tag.as_vec();
|
||||
if let Ok(event_pubkey) = nostr::Keys::from_pk_str(&tag[1]) {
|
||||
let pk = event_pubkey.public_key();
|
||||
return filter.authors.iter().any(|author| author == &pk);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
false
|
||||
})
|
||||
}
|
||||
|
||||
fn tag_match(filter: &Filter, event: &Event) -> bool {
|
||||
println!(
|
||||
"[FILTER][TAG] matched: {:?}",
|
||||
filter.generic_tags.iter().any(|(key, value)| {
|
||||
event.tags.iter().any(|tag| {
|
||||
let kv = tag.as_vec();
|
||||
key.to_string() == kv[0] && value.iter().any(|vv| vv == &kv[1])
|
||||
if filter.generic_tags.is_empty() && event.tags.is_empty() {
|
||||
return true;
|
||||
}
|
||||
log::debug!(
|
||||
"[FILTER][tag_match] matched: {:?}",
|
||||
filter
|
||||
.generic_tags
|
||||
.iter()
|
||||
.any(|(filter_tag_key, filter_tag_value)| {
|
||||
event.tags.iter().any(|event_tag| {
|
||||
let event_tag = event_tag.as_vec();
|
||||
let event_tag_key = event_tag[0].clone();
|
||||
let event_tag_value = event_tag[1].clone();
|
||||
|
||||
if filter_tag_key.to_string() == event_tag_key {
|
||||
return filter_tag_value
|
||||
.iter()
|
||||
.any(|f_tag_val| f_tag_val.to_string() == event_tag_value);
|
||||
};
|
||||
|
||||
false
|
||||
})
|
||||
})
|
||||
);
|
||||
|
||||
filter
|
||||
.generic_tags
|
||||
.iter()
|
||||
.any(|(filter_tag_key, filter_tag_value)| {
|
||||
event.tags.iter().any(|event_tag| {
|
||||
let event_tag = event_tag.as_vec();
|
||||
let event_tag_key = event_tag[0].clone();
|
||||
let event_tag_value = event_tag[1].clone();
|
||||
|
||||
if filter_tag_key.to_string() == event_tag_key {
|
||||
return filter_tag_value
|
||||
.iter()
|
||||
.any(|f_tag_val| f_tag_val.to_string() == event_tag_value);
|
||||
};
|
||||
|
||||
false
|
||||
})
|
||||
})
|
||||
);
|
||||
|
||||
filter.generic_tags.iter().any(|(key, value)| {
|
||||
event.tags.iter().any(|tag| {
|
||||
let kv = tag.as_vec();
|
||||
key.to_string() == kv[0] && value.iter().any(|vv| vv == &kv[1])
|
||||
})
|
||||
});
|
||||
|
||||
true // TODO: Fix delegated authors check
|
||||
}
|
||||
|
||||
pub fn interested_in_event(filter: &Filter, event: &Event) -> bool {
|
||||
ids_match(filter, event)
|
||||
&& filter.since.map_or(
|
||||
{
|
||||
println!("[FILTER][SINCE][default] matched: {:?}", true);
|
||||
log::info!("[FILTER][SINCE][default] matched: {:?}", true);
|
||||
true
|
||||
},
|
||||
|t| {
|
||||
println!("[FILTER][SINCE] matched: {:?}", event.created_at >= t);
|
||||
log::info!("[FILTER][SINCE] matched: {:?}", event.created_at >= t);
|
||||
event.created_at >= t
|
||||
},
|
||||
)
|
||||
&& filter.until.map_or(
|
||||
{
|
||||
println!("[FILTER][UNTIL][default] matched: {:?}", true);
|
||||
log::info!("[FILTER][UNTIL][default] matched: {:?}", true);
|
||||
true
|
||||
},
|
||||
|t| {
|
||||
println!("[FILTER][UNTIL] matched: {:?}", event.created_at <= t);
|
||||
log::info!("[FILTER][UNTIL] matched: {:?}", event.created_at <= t);
|
||||
event.created_at <= t
|
||||
},
|
||||
)
|
||||
|
@ -154,3 +174,27 @@ pub fn interested_in_event(filter: &Filter, event: &Event) -> bool {
|
|||
|| delegated_authors_match(filter, event))
|
||||
&& tag_match(filter, event)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::utils::nostr_filter_helpers::interested_in_event;
|
||||
|
||||
#[test]
|
||||
fn check_simple_match() {
|
||||
let my_keys = nostr::Keys::generate();
|
||||
let event = nostr::EventBuilder::text_note("hello", [])
|
||||
.to_event(&my_keys)
|
||||
.unwrap();
|
||||
|
||||
let k = nostr::Kind::TextNote;
|
||||
let filter = nostr::Filter::new()
|
||||
.kinds(vec![k])
|
||||
.authors(vec![event.pubkey]);
|
||||
|
||||
let res = interested_in_event(&filter, &event);
|
||||
|
||||
dbg!(&res);
|
||||
|
||||
assert!(res);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use super::{config::Config, error::Error};
|
||||
// use super::nostr_filter_helpers;
|
||||
use crate::utils::nostr_filter_helpers;
|
||||
use crate::PubSub;
|
||||
|
||||
use nostr::{Event, Filter, SubscriptionId};
|
||||
|
@ -34,11 +34,12 @@ impl Subscription {
|
|||
pub fn interested_in_event(&self, event: &Event) -> bool {
|
||||
log::info!("[Subscription] Checking if client is interested in the new event");
|
||||
for filter in &self.filters {
|
||||
if filter.match_event(event) {
|
||||
if nostr_filter_helpers::interested_in_event(filter, event) {
|
||||
log::info!("[Subscription] found filter that matches the event");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue