websocket done, api needs rebuild

This commit is contained in:
41666 2022-12-07 23:42:19 -05:00
parent 5c3a9a1888
commit 50c4ac387a
13 changed files with 192 additions and 195 deletions

1
Cargo.lock generated
View file

@ -49,7 +49,6 @@ dependencies = [
"async-graphql-axum", "async-graphql-axum",
"axum", "axum",
"lazy_static", "lazy_static",
"redis",
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",

View file

@ -3,14 +3,6 @@
version: '3.7' version: '3.7'
services: services:
redis:
image: redis:alpine
command: redis-server --save "" --appendonly no
container_name: redis
restart: always
ports:
- 6379:6379
tsdb: tsdb:
image: timescale/timescaledb:latest-pg14 image: timescale/timescaledb:latest-pg14
environment: environment:
@ -29,15 +21,27 @@ services:
- tsdb - tsdb
restart: always restart: always
environment: environment:
DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:$TSDB_PORT/data DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data
ws: ws:
image: ghcr.io/genudine/saerro/websocket:latest image: ghcr.io/genudine/saerro/websocket:latest
pull_policy: always pull_policy: always
restart: always restart: always
ports:
- 8999:8999
environment: environment:
DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:$TSDB_PORT/data DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data
WS_ADDR: wss://push.nanite-systems.net/streaming?environment=ps2&service-id=s:saegd WS_ADDR: wss://push.nanite-systems.net/streaming?environment=all&service-id=s:saegd
WORLDS: all WORLDS: all
links: links:
- tsdb - tsdb
prune:
image: ghcr.io/genudine/saerro/tasks:latest
command: prune
pull_policy: always
restart: "no"
environment:
DATABASE_ADDR: postgres://saerrouser:saerro321@tsdb:5432/data
links:
- tsdb

View file

@ -1,15 +1,8 @@
version: "3" version: "3"
services: services:
redis:
image: redis
command: redis-server --save "" --appendonly no
container_name: redis
ports:
- "6379:6379"
tsdb: tsdb:
image: timescale/timescaledb:latest-pg14 image: timescale/timescaledb-ha:pg14-latest
environment: environment:
POSTGRES_PASSWORD: saerro321 POSTGRES_PASSWORD: saerro321
POSTGRES_USER: saerrouser POSTGRES_USER: saerrouser

View file

@ -6,13 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] }
serde_json = "1.0.89" serde_json = "1.0.89"
serde = "1.0.149" serde = "1.0.149"
async-graphql = { version = "5.0.2" } async-graphql = { version = "5.0.2" }
async-graphql-axum = "5.0.2" async-graphql-axum = "5.0.2"
axum = "0.6.1" axum = "0.6.1"
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] } sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] }
tokio = { version = "1.23.0", features = [ "full" ] } tokio = { version = "1.23.0", features = [ "full" ] }
tower-http = { version = "0.3.5", features = ["cors"] } tower-http = { version = "0.3.5", features = ["cors"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"

View file

@ -1,6 +1,4 @@
use crate::util::zcount;
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use redis::aio::MultiplexedConnection;
pub struct Classes { pub struct Classes {
world_id: String, world_id: String,
@ -11,8 +9,7 @@ impl Classes {
Self { world_id } Self { world_id }
} }
async fn by_class<'ctx>(&self, ctx: &Context<'ctx>, class_name: &str) -> u32 { async fn by_class<'ctx>(&self, ctx: &Context<'ctx>, class_name: &str) -> u32 {
let con = ctx.data::<MultiplexedConnection>().unwrap().to_owned(); 0
zcount(con, format!("c:{}/{}", self.world_id, class_name)).await
} }
} }

View file

