From 50c4ac387a64d319d4656cfa33e9a478f6eee6ef Mon Sep 17 00:00:00 2001 From: Katalina Okano Date: Wed, 7 Dec 2022 23:42:19 -0500 Subject: [PATCH] websocket done, api needs rebuild --- Cargo.lock | 1 - docker-compose.live.yaml | 26 +++--- docker-compose.yaml | 9 +- services/api/Cargo.toml | 3 +- services/api/src/classes.rs | 5 +- services/api/src/health.rs | 153 +++++++++++++------------------ services/api/src/main.rs | 19 +--- services/api/src/util.rs | 17 ---- services/api/src/vehicles.rs | 5 +- services/api/src/world.rs | 9 +- services/tasks/src/main.rs | 60 ++++++------ services/tasks/src/migrations.rs | 46 +++++++++- services/websocket/src/main.rs | 34 ++++++- 13 files changed, 192 insertions(+), 195 deletions(-) delete mode 100644 services/api/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index c3c0177..80874be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,7 +49,6 @@ dependencies = [ "async-graphql-axum", "axum", "lazy_static", - "redis", "serde", "serde_json", "sqlx", diff --git a/docker-compose.live.yaml b/docker-compose.live.yaml index 8826cf6..5cacd57 100644 --- a/docker-compose.live.yaml +++ b/docker-compose.live.yaml @@ -3,14 +3,6 @@ version: '3.7' services: - redis: - image: redis:alpine - command: redis-server --save "" --appendonly no - container_name: redis - restart: always - ports: - - 6379:6379 - tsdb: image: timescale/timescaledb:latest-pg14 environment: @@ -29,15 +21,27 @@ services: - tsdb restart: always environment: - DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:$TSDB_PORT/data + DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data ws: image: ghcr.io/genudine/saerro/websocket:latest pull_policy: always restart: always + ports: + - 8999:8999 environment: - DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:$TSDB_PORT/data - WS_ADDR: wss://push.nanite-systems.net/streaming?environment=ps2&service-id=s:saegd + DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data + WS_ADDR: wss://push.nanite-systems.net/streaming?environment=all&service-id=s:saegd WORLDS: all links: - tsdb + + prune: + image: ghcr.io/genudine/saerro/tasks:latest + command: prune + pull_policy: always + restart: "no" + environment: + DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data + links: + - tsdb diff --git a/docker-compose.yaml b/docker-compose.yaml index 61cd819..6d53195 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,15 +1,8 @@ version: "3" services: - redis: - image: redis - command: redis-server --save "" --appendonly no - container_name: redis - ports: - - "6379:6379" - tsdb: - image: timescale/timescaledb:latest-pg14 + image: timescale/timescaledb-ha:pg14-latest environment: POSTGRES_PASSWORD: saerro321 POSTGRES_USER: saerrouser diff --git a/services/api/Cargo.toml b/services/api/Cargo.toml index adf67cc..10ce14e 100644 --- a/services/api/Cargo.toml +++ b/services/api/Cargo.toml @@ -6,13 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] } serde_json = "1.0.89" serde = "1.0.149" async-graphql = { version = "5.0.2" } async-graphql-axum = "5.0.2" axum = "0.6.1" -sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] } +sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] } tokio = { version = "1.23.0", features = [ "full" ] } tower-http = { version = "0.3.5", features = ["cors"] } lazy_static = "1.4.0" \ No newline at end of file diff --git a/services/api/src/classes.rs b/services/api/src/classes.rs index eded390..d63a4e5 100644 --- a/services/api/src/classes.rs +++ b/services/api/src/classes.rs @@ -1,6 +1,4 @@ -use crate::util::zcount; use async_graphql::{Context, Object}; -use redis::aio::MultiplexedConnection; pub struct Classes { world_id: String, @@ -11,8 +9,7 @@ impl Classes { Self { world_id } } async fn by_class<'ctx>(&self, ctx: &Context<'ctx>, class_name: &str) -> u32 { - let con = ctx.data::().unwrap().to_owned(); - zcount(con, format!("c:{}/{}", self.world_id, class_name)).await + 0 } } diff --git a/services/api/src/health.rs b/services/api/src/health.rs index f3ea75e..b598063 100644 --- a/services/api/src/health.rs +++ b/services/api/src/health.rs @@ -1,39 +1,45 @@ use async_graphql::{Context, Enum, Object}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; -use redis::{aio::MultiplexedConnection, pipe}; +use sqlx::{query, Pool, Postgres, Row}; -pub async fn get_health( - Extension(mut redis): Extension, -) -> impl IntoResponse { - let (ping, pc, ps4us, ps4eu): (String, bool, bool, bool) = pipe() - .cmd("PING") - .get("heartbeat:pc") - .get("heartbeat:ps4us") - .get("heartbeat:ps4eu") - .query_async(&mut redis) - .await - .unwrap_or_default(); +pub async fn get_health(Extension(pool): Extension>) -> impl IntoResponse { + let events_resp = + query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") + .fetch_one(&pool) + .await; - if ping != "PONG" { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(json!({ - "status": "error", - "message": "Redis is not responding", - })), - ); + match events_resp { + Ok(row) => { + let events_row: i64 = row.get(0); + + if events_row == 0 { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ + "websocket": "down", + "database": "up" + })), + ); + } else { + return ( + StatusCode::OK, + Json(json!({ + "websocket": "up", + "database": "up" + })), + ); + } + } + Err(_) => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(json!({ + "websocket": "down", + "database": "down" + })), + ); + } } - - ( - StatusCode::OK, - Json(json!({ - "status": if ping == "PONG" && pc && ps4us && ps4eu { "ok" } else { "degraded" }, - "redis": ping == "PONG", - "pc": if pc { "primary" } else { "backup/down" }, - "ps4us": if ps4us { "primary" } else { "backup/down" }, - "ps4eu": if ps4eu { "primary" } else { "backup/down" }, - })), - ) } #[derive(Enum, Copy, Clone, Eq, PartialEq)] @@ -45,70 +51,43 @@ enum UpDown { Down, } -#[derive(Enum, Copy, Clone, Eq, PartialEq)] -enum WebsocketState { - /// The Nanite Systems manifold is sending events, and the primary listener is processing data. - Primary, - - /// The Daybreak Games manifold is sending events, and the backup listener is processing data; the primary listener is down. - Backup, - - /// The entire event streaming system is down. - Down, -} - pub struct Health {} -impl Health { - async fn get_health<'ctx>(&self, ctx: &Context<'ctx>, pair: &str) -> WebsocketState { - let mut con = ctx.data::().unwrap().to_owned(); - let (primary, backup): (Option, Option) = pipe() - .get(format!("heartbeat:{}:primary", pair)) - .get(format!("heartbeat:{}:backup", pair)) - .query_async(&mut con) - .await - .unwrap(); - - match (primary, backup) { - (Some(_), _) => WebsocketState::Primary, - (None, Some(_)) => WebsocketState::Backup, - _ => WebsocketState::Down, - } - } -} - /// Reports on the health of Saerro Listening Post #[Object] impl Health { - /// Did a ping to Redis (our main datastore) succeed? - async fn redis<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { - let mut con = ctx.data::().unwrap().to_owned(); - let ping: String = redis::cmd("PING") - .query_async(&mut con) - .await - .unwrap_or_default(); - if ping == "PONG" { - UpDown::Up - } else { - UpDown::Down + /// Did a ping to Postgres (our main datastore) succeed? + async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { + let pool = ctx.data::>().unwrap(); + + let events_resp = + query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") + .fetch_one(pool) + .await; + + match events_resp { + Ok(_) => UpDown::Up, + Err(_) => UpDown::Down, } } - /// What is the state of the websocket listener cluster for PC? - #[graphql(name = "pc")] - async fn pc<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { - self.get_health(ctx, "pc").await - } + /// Is the websocket connection to the Nanite Systems manifold up? + async fn websocket<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { + let pool = ctx.data::>().unwrap(); - /// What is the state of the websocket listener cluster for PS4 US? - #[graphql(name = "ps4us")] - async fn ps4us<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { - self.get_health(ctx, "ps4us").await - } - - /// What is the state of the websocket listener cluster for PS4 EU? - #[graphql(name = "ps4eu")] - async fn ps4eu<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { - self.get_health(ctx, "ps4eu").await + match query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") + .fetch_one(pool) + .await + { + Ok(i) => { + let num: i64 = i.get(0); + if num == 0 { + UpDown::Down + } else { + UpDown::Up + } + } + Err(_) => UpDown::Down, + } } } diff --git a/services/api/src/main.rs b/services/api/src/main.rs index 7662a81..89cf067 100644 --- a/services/api/src/main.rs +++ b/services/api/src/main.rs @@ -1,7 +1,6 @@ mod classes; mod health; mod query; -mod util; mod vehicles; mod world; @@ -58,20 +57,12 @@ async fn graphiql() -> impl IntoResponse { #[tokio::main] async fn main() { - let redis_url = format!( - "redis://{}:{}", - std::env::var("REDIS_HOST").unwrap_or("localhost".to_string()), - std::env::var("REDIS_PORT").unwrap_or("6379".to_string()), - ); - - let redis = redis::Client::open(redis_url) - .unwrap() - .get_multiplexed_tokio_connection() - .await - .unwrap(); + let db_url = std::env::var("DATABASE_URL") + .unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); + let db = sqlx::PgPool::connect(&db_url).await.unwrap(); let schema = Schema::build(query::Query, EmptyMutation, EmptySubscription) - .data(redis.clone()) + .data(db.clone()) .finish(); let app = Router::new() @@ -83,7 +74,7 @@ async fn main() { ) .route("/graphql/playground", get(graphiql)) .fallback(handle_404) - .layer(Extension(redis)) + .layer(Extension(db)) .layer(Extension(schema)) .layer( CorsLayer::new() diff --git a/services/api/src/util.rs b/services/api/src/util.rs deleted file mode 100644 index 6de5114..0000000 --- a/services/api/src/util.rs +++ /dev/null @@ -1,17 +0,0 @@ -use redis::{aio::MultiplexedConnection, AsyncCommands, FromRedisValue}; -use std::{ - ops::Sub, - time::{Duration, SystemTime}, -}; - -pub async fn zcount(mut con: MultiplexedConnection, key: String) -> RV { - let filter_timestamp = SystemTime::now() - .sub(Duration::from_secs(60 * 15)) - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - - con.zcount::(key, filter_timestamp, "+inf") - .await - .unwrap() -} diff --git a/services/api/src/vehicles.rs b/services/api/src/vehicles.rs index c2097f7..03294b1 100644 --- a/services/api/src/vehicles.rs +++ b/services/api/src/vehicles.rs @@ -1,6 +1,4 @@ -use crate::util::zcount; use async_graphql::{Context, Object}; -use redis::aio::MultiplexedConnection; pub struct Vehicles { world_id: String, @@ -11,8 +9,7 @@ impl Vehicles { Self { world_id } } async fn by_vehicle<'ctx>(&self, ctx: &Context<'ctx>, vehicle_name: &str) -> u32 { - let con = ctx.data::().unwrap().to_owned(); - zcount(con, format!("v:{}/{}", self.world_id, vehicle_name)).await + 0 } } diff --git a/services/api/src/world.rs b/services/api/src/world.rs index 76d749b..b1012bb 100644 --- a/services/api/src/world.rs +++ b/services/api/src/world.rs @@ -1,7 +1,6 @@ -use crate::{classes::Classes, util::zcount, vehicles::Vehicles}; +use crate::{classes::Classes, vehicles::Vehicles}; use async_graphql::{Context, Object}; use lazy_static::lazy_static; -use redis::aio::MultiplexedConnection; use std::collections::HashMap; lazy_static! { @@ -67,8 +66,7 @@ impl World { } async fn population<'ctx>(&self, ctx: &Context<'ctx>) -> u32 { - let con = ctx.data::().unwrap().to_owned(); - zcount(con, format!("wp:{}", self.id)).await + 0 } async fn faction_population(&self) -> FactionPopulation { @@ -92,8 +90,7 @@ struct FactionPopulation { impl FactionPopulation { async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: u8) -> u32 { - let con = ctx.data::().unwrap().to_owned(); - zcount(con, format!("wp:{}/{}", self.world_id, faction)).await + 0 } } diff --git a/services/tasks/src/main.rs b/services/tasks/src/main.rs index 42dd8ad..ba61378 100644 --- a/services/tasks/src/main.rs +++ b/services/tasks/src/main.rs @@ -2,20 +2,11 @@ use async_once::AsyncOnce; use dotenvy::dotenv; use lazy_static::lazy_static; use migrations::cmd_migrate; -use once_cell::sync::Lazy; -use redis::Commands; use sqlx::query; use std::env::args; -use std::ops::Sub; -use std::time::{Duration, SystemTime}; mod migrations; -pub static REDIS_CLIENT: Lazy = Lazy::new(|| { - redis::Client::open(std::env::var("REDIS_ADDR").unwrap_or("redis://localhost:6379".to_string())) - .unwrap() -}); - lazy_static! { pub static ref PG: AsyncOnce = AsyncOnce::new(async { let db_url = std::env::var("DATABASE_URL") @@ -24,36 +15,37 @@ lazy_static! { }); } -fn cmd_prune() { +async fn cmd_prune() { println!("Pruning old data..."); - let mut con = REDIS_CLIENT.get_connection().unwrap(); + let pool = PG.get().await; - let prune_after = SystemTime::now() - .sub(Duration::from_secs(60 * 15)) - .duration_since(SystemTime::UNIX_EPOCH) + let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';") + .execute(pool) + .await .unwrap() - .as_secs(); + .rows_affected(); + println!("Deleted {} rows of old player data", rows); - let keys: Vec = con.keys("wp:*").unwrap(); - for key in keys { - println!("-> Pruning world pop {}", key); - let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); - println!("==> Removed {} items", removed_items); - } + let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';") + .execute(pool) + .await + .unwrap() + .rows_affected(); + println!("Deleted {} rows of old class data", rows); - let keys: Vec = con.keys("v:*").unwrap(); - for key in keys { - println!("-> Pruning vehicle {}", key); - let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); - println!("==> Removed {} items", removed_items); - } + let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';") + .execute(pool) + .await + .unwrap() + .rows_affected(); + println!("Deleted {} rows of old vehicle data", rows); - let keys: Vec = con.keys("c:*").unwrap(); - for key in keys { - println!("-> Pruning class {}", key); - let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); - println!("==> Removed {} items", removed_items); - } + let rows = query("DELETE FROM analytics WHERE time < NOW() - INTERVAL '1 day';") + .execute(pool) + .await + .unwrap() + .rows_affected(); + println!("Deleted {} rows of old analytics data", rows); } fn cmd_help() { @@ -72,7 +64,7 @@ async fn main() { match command.as_str() { "help" => cmd_help(), - "prune" => cmd_prune(), + "prune" => cmd_prune().await, "migrate" => cmd_migrate().await, _ => { println!("Unknown command: {}", command); diff --git a/services/tasks/src/migrations.rs b/services/tasks/src/migrations.rs index de6a9a2..658a9aa 100644 --- a/services/tasks/src/migrations.rs +++ b/services/tasks/src/migrations.rs @@ -4,7 +4,12 @@ use sqlx::query; pub async fn cmd_migrate() { println!("Migrating database..."); - tokio::join!(migrate_players(), migrate_classes(), migrate_vehicles(),); + tokio::join!( + migrate_players(), + migrate_classes(), + migrate_vehicles(), + migrate_analytics() + ); } async fn migrate_players() { @@ -21,8 +26,8 @@ async fn migrate_players() { println!("PLAYERS => CREATE TABLE players"); query( "CREATE TABLE players ( - time TIMESTAMPTZ NOT NULL, character_id TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, world_id INT NOT NULL, faction_id INT NOT NULL, zone_id INT NOT NULL);", @@ -63,8 +68,8 @@ async fn migrate_classes() { println!("CLASSES => CREATE TABLE classes"); query( "CREATE TABLE classes ( - time TIMESTAMPTZ NOT NULL, character_id TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, world_id INT NOT NULL, faction_id INT NOT NULL, zone_id INT NOT NULL, @@ -106,8 +111,8 @@ async fn migrate_vehicles() { println!("VEHICLES => CREATE TABLE vehicles"); query( "CREATE TABLE vehicles ( - time TIMESTAMPTZ NOT NULL, character_id TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, world_id INT NOT NULL, faction_id INT NOT NULL, zone_id INT NOT NULL, @@ -135,3 +140,36 @@ async fn migrate_vehicles() { println!("VEHICLES => done!"); } + +async fn migrate_analytics() { + let pool = PG.get().await; + + println!("-> Migrating analytics"); + println!("ANALYTICS => CREATE TABLE IF NOT EXISTS analytics"); + query( + "CREATE TABLE IF NOT EXISTS analytics ( + time TIMESTAMPTZ NOT NULL, + event_name TEXT NOT NULL, + world_id INT NOT NULL);", + ) + .execute(pool) + .await + .unwrap(); + + println!("ANALYTICS => create_hypertable"); + query( + "SELECT create_hypertable('analytics', 'time', + chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE);", + ) + .execute(pool) + .await + .unwrap(); + + println!("ANALYTICS => add_retention_policy"); + query("SELECT add_retention_policy('analytics', INTERVAL '1 day', if_not_exists => TRUE);") + .execute(pool) + .await + .unwrap(); + + println!("ANALYTICS => done!"); +} diff --git a/services/websocket/src/main.rs b/services/websocket/src/main.rs index 3da9d18..c292c13 100644 --- a/services/websocket/src/main.rs +++ b/services/websocket/src/main.rs @@ -7,15 +7,15 @@ use serde::Deserialize; use serde_aux::prelude::*; use serde_json::json; use sqlx::{postgres::PgPoolOptions, query}; -use std::{env, net::SocketAddr, time::Duration}; +use std::{env, net::SocketAddr}; use tokio::task::JoinSet; use tokio_tungstenite::{connect_async, tungstenite::Message}; mod translators; lazy_static! { - static ref PAIR: String = env::var("PAIR").unwrap_or_default(); - static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string()); + // static ref PAIR: String = env::var("PAIR").unwrap_or_default(); + // static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string()); static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default(); static ref PG: AsyncOnce = AsyncOnce::new(async { let db_url = std::env::var("DATABASE_URL") @@ -72,6 +72,11 @@ struct ClassEvent { team_id: i32, } +struct AnalyticsEvent { + world_id: i32, + event_name: String, +} + // async fn track_pop(pop_event: PopEvent) { // track_pop_db(pop_event.clone()).await; // track_pop_redis(pop_event).await; @@ -165,9 +170,32 @@ async fn track_class(class_event: ClassEvent) { .unwrap(); } +async fn track_analytics(analytics_event: AnalyticsEvent) { + // println!("[ws/track_analytics]"); + let pool = PG.get().await; + + let AnalyticsEvent { + world_id, + event_name, + } = analytics_event; + + query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);") + .bind(world_id) + .bind(event_name) + .execute(pool) + .await + .unwrap(); +} + async fn process_event(event: &Event) { let mut set = JoinSet::new(); // println!("[ws/process_event] EVENT: {:?}", event); + + set.spawn(track_analytics(AnalyticsEvent { + world_id: event.world_id.clone(), + event_name: event.event_name.clone(), + })); + if event.character_id != "0" { // General population tracking set.spawn(track_pop(PopEvent {