Admin pubkey now can delete any event

This commit is contained in:
Tony Klink 2024-01-18 19:29:59 -06:00
parent 5b2726e66d
commit 025ba52a81
Signed by: klink
GPG key ID: 85175567C4D19231
12 changed files with 274 additions and 125 deletions

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"type": "github"
},
"original": {
@ -38,10 +38,10 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1703438236,
"narHash": "sha256-aqVBq1u09yFhL7bj1/xyUeJjzr92fXVvQSSEx6AdB1M=",
"path": "/nix/store/5acdh8xyry0kdvp6xla2hw7wf3zkphkl-source",
"rev": "5f64a12a728902226210bf01d25ec6cbb9d9265b",
"lastModified": 1705496572,
"narHash": "sha256-rPIe9G5EBLXdBdn9ilGc0nq082lzQd0xGGe092R/5QE=",
"path": "/nix/store/wcidiyklj0nrljlz5m3qlkvhv8f2ddv8-source",
"rev": "842d9d80cfd4560648c785f8a4e6f3b096790e19",
"type": "path"
},
"original": {
@ -51,11 +51,11 @@
},
"nixpkgs_2": {
"locked": {
"lastModified": 1703499205,
"narHash": "sha256-lF9rK5mSUfIZJgZxC3ge40tp1gmyyOXZ+lRY3P8bfbg=",
"lastModified": 1705566941,
"narHash": "sha256-CLNtVRDA8eUPk+bxsCCZtRO0Cp+SpHdn1nNOLoFypLs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e1fa12d4f6c6fe19ccb59cac54b5b3f25e160870",
"rev": "b06ff4bf8f4ad900fe0c2a61fc2946edc3a84be7",
"type": "github"
},
"original": {

View file

@ -51,6 +51,7 @@
];
env = {
DATABASE_URL = "/tmp/sqlite.db";
ADMIN_PUBKEY = "npub14d2a54za7dnfzktle40vw7kdx48vk3ljy3t7w7sdpk3segea65mq2t6kc4";
RUST_BACKTRACE = 1;
};
};

View file

@ -15,6 +15,12 @@ in {
domain from which the sneedstr will be acessible.
'';
};
adminPubkey = mkOption {
type = types.str;
description = ''
'npub' of the administrator account. Must be defined!
'';
};
sslEnable = mkEnableOption "Whether to enable ACME SSL for nginx proxy";
hostAddress = mkOption {
type = types.nullOr types.str;
@ -55,7 +61,10 @@ in {
systemd.services.sneedstr = {
enable = true;
description = "Sneedstr Nostr relay";
environment = { DATABASE_URL = "${DB_PATH}/sneedstr.db"; };
environment = {
DATABASE_URL = "${DB_PATH}/sneedstr.db";
ADMIN_PUBKEY = cfg.adminPubkey;
};
startLimitBurst = 1;
startLimitIntervalSec = 10;
unitConfig = {

View file

@ -16,9 +16,9 @@ pub mod channels {
#[derive(Debug, Clone, PartialEq)]
pub enum Command {
// DbRequest
DbReqWriteEvent(Box<nostr::Event>),
DbReqWriteEvent(/* client_id */ uuid::Uuid, Box<nostr::Event>),
DbReqFindEvent(/* client_id*/ uuid::Uuid, Subscription),
DbReqDeleteEvents(/* event ids */ Vec<String>),
DbReqDeleteEvents(/* client_id*/ uuid::Uuid, Box<nostr::Event>),
// Old messages
DbReqInsertUser(UserRow),
@ -33,7 +33,7 @@ pub enum Command {
),
DbResInfo,
DbResOk,
DbResOkWithStatus(String),
DbResOkWithStatus(/* client_id */ uuid::Uuid, nostr::RelayMessage),
DbResAccount, // TODO: Add Account DTO as a param
DbResUser(UserRow),
// Event Pipeline

View file

@ -2,17 +2,18 @@ use crate::{
bussy::PubSub,
utils::{error::Error, structs::Subscription},
};
use async_trait::async_trait;
use nostr::Event;
use nostr::{Event, RelayMessage};
use std::sync::Arc;
#[async_trait]
pub trait Noose: Send + Sync {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error>;
async fn migration_up(&self);
async fn write_event(&self, event: Box<Event>) -> Result<String, Error>;
async fn write_event(&self, event: Box<Event>) -> Result<RelayMessage, Error>;
async fn delete_events(&self, event_ids: Box<Event>) -> Result<RelayMessage, Error>;
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error>;
}

View file

@ -15,10 +15,11 @@ pub fn start(context: Context) {
rt.block_on(async move {
let pipeline_pubsub = context.pubsub.clone();
let pipeline_config = context.config.clone();
let db_pubsub = context.pubsub.clone();
let pipeline_handle = tokio::task::spawn(async move {
let mut pipeline = Pipeline::new(pipeline_pubsub);
let mut pipeline = Pipeline::new(pipeline_pubsub, pipeline_config);
pipeline.start().await.unwrap();
});

View file

@ -1,37 +1,49 @@
use crate::bussy::{channels, Command, Message, PubSub};
use crate::utils::config::Config;
use crate::utils::error::Error;
use nostr::Event;
use std::sync::Arc;
pub struct Pipeline {
pubsub: Arc<PubSub>,
config: Arc<Config>,
}
impl Pipeline {
pub fn new(pubsub: Arc<PubSub>) -> Self {
Self { pubsub }
pub fn new(pubsub: Arc<PubSub>, config: Arc<Config>) -> Self {
Self { pubsub, config }
}
pub async fn start(&mut self) -> Result<(), Error> {
let mut subscriber = self.pubsub.subscribe(channels::MSG_PIPELINE).await;
while let Ok(message) = subscriber.recv().await {
log::debug!("[Pipeline] received message: {:?}", message);
log::info!("[Pipeline] received message: {:?}", message);
let channel;
let command = match message.content {
Command::PipelineReqEvent(client_id, event) => {
match self.handle_event(client_id, event.clone()).await {
Ok(_) => {
let message =
nostr::RelayMessage::new_ok(event.id, true, "".to_string());
Command::PipelineResRelayMessageOk(client_id, message)
log::info!("[Pipeline] handle_event completed");
channel = message.source;
Command::Noop
}
Err(e) => {
channel = channels::MSG_RELAY;
Command::ServiceError(e)
}
Err(e) => Command::ServiceError(e),
}
}
_ => Command::Noop,
Command::DbResOkWithStatus(client_id, message) => {
channel = channels::MSG_RELAY;
Command::PipelineResRelayMessageOk(client_id, message)
}
_ => {
channel = channels::MSG_RELAY;
Command::Noop
}
};
if command != Command::Noop {
let channel = message.source;
let message = Message {
source: channels::MSG_PIPELINE,
content: command,
@ -55,8 +67,8 @@ impl Pipeline {
client_id: uuid::Uuid,
event: Box<Event>,
) -> Result<(), Error> {
let store_event_task = self.store_event(event.clone());
let process_deletions_task = self.process_deletions(event.clone());
let store_event_task = self.store_event(client_id, event.clone());
let process_deletions_task = self.process_deletions(client_id, event.clone());
let track_hashtags_task = self.track_hashtags(event.clone());
let process_media_task = self.process_media(event.clone());
let stream_out_task = self.stream_out(event.clone());
@ -99,68 +111,81 @@ impl Pipeline {
}
}
async fn store_event(&self, event: Box<Event>) -> Result<(), Error> {
async fn store_event(&self, client_id: uuid::Uuid, event: Box<Event>) -> Result<(), Error> {
log::info!("[Pipeline] store_event: processing...");
if event.kind.as_u32() == 5 && event.pubkey == *self.config.get_admin_pubkey() {
log::info!("[Pipeline] store_event: skipping admin event");
return Ok(());
}
if event.kind.is_ephemeral() {
log::info!("[Pipeline] store_event: skipping ephemeral event");
return Ok(());
}
log::info!("[Pipeline] store_event: writing event to DB");
self.pubsub
.publish(
channels::MSG_NOOSE,
Message {
source: channels::MSG_PIPELINE,
content: Command::DbReqWriteEvent(event),
content: Command::DbReqWriteEvent(client_id, event),
},
)
.await;
log::info!("[Pipeline] store_event: finished");
Ok(())
}
async fn process_deletions(&self, event: Box<Event>) -> Result<(), Error> {
// if event.kind.as_u32() == 5 {
// let events_for_deletion: Vec<String> = event
// .tags
// .iter()
// .filter_map(|tag| match tag {
// nostr::Tag::Event(event_id, _, _) => Some(event_id.to_string()),
// _ => None,
// })
// .collect();
// self.pubsub
// .publish(
// channels::MSG_NOOSE,
// Message {
// source: channels::MSG_PIPELINE,
// content: Command::DbReqDeleteEvents(events_for_deletion),
// },
// )
// .await;
// }
async fn process_deletions(
&self,
client_id: uuid::Uuid,
event: Box<Event>,
) -> Result<(), Error> {
log::info!("[Pipeline] process_deletions: processing...");
if event.kind.as_u32() == 5 && event.pubkey == *self.config.get_admin_pubkey() {
self.pubsub
.publish(
channels::MSG_NOOSE,
Message {
source: channels::MSG_PIPELINE,
content: Command::DbReqDeleteEvents(client_id, event),
},
)
.await;
}
log::info!("[Pipeline] process_deletions: finished");
Ok(())
}
async fn track_hashtags(&self, event: Box<Event>) -> Result<(), Error> {
log::info!("[Pipeline] track_hashtags: processing...");
log::info!("[Pipeline] track_hashtags: finished");
Ok(())
}
async fn process_media(&self, event: Box<Event>) -> Result<(), Error> {
log::info!("[Pipeline] process_media: processing...");
log::info!("[Pipeline] process_media: finished");
Ok(())
}
async fn stream_out(&self, event: Box<Event>) -> Result<(), Error> {
log::info!("[Pipeline] stream_out: processing...");
let message = Message {
source: channels::MSG_PIPELINE,
content: Command::PipelineResStreamOutEvent(event),
};
self.pubsub.publish(channels::MSG_RELAY, message).await;
log::info!("[Pipeline] stream_out: finished");
Ok(())
}
async fn broadcast(&self, event: Box<Event>) -> Result<(), Error> {
log::info!("[Pipeline] broadcast: processing...");
log::info!("[Pipeline] broadcast: finished");
Ok(())
}
}

View file

@ -1,5 +1,4 @@
use async_trait::async_trait;
use nostr::{Event, JsonUtil};
use nostr::{Event, JsonUtil, RelayMessage};
use sea_query::{extension::sqlite::SqliteExpr, Query};
use sea_query_binder::SqlxBinder;
use sqlx::sqlite::{Sqlite, SqlitePoolOptions};
@ -147,8 +146,8 @@ impl SqliteDb {
async fn build_pool(name: &str, max_size: u32) -> Pool<Sqlite> {
let pool_options = SqlitePoolOptions::new()
.test_before_acquire(true)
// .idle_timeout(Some(Duration::from_secs(10)))
// .max_lifetime(Some(Duration::from_secs(30)))
.idle_timeout(Some(std::time::Duration::from_secs(10)))
.max_lifetime(Some(std::time::Duration::from_secs(30)))
.max_lifetime(None)
.idle_timeout(None)
.max_connections(max_size);
@ -172,7 +171,7 @@ impl SqliteDb {
}
}
async fn add_event(&self, event: Box<Event>) -> Result<String, Error> {
async fn add_event(&self, event: Box<Event>) -> Result<RelayMessage, Error> {
let id = event.id.to_string();
let kind = event.kind.to_string();
let pubkey = event.pubkey.to_string();
@ -181,11 +180,20 @@ impl SqliteDb {
let tags = serde_json::to_string(&event.tags).unwrap();
let sig = event.sig.to_string();
let message = format!("[\"OK\", \"{}\", true, \"\"]", id.clone());
let message = nostr::RelayMessage::Ok {
event_id: event.id,
status: true,
message: "".to_string(),
};
// Skip events that are older than 10 minutes
if chrono::Utc::now().timestamp() - 600 > created_at {
let message = format!("[\"OK\", \"{}\", false, \"invalid: event creation date is too far off from the current time\"]", id.clone());
let message = nostr::RelayMessage::Ok {
event_id: event.id,
status: false,
message: "invalid: event creation date is too far off from the current time"
.to_string(),
};
return Ok(message);
}
@ -193,6 +201,25 @@ impl SqliteDb {
return Ok(message);
}
{
let (sql, value) = Query::select()
.from(EventsTable::Table)
.column(EventsTable::EventId)
.and_where(sea_query::Expr::col(EventsTable::EventId).eq(event.id.to_string()))
.limit(1)
.build_sqlx(sea_query::SqliteQueryBuilder);
let events = sqlx::query_with(&sql, value).fetch_one(&self.pool).await;
if events.ok().is_some() {
let message = nostr::RelayMessage::Ok {
event_id: event.id,
status: false,
message: "invalid: event with this id already exists".to_string(),
};
return Ok(message);
}
}
let tx = self.pool.begin().await.unwrap();
{
if event.is_replaceable() {
@ -320,6 +347,7 @@ impl SqliteDb {
{
if event.is_parameterized_replaceable() {
dbg!("deleting older parametrized replaceable event from events table");
log::info!("deleting older parametrized replaceable event from events table");
let d_tag = event.identifier();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
@ -379,6 +407,7 @@ impl SqliteDb {
dbg!(event.as_json());
if event.kind.as_u32() == 5 {
dbg!("deleting event");
log::info!("deleting event");
let ids: Vec<String> = event.event_ids().map(|eid| eid.to_string()).collect();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
@ -411,6 +440,7 @@ impl SqliteDb {
.unwrap();
} else {
dbg!("inserting new event in events");
log::info!("inserting new event in events");
// Insert into Events table
let (sql, values) = Query::insert()
.into_table(EventsTable::Table)
@ -474,6 +504,7 @@ impl SqliteDb {
tx.commit().await.unwrap();
log::info!("[SQLite] add_event completed");
Ok(message)
}
@ -625,8 +656,28 @@ impl SqliteDb {
.reduce(|mut result, query| result.union(sea_query::UnionType::All, query).to_owned())
}
async fn delete_filters(&self, subscription: Subscription) -> Vec<EventRow> {
todo!()
async fn admin_delete_events(&self, event: Box<Event>) -> Result<RelayMessage, Error> {
let event_ids: Vec<String> = event.event_ids().map(|e| e.to_string()).collect();
let (sql, values) = Query::delete()
.from_table(EventsTable::Table)
.and_where(sea_query::Expr::col(EventsTable::EventId).is_in(event_ids))
.build_sqlx(sea_query::SqliteQueryBuilder);
match sqlx::query_with(&sql, values).execute(&self.pool).await {
Ok(affected_rows) => {
let message = RelayMessage::Notice {
message: format!("{} events deleted", affected_rows.rows_affected()),
};
Ok(message)
}
Err(e) => {
log::error!("[admin_delete_events] Failed to execute query: {}", e);
let message = RelayMessage::Notice {
message: "unable to delete events".to_string(),
};
Ok(message)
}
}
}
async fn count_events_by_filters(&self, subscription: Subscription) -> i32 {
@ -660,7 +711,6 @@ impl SqliteDb {
}
}
#[async_trait]
impl Noose for SqliteDb {
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
@ -668,8 +718,8 @@ impl Noose for SqliteDb {
while let Ok(message) = subscriber.recv().await {
log::info!("[Noose] received message: {:?}", message);
let command = match message.content {
Command::DbReqWriteEvent(event) => match self.write_event(event).await {
Ok(status) => Command::DbResOkWithStatus(status),
Command::DbReqWriteEvent(client_id, event) => match self.write_event(event).await {
Ok(status) => Command::DbResOkWithStatus(client_id, status),
Err(e) => Command::ServiceError(e),
},
Command::DbReqFindEvent(client_id, subscriptioin) => {
@ -678,6 +728,12 @@ impl Noose for SqliteDb {
Err(e) => Command::ServiceError(e),
}
}
Command::DbReqDeleteEvents(client_id, event_ids) => {
match self.delete_events(event_ids).await {
Ok(status) => Command::DbResOkWithStatus(client_id, status),
Err(e) => Command::ServiceError(e),
}
}
_ => Command::Noop,
};
if command != Command::Noop {
@ -687,7 +743,11 @@ impl Noose for SqliteDb {
content: command,
};
log::debug!("[Noose] publishing new message: {:?}", message);
log::info!(
"[Noose] publishing new message: {:?} to channel {}",
message,
channel
);
pubsub.publish(channel, message).await;
}
@ -700,12 +760,21 @@ impl Noose for SqliteDb {
SqliteDb::migrate(&self.pool).await;
}
async fn write_event(&self, event: Box<Event>) -> Result<String, Error> {
log::debug!("[Noose] write_event triggered");
async fn write_event(&self, event: Box<Event>) -> Result<RelayMessage, Error> {
log::info!("[Noose] write_event triggered");
let status = self.add_event(event).await.unwrap();
return Ok(status);
log::info!("[Noose] write event completed: {}", status.as_json());
Ok(status)
}
async fn delete_events(&self, event: Box<Event>) -> Result<RelayMessage, Error> {
log::debug!("[Noose] delete_filters triggered");
let status = self.admin_delete_events(event).await.unwrap();
Ok(status)
}
async fn find_event(&self, subscription: Subscription) -> Result<Vec<String>, Error> {
@ -740,7 +809,7 @@ impl Noose for SqliteDb {
}
}
return Ok(eose_message);
Ok(eose_message)
}
}
@ -750,6 +819,7 @@ mod tests {
use super::SqliteDb;
use crate::utils::structs::Subscription;
use nostr::key::FromSkStr;
use nostr::util::JsonUtil;
#[tokio::test]
@ -777,6 +847,30 @@ mod tests {
);
}
#[tokio::test]
async fn admin_delete_events() {
let db = SqliteDb::new().await;
let admin_pubkey = "npub14d2a54za7dnfzktle40vw7kdx48vk3ljy3t7w7sdpk3segea65mq2t6kc4";
let admin_secret = "nsec1rayezcsw7txmtu3smpsgs7m5fa3dazhx6lhdm44dxclveplhajpsalyx2l";
let admin_keys = nostr::Keys::from_sk_str(admin_secret).unwrap();
let event = nostr::EventBuilder::new(nostr::Kind::TextNote, "this is a test", vec![])
.to_event(&nostr::Keys::generate())
.unwrap();
let res = db.add_event(Box::new(event.clone())).await.unwrap();
let e_ids = vec![event.id];
let event = nostr::EventBuilder::delete(e_ids)
.to_event(&admin_keys)
.unwrap();
let message = db.admin_delete_events(Box::new(event)).await.unwrap();
assert_eq!(message.as_json(), "[\"NOTICE\",\"1 events deleted\"]");
}
#[tokio::test]
async fn delete_events() {
let db = SqliteDb::new().await;
@ -815,53 +909,6 @@ mod tests {
dbg!(&delete_event);
let resp = db.add_event(Box::new(delete_event.clone())).await.unwrap();
dbg!(resp);
// let sub_id = nostr::SubscriptionId::new("test".to_string());
// let mut subscription = Subscription::new(sub_id, vec![]);
// if delete_event.kind == nostr::Kind::EventDeletion {
// delete_event
// .tags
// .iter()
// .filter(|tag| {
// matches!(
// tag,
// nostr::Tag::Event {
// event_id,
// relay_url,
// marker,
// }
// )
// })
// .for_each(|tag| {
// if let nostr::Tag::Event {
// event_id,
// relay_url,
// marker,
// } = tag
// {
// let filter = nostr::Filter::new();
// let filter = &filter.event(*event_id);
// subscription.filters.push(filter.clone());
// }
// });
// dbg!(&subscription);
// }
// let res = db.delete_filters(subscription).await;
// dbg!(res);
// let sub = Subscription::new(sub_id, filters);
// let num = db.delete_filters(sub).await.len();
// println!(
// "Time passed: {}",
// (std::time::Instant::now() - t).as_millis()
// );
// assert_eq!(num, 1);
}
#[tokio::test]

View file

@ -24,8 +24,8 @@ pub async fn client_connection(
let mut client_receiver = UnboundedReceiverStream::new(client_receiver);
// Create and Add to the Context new Client and set its sender
let ip = if real_client_ip.is_some() {
real_client_ip.unwrap().to_string()
let ip = if let Some(ip) = real_client_ip {
ip.to_string()
} else {
"".to_string()
};
@ -78,9 +78,11 @@ pub async fn client_connection(
}
}
}
crate::bussy::Command::DbResOkWithStatus(status) => {
if let Some(sender) = &client.client_connection {
sender.send(Ok(Message::text(status))).unwrap();
crate::bussy::Command::DbResOkWithStatus(client_id, status) => {
if client.client_id == client_id {
if let Some(sender) = &client.client_connection {
sender.send(Ok(Message::text(status.as_json()))).unwrap();
}
}
},
_ => ()
@ -220,6 +222,20 @@ async fn handle_msg(context: &Context, client: &mut Client, client_message: Clie
async fn handle_event(context: &Context, client: &Client, event: Box<Event>) {
log::debug!("handle_event is processing new event");
if let Err(err) = event.verify() {
let relay_message =
nostr::RelayMessage::new_ok(event.id, false, "Failed to verify event signature");
let message = crate::bussy::Message {
source: channels::MSG_RELAY,
content: crate::bussy::Command::PipelineResRelayMessageOk(
client.client_id,
relay_message,
),
};
context.pubsub.publish(channels::MSG_RELAY, message).await;
return;
}
context
.pubsub
.publish(

46
src/utils/config.rs Normal file
View file

@ -0,0 +1,46 @@
use nostr::{key::FromPkStr, secp256k1::XOnlyPublicKey};
#[derive(Clone, Debug)]
pub struct Config {
admin_pubkey: XOnlyPublicKey,
}
impl Default for Config {
fn default() -> Self {
panic!("Use Config::new(env_admin_pk)")
}
}
impl Config {
pub fn new() -> Self {
if let Ok(env_admin_pk) = std::env::var("ADMIN_PUBKEY") {
match nostr::Keys::from_pk_str(&env_admin_pk) {
Ok(admin_keys) => {
return Self {
admin_pubkey: admin_keys.public_key(),
};
}
Err(e) => {
panic!("Unable to parse ADMIN_PUBKEY: {}", e);
}
}
}
panic!("Environment variable ADMIN_PUBKEY not defined");
}
pub fn get_admin_pubkey(&self) -> &XOnlyPublicKey {
&self.admin_pubkey
}
pub fn get_relay_config_json(&self) -> serde_json::Value {
serde_json::json!({
"contact": "klink@zhitno.st",
"name": "zhitno.st",
"description": "Very *special* nostr relay",
"supported_nips": [ 1, 9, 11, 12, 15, 16, 20, 22, 28, 33 ],
"software": "git+https://git.zhitno.st/Klink/sneedstr.git",
"version": "0.1.0"
})
}
}

View file

@ -2,6 +2,7 @@ pub mod crypto;
pub mod error;
pub mod filter;
// mod nostr_filter_helpers;
pub mod config;
pub mod rejection_handler;
pub mod response;
pub mod structs;

View file

@ -1,6 +1,7 @@
use super::error::Error;
use super::{config::Config, error::Error};
// use super::nostr_filter_helpers;
use crate::PubSub;
use nostr::{Event, Filter, SubscriptionId};
use std::collections::HashMap;
use std::sync::Arc;
@ -122,6 +123,7 @@ impl Client {
#[derive(Debug, Clone)]
pub struct Context {
pub pubsub: Arc<PubSub>,
pub config: Arc<Config>,
}
impl Default for Context {
@ -133,7 +135,7 @@ impl Default for Context {
impl Context {
pub fn new() -> Self {
let pubsub = Arc::new(PubSub::new());
Self { pubsub }
let config = Arc::new(Config::new());
Self { pubsub, config }
}
}