add prometheus to api and ws
This commit is contained in:
parent
7ab5893f67
commit
3a422b8f6f
19 changed files with 1026 additions and 497 deletions
|
@ -23,3 +23,5 @@ futures = "0.3.28"
|
|||
async_once = "0.2.6"
|
||||
serde-aux = "4.2.0"
|
||||
axum = "0.6.20"
|
||||
prometheus = "0.13.3"
|
||||
prometheus-static-metric = "0.5.1"
|
||||
|
|
|
@ -12,6 +12,7 @@ use tokio::task::JoinSet;
|
|||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
|
||||
mod translators;
|
||||
mod telemetry;
|
||||
|
||||
lazy_static! {
|
||||
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
|
||||
|
@ -75,6 +76,7 @@ struct AnalyticsEvent {
|
|||
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
|
||||
let pool = PG.get().await;
|
||||
|
||||
telemetry::db_read("players", "get_team_id");
|
||||
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
|
||||
.bind(character_id.clone())
|
||||
.fetch_one(pool)
|
||||
|
@ -108,6 +110,7 @@ async fn track_pop(pop_event: PopEvent) {
|
|||
translators::vehicle_to_name(vehicle_id.as_str())
|
||||
};
|
||||
|
||||
telemetry::db_write("players", "track_pop");
|
||||
query(
|
||||
"
|
||||
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
|
||||
|
@ -130,6 +133,7 @@ async fn track_pop(pop_event: PopEvent) {
|
|||
.unwrap();
|
||||
|
||||
if vehicle_name != "unknown" {
|
||||
telemetry::db_write("vehicles", "track_pop");
|
||||
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
|
||||
VALUES (now(), $1, $2, $3, $4, $5)
|
||||
ON CONFLICT (character_id) DO UPDATE SET
|
||||
|
@ -159,6 +163,7 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
|
|||
event_name,
|
||||
} = analytics_event;
|
||||
|
||||
telemetry::db_write("analytics", "track_analytics");
|
||||
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
|
||||
.bind(world_id)
|
||||
.bind(event_name)
|
||||
|
@ -210,6 +215,7 @@ async fn process_death_event(event: &Event) {
|
|||
}
|
||||
|
||||
async fn process_exp_event(event: &Event) {
|
||||
telemetry::experience_event(&event.world_id, &event.experience_id);
|
||||
let mut set = JoinSet::new();
|
||||
// println!("[ws/process_event] EVENT: {:?}", event);
|
||||
|
||||
|
@ -287,6 +293,9 @@ async fn healthz() {
|
|||
"status": "ok",
|
||||
}))
|
||||
}),
|
||||
).route(
|
||||
"/metrics",
|
||||
get(telemetry::handler)
|
||||
);
|
||||
|
||||
let port: u16 = std::env::var("PORT")
|
||||
|
@ -325,16 +334,20 @@ async fn main() {
|
|||
|
||||
let mut data: Payload = match serde_json::from_str(body) {
|
||||
Ok(data) => data,
|
||||
Err(_) => {
|
||||
Err(_e) => {
|
||||
// println!("Error: {}; body: {}", e, body.clone());
|
||||
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if data.payload.event_name == "" {
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
|
||||
return;
|
||||
}
|
||||
|
||||
telemetry::event(&data.payload.world_id, &data.payload.event_name);
|
||||
|
||||
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
|
||||
process_death_event(&data.payload).await;
|
||||
return;
|
||||
|
@ -346,12 +359,16 @@ async fn main() {
|
|||
Ok(team_id) => {
|
||||
data.payload.team_id = team_id;
|
||||
}
|
||||
Err(_) => {}
|
||||
Err(_) => {
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
|
||||
}
|
||||
}
|
||||
}
|
||||
process_exp_event(&data.payload).await;
|
||||
return;
|
||||
}
|
||||
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
|
||||
})
|
||||
.fuse();
|
||||
|
||||
|
|
75
services/websocket/src/telemetry.rs
Normal file
75
services/websocket/src/telemetry.rs
Normal file
|
@ -0,0 +1,75 @@
|
|||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
IntGaugeVec,
|
||||
register_int_gauge_vec,
|
||||
TextEncoder,
|
||||
gather
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
// incoming events
|
||||
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
|
||||
"world_id", "event_name"
|
||||
]).unwrap();
|
||||
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
|
||||
"world_id", "event_name", "reason"
|
||||
]).unwrap();
|
||||
|
||||
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
|
||||
"world_id", "experience_id"
|
||||
]).unwrap();
|
||||
|
||||
// database stuff
|
||||
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
}
|
||||
|
||||
pub async fn handler() -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = String::new();
|
||||
|
||||
let metrics = gather();
|
||||
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
|
||||
|
||||
buffer
|
||||
}
|
||||
|
||||
pub fn event(world_id: &i32, event_name: &String) {
|
||||
EVENTS.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&event_name,
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
|
||||
EVENTS_DROPPED.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&event_name,
|
||||
reason,
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn experience_event(world_id: &i32, experience_id: &i32) {
|
||||
EXPERIENCE_EVENTS.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&experience_id.to_string(),
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn db_write(table: &str, op: &str) {
|
||||
DB_WRITES.with_label_values(&[table, op]).inc();
|
||||
}
|
||||
|
||||
pub fn db_read(table: &str, op: &str) {
|
||||
DB_READS.with_label_values(&[table, op]).inc();
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue