diff --git a/README.md b/README.md index 1fe1ef6..62c1dd5 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ cargo run --bin tasks migrate # Start NSS ingest. Use push.planetside2.com if NSS isn't quite working... env \ WS_ADDR="wss://push.nanite-systems.net/streaming?environment=all&service-id=s:$SERVICE_ID" \ - WORLDS=all + WORLDS=all \ cargo run --bin websocket # Start API diff --git a/docker-compose.yaml b/docker-compose.yaml index 6d53195..a75f49f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ version: "3" services: tsdb: - image: timescale/timescaledb-ha:pg14-latest + image: timescale/timescaledb:latest-pg14 environment: POSTGRES_PASSWORD: saerro321 POSTGRES_USER: saerrouser diff --git a/services/api/src/classes.rs b/services/api/src/classes.rs index 95817cd..40b59dd 100644 --- a/services/api/src/classes.rs +++ b/services/api/src/classes.rs @@ -16,7 +16,7 @@ impl Class { let pool = ctx.data::>().unwrap(); let sql = format!( - "SELECT count(distinct character_id) FROM classes WHERE time > now() - interval '15 minutes' AND class_id = $1 {};", + "SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND class_id = $1 {};", filters.sql(), ); diff --git a/services/api/src/vehicles.rs b/services/api/src/vehicles.rs index 9642067..bcb3442 100644 --- a/services/api/src/vehicles.rs +++ b/services/api/src/vehicles.rs @@ -16,7 +16,7 @@ impl Vehicle { let pool = ctx.data::>().unwrap(); let sql = format!( - "SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};", + "SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};", filters.sql(), ); @@ -89,7 +89,7 @@ impl Vehicles { let pool = ctx.data::>().unwrap(); let sql = format!( - "SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' {};", + "SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND vehicle_id != 'unknown' {};", self.filters.sql(), ); diff --git a/services/tasks/src/main.rs b/services/tasks/src/main.rs index 2387e59..0abf2de 100644 --- a/services/tasks/src/main.rs +++ b/services/tasks/src/main.rs @@ -25,20 +25,6 @@ async fn cmd_prune() { .rows_affected(); println!("Deleted {} rows of old player data", rows); - let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';") - .execute(pool) - .await - .unwrap() - .rows_affected(); - println!("Deleted {} rows of old class data", rows); - - let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';") - .execute(pool) - .await - .unwrap() - .rows_affected(); - println!("Deleted {} rows of old vehicle data", rows); - let rows = query("DELETE FROM analytics WHERE time < NOW() - INTERVAL '1 day';") .execute(pool) .await diff --git a/services/tasks/src/migrations.rs b/services/tasks/src/migrations.rs index ffa08e1..da53f7c 100644 --- a/services/tasks/src/migrations.rs +++ b/services/tasks/src/migrations.rs @@ -4,12 +4,7 @@ use sqlx::{query, Row}; pub async fn cmd_migrate() { println!("Migrating database..."); - tokio::join!( - migrate_players(), - migrate_classes(), - migrate_vehicles(), - migrate_analytics() - ); + tokio::join!(migrate_players(), migrate_analytics()); } async fn migrate_players() { @@ -30,7 +25,10 @@ async fn migrate_players() { time TIMESTAMPTZ NOT NULL, world_id INT NOT NULL, faction_id INT NOT NULL, - zone_id INT NOT NULL);", + zone_id INT NOT NULL, + class_id TEXT NOT NULL, + vehicle_id TEXT NOT NULL + );", ) .execute(pool) .await @@ -54,93 +52,6 @@ async fn migrate_players() { println!("PLAYERS => done!"); } -async fn migrate_classes() { - let pool = PG.get().await; - - println!("-> Migrating classes"); - - println!("CLASSES => DROP TABLE IF EXISTS classes"); - query("DROP TABLE IF EXISTS classes") - .execute(pool) - .await - .unwrap(); - - println!("CLASSES => CREATE TABLE classes"); - query( - "CREATE TABLE classes ( - character_id TEXT NOT NULL, - time TIMESTAMPTZ NOT NULL, - world_id INT NOT NULL, - faction_id INT NOT NULL, - zone_id INT NOT NULL, - class_id TEXT NOT NULL);", - ) - .execute(pool) - .await - .unwrap(); - - println!("CLASSES => create_hypertable"); - query( - "SELECT create_hypertable('classes', 'time', - chunk_time_interval => INTERVAL '5 minutes', if_not_exists => TRUE);", - ) - .execute(pool) - .await - .unwrap(); - - println!("CLASSES => add_retention_policy"); - query("SELECT add_retention_policy('classes', INTERVAL '15 minutes');") - .execute(pool) - .await - .unwrap(); - - println!("CLASSES => done!"); -} - -async fn migrate_vehicles() { - let pool = PG.get().await; - - println!("-> Migrating vehicles"); - - println!("VEHICLES => DROP TABLE IF EXISTS vehicles"); - query("DROP TABLE IF EXISTS vehicles") - .execute(pool) - .await - .unwrap(); - - println!("VEHICLES => CREATE TABLE vehicles"); - query( - "CREATE TABLE vehicles ( - character_id TEXT NOT NULL, - time TIMESTAMPTZ NOT NULL, - world_id INT NOT NULL, - faction_id INT NOT NULL, - zone_id INT NOT NULL, - vehicle_id TEXT NOT NULL);", - ) - .execute(pool) - .await - .unwrap(); - - println!("VEHICLES => create_hypertable"); - query( - "SELECT create_hypertable('vehicles', 'time', - chunk_time_interval => INTERVAL '5 minutes', if_not_exists => TRUE);", - ) - .execute(pool) - .await - .unwrap(); - - println!("VEHICLES => add_retention_policy"); - - query("SELECT add_retention_policy('vehicles', INTERVAL '15 minutes');") - .execute(pool) - .await - .unwrap(); - - println!("VEHICLES => done!"); -} - async fn migrate_analytics() { let pool = PG.get().await; @@ -177,11 +88,11 @@ async fn migrate_analytics() { pub async fn is_migrated() -> bool { let pool = PG.get().await; - let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'vehicles', 'classes', 'analytics');") + let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'analytics');") .fetch_one(pool) .await .unwrap() .get(0); - tables == 4 + tables == 2 } diff --git a/services/websocket/src/main.rs b/services/websocket/src/main.rs index 0db3098..541fad6 100644 --- a/services/websocket/src/main.rs +++ b/services/websocket/src/main.rs @@ -27,10 +27,9 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender) { let worlds: Vec<&str> = worlds_raw.split(',').collect(); let experience_ids = vec![ - 2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, - 98, 99, 100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, - 233, 293, 294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, - 675, + 2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, + 100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, 233, 293, + 294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, 675, ]; let mut events = experience_ids .iter() @@ -63,22 +62,8 @@ struct PopEvent { team_id: i32, character_id: String, zone_id: i32, -} - -struct VehicleEvent { - world_id: i32, - vehicle_id: String, - character_id: String, - zone_id: i32, - team_id: i32, -} - -struct ClassEvent { - world_id: i32, - character_id: String, loadout_id: String, - zone_id: i32, - team_id: i32, + vehicle_id: String, } struct AnalyticsEvent { @@ -111,85 +96,29 @@ async fn track_pop(pop_event: PopEvent) { team_id, character_id, zone_id, + loadout_id, + vehicle_id, } = pop_event; - query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);") + let class_name = translators::loadout_to_class(loadout_id.as_str()); + let vehicle_name = if vehicle_id == "" { + "unknown".to_string() + } else { + translators::vehicle_to_name(vehicle_id.as_str()) + }; + + query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id, class_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5, $6);") .bind(character_id) .bind(world_id) .bind(team_id) .bind(zone_id) + .bind(class_name) + .bind(vehicle_name) .execute(pool) .await .unwrap(); } -async fn track_vehicle(vehicle_event: VehicleEvent) { - // println!("[ws/track_vehicle]"); - let pool = PG.get().await; - - let VehicleEvent { - world_id, - vehicle_id, - zone_id, - character_id, - team_id, - } = vehicle_event; - - let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str()); - - if vehicle_name == "unknown" { - return; - } - - query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);") - .bind(character_id) - .bind(world_id) - .bind(team_id) - .bind(zone_id) - .bind(vehicle_name) - .execute(pool) - .await - .unwrap(); -} - -async fn track_class(class_event: ClassEvent) { - // println!("[ws/track_class]"); - let pool = PG.get().await; - - let ClassEvent { - world_id, - character_id, - loadout_id, - zone_id, - team_id, - } = class_event; - - let class_name = translators::loadout_to_class(loadout_id.as_str()); - - if class_name == "unknown" { - return; - } - - query( - "INSERT INTO classes ( - time, - character_id, - world_id, - faction_id, - zone_id, - class_id - ) VALUES (now(), $1, $2, $3, $4, $5);", - ) - .bind(character_id) - .bind(world_id) - .bind(team_id) - .bind(zone_id) - .bind(class_name) - .execute(pool) - .await - .unwrap(); -} - async fn track_analytics(analytics_event: AnalyticsEvent) { // println!("[ws/track_analytics]"); let pool = PG.get().await; @@ -216,33 +145,14 @@ async fn process_death_event(event: &Event) { event_name: event.event_name.clone(), })); - if event.character_id != "0" { - // General population tracking + if event.character_id != "" && event.character_id != "0" { set.spawn(track_pop(PopEvent { world_id: event.world_id.clone(), team_id: event.team_id.clone(), character_id: event.character_id.clone(), zone_id: event.zone_id.clone(), - })); - } - - if event.event_name == "VehicleDestroy" { - set.spawn(track_vehicle(VehicleEvent { - world_id: event.world_id.clone(), - vehicle_id: event.vehicle_id.clone(), - character_id: event.character_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), - })); - } - - if event.event_name == "Death" { - set.spawn(track_class(ClassEvent { - world_id: event.world_id.clone(), - character_id: event.character_id.clone(), loadout_id: event.loadout_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), + vehicle_id: event.vehicle_id.clone(), })); } @@ -255,27 +165,9 @@ async fn process_death_event(event: &Event) { team_id: event.attacker_team_id.clone(), character_id: event.attacker_character_id.clone(), zone_id: event.zone_id.clone(), + loadout_id: event.attacker_loadout_id.clone(), + vehicle_id: event.attacker_vehicle_id.clone(), })); - - if event.event_name == "VehicleDestroy" { - set.spawn(track_vehicle(VehicleEvent { - world_id: event.world_id.clone(), - vehicle_id: event.attacker_vehicle_id.clone(), - character_id: event.attacker_character_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.attacker_team_id.clone(), - })); - } - - if event.event_name == "Death" { - set.spawn(track_class(ClassEvent { - world_id: event.world_id.clone(), - character_id: event.attacker_character_id.clone(), - loadout_id: event.attacker_loadout_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.attacker_team_id.clone(), - })); - } } while let Some(_) = set.join_next().await {} @@ -290,56 +182,23 @@ async fn process_exp_event(event: &Event) { event_name: event.event_name.clone(), })); + // Vehicle EXP events + let vehicle_id = match event.experience_id { + 201 => "11".to_string(), // Galaxy Spawn Bonus + 233 => "2".to_string(), // Sunderer Spawn Bonus + 674 | 675 => "160".to_string(), // ANT stuff + _ => "".to_string(), + }; + set.spawn(track_pop(PopEvent { world_id: event.world_id.clone(), team_id: event.team_id.clone(), character_id: event.character_id.clone(), zone_id: event.zone_id.clone(), - })); - - set.spawn(track_class(ClassEvent { - world_id: event.world_id.clone(), - character_id: event.character_id.clone(), loadout_id: event.loadout_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), + vehicle_id: vehicle_id.clone(), })); - // Vehicle EXP events - match event.experience_id { - 201 => { - // Galaxy Spawn Bonus - set.spawn(track_vehicle(VehicleEvent { - world_id: event.world_id.clone(), - vehicle_id: "11".to_string(), - character_id: event.character_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), - })); - } - 233 => { - // Sunderer Spawn Bonus - set.spawn(track_vehicle(VehicleEvent { - world_id: event.world_id.clone(), - vehicle_id: "2".to_string(), - character_id: event.character_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), - })); - } - 674 | 675 => { - // ANT stuff - set.spawn(track_vehicle(VehicleEvent { - world_id: event.world_id.clone(), - vehicle_id: "160".to_string(), - character_id: event.character_id.clone(), - zone_id: event.zone_id.clone(), - team_id: event.team_id.clone(), - })); - } - _ => {} - } - while let Some(_) = set.join_next().await {} } #[derive(Deserialize, Debug, Clone, Default)]