This commit is contained in:
41666 2022-12-07 00:01:38 -05:00
parent 1f2e3e6eab
commit 5c3a9a1888
11 changed files with 950 additions and 253 deletions

View file

@ -8,10 +8,14 @@ edition = "2021"
[dependencies]
redis = { version = "0.22.1", default_features = false, features = ["r2d2"] }
lazy_static = "1.4.0"
tokio-tungstenite = { version = "0.17.2", features=["native-tls"] }
tokio-tungstenite = { version = "0.18.0", features=["native-tls"] }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
serde = { version = "1.0.148", features = ["derive"] }
tokio = { version = "1.22.0" }
tokio = { version = "1.23.0", features = ["full"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
url = "2.3.1"
futures-util = "0.3.25"
futures = "0.3.25"
async_once = "0.2.6"
serde-aux = "4.1.2"
axum = "0.6.1"

View file

@ -1,24 +1,27 @@
use async_once::AsyncOnce;
use axum::{routing::get, Router};
use futures::{pin_mut, FutureExt};
use futures_util::StreamExt;
use lazy_static::lazy_static;
use redis::Commands;
use serde::Deserialize;
use serde_aux::prelude::*;
use serde_json::json;
use std::{env, time::SystemTime};
use sqlx::{postgres::PgPoolOptions, query};
use std::{env, net::SocketAddr, time::Duration};
use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators;
lazy_static! {
static ref REDIS_CLIENT: redis::Client = redis::Client::open(format!(
"redis://{}:{}",
std::env::var("REDIS_HOST").unwrap_or("localhost".to_string()),
std::env::var("REDIS_PORT").unwrap_or("6379".to_string()),
))
.unwrap();
static ref PAIR: String = env::var("PAIR").unwrap_or_default();
static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
PgPoolOptions::new().connect(&db_url).await.unwrap()
});
}
async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
@ -42,54 +45,69 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
tx.unbounded_send(Message::text(setup_msg.to_string()))
.unwrap();
println!("Sent setup message");
println!("[ws] Sent setup message");
}
#[derive(Clone)]
struct PopEvent {
world_id: String,
team_id: String,
world_id: i32,
team_id: i32,
character_id: String,
timestamp: u64,
zone_id: i32,
}
struct VehicleEvent {
world_id: String,
world_id: i32,
vehicle_id: String,
character_id: String,
timestamp: u64,
zone_id: i32,
team_id: i32,
}
struct ClassEvent {
world_id: String,
world_id: i32,
character_id: String,
loadout_id: String,
timestamp: u64,
zone_id: i32,
team_id: i32,
}
// async fn track_pop(pop_event: PopEvent) {
// track_pop_db(pop_event.clone()).await;
// track_pop_redis(pop_event).await;
// }
async fn track_pop(pop_event: PopEvent) {
let mut con = REDIS_CLIENT.get_connection().unwrap();
// println!("[ws/track_pop]");
let pool = PG.get().await;
let PopEvent {
world_id,
team_id,
character_id,
timestamp,
zone_id,
} = pop_event;
let key = format!("wp:{}/{}", world_id, team_id);
let _: () = con.zadd(key, character_id.clone(), timestamp).unwrap();
let key = format!("wp:{}", world_id);
let _: () = con.zadd(key, character_id, timestamp).unwrap();
query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.execute(pool)
.await
.unwrap();
}
async fn track_vehicle(vehicle_event: VehicleEvent) {
let mut con = REDIS_CLIENT.get_connection().unwrap();
// println!("[ws/track_vehicle]");
let pool = PG.get().await;
let VehicleEvent {
world_id,
vehicle_id,
timestamp,
zone_id,
character_id,
team_id,
} = vehicle_event;
let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str());
@ -98,18 +116,27 @@ async fn track_vehicle(vehicle_event: VehicleEvent) {
return;
}
let key = format!("v:{}/{}", world_id, vehicle_name);
let _: () = con.zadd(key, character_id, timestamp).unwrap();
query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(vehicle_name)
.execute(pool)
.await
.unwrap();
}
async fn track_class(class_event: ClassEvent) {
let mut con = REDIS_CLIENT.get_connection().unwrap();
// println!("[ws/track_class]");
let pool = PG.get().await;
let ClassEvent {
world_id,
character_id,
loadout_id,
timestamp,
zone_id,
team_id,
} = class_event;
let class_name = translators::loadout_to_class(loadout_id.as_str());
@ -118,108 +145,107 @@ async fn track_class(class_event: ClassEvent) {
return;
}
let key = format!("c:{}/{}", world_id, class_name);
let _: () = con.zadd(key, character_id, timestamp).unwrap();
query(
"INSERT INTO classes (
time,
character_id,
world_id,
faction_id,
zone_id,
class_id
) VALUES (now(), $1, $2, $3, $4, $5);",
)
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(class_name)
.execute(pool)
.await
.unwrap();
}
fn should_process_event() -> bool {
let mut con = REDIS_CLIENT.get_connection().unwrap();
let role: String = ROLE.parse().unwrap();
let heartbeat_key = format!("heartbeat:{}:{}", PAIR.to_string(), role);
let _: () = con.set_ex(heartbeat_key, "1", 60).unwrap();
if role == "primary" {
return false;
async fn process_event(event: &Event) {
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
if event.character_id != "0" {
// General population tracking
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
}));
}
let primary_heartbeat_key = format!("heartbeat:{}:primary", PAIR.to_string());
match con.get(primary_heartbeat_key) {
Ok(1) => true,
_ => false,
}
}
fn process_event(event: &Event) {
if should_process_event() {
return;
}
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
// General population tracking
track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
timestamp,
})
.now_or_never();
if event.event_name == "VehicleDestroy" {
track_vehicle(VehicleEvent {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.vehicle_id.clone(),
character_id: event.character_id.clone(),
timestamp,
})
.now_or_never();
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
if event.event_name == "Death" {
track_class(ClassEvent {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.character_id.clone(),
loadout_id: event.loadout_id.clone(),
timestamp,
})
.now_or_never();
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
if event.attacker_character_id != ""
&& (event.attacker_team_id != "" || event.attacker_team_id != "0")
&& event.attacker_character_id != "0"
&& (event.attacker_team_id != 0 || event.attacker_team_id != 0)
{
track_pop(PopEvent {
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.attacker_team_id.clone(),
character_id: event.attacker_character_id.clone(),
timestamp,
})
.now_or_never();
zone_id: event.zone_id.clone(),
}));
if event.event_name == "VehicleDestroy" {
track_vehicle(VehicleEvent {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
character_id: event.attacker_character_id.clone(),
timestamp,
})
.now_or_never();
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
if event.event_name == "Death" {
track_class(ClassEvent {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.attacker_character_id.clone(),
loadout_id: event.attacker_loadout_id.clone(),
timestamp,
})
.now_or_never();
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
}
while let Some(_) = set.join_next().await {}
}
#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default)]
struct Event {
event_name: String,
world_id: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
world_id: i32,
character_id: String,
attacker_character_id: String,
attacker_team_id: String,
team_id: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
attacker_team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")]
team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")]
zone_id: i32,
// Class Tracking
#[serde(default)]
@ -239,23 +265,22 @@ struct Payload {
payload: Event,
}
// /// Send a longer heartbeat in case this is PS4EU and gets like one event per hour
// async fn heartbeat() {
// let mut interval = tokio::time::interval(Duration::from_secs(150));
// loop {
// interval.tick().await;
// let mut con = REDIS_CLIENT.get_connection().unwrap();
// let role: String = ROLE.parse().unwrap();
// let heartbeat_key = format!("heartbeat:{}:{}", PAIR.to_string(), role);
// let response: Option<String> = con.get(heartbeat_key.clone()).unwrap();
// match response {
// None => {
// let _: () = con.set_ex(heartbeat_key, "1", 300).unwrap();
// }
// _ => (),
// }
// }
// }
async fn healthz() {
let app = Router::new().route("/healthz", get(|| async { "ok" }));
let port: u16 = std::env::var("PORT")
.unwrap_or("8999".to_string())
.parse()
.unwrap();
let addr = SocketAddr::from(([0, 0, 0, 0], port));
println!("[healthz] Listening on http://{}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
#[tokio::main]
async fn main() {
@ -273,42 +298,27 @@ async fn main() {
let fused_writer = rx.map(Ok).forward(write).fuse();
let fused_reader = read
.for_each(|msg| async move {
// println!("Processing event: {:?}", msg);
let body = &msg.unwrap().to_string();
let data: Payload = serde_json::from_str(body).unwrap_or(Payload {
payload: Event {
event_name: "".to_string(),
world_id: "".to_string(),
character_id: "".to_string(),
attacker_character_id: "".to_string(),
attacker_team_id: "".to_string(),
team_id: "".to_string(),
attacker_loadout_id: "".to_string(),
loadout_id: "".to_string(),
vehicle_id: "".to_string(),
attacker_vehicle_id: "".to_string(),
},
payload: Event::default(),
});
if data.payload.event_name == "" {
return;
}
process_event(&data.payload);
process_event(&data.payload).await;
})
.fuse();
pin_mut!(fused_writer, fused_reader);
let init = tokio::spawn(send_init(tx.clone()));
let mut healthz = tokio::spawn(healthz()).fuse();
futures::select! {
_ = fused_reader => {}
_ = fused_writer => {}
_ = healthz => {}
}
// tokio::spawn(heartbeat());
init.await.unwrap();
}