diff --git a/Cargo.lock b/Cargo.lock index 7935f7d..06efec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2530,6 +2530,7 @@ version = "0.1.0" dependencies = [ "async_once", "axum", + "chrono", "futures", "futures-util", "lazy_static", diff --git a/services/api/src/analytics.rs b/services/api/src/analytics.rs index 0f3bddf..2131e9f 100644 --- a/services/api/src/analytics.rs +++ b/services/api/src/analytics.rs @@ -1,11 +1,50 @@ -use async_graphql::Object; +use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject}; +use chrono::{DateTime, Utc}; +use sqlx::{query, Pool, Postgres, Row}; pub struct Analytics {} +#[derive(SimpleObject, Debug, Clone)] +pub struct Event { + pub time: DateTime, + pub event_name: String, + pub world_id: i32, + pub count: i64, +} + #[Object] impl Analytics { - async fn population(&self) -> i32 { - 0 + /// Get all events in analytics, bucket_size is in seconds + async fn events<'ctx>( + &self, + ctx: &Context<'ctx>, + #[graphql(default = 60)] bucket_size: u64, + world_id: Option, + ) -> Vec { + let pool = ctx.data::>().unwrap(); + + let sql = format!("SELECT time_bucket('{} seconds', time) AS bucket, count(*), event_name, world_id FROM analytics WHERE time > now() - interval '1 day' {} GROUP BY bucket, world_id, event_name ORDER BY bucket ASC", + bucket_size, + if let Some(world_id) = world_id { + format!("AND world_id = {}", world_id) + } else { + "".to_string() + } + ); + + let mut result = query(sql.as_str()).fetch(pool); + + let mut events = Vec::new(); + while let Some(row) = result.try_next().await.unwrap() { + events.push(Event { + time: row.get("bucket"), + event_name: row.get("event_name"), + world_id: row.get("world_id"), + count: row.get("count"), + }); + } + + events } } diff --git a/services/api/src/html/ingest.html b/services/api/src/html/ingest.html index 60f37d8..f0d6939 100644 --- a/services/api/src/html/ingest.html +++ b/services/api/src/html/ingest.html @@ -14,6 +14,213 @@ color: #cead42; text-decoration: none; } + + .chart-container { + height: 50vh; + position: relative; + } + + .smaller { + height: 33vh; + }

Ingest Stats

-

sorry wip

+
+

All Events by Type

+
+ +
+
+
+

Events by World

+
+ +
+
+
+

Connery

+
+ +
+
+
+

Miller

+
+ +
+
+
+

Cobalt

+
+ +
+
+
+

Emerald

+
+ +
+
+
+

Jaeger

+
+ +
+
+
+

SolTech

+
+ +
+
+
+

Genudine

+
+ +
+
+
+

Ceres

+
+ +
+
+ + + diff --git a/services/api/src/query.rs b/services/api/src/query.rs index d025142..a53f8fb 100644 --- a/services/api/src/query.rs +++ b/services/api/src/query.rs @@ -1,6 +1,6 @@ use crate::{ - classes::ClassesQuery, health::HealthQuery, population::PopulationQuery, - vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery, + analytics::AnalyticsQuery, classes::ClassesQuery, health::HealthQuery, + population::PopulationQuery, vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery, }; use async_graphql::MergedObject; @@ -12,4 +12,5 @@ pub struct Query( WorldQuery, ZoneQuery, HealthQuery, + AnalyticsQuery, ); diff --git a/services/tasks/src/main.rs b/services/tasks/src/main.rs index ba61378..579a91d 100644 --- a/services/tasks/src/main.rs +++ b/services/tasks/src/main.rs @@ -65,6 +65,10 @@ async fn main() { match command.as_str() { "help" => cmd_help(), "prune" => cmd_prune().await, + "auto-prune" => loop { + cmd_prune().await; + tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await; + }, "migrate" => cmd_migrate().await, _ => { println!("Unknown command: {}", command); diff --git a/services/websocket/Cargo.toml b/services/websocket/Cargo.toml index 091231f..b57fcf2 100644 --- a/services/websocket/Cargo.toml +++ b/services/websocket/Cargo.toml @@ -17,4 +17,5 @@ futures-util = "0.3.25" futures = "0.3.25" async_once = "0.2.6" serde-aux = "4.1.2" -axum = "0.6.1" \ No newline at end of file +axum = "0.6.1" +chrono = "0.4.23" \ No newline at end of file diff --git a/services/websocket/src/main.rs b/services/websocket/src/main.rs index d5f2954..465b83a 100644 --- a/services/websocket/src/main.rs +++ b/services/websocket/src/main.rs @@ -1,5 +1,5 @@ use async_once::AsyncOnce; -use axum::{routing::get, Router}; +use axum::{routing::get, Json, Router}; use futures::{pin_mut, FutureExt}; use futures_util::StreamExt; use lazy_static::lazy_static; @@ -283,7 +283,14 @@ struct Payload { } async fn healthz() { - let app = Router::new().route("/healthz", get(|| async { "ok" })); + let app = Router::new().route( + "/healthz", + get(|| async { + Json(json!({ + "status": "ok", + })) + }), + ); let port: u16 = std::env::var("PORT") .unwrap_or("8999".to_string()) @@ -314,7 +321,7 @@ async fn main() { let fused_writer = rx.map(Ok).forward(write).fuse(); let fused_reader = read - .for_each(|msg| async move { + .for_each(|msg| async { let body = &msg.unwrap().to_string(); let data: Payload = match serde_json::from_str(body) {