@ -1,39 +1,45 @@
use async_graphql::{Context, Enum, Object}; use async_graphql::{Context, Enum, Object};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use redis::{aio::MultiplexedConnection, pipe}; use sqlx::{query, Pool, Postgres, Row};
pub async fn get_health( pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
Extension(mut redis): Extension<redis::aio::MultiplexedConnection>, let events_resp =
) -> impl IntoResponse { query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
let (ping, pc, ps4us, ps4eu): (String, bool, bool, bool) = pipe() .fetch_one(&pool)
.cmd("PING") .await;
.get("heartbeat:pc")
.get("heartbeat:ps4us")
.get("heartbeat:ps4eu")
.query_async(&mut redis)
.await
.unwrap_or_default();
if ping != "PONG" { match events_resp {
return ( Ok(row) => {
StatusCode::SERVICE_UNAVAILABLE, let events_row: i64 = row.get(0);
Json(json!({
"status": "error", if events_row == 0 {
"message": "Redis is not responding", return (
})), StatusCode::SERVICE_UNAVAILABLE,
); Json(json!({
"websocket": "down",
"database": "up"
})),
);
} else {
return (
StatusCode::OK,
Json(json!({
"websocket": "up",
"database": "up"
})),
);
}
}
Err(_) => {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"websocket": "down",
"database": "down"
})),
);
}
} }
(
StatusCode::OK,
Json(json!({
"status": if ping == "PONG" && pc && ps4us && ps4eu { "ok" } else { "degraded" },
"redis": ping == "PONG",
"pc": if pc { "primary" } else { "backup/down" },
"ps4us": if ps4us { "primary" } else { "backup/down" },
"ps4eu": if ps4eu { "primary" } else { "backup/down" },
})),
)
} }
#[derive(Enum, Copy, Clone, Eq, PartialEq)] #[derive(Enum, Copy, Clone, Eq, PartialEq)]
@ -45,70 +51,43 @@ enum UpDown {
Down, Down,
} }
#[derive(Enum, Copy, Clone, Eq, PartialEq)]
enum WebsocketState {
/// The Nanite Systems manifold is sending events, and the primary listener is processing data.
Primary,
/// The Daybreak Games manifold is sending events, and the backup listener is processing data; the primary listener is down.
Backup,
/// The entire event streaming system is down.
Down,
}
pub struct Health {} pub struct Health {}
impl Health {
async fn get_health<'ctx>(&self, ctx: &Context<'ctx>, pair: &str) -> WebsocketState {
let mut con = ctx.data::<MultiplexedConnection>().unwrap().to_owned();
let (primary, backup): (Option<String>, Option<String>) = pipe()
.get(format!("heartbeat:{}:primary", pair))
.get(format!("heartbeat:{}:backup", pair))
.query_async(&mut con)
.await
.unwrap();
match (primary, backup) {
(Some(_), _) => WebsocketState::Primary,
(None, Some(_)) => WebsocketState::Backup,
_ => WebsocketState::Down,
}
}
}
/// Reports on the health of Saerro Listening Post /// Reports on the health of Saerro Listening Post
#[Object] #[Object]
impl Health { impl Health {
/// Did a ping to Redis (our main datastore) succeed? /// Did a ping to Postgres (our main datastore) succeed?
async fn redis<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
let mut con = ctx.data::<MultiplexedConnection>().unwrap().to_owned(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
let ping: String = redis::cmd("PING")
.query_async(&mut con) let events_resp =
.await query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.unwrap_or_default(); .fetch_one(pool)
if ping == "PONG" { .await;
UpDown::Up
} else { match events_resp {
UpDown::Down Ok(_) => UpDown::Up,
Err(_) => UpDown::Down,
} }
} }
/// What is the state of the websocket listener cluster for PC? /// Is the websocket connection to the Nanite Systems manifold up?
#[graphql(name = "pc")] async fn websocket<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
async fn pc<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { let pool = ctx.data::<Pool<Postgres>>().unwrap();
self.get_health(ctx, "pc").await
}
/// What is the state of the websocket listener cluster for PS4 US? match query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
#[graphql(name = "ps4us")] .fetch_one(pool)
async fn ps4us<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { .await
self.get_health(ctx, "ps4us").await {
} Ok(i) => {
let num: i64 = i.get(0);
/// What is the state of the websocket listener cluster for PS4 EU? if num == 0 {
#[graphql(name = "ps4eu")] UpDown::Down
async fn ps4eu<'ctx>(&self, ctx: &Context<'ctx>) -> WebsocketState { } else {
self.get_health(ctx, "ps4eu").await UpDown::Up
}
}
Err(_) => UpDown::Down,
}
} }
} }

View file

@ -1,7 +1,6 @@
mod classes; mod classes;
mod health; mod health;
mod query; mod query;
mod util;
mod vehicles; mod vehicles;
mod world; mod world;
@ -58,20 +57,12 @@ async fn graphiql() -> impl IntoResponse {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let redis_url = format!( let db_url = std::env::var("DATABASE_URL")
"redis://{}:{}", .unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
std::env::var("REDIS_HOST").unwrap_or("localhost".to_string()), let db = sqlx::PgPool::connect(&db_url).await.unwrap();
std::env::var("REDIS_PORT").unwrap_or("6379".to_string()),
);
let redis = redis::Client::open(redis_url)
.unwrap()
.get_multiplexed_tokio_connection()
.await
.unwrap();
let schema = Schema::build(query::Query, EmptyMutation, EmptySubscription) let schema = Schema::build(query::Query, EmptyMutation, EmptySubscription)
.data(redis.clone()) .data(db.clone())
.finish(); .finish();
let app = Router::new() let app = Router::new()
@ -83,7 +74,7 @@ async fn main() {
) )
.route("/graphql/playground", get(graphiql)) .route("/graphql/playground", get(graphiql))
.fallback(handle_404) .fallback(handle_404)
.layer(Extension(redis)) .layer(Extension(db))
.layer(Extension(schema)) .layer(Extension(schema))
.layer( .layer(
CorsLayer::new() CorsLayer::new()

View file

@ -1,17 +0,0 @@
use redis::{aio::MultiplexedConnection, AsyncCommands, FromRedisValue};
use std::{
ops::Sub,
time::{Duration, SystemTime},
};
pub async fn zcount<RV: FromRedisValue>(mut con: MultiplexedConnection, key: String) -> RV {
let filter_timestamp = SystemTime::now()
.sub(Duration::from_secs(60 * 15))
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
con.zcount::<String, u64, &'static str, RV>(key, filter_timestamp, "+inf")
.await
.unwrap()
}

View file

@ -1,6 +1,4 @@
use crate::util::zcount;
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use redis::aio::MultiplexedConnection;
pub struct Vehicles { pub struct Vehicles {
world_id: String, world_id: String,
@ -11,8 +9,7 @@ impl Vehicles {
Self { world_id } Self { world_id }
} }
async fn by_vehicle<'ctx>(&self, ctx: &Context<'ctx>, vehicle_name: &str) -> u32 { async fn by_vehicle<'ctx>(&self, ctx: &Context<'ctx>, vehicle_name: &str) -> u32 {
let con = ctx.data::<MultiplexedConnection>().unwrap().to_owned(); 0
zcount(con, format!("v:{}/{}", self.world_id, vehicle_name)).await
} }
} }

View file

@ -1,7 +1,6 @@
use crate::{classes::Classes, util::zcount, vehicles::Vehicles}; use crate::{classes::Classes, vehicles::Vehicles};
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use redis::aio::MultiplexedConnection;
use std::collections::HashMap; use std::collections::HashMap;
lazy_static! { lazy_static! {
@ -67,8 +66,7 @@ impl World {
} }
async fn population<'ctx>(&self, ctx: &Context<'ctx>) -> u32 { async fn population<'ctx>(&self, ctx: &Context<'ctx>) -> u32 {
let con = ctx.data::<MultiplexedConnection>().unwrap().to_owned(); 0
zcount(con, format!("wp:{}", self.id)).await
} }
async fn faction_population(&self) -> FactionPopulation { async fn faction_population(&self) -> FactionPopulation {
@ -92,8 +90,7 @@ struct FactionPopulation {
impl FactionPopulation { impl FactionPopulation {
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: u8) -> u32 { async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: u8) -> u32 {
let con = ctx.data::<MultiplexedConnection>().unwrap().to_owned(); 0
zcount(con, format!("wp:{}/{}", self.world_id, faction)).await
} }
} }

View file

@ -2,20 +2,11 @@ use async_once::AsyncOnce;
use dotenvy::dotenv; use dotenvy::dotenv;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use migrations::cmd_migrate; use migrations::cmd_migrate;
use once_cell::sync::Lazy;
use redis::Commands;
use sqlx::query; use sqlx::query;
use std::env::args; use std::env::args;
use std::ops::Sub;
use std::time::{Duration, SystemTime};
mod migrations; mod migrations;
pub static REDIS_CLIENT: Lazy<redis::Client> = Lazy::new(|| {
redis::Client::open(std::env::var("REDIS_ADDR").unwrap_or("redis://localhost:6379".to_string()))
.unwrap()
});
lazy_static! { lazy_static! {
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
@ -24,36 +15,37 @@ lazy_static! {
}); });
} }
fn cmd_prune() { async fn cmd_prune() {
println!("Pruning old data..."); println!("Pruning old data...");
let mut con = REDIS_CLIENT.get_connection().unwrap(); let pool = PG.get().await;
let prune_after = SystemTime::now() let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';")
.sub(Duration::from_secs(60 * 15)) .execute(pool)
.duration_since(SystemTime::UNIX_EPOCH) .await
.unwrap() .unwrap()
.as_secs(); .rows_affected();
println!("Deleted {} rows of old player data", rows);
let keys: Vec<String> = con.keys("wp:*").unwrap(); let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';")
for key in keys { .execute(pool)
println!("-> Pruning world pop {}", key); .await
let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); .unwrap()
println!("==> Removed {} items", removed_items); .rows_affected();
} println!("Deleted {} rows of old class data", rows);
let keys: Vec<String> = con.keys("v:*").unwrap(); let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';")
for key in keys { .execute(pool)
println!("-> Pruning vehicle {}", key); .await
let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); .unwrap()
println!("==> Removed {} items", removed_items); .rows_affected();
} println!("Deleted {} rows of old vehicle data", rows);
let keys: Vec<String> = con.keys("c:*").unwrap(); let rows = query("DELETE FROM analytics WHERE time < NOW() - INTERVAL '1 day';")
for key in keys { .execute(pool)
println!("-> Pruning class {}", key); .await
let removed_items: u64 = con.zrembyscore(key, 0, prune_after).unwrap(); .unwrap()
println!("==> Removed {} items", removed_items); .rows_affected();
} println!("Deleted {} rows of old analytics data", rows);
} }
fn cmd_help() { fn cmd_help() {
@ -72,7 +64,7 @@ async fn main() {
match command.as_str() { match command.as_str() {
"help" => cmd_help(), "help" => cmd_help(),
"prune" => cmd_prune(), "prune" => cmd_prune().await,
"migrate" => cmd_migrate().await, "migrate" => cmd_migrate().await,
_ => { _ => {
println!("Unknown command: {}", command); println!("Unknown command: {}", command);

View file

@ -4,7 +4,12 @@ use sqlx::query;
pub async fn cmd_migrate() { pub async fn cmd_migrate() {
println!("Migrating database..."); println!("Migrating database...");
tokio::join!(migrate_players(), migrate_classes(), migrate_vehicles(),); tokio::join!(
migrate_players(),
migrate_classes(),
migrate_vehicles(),
migrate_analytics()
);
} }
async fn migrate_players() { async fn migrate_players() {
@ -21,8 +26,8 @@ async fn migrate_players() {
println!("PLAYERS => CREATE TABLE players"); println!("PLAYERS => CREATE TABLE players");
query( query(
"CREATE TABLE players ( "CREATE TABLE players (
time TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL, character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL, world_id INT NOT NULL,
faction_id INT NOT NULL, faction_id INT NOT NULL,
zone_id INT NOT NULL);", zone_id INT NOT NULL);",
@ -63,8 +68,8 @@ async fn migrate_classes() {
println!("CLASSES => CREATE TABLE classes"); println!("CLASSES => CREATE TABLE classes");
query( query(
"CREATE TABLE classes ( "CREATE TABLE classes (
time TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL, character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL, world_id INT NOT NULL,
faction_id INT NOT NULL, faction_id INT NOT NULL,
zone_id INT NOT NULL, zone_id INT NOT NULL,
@ -106,8 +111,8 @@ async fn migrate_vehicles() {
println!("VEHICLES => CREATE TABLE vehicles"); println!("VEHICLES => CREATE TABLE vehicles");
query( query(
"CREATE TABLE vehicles ( "CREATE TABLE vehicles (
time TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL, character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL, world_id INT NOT NULL,
faction_id INT NOT NULL, faction_id INT NOT NULL,
zone_id INT NOT NULL, zone_id INT NOT NULL,
@ -135,3 +140,36 @@ async fn migrate_vehicles() {
println!("VEHICLES => done!"); println!("VEHICLES => done!");
} }
async fn migrate_analytics() {
let pool = PG.get().await;
println!("-> Migrating analytics");
println!("ANALYTICS => CREATE TABLE IF NOT EXISTS analytics");
query(
"CREATE TABLE IF NOT EXISTS analytics (
time TIMESTAMPTZ NOT NULL,
event_name TEXT NOT NULL,
world_id INT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("ANALYTICS => create_hypertable");
query(
"SELECT create_hypertable('analytics', 'time',
chunk_time_interval => INTERVAL '1 hour', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("ANALYTICS => add_retention_policy");
query("SELECT add_retention_policy('analytics', INTERVAL '1 day', if_not_exists => TRUE);")
.execute(pool)
.await
.unwrap();
println!("ANALYTICS => done!");
}

View file

@ -7,15 +7,15 @@ use serde::Deserialize;
use serde_aux::prelude::*; use serde_aux::prelude::*;
use serde_json::json; use serde_json::json;
use sqlx::{postgres::PgPoolOptions, query}; use sqlx::{postgres::PgPoolOptions, query};
use std::{env, net::SocketAddr, time::Duration}; use std::{env, net::SocketAddr};
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators; mod translators;
lazy_static! { lazy_static! {
static ref PAIR: String = env::var("PAIR").unwrap_or_default(); // static ref PAIR: String = env::var("PAIR").unwrap_or_default();
static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string()); // static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default(); static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
@ -72,6 +72,11 @@ struct ClassEvent {
team_id: i32, team_id: i32,
} }
struct AnalyticsEvent {
world_id: i32,
event_name: String,
}
// async fn track_pop(pop_event: PopEvent) { // async fn track_pop(pop_event: PopEvent) {
// track_pop_db(pop_event.clone()).await; // track_pop_db(pop_event.clone()).await;
// track_pop_redis(pop_event).await; // track_pop_redis(pop_event).await;
@ -165,9 +170,32 @@ async fn track_class(class_event: ClassEvent) {
.unwrap(); .unwrap();
} }
async fn track_analytics(analytics_event: AnalyticsEvent) {
// println!("[ws/track_analytics]");
let pool = PG.get().await;
let AnalyticsEvent {
world_id,
event_name,
} = analytics_event;
query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
.bind(world_id)
.bind(event_name)
.execute(pool)
.await
.unwrap();
}
async fn process_event(event: &Event) { async fn process_event(event: &Event) {
let mut set = JoinSet::new(); let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event); // println!("[ws/process_event] EVENT: {:?}", event);
set.spawn(track_analytics(AnalyticsEvent {
world_id: event.world_id.clone(),
event_name: event.event_name.clone(),
}));
if event.character_id != "0" { if event.character_id != "0" {
// General population tracking // General population tracking
set.spawn(track_pop(PopEvent { set.spawn(track_pop(PopEvent {