Add pubkey ban support

Temporary use nostr-db as a db backend due to breaking API change
    in the library
This commit is contained in:
Tony Klink 2024-01-23 10:30:47 -06:00
parent a178cdee05
commit 0bbce25d39
Signed by: klink
GPG key ID: 85175567C4D19231
12 changed files with 561 additions and 917 deletions

View file

@ -9,11 +9,9 @@ use std::sync::Arc;
pub trait Noose: Send + Sync {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error>;
async fn migration_up(&self);
async fn write_event(&self, event: Box<Event>) -> Result<RelayMessage, Error>;
async fn delete_events(&self, event_ids: Box<Event>) -> Result<RelayMessage, Error>;
async fn find_event(&self, subscription: Subscription) -> Result<Vec<RelayMessage>, Error>;
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error>;
async fn counts(&self, subscription: Subscription) -> Result<RelayMessage, Error>;
}

View file

@ -1,13 +1,12 @@
use crate::utils::structs::Context;
use tokio::runtime;
use db::Noose;
use pipeline::Pipeline;
use tokio::runtime;
pub mod db;
mod nostr_db;
pub mod pipeline;
// mod sled;
mod sqlite;
pub mod sled;
// mod sqlite;
pub mod user;
pub fn start(context: Context) {
@ -16,19 +15,32 @@ pub fn start(context: Context) {
rt.block_on(async move {
let pipeline_pubsub = context.pubsub.clone();
let pipeline_config = context.config.clone();
let db_config = context.config.clone();
let db_pubsub = context.pubsub.clone();
let sled_pubsub = context.pubsub.clone();
let pipeline_handle = tokio::task::spawn(async move {
let mut pipeline = Pipeline::new(pipeline_pubsub, pipeline_config);
pipeline.start().await.unwrap();
});
let sqlite_writer_handle = tokio::task::spawn(async move {
let mut db_writer = sqlite::SqliteDb::new().await;
let sled_handle = tokio::task::spawn(async move {
let mut sled_writer = sled::SledDb::new();
sled_writer.start(sled_pubsub).await.unwrap();
});
let nostr_db_writer_handle = tokio::task::spawn(async move {
let mut db_writer = nostr_db::NostrDb::new(db_config).await;
db_writer.start(db_pubsub).await.unwrap();
});
sqlite_writer_handle.await.unwrap();
// let sqlite_writer_handle = tokio::task::spawn(async move {
// let mut db_writer = sqlite::SqliteDb::new().await;
// db_writer.start(db_pubsub).await.unwrap();
// });
// sqlite_writer_handle.await.unwrap();
pipeline_handle.await.unwrap();
});
}

111
src/noose/nostr_db.rs Normal file
View file

@ -0,0 +1,111 @@
use crate::{
bussy::{channels, Command, Message, PubSub},
noose::Noose,
utils::{config::Config, error::Error, structs::Subscription},
};
use nostr_sqlite::{database::NostrDatabase, SQLiteDatabase};
use std::sync::Arc;
pub struct NostrDb {
db: SQLiteDatabase,
}
impl NostrDb {
pub async fn new(config: Arc<Config>) -> Self {
let db_path = config.get_db_path();
if let Ok(db) = SQLiteDatabase::open(db_path).await {
return Self { db };
}
panic!("[NostrDb] Failed to initialize database");
}
}
impl Noose for NostrDb {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
while let Ok(message) = subscriber.recv().await {
log::info!("[Noose] received message: {:?}", message);
let command = match message.content {
Command::DbReqWriteEvent(client_id, event) => match self.write_event(event).await {
Ok(status) => Command::DbResOkWithStatus(client_id, status),
Err(e) => Command::ServiceError(e),
},
Command::DbReqFindEvent(client_id, subscriptioin) => {
match self.find_event(subscriptioin).await {
Ok(relay_messages) => {
Command::DbResRelayMessages(client_id, relay_messages)
}
Err(e) => Command::ServiceError(e),
}
}
Command::DbReqEventCounts(client_id, subscriptioin) => {
match self.counts(subscriptioin).await {
Ok(relay_message) => Command::DbResEventCounts(client_id, relay_message),
Err(e) => Command::ServiceError(e),
}
}
_ => Command::Noop,
};
if command != Command::Noop {
let channel = message.source;
let message = Message {
source: channels::MSG_NOOSE,
content: command,
};
log::info!(
"[Noose] publishing new message: {:?} to channel {}",
message,
channel
);
pubsub.publish(channel, message).await;
}
}
Ok(())
}
async fn write_event(&self, event: Box<nostr::Event>) -> Result<nostr::RelayMessage, Error> {
// TODO: Maybe do event validation and admin deletions here
match self.db.save_event(&event).await {
Ok(status) => {
let relay_message = nostr::RelayMessage::ok(event.id, status, "");
Ok(relay_message)
}
Err(err) => Err(Error::bad_request(err.to_string())),
}
}
async fn find_event(
&self,
subscription: Subscription,
) -> Result<Vec<nostr::RelayMessage>, Error> {
match self
.db
.query(subscription.filters, nostr_sqlite::database::Order::Desc)
.await
{
Ok(events) => {
let relay_messages = events
.into_iter()
.map(|event| nostr::RelayMessage::event(subscription.id.clone(), event))
.collect();
Ok(relay_messages)
}
Err(err) => Err(Error::bad_request(err.to_string())),
}
}
async fn counts(&self, subscription: Subscription) -> Result<nostr::RelayMessage, Error> {
match self.db.count(subscription.filters).await {
Ok(counts) => {
let relay_message = nostr::RelayMessage::count(subscription.id, counts);
Ok(relay_message)
}
Err(err) => Err(Error::internal_with_message(err.to_string())),
}
}
}

View file

@ -1,234 +1,138 @@
use super::db::Noose;
use std::sync::Arc;
use crate::bussy::{channels, Command, Message, PubSub};
use crate::utils::error::Error;
use crate::utils::structs::Subscription;
use async_trait::async_trait;
use nostr::Event;
use serde::Serialize;
use std::sync::Arc;
use super::user::{User, UserRow};
#[derive(Debug, Clone, PartialEq)]
pub struct BanInfo {
pub pubkey: String,
pub reason: String,
}
// Db Interface
pub struct SledDb {
db: sled::Db,
events: sled::Tree,
nip05s: sled::Tree,
pub users: sled::Tree,
index: sled::Db,
banned_pubkeys: sled::Tree,
}
impl SledDb {
pub fn new() -> Self {
let db = sled::open("/tmp/sled_db").unwrap();
let events = db.open_tree("events").unwrap();
let nip05s = db.open_tree("identifiers").unwrap();
let accounts = db.open_tree("accounts").unwrap();
let index = sled::open("/tmp/sled_index").unwrap();
let banned_pubkeys = db.open_tree("banned_pubkeys").unwrap();
Self {
db,
events,
nip05s,
users: accounts,
index,
banned_pubkeys
}
}
pub async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
while let Ok(message) = subscriber.recv().await {
log::info!("[Noose] received message: {:?}", message);
let command = match message.content {
Command::SledReqBanUser(ban_info) => match self.ban_user(ban_info).await {
Ok(status) => Command::SledResSuccess(status),
Err(e) => Command::ServiceError(e),
},
Command::SledReqUnbanUser(pubkey) => match self.unban_user(&pubkey).await {
Ok(status) => Command::SledResSuccess(status),
Err(e) => Command::ServiceError(e),
},
Command::SledReqGetBans => match self.get_bans().await {
Ok(bans) => Command::SledResBans(bans),
Err(e) => Command::ServiceError(e),
},
Command::SledReqBanInfo(pubkey) => match self.get_ban_by_pubkey(&pubkey).await {
Ok(ban_info) => Command::SledResBan(ban_info),
Err(e) => Command::ServiceError(e),
},
_ => Command::Noop,
};
if command != Command::Noop {
let channel = message.source;
let message = Message {
source: channels::MSG_SLED,
content: command,
};
log::info!(
"[Sled] publishing new message: {:?} to channel {}",
message,
channel
);
pubsub.publish(channel, message).await;
}
}
Ok(())
}
fn clear_db(&self) -> Result<(), sled::Error> {
self.db.clear()
}
fn clear_index(&self) -> Result<(), sled::Error> {
self.index.clear()
}
async fn insert_user(&self, user: UserRow) -> Result<(), Error> {
let pubkey = user.pubkey.clone();
let username = user.username.clone();
if let Ok(Some(_)) = self.nip05s.get(&username) {
return Err(Error::internal_with_message("User already exists"));
async fn ban_user(&self, ban_info: Box<BanInfo>) -> Result<bool, Error> {
if let Ok(Some(_)) = self.banned_pubkeys.insert(ban_info.pubkey, ban_info.reason.as_bytes()) {
return Ok(true)
}
let mut user_buff = flexbuffers::FlexbufferSerializer::new();
user.serialize(&mut user_buff).unwrap();
self.nip05s.insert(&username, user_buff.view()).unwrap();
let prefix = "nip05:";
let key = format!("{}{}", prefix, pubkey);
self.index.insert(key, username.as_bytes()).unwrap();
Ok(())
Ok(false)
}
async fn get_user(&self, user: User) -> Result<UserRow, Error> {
let mut user_row = None;
if let Some(username) = user.name {
if let Ok(Some(buff)) = self.nip05s.get(username) {
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
user_row = Some(b);
fn is_banned(&self, pubkey: &String) -> bool{
if let Ok(Some(banned)) = self.banned_pubkeys.get(pubkey) {
return true
}
false
}
async fn unban_user(&self, pubkey: &String) -> Result<bool, Error> {
if self.is_banned(pubkey) {
self.banned_pubkeys.remove(pubkey).unwrap();
return Ok(true);
}
Ok(false)
}
async fn get_bans(&self) -> Result<Vec<BanInfo>, Error> {
let bans: Vec<BanInfo> = self.banned_pubkeys.iter().filter_map(|row| {
if let Ok((k, v)) = row {
let ban_info = BanInfo {
pubkey: String::from_utf8(k.to_vec()).unwrap(),
reason: String::from_utf8(v.to_vec()).unwrap(),
};
Some(ban_info)
} else {
None
}
} else if let Some(pubkey) = user.pubkey {
let prefix = "nip05:";
let reference = format!("{}{}", prefix, pubkey);
if let Ok(Some(row)) = self.index.get(reference) {
let key = String::from_utf8(row.to_vec()).unwrap();
}).collect();
if let Ok(Some(buff)) = self.nip05s.get(key) {
let b = flexbuffers::from_slice::<UserRow>(&buff).unwrap();
Ok(bans)
}
user_row = Some(b);
}
async fn get_ban_by_pubkey(&self, pubkey: &String) -> Result<Option<BanInfo>, Error> {
if self.is_banned(pubkey) {
if let Ok(Some(reason)) = self.banned_pubkeys.get(pubkey) {
let ban_info = BanInfo {
pubkey: pubkey.to_owned(),
reason: String::from_utf8(reason.to_vec()).unwrap()
};
return Ok(Some(ban_info));
}
}
match user_row {
Some(user) => Ok(user),
None => Err(Error::internal_with_message("User not found")),
}
}
}
#[async_trait]
impl Noose for SledDb {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
while let Ok(message) = subscriber.recv().await {
log::info!("noose subscriber received: {:?}", message);
let command = match message.content {
Command::DbReqInsertUser(user) => match self.insert_user(user).await {
Ok(_) => Command::DbResOk,
Err(e) => Command::ServiceError(e),
},
Command::DbReqGetUser(user) => match self.get_user(user).await {
Ok(user) => Command::DbResUser(user),
Err(e) => Command::ServiceError(e),
},
Command::DbReqWriteEvent(event) => match self.write_event(event).await {
Ok(_) => Command::DbResOk,
Err(e) => Command::ServiceError(e),
},
_ => Command::Noop,
};
if command != Command::Noop {
log::info!("Publishing new message");
let channel = message.source;
pubsub
.publish(
channel,
Message {
source: channels::MSG_NOOSE,
content: command,
},
)
.await;
}
return Ok(None);
}
Ok(())
}
async fn migration_up(&self) {}
async fn write_event(&self, event: Box<Event>) -> Result<String, Error> {
let mut event_buff = flexbuffers::FlexbufferSerializer::new();
event.serialize(&mut event_buff).unwrap();
self.events.insert(event.id, event_buff.view()).unwrap();
{
// Timestamp
let key = format!("created_at:{}|#e:{}", event.created_at, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
}
{
// Author, pubkeys #p
let key = format!("#author:{}|#e:{}", event.pubkey, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
{
// Kinds
let key = format!("#k:{}|#e:{}", event.kind, event.id);
self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
{
// Tags
event.tags.iter().for_each(|tag| {
if let Some(key) = match tag {
// #e tag
nostr::Tag::Event(event_id, _, _) => Some(format!("#e:{}", event_id)),
// #p tag
nostr::Tag::PubKey(pubkey, _) => Some(format!("#p:{}|#e:{}", pubkey, event.id)),
// #t tag
nostr::Tag::Hashtag(hashtag) => Some(format!("#t:{}|#e:{}", hashtag, event.id)),
// #a tag
nostr::Tag::A {
kind,
public_key,
identifier,
relay_url,
} => Some(format!(
"#a:kind:{}|#a:pubkey:{}#a:identifier:{}|#e:{}",
kind, public_key, identifier, event.id
)),
_ => None,
} {
self.index.insert(key, event.id.as_bytes()).unwrap();
}
});
// let key = format!("#t:{}|#e:{}", event.kind, event.id);
// self.index.insert(key, event.id.as_bytes()).unwrap();
// self.index.scan_prefix(
}
let message = format!("[\"OK\", \"{}\", true, \"\"]", event.id.to_string());
Ok(message)
}
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error> {
todo!()
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::SledDb;
use crate::{
bussy::PubSub,
noose::user::{User, UserRow},
};
use std::sync::Arc;
#[tokio::test]
async fn get_db_names() {
let pubsub = Arc::new(PubSub::new());
let db = SledDb::new();
let pk = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
let username = "klink".to_string();
let user = UserRow::new(pk, username, false);
let result = db.insert_user(user).await;
let pubkey = "npub1p3ya99jfdafnqlk87p6wfd36d2nme5mkld769rhd9pkht6hmqlaq6mzxdu".to_string();
let username = "klink".to_string();
let user = User {
name: None,
pubkey: Some(pubkey),
};
let user = db.get_user(user).await;
db.clear_db().unwrap();
db.clear_index().unwrap();
}
}