Rework sqlite db to implement nostr_database trait
This commit is contained in:
parent
0bbce25d39
commit
5b7d0b7938
10 changed files with 1342 additions and 814 deletions
|
@ -21,3 +21,10 @@ CREATE TABLE tags (
|
|||
CREATE INDEX idx_tags_tag ON tags (tag);
|
||||
CREATE INDEX idx_tags_value ON tags (value);
|
||||
CREATE INDEX idx_tags_event_id ON tags (event_id);
|
||||
|
||||
CREATE TABLE deleted_coordinates (
|
||||
coordinate TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_coordinates_coordinate ON coordinates (coordinate);
|
||||
|
|
|
@ -1,2 +1,6 @@
|
|||
PRAGMA encoding = "UTF-8";
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA foreign_keys = ON;
|
||||
PRAGMA auto_vacuum = FULL;
|
||||
PRAGMA journal_size_limit=32768;
|
||||
PRAGMA mmap_size = 17179869184; -- cap mmap at 16GB
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
CREATE TABLE IF NOT EXISTS event_seen_by_relays (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
event_id TEXT NOT NULL,
|
||||
relay_url TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_seen_by_relays_index ON event_seen_by_relays(event_id,relay_url);
|
50
src/noose/migrations/mod.rs
Normal file
50
src/noose/migrations/mod.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
use rusqlite::Connection;
|
||||
use rusqlite_migration::{Migrations, M};
|
||||
|
||||
pub struct MigrationRunner {}
|
||||
|
||||
impl MigrationRunner {
|
||||
pub fn up(connection: &mut Connection) -> bool {
|
||||
let m_create_events = include_str!("./1697409647688_create_events.sql");
|
||||
let m_event_seen_by_relays = include_str!("1706115586021_event_seen_by_relays.sql");
|
||||
let m_add_realys = include_str!("./1697410161900_add_relays.sql");
|
||||
let m_events_fts = include_str!("./1697410223576_events_fts.sql");
|
||||
let m_users = include_str!("./1697410294265_users.sql");
|
||||
let m_unattached_media = include_str!("./1697410480767_unattached_media.sql");
|
||||
let m_pragma = include_str!("./1697410424624_pragma.sql");
|
||||
|
||||
let migrations = Migrations::new(vec![
|
||||
M::up(m_create_events),
|
||||
M::up(m_event_seen_by_relays),
|
||||
M::up(m_add_realys),
|
||||
M::up(m_events_fts),
|
||||
M::up(m_users),
|
||||
M::up(m_unattached_media),
|
||||
M::up(m_pragma),
|
||||
]);
|
||||
|
||||
match migrations.to_latest(connection) {
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
log::error!("Migrations failed: {}", err.to_string());
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rusqlite::Connection;
|
||||
|
||||
use super::MigrationRunner;
|
||||
|
||||
#[test]
|
||||
fn get_sql_path() {
|
||||
let mut connection = Connection::open_in_memory().unwrap();
|
||||
|
||||
let runner = MigrationRunner::up(&mut connection);
|
||||
|
||||
assert!(runner);
|
||||
}
|
||||
}
|
|
@ -3,11 +3,11 @@ use db::Noose;
|
|||
use pipeline::Pipeline;
|
||||
use tokio::runtime;
|
||||
pub mod db;
|
||||
mod nostr_db;
|
||||
pub mod pipeline;
|
||||
pub mod sled;
|
||||
// mod sqlite;
|
||||
mod sqlite;
|
||||
pub mod user;
|
||||
mod migrations;
|
||||
|
||||
pub fn start(context: Context) {
|
||||
let rt = runtime::Runtime::new().unwrap();
|
||||
|
@ -30,17 +30,12 @@ pub fn start(context: Context) {
|
|||
sled_writer.start(sled_pubsub).await.unwrap();
|
||||
});
|
||||
|
||||
let nostr_db_writer_handle = tokio::task::spawn(async move {
|
||||
let mut db_writer = nostr_db::NostrDb::new(db_config).await;
|
||||
let nostr_sqlite_writer_handle = tokio::task::spawn(async move {
|
||||
let mut db_writer = sqlite::NostrSqlite::new(db_config).await;
|
||||
db_writer.start(db_pubsub).await.unwrap();
|
||||
});
|
||||
|
||||
// let sqlite_writer_handle = tokio::task::spawn(async move {
|
||||
// let mut db_writer = sqlite::SqliteDb::new().await;
|
||||
// db_writer.start(db_pubsub).await.unwrap();
|
||||
// });
|
||||
|
||||
// sqlite_writer_handle.await.unwrap();
|
||||
nostr_sqlite_writer_handle.await.unwrap();
|
||||
pipeline_handle.await.unwrap();
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
use crate::{
|
||||
bussy::{channels, Command, Message, PubSub},
|
||||
noose::Noose,
|
||||
utils::{config::Config, error::Error, structs::Subscription},
|
||||
};
|
||||
use nostr_sqlite::{database::NostrDatabase, SQLiteDatabase};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct NostrDb {
|
||||
db: SQLiteDatabase,
|
||||
}
|
||||
|
||||
impl NostrDb {
|
||||
pub async fn new(config: Arc<Config>) -> Self {
|
||||
let db_path = config.get_db_path();
|
||||
if let Ok(db) = SQLiteDatabase::open(db_path).await {
|
||||
return Self { db };
|
||||
}
|
||||
|
||||
panic!("[NostrDb] Failed to initialize database");
|
||||
}
|
||||
}
|
||||
|
||||
impl Noose for NostrDb {
|
||||
async fn start(&mut self, pubsub: Arc<PubSub>) -> Result<(), Error> {
|
||||
let mut subscriber = pubsub.subscribe(channels::MSG_NOOSE).await;
|
||||
|
||||
while let Ok(message) = subscriber.recv().await {
|
||||
log::info!("[Noose] received message: {:?}", message);
|
||||
let command = match message.content {
|
||||
Command::DbReqWriteEvent(client_id, event) => match self.write_event(event).await {
|
||||
Ok(status) => Command::DbResOkWithStatus(client_id, status),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
},
|
||||
Command::DbReqFindEvent(client_id, subscriptioin) => {
|
||||
match self.find_event(subscriptioin).await {
|
||||
Ok(relay_messages) => {
|
||||
Command::DbResRelayMessages(client_id, relay_messages)
|
||||
}
|
||||
Err(e) => Command::ServiceError(e),
|
||||
}
|
||||
}
|
||||
Command::DbReqEventCounts(client_id, subscriptioin) => {
|
||||
match self.counts(subscriptioin).await {
|
||||
Ok(relay_message) => Command::DbResEventCounts(client_id, relay_message),
|
||||
Err(e) => Command::ServiceError(e),
|
||||
}
|
||||
}
|
||||
_ => Command::Noop,
|
||||
};
|
||||
if command != Command::Noop {
|
||||
let channel = message.source;
|
||||
let message = Message {
|
||||
source: channels::MSG_NOOSE,
|
||||
content: command,
|
||||
};
|
||||
|
||||
log::info!(
|
||||
"[Noose] publishing new message: {:?} to channel {}",
|
||||
message,
|
||||
channel
|
||||
);
|
||||
|
||||
pubsub.publish(channel, message).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_event(&self, event: Box<nostr::Event>) -> Result<nostr::RelayMessage, Error> {
|
||||
// TODO: Maybe do event validation and admin deletions here
|
||||
match self.db.save_event(&event).await {
|
||||
Ok(status) => {
|
||||
let relay_message = nostr::RelayMessage::ok(event.id, status, "");
|
||||
Ok(relay_message)
|
||||
}
|
||||
Err(err) => Err(Error::bad_request(err.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_event(
|
||||
&self,
|
||||
subscription: Subscription,
|
||||
) -> Result<Vec<nostr::RelayMessage>, Error> {
|
||||
match self
|
||||
.db
|
||||
.query(subscription.filters, nostr_sqlite::database::Order::Desc)
|
||||
.await
|
||||
{
|
||||
Ok(events) => {
|
||||
let relay_messages = events
|
||||
.into_iter()
|
||||
.map(|event| nostr::RelayMessage::event(subscription.id.clone(), event))
|
||||
.collect();
|
||||
Ok(relay_messages)
|
||||
}
|
||||
Err(err) => Err(Error::bad_request(err.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn counts(&self, subscription: Subscription) -> Result<nostr::RelayMessage, Error> {
|
||||
match self.db.count(subscription.filters).await {
|
||||
Ok(counts) => {
|
||||
let relay_message = nostr::RelayMessage::count(subscription.id, counts);
|
||||
Ok(relay_message)
|
||||
}
|
||||
Err(err) => Err(Error::internal_with_message(err.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
1765
src/noose/sqlite.rs
1765
src/noose/sqlite.rs
File diff suppressed because it is too large
Load diff
|
@ -4,6 +4,7 @@ use std::{
|
|||
convert::From,
|
||||
fmt::{self, Display},
|
||||
};
|
||||
use std::error::Error as StdError;
|
||||
use validator::ValidationErrors;
|
||||
use warp::{http::StatusCode, reject::Reject};
|
||||
|
||||
|
@ -16,6 +17,16 @@ pub struct Error {
|
|||
pub sneedstr_version: Option<u16>,
|
||||
}
|
||||
|
||||
impl StdError for Error {
|
||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||
None
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
self.source()
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn new(code: StatusCode, message: String) -> Self {
|
||||
Self {
|
||||
|
@ -74,12 +85,12 @@ impl Error {
|
|||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}: {}", self.status_code(), &self.message)?;
|
||||
if let Some(val) = &self.sneedstr_version {
|
||||
write!(f, "\ndiem ledger version: {}", val)?;
|
||||
}
|
||||
Ok(())
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Error {{ code: {}, message: '{}', sneedstr_version: {:?} }}",
|
||||
self.code, self.message, self.sneedstr_version
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue