Compare commits

...
Sign in to create a new pull request.

8 commits

Author SHA1 Message Date
noe
3c0ab61695 fix docker image 2024-02-18 00:54:54 -05:00
noe
3a422b8f6f add prometheus to api and ws 2024-02-18 00:52:39 -05:00
noe
7ab5893f67 nix 2024-02-17 03:32:13 -05:00
96cb2c80d8 add auto-maint 2023-09-04 14:06:04 -04:00
ad8105ca94 add tasks print-env 2023-09-04 02:45:29 -04:00
e805a4ca5a update all cargo packages' 2023-09-04 01:30:53 -04:00
be38bb6b5f use nix 2023-09-04 01:30:40 -04:00
c58fdf06b7 refactor to use default localhost IPs instead of localhost 2023-09-04 01:30:23 -04:00
23 changed files with 1699 additions and 818 deletions

3
.envrc Normal file
View file

@ -0,0 +1,3 @@
use flake . --accept-flake-config;
# source .envrc-local

4
.gitignore vendored
View file

@ -1,3 +1,7 @@
/target /target
.DS_Store .DS_Store
*/.DS_Store */.DS_Store
.envrc-local
/.vscode
/.direnv
/result

1894
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,4 @@
[workspace] [workspace]
members = [ members = ["services/*"]
"services/*", exclude = ["hack/codegen"]
] resolver = "2"
exclude = [
"hack/codegen",
]

View file

@ -1,4 +1,4 @@
FROM rust:1.69.0-bullseye as rust-base FROM rust:1.76.0-bullseye as rust-base
WORKDIR /app WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends curl clang RUN apt-get update && apt-get install -y --no-install-recommends curl clang
ARG MOLD_VERSION=1.11.0 ARG MOLD_VERSION=1.11.0

View file

@ -2,7 +2,7 @@ version: "3"
services: services:
tsdb: tsdb:
image: timescale/timescaledb:latest-pg14 image: docker.io/timescale/timescaledb:latest-pg14
environment: environment:
POSTGRES_PASSWORD: saerro321 POSTGRES_PASSWORD: saerro321
POSTGRES_USER: saerrouser POSTGRES_USER: saerrouser

103
flake.lock generated Normal file
View file

@ -0,0 +1,103 @@
{
"nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1708150887,
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
"owner": "nix-community",
"repo": "fenix",
"rev": "761431323e30846bae160e15682cfa687c200606",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1706830856,
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1707956935,
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"dir": "lib",
"lastModified": 1706550542,
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
"type": "github"
},
"original": {
"dir": "lib",
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"fenix": "fenix",
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1708018577,
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

56
flake.nix Normal file
View file

@ -0,0 +1,56 @@
{
description = "PlanetSide 2 Metagame Harvesting Service";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
nixConfig = {
extra-substituters = [
"https://nix-community.cachix.org"
];
extra-trusted-public-keys = [
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
];
};
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
systems = [ "x86_64-linux" "aarch64-linux" ];
perSystem = { config, self', pkgs, lib, system, ... }: let
fenix = inputs.fenix.packages.${system}.minimal;
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
buildDeps = [
pkgs.openssl
];
devDeps = [
fenix.toolchain
pkgs.docker-compose
pkgs.cargo-watch
];
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
in {
packages.default = (pkgs.makeRustPlatform {
cargo = fenix.toolchain;
rustc = fenix.toolchain;
}).buildRustPackage {
inherit (cargoToml.package) name version;
cargoLock.lockFile = ./Cargo.lock;
src = ./.;
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = buildDeps ++ devDeps;
};
devShells.default = pkgs.mkShell {
nativeBuildInputs = buildDeps ++ devDeps;
};
};
};
}

View file

@ -6,19 +6,25 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
serde_json = "1.0.96" serde_json = "1.0.105"
serde = "1.0.163" serde = "1.0.188"
async-graphql = { version = "5.0.8", features = ["chrono"] } async-graphql = { version = "6.0.5", features = ["chrono"] }
axum = "0.6.18" axum = "0.6.20"
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls", "postgres", "chrono" ] } sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.4.0", features = ["cors"] } tower-http = { version = "0.4.4", features = ["cors"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
reqwest = { version = "0.11.18", features = ["rustls-tls-webpki-roots", "rustls"] } reqwest = { version = "0.11.20", features = [
chrono = "0.4.24" "rustls-tls-webpki-roots",
"rustls",
] }
chrono = "0.4.28"
prometheus = "0.13.3"
[dependencies.openssl] [dependencies.openssl]
version = "0.10.52" version = "0.10.57"
features = [ features = ["vendored"]
"vendored"
]

View file

@ -1,6 +1,7 @@
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject}; use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row}; use sqlx::{query, Pool, Postgres, Row};
use crate::telemetry;
pub struct Analytics {} pub struct Analytics {}
@ -22,8 +23,10 @@ impl Analytics {
world_id: Option<i32>, world_id: Option<i32>,
#[graphql(default = false)] hi_precision: bool, #[graphql(default = false)] hi_precision: bool,
) -> Vec<Event> { ) -> Vec<Event> {
telemetry::graphql_query("Analytics", "events");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "events");
let sql = format!(" let sql = format!("
SELECT SELECT
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket, time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
factions::{NC, TR, VS}, factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy}, utils::{Filters, IdOrNameBy},
telemetry
}; };
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -13,6 +14,7 @@ pub struct Class {
impl Class { impl Class {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 { async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
telemetry::db_read("players", "fetch");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
let sql = format!( let sql = format!(
@ -36,9 +38,12 @@ impl Class {
#[Object] #[Object]
impl Class { impl Class {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "total");
self.fetch(ctx, self.filters.clone()).await self.fetch(ctx, self.filters.clone()).await
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "nc");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -49,6 +54,7 @@ impl Class {
.await .await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "tr");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -59,6 +65,7 @@ impl Class {
.await .await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "vs");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -86,36 +93,42 @@ impl Classes {
#[Object] #[Object]
impl Classes { impl Classes {
async fn infiltrator(&self) -> Class { async fn infiltrator(&self) -> Class {
telemetry::graphql_query("Classes", "infiltrator");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "infiltrator".to_string(), class_name: "infiltrator".to_string(),
} }
} }
async fn light_assault(&self) -> Class { async fn light_assault(&self) -> Class {
telemetry::graphql_query("Classes", "light_assault");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "light_assault".to_string(), class_name: "light_assault".to_string(),
} }
} }
async fn combat_medic(&self) -> Class { async fn combat_medic(&self) -> Class {
telemetry::graphql_query("Classes", "combat_medic");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "combat_medic".to_string(), class_name: "combat_medic".to_string(),
} }
} }
async fn engineer(&self) -> Class { async fn engineer(&self) -> Class {
telemetry::graphql_query("Classes", "engineer");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "engineer".to_string(), class_name: "engineer".to_string(),
} }
} }
async fn heavy_assault(&self) -> Class { async fn heavy_assault(&self) -> Class {
telemetry::graphql_query("Classes", "heavy_assault");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "heavy_assault".to_string(), class_name: "heavy_assault".to_string(),
} }
} }
async fn max(&self) -> Class { async fn max(&self) -> Class {
telemetry::graphql_query("Classes", "max");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "max".to_string(), class_name: "max".to_string(),
@ -135,6 +148,7 @@ impl ClassesQuery {
/// Get a specific class /// Get a specific class
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class { pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
telemetry::graphql_query("Classes", "");
Class { Class {
filters: filter.unwrap_or_default(), filters: filter.unwrap_or_default(),
class_name, class_name,

View file

@ -1,10 +1,13 @@
use crate::utils::ID_TO_WORLD; use crate::{telemetry, utils::ID_TO_WORLD};
use async_graphql::{Context, Enum, Object, SimpleObject}; use async_graphql::{Context, Enum, Object, SimpleObject};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row}; use sqlx::{query, Pool, Postgres, Row};
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse { pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
telemetry::http_request("/health", "GET");
telemetry::db_read("analytics", "get_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(&pool) .fetch_one(&pool)
@ -63,6 +66,7 @@ impl Health {
) -> (UpDown, Option<DateTime<Utc>>) { ) -> (UpDown, Option<DateTime<Utc>>) {
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "most_recent_event_time");
let events_resp = let events_resp =
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1") query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
.bind(world_id) .bind(world_id)
@ -91,8 +95,11 @@ impl Health {
impl Health { impl Health {
/// Did a ping to Postgres (our main datastore) succeed? /// Did a ping to Postgres (our main datastore) succeed?
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "database");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "database_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool) .fetch_one(pool)
@ -106,8 +113,11 @@ impl Health {
/// Is the websocket processing jobs? /// Is the websocket processing jobs?
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "ingest");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "ingest_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool) .fetch_one(pool)
@ -129,9 +139,11 @@ impl Health {
/// Is the websocket actually turned on? /// Is the websocket actually turned on?
async fn ingest_reachable(&self) -> UpDown { async fn ingest_reachable(&self) -> UpDown {
telemetry::graphql_query("Health", "ingest_reachable");
reqwest::get( reqwest::get(
std::env::var("WEBSOCKET_HEALTHCHECK") std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://localhost:8999/health".to_string()), .unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
) )
.await .await
.map(|_| UpDown::Up) .map(|_| UpDown::Up)
@ -145,6 +157,8 @@ impl Health {
/// Checks if a world has had any events for the last 5 minutes /// Checks if a world has had any events for the last 5 minutes
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> { async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
telemetry::graphql_query("Health", "worlds");
let mut worlds = Vec::new(); let mut worlds = Vec::new();
for (id, name) in ID_TO_WORLD.iter() { for (id, name) in ID_TO_WORLD.iter() {
let (status, last_event) = self.most_recent_event_time(ctx, *id).await; let (status, last_event) = self.most_recent_event_time(ctx, *id).await;

View file

@ -4,6 +4,7 @@ mod factions;
mod health; mod health;
mod population; mod population;
mod query; mod query;
mod telemetry;
mod utils; mod utils;
mod vehicles; mod vehicles;
mod world; mod world;
@ -26,10 +27,12 @@ use tower_http::cors::{Any, CorsLayer};
extern crate serde_json; extern crate serde_json;
async fn index() -> Html<&'static str> { async fn index() -> Html<&'static str> {
telemetry::http_request("/", "GET");
Html(include_str!("html/index.html")) Html(include_str!("html/index.html"))
} }
async fn ingest() -> Html<&'static str> { async fn ingest() -> Html<&'static str> {
telemetry::http_request("/ingest", "GET");
Html(include_str!("html/ingest.html")) Html(include_str!("html/ingest.html"))
} }
@ -41,6 +44,7 @@ async fn graphql_handler_post(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>, Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
Json(query): Json<Request>, Json(query): Json<Request>,
) -> Json<Response> { ) -> Json<Response> {
telemetry::http_request("/graphql", "POST");
Json(schema.execute(query).await) Json(schema.execute(query).await)
} }
@ -48,6 +52,8 @@ async fn graphql_handler_get(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>, Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
query: Query<Request>, query: Query<Request>,
) -> axum::response::Response { ) -> axum::response::Response {
telemetry::http_request("/graphql", "GET");
if query.query == "" { if query.query == "" {
return Redirect::to("/graphiql").into_response(); return Redirect::to("/graphiql").into_response();
} }
@ -56,6 +62,8 @@ async fn graphql_handler_get(
} }
async fn graphiql() -> impl IntoResponse { async fn graphiql() -> impl IntoResponse {
telemetry::http_request("/graphiql", "GET");
Html( Html(
GraphiQLSource::build() GraphiQLSource::build()
.endpoint("/graphql") .endpoint("/graphql")
@ -67,7 +75,7 @@ async fn graphiql() -> impl IntoResponse {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
let db = sqlx::PgPool::connect(&db_url).await.unwrap(); let db = sqlx::PgPool::connect(&db_url).await.unwrap();
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription) let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
@ -83,6 +91,8 @@ async fn main() {
post(graphql_handler_post).get(graphql_handler_get), post(graphql_handler_post).get(graphql_handler_get),
) )
.route("/graphiql", get(graphiql)) .route("/graphiql", get(graphiql))
.route("/metrics", get(telemetry::handler))
.route("/metrics/combined", get(telemetry::handler_combined))
.fallback(handle_404) .fallback(handle_404)
.layer(Extension(db)) .layer(Extension(db))
.layer(Extension(schema)) .layer(Extension(schema))

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
factions::{NC, NSO, TR, VS}, factions::{NC, NSO, TR, VS},
utils::Filters, utils::Filters,
telemetry,
}; };
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -22,6 +23,7 @@ impl Population {
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 { async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_by_faction");
let sql = format!( let sql = format!(
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};", "SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
self.filters.sql(), self.filters.sql(),
@ -43,8 +45,11 @@ impl Population {
#[Object] #[Object]
impl Population { impl Population {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_total");
let sql = format!( let sql = format!(
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};", "SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(), self.filters.sql(),
@ -61,15 +66,19 @@ impl Population {
query query
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "nc");
self.by_faction(ctx, NC).await self.by_faction(ctx, NC).await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "vs");
self.by_faction(ctx, VS).await self.by_faction(ctx, VS).await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "tr");
self.by_faction(ctx, TR).await self.by_faction(ctx, TR).await
} }
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "ns");
self.by_faction(ctx, NSO).await self.by_faction(ctx, NSO).await
} }
} }

View file

@ -0,0 +1,138 @@
use lazy_static::lazy_static;
use prometheus::{
IntGauge,
IntGaugeVec,
register_int_gauge_vec,
register_int_gauge,
TextEncoder,
gather
};
use sqlx::{Pool, Postgres, Row};
use axum::Extension;
use chrono::{DateTime, Utc};
lazy_static! {
// http
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
"route", "method"
]).unwrap();
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
"major", "minor"
]).unwrap();
// counters
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
update_data_gauges(pool).await;
// Final output
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
let local = handler(Extension(pool)).await;
let remote = match reqwest::get(url).await {
Ok(r) => r.text().await.expect("failed to text lol"),
Err(_) => String::from("")
};
format!("{}{}", local, remote)
}
// pub fn db_write(table: &str, op: &str) {
// DB_WRITES.with_label_values(&[table, op]).inc();
// }
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}
pub fn http_request(route: &str, method: &str) {
HTTP_REQUEST.with_label_values(&[route, method]).inc();
}
pub fn graphql_query(major: &str, minor: &str) {
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
}
async fn update_data_gauges(pool: Pool<Postgres>) {
// Do some easy queries to fill our non-cumulative gauges
db_read("players", "count_all");
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
PLAYERS_TRACKED.set(player_count);
db_read("players", "get_newest");
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_PLAYER.set(player_newest.timestamp());
db_read("players", "get_oldest");
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_PLAYER.set(player_oldest.timestamp());
db_read("vehicles", "count_all");
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
VEHICLES_TRACKED.set(vehicle_count);
db_read("vehicles", "get_newest");
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
db_read("vehicles", "get_oldest");
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
}

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
factions::{NC, TR, VS}, factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy}, utils::{Filters, IdOrNameBy},
telemetry,
}; };
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -15,6 +16,7 @@ impl Vehicle {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 { async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("vehicles", "fetch");
let sql = format!( let sql = format!(
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};", "SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
filters.sql(), filters.sql(),
@ -36,9 +38,13 @@ impl Vehicle {
#[Object] #[Object]
impl Vehicle { impl Vehicle {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "total");
self.fetch(ctx, self.filters.clone()).await self.fetch(ctx, self.filters.clone()).await
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "nc");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -49,6 +55,8 @@ impl Vehicle {
.await .await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "tr");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -59,6 +67,8 @@ impl Vehicle {
.await .await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "vs");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
@ -86,8 +96,11 @@ impl Vehicles {
#[Object] #[Object]
impl Vehicles { impl Vehicles {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicles", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "vehicles_total");
let sql = format!( let sql = format!(
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};", "SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(), self.filters.sql(),
@ -106,36 +119,48 @@ impl Vehicles {
// Transport // Transport
async fn flash(&self) -> Vehicle { async fn flash(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "flash");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "flash".to_string(), vehicle_name: "flash".to_string(),
} }
} }
async fn sunderer(&self) -> Vehicle { async fn sunderer(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "sunderer");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "sunderer".to_string(), vehicle_name: "sunderer".to_string(),
} }
} }
async fn ant(&self) -> Vehicle { async fn ant(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "ant");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "ant".to_string(), vehicle_name: "ant".to_string(),
} }
} }
async fn harasser(&self) -> Vehicle { async fn harasser(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "harasser");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "harasser".to_string(), vehicle_name: "harasser".to_string(),
} }
} }
async fn javelin(&self) -> Vehicle { async fn javelin(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "javelin");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "javelin".to_string(), vehicle_name: "javelin".to_string(),
} }
} }
async fn corsair(&self) -> Vehicle { async fn corsair(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "corsair");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "corsair".to_string(), vehicle_name: "corsair".to_string(),
@ -144,30 +169,40 @@ impl Vehicles {
// Tanks // Tanks
async fn lightning(&self) -> Vehicle { async fn lightning(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "lightning");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "lightning".to_string(), vehicle_name: "lightning".to_string(),
} }
} }
async fn prowler(&self) -> Vehicle { async fn prowler(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "prowler");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "prowler".to_string(), vehicle_name: "prowler".to_string(),
} }
} }
async fn vanguard(&self) -> Vehicle { async fn vanguard(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "vanguard");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "vanguard".to_string(), vehicle_name: "vanguard".to_string(),
} }
} }
async fn magrider(&self) -> Vehicle { async fn magrider(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "magrider");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "magrider".to_string(), vehicle_name: "magrider".to_string(),
} }
} }
async fn chimera(&self) -> Vehicle { async fn chimera(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "chimera");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "chimera".to_string(), vehicle_name: "chimera".to_string(),
@ -176,42 +211,56 @@ impl Vehicles {
// Air // Air
async fn mosquito(&self) -> Vehicle { async fn mosquito(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "mosquito");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "mosquito".to_string(), vehicle_name: "mosquito".to_string(),
} }
} }
async fn liberator(&self) -> Vehicle { async fn liberator(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "liberator");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "liberator".to_string(), vehicle_name: "liberator".to_string(),
} }
} }
async fn galaxy(&self) -> Vehicle { async fn galaxy(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "galaxy");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "galaxy".to_string(), vehicle_name: "galaxy".to_string(),
} }
} }
async fn valkyrie(&self) -> Vehicle { async fn valkyrie(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "valkyrie");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "valkyrie".to_string(), vehicle_name: "valkyrie".to_string(),
} }
} }
async fn reaver(&self) -> Vehicle { async fn reaver(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "reaver");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "reaver".to_string(), vehicle_name: "reaver".to_string(),
} }
} }
async fn scythe(&self) -> Vehicle { async fn scythe(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "scythe");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "scythe".to_string(), vehicle_name: "scythe".to_string(),
} }
} }
async fn dervish(&self) -> Vehicle { async fn dervish(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "dervish");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "dervish".to_string(), vehicle_name: "dervish".to_string(),

View file

@ -4,6 +4,7 @@ use crate::{
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS}, utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
vehicles::Vehicles, vehicles::Vehicles,
zone::Zones, zone::Zones,
telemetry,
}; };
use async_graphql::Object; use async_graphql::Object;
@ -33,11 +34,15 @@ impl World {
impl World { impl World {
/// The ID of the world. /// The ID of the world.
async fn id(&self) -> i32 { async fn id(&self) -> i32 {
telemetry::graphql_query("World", "id");
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap() id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
} }
/// The name of the world, in official game capitalization. /// The name of the world, in official game capitalization.
async fn name(&self) -> String { async fn name(&self) -> String {
telemetry::graphql_query("World", "name");
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap(); let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
// Special case for SolTech, lol. // Special case for SolTech, lol.
@ -51,6 +56,8 @@ impl World {
/// Population filtered to this world. /// Population filtered to this world.
async fn population(&self) -> Population { async fn population(&self) -> Population {
telemetry::graphql_query("World", "population");
Population::new(Some(Filters { Population::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -60,6 +67,8 @@ impl World {
/// Vehicles filtered to this world. /// Vehicles filtered to this world.
async fn vehicles(&self) -> Vehicles { async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("World", "vehicles");
Vehicles::new(Some(Filters { Vehicles::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -69,6 +78,8 @@ impl World {
/// Classes filtered to this world. /// Classes filtered to this world.
async fn classes(&self) -> Classes { async fn classes(&self) -> Classes {
telemetry::graphql_query("World", "classes");
Classes::new(Some(Filters { Classes::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -78,6 +89,8 @@ impl World {
/// Get a specific zone/continent on this world. /// Get a specific zone/continent on this world.
async fn zones(&self) -> Zones { async fn zones(&self) -> Zones {
telemetry::graphql_query("World", "zones");
Zones::new(Some(self.filter.clone())) Zones::new(Some(self.filter.clone()))
} }
} }

View file

@ -3,6 +3,7 @@ use crate::{
population::Population, population::Population,
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS}, utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
vehicles::Vehicles, vehicles::Vehicles,
telemetry,
}; };
use async_graphql::Object; use async_graphql::Object;
@ -23,11 +24,15 @@ impl Zone {
impl Zone { impl Zone {
/// The ID of the zone/continent. /// The ID of the zone/continent.
async fn id(&self) -> i32 { async fn id(&self) -> i32 {
telemetry::graphql_query("Zone", "id");
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap() id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
} }
/// The name of the continent, in official game capitalization. /// The name of the continent, in official game capitalization.
async fn name(&self) -> String { async fn name(&self) -> String {
telemetry::graphql_query("Zone", "name");
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap(); let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
// Capitalize the first letter // Capitalize the first letter
@ -35,14 +40,20 @@ impl Zone {
} }
async fn population(&self) -> Population { async fn population(&self) -> Population {
telemetry::graphql_query("Zone", "population");
Population::new(Some(self.filters.clone())) Population::new(Some(self.filters.clone()))
} }
async fn vehicles(&self) -> Vehicles { async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("Zone", "vehicles");
Vehicles::new(Some(self.filters.clone())) Vehicles::new(Some(self.filters.clone()))
} }
async fn classes(&self) -> Classes { async fn classes(&self) -> Classes {
telemetry::graphql_query("Zone", "classes");
Classes::new(Some(self.filters.clone())) Classes::new(Some(self.filters.clone()))
} }
} }

View file

@ -6,7 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls" , "postgres" ] } sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
async_once = "0.2.6" async_once = "0.2.6"

View file

@ -9,7 +9,7 @@ mod migrations;
lazy_static! { lazy_static! {
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
sqlx::PgPool::connect(&db_url).await.unwrap() sqlx::PgPool::connect(&db_url).await.unwrap()
}); });
} }
@ -71,7 +71,20 @@ async fn main() {
cmd_prune().await; cmd_prune().await;
println!("Done!"); println!("Done!");
} }
"auto-maintenance" => loop {
println!("Running maintenance tasks...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"migrate" => cmd_migrate().await, "migrate" => cmd_migrate().await,
"print-env" => {
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
}
_ => { _ => {
println!("Unknown command: {}", command); println!("Unknown command: {}", command);
cmd_help(); cmd_help();

View file

@ -7,14 +7,21 @@ edition = "2021"
[dependencies] [dependencies]
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio-tungstenite = { version = "0.19.0", features=["rustls-tls-webpki-roots"] } tokio-tungstenite = { version = "0.20.0", features = [
serde = { version = "1.0.163", features = ["derive"] } "rustls-tls-webpki-roots",
serde_json = "1.0.96" ] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] } serde = { version = "1.0.188", features = ["derive"] }
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls" , "postgres" ] } serde_json = "1.0.105"
url = "2.3.1" tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
url = "2.4.1"
futures-util = "0.3.28" futures-util = "0.3.28"
futures = "0.3.28" futures = "0.3.28"
async_once = "0.2.6" async_once = "0.2.6"
serde-aux = "4.2.0" serde-aux = "4.2.0"
axum = "0.6.18" axum = "0.6.20"
prometheus = "0.13.3"
prometheus-static-metric = "0.5.1"

View file

@ -12,12 +12,13 @@ use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators; mod translators;
mod telemetry;
lazy_static! { lazy_static! {
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default(); static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
PgPoolOptions::new().connect(&db_url).await.unwrap() PgPoolOptions::new().connect(&db_url).await.unwrap()
}); });
} }
@ -75,6 +76,7 @@ struct AnalyticsEvent {
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> { async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
let pool = PG.get().await; let pool = PG.get().await;
telemetry::db_read("players", "get_team_id");
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;") let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
.bind(character_id.clone()) .bind(character_id.clone())
.fetch_one(pool) .fetch_one(pool)
@ -108,6 +110,7 @@ async fn track_pop(pop_event: PopEvent) {
translators::vehicle_to_name(vehicle_id.as_str()) translators::vehicle_to_name(vehicle_id.as_str())
}; };
telemetry::db_write("players", "track_pop");
query( query(
" "
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name) INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
@ -130,6 +133,7 @@ async fn track_pop(pop_event: PopEvent) {
.unwrap(); .unwrap();
if vehicle_name != "unknown" { if vehicle_name != "unknown" {
telemetry::db_write("vehicles", "track_pop");
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name) query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
VALUES (now(), $1, $2, $3, $4, $5) VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET ON CONFLICT (character_id) DO UPDATE SET
@ -159,6 +163,7 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
event_name, event_name,
} = analytics_event; } = analytics_event;
telemetry::db_write("analytics", "track_analytics");
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);") match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
.bind(world_id) .bind(world_id)
.bind(event_name) .bind(event_name)
@ -210,6 +215,7 @@ async fn process_death_event(event: &Event) {
} }
async fn process_exp_event(event: &Event) { async fn process_exp_event(event: &Event) {
telemetry::experience_event(&event.world_id, &event.experience_id);
let mut set = JoinSet::new(); let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event); // println!("[ws/process_event] EVENT: {:?}", event);
@ -287,6 +293,9 @@ async fn healthz() {
"status": "ok", "status": "ok",
})) }))
}), }),
).route(
"/metrics",
get(telemetry::handler)
); );
let port: u16 = std::env::var("PORT") let port: u16 = std::env::var("PORT")
@ -325,16 +334,20 @@ async fn main() {
let mut data: Payload = match serde_json::from_str(body) { let mut data: Payload = match serde_json::from_str(body) {
Ok(data) => data, Ok(data) => data,
Err(_) => { Err(_e) => {
// println!("Error: {}; body: {}", e, body.clone()); // println!("Error: {}; body: {}", e, body.clone());
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
return; return;
} }
}; };
if data.payload.event_name == "" { if data.payload.event_name == "" {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
return; return;
} }
telemetry::event(&data.payload.world_id, &data.payload.event_name);
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" { if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
process_death_event(&data.payload).await; process_death_event(&data.payload).await;
return; return;
@ -346,12 +359,16 @@ async fn main() {
Ok(team_id) => { Ok(team_id) => {
data.payload.team_id = team_id; data.payload.team_id = team_id;
} }
Err(_) => {} Err(_) => {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
}
} }
} }
process_exp_event(&data.payload).await; process_exp_event(&data.payload).await;
return; return;
} }
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
}) })
.fuse(); .fuse();

View file

@ -0,0 +1,75 @@
use lazy_static::lazy_static;
use prometheus::{
IntGaugeVec,
register_int_gauge_vec,
TextEncoder,
gather
};
lazy_static! {
// incoming events
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
"world_id", "event_name"
]).unwrap();
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
"world_id", "event_name", "reason"
]).unwrap();
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
"world_id", "experience_id"
]).unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler() -> String {
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub fn event(world_id: &i32, event_name: &String) {
EVENTS.with_label_values(&[
&world_id.to_string(),
&event_name,
]).inc();
}
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
EVENTS_DROPPED.with_label_values(&[
&world_id.to_string(),
&event_name,
reason,
]).inc();
}
pub fn experience_event(world_id: &i32, experience_id: &i32) {
EXPERIENCE_EVENTS.with_label_values(&[
&world_id.to_string(),
&experience_id.to_string(),
]).inc();
}
pub fn db_write(table: &str, op: &str) {
DB_WRITES.with_label_values(&[table, op]).inc();
}
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}