Compare commits

..

No commits in common. "main" and "rework" have entirely different histories.
main ... rework

38 changed files with 1381 additions and 4324 deletions

0
.env Normal file
View file

3
.envrc
View file

@ -1,3 +0,0 @@
use flake . --accept-flake-config;
# source .envrc-local

View file

@ -17,12 +17,36 @@ jobs:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: docker buildx create --use --driver=docker-container
- run: |
TAG_LATEST_IF_MASTER=$(if [ "$GITHUB_REF_NAME" = "main" ]; then echo "-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest"; else echo ""; fi)
docker buildx build . \
docker build . \
--build-arg SERVICE=${{ matrix.service }} \
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} $TAG_LATEST_IF_MASTER \
--push \
--cache-to type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }} \
--cache-from type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }}
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }}
- run: |
docker tag ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} \
ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest
if: github.ref == 'refs/heads/main'
- run: |
docker push ghcr.io/${{ github.repository }}/${{ matrix.service }}
deploy:
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
environment:
name: production
url: https://saerro.harasse.rs
permissions:
contents: "read"
id-token: "write"
steps:
- id: "auth"
uses: "google-github-actions/auth@v1"
with:
workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }}
service_account: ${{ secrets.SERVICE_ACCOUNT }}
- name: "Set up Cloud SDK"
uses: "google-github-actions/setup-gcloud@v1"
- name: "Deploy"
run: |
gcloud compute ssh ${{ secrets.VM_NAME }} --zone=us-central1-a --command "cd /opt && sudo docker compose pull && sudo docker compose up -d"

6
.gitignore vendored
View file

@ -1,7 +1 @@
/target
.DS_Store
*/.DS_Store
.envrc-local
/.vscode
/.direnv
/result

2179
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
[workspace]
members = ["services/*"]
exclude = ["hack/codegen"]
resolver = "2"
members = [
"services/*",
"hack/*"
]

View file

@ -1,21 +1,18 @@
FROM rust:1.76.0-bullseye as rust-base
FROM rust:1.65.0-bullseye AS builder
ARG SERVICE
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends curl clang
ARG MOLD_VERSION=1.11.0
RUN curl -sSL https://github.com/rui314/mold/releases/download/v${MOLD_VERSION}/mold-${MOLD_VERSION}-x86_64-linux.tar.gz | tar xzv && \
mv mold-${MOLD_VERSION}-x86_64-linux/bin/mold /mold && \
rm -rf mold-${MOLD_VERSION}-x86_64-linux
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
COPY Cargo.toml Cargo.lock ./
COPY services ./services
COPY hack ./hack
FROM rust-base as builder
COPY . .
ARG SERVICE
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
RUN cargo build --release --bin ${SERVICE}
RUN cargo build --bin ${SERVICE} --release
FROM debian:bullseye-slim as runtime
FROM debian:bullseye-slim AS target
ARG SERVICE
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/${SERVICE} /app
ENTRYPOINT ["/app"]
RUN chmod a+x /app
CMD /app

View file

@ -2,35 +2,33 @@
PlanetSide 2 live population API. This API is free and open for anyone to use.
https://saerro.ps2.live
https://saerro.harasse.rs
tl;dr: Watch for specific events, transform and add them to a timeseries set, and query that set for the last 15 minutes.
We're built on 3 core types, `players`, `classes`, and `vehicles`. Each can be filtered by Continent/Zone, Faction, and World.
Our methodology is to add any player ID seen on the Census websockets to a time-sorted set, and returning the number of player IDs seen within 15 minutes.
---
The one and only goal of this app is to provide a current "point-in-time" population status for PlanetSide 2, per world, per faction, (and later, per continent.) Historical info is _not_ a goal; you may implement this on your end.
Please open an issue here or get in touch with Pomf (okano#0001) on the PS2 Discord if you have complex use cases for this data; it may be trivial/easy to implement APIs tailored to your needs.
The main use case is for [Medkit](https://github.com/kayteh/medkit2) bot to have an in-house source of population data, without relying too heavily on any third-party stats service, like Fisu, Honu, or Voidwell; which all have different population tracking needs and goals (and thus, different data.)
An example of how it can be used on [pstop](https://pstop.harasse.rs) ([GitHub](https://github.com/genudine/pstop)).
## Architecture
- GraphQL API
- Serves https://saerro.ps2.live
- Built on a "stacking filter" graph model, where each dimension adds a filter to lower dimensions.
- Event Streaming Service (ESS) Ingest
- WebSocket listening to https://push.nanite-systems.net (which is a resilient mirror to https://push.planetside2.com)
- Listens for `Death`, `VehicleDestroy`, and a number of `GainExperience` events.
- Postgres with TimescaleDB
- Holds `players` and `analytics` tables as hypertables.
- Timescale makes this way too fast, mind-blowing :)
- Tasks
- Occasional jobs that prune the database past what we actually want to retain,
- Core data tables are kept to about 20 mins max of data, analytics to 1 week
- Can do database resets/migrations.
- Websocket processors
- A pair per PC, PS4US, PS4EU
- Connects to [wss://push.nanite-systems.net](https://nanite-systems.net) and Census Websocket
- Primary will connect to NS.
- Backup will connect to Census. It will wait for 60 seconds before deciding the primary is dead, and then start processing events.
- API
- Serves https://saerro.harasse.rs
- Built on axum and async-graphql
- Redis
- Using ZADD with score as timestamp, ZCOUNTBYSCORE by timestamp in 15 minute windows, and cleaned up with SCAN+ZREMBYSCORE, population data is tracked.
- There is deliberately no persistence.
- Redis "Tender"
- Cleans up Redis every 5 mins.
# Developing
@ -39,27 +37,48 @@ This app is built with Rust. You can set up a build environment via https://rust
To run,
```sh
# Start backing services
# Start Redis/backing services
docker compose up -d
# Run database migrations (required first step on a freshly up'd database)
cargo run --bin tasks migrate
# Start NSS ingest. Use push.planetside2.com if NSS isn't quite working...
# Start Websocket for PC
env \
WS_ADDR="wss://push.nanite-systems.net/streaming?environment=all&service-id=s:$SERVICE_ID" \
WORLDS=all \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
PAIR=pc \
ROLE=primary \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# (Optional:) Start redundant websocket for PC
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
PAIR=pc \
ROLE=backup \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# (Optional:) Start PS4US websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4us&service-id=s:$SERVICE_ID" \
PAIR=ps4us \
WORLDS=1000 \
cargo run --bin websocket
# (Optional:) Start PS4EU websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4eu&service-id=s:$SERVICE_ID" \
PAIR=ps4eu \
WORLDS=2000 \
cargo run --bin websocket
# Start API
cargo run --bin api
# Run prune tool
cargo run --bin tasks prune
cargo run --bin tools prune
# Build containers
docker build . --build-arg SERVICE=api -t saerro:api
docker build . --build-arg SERVICE=tasks -t saerro:tasks
docker build . --build-arg SERVICE=tools -t saerro:tools
docker build . --build-arg SERVICE=websocket -t saerro:websocket
```
@ -75,4 +94,4 @@ Currently, the entire stack runs on Docker. You may deploy it to any server via:
docker compose up -d -f docker-compose.live.yaml
```
It listens on port 80, it's up to you from here. Make sure to change passwords present in the file. It's not _that secret_ of data, but why risk it?
It listens on port 80, it's up to you from here.

View file

@ -16,7 +16,7 @@ services:
image: ghcr.io/genudine/saerro/api:latest
pull_policy: always
ports:
- 80:8000
- 8000:80
links:
- tsdb
restart: always

View file

@ -2,7 +2,7 @@ version: "3"
services:
tsdb:
image: docker.io/timescale/timescaledb:latest-pg14
image: timescale/timescaledb-ha:pg14-latest
environment:
POSTGRES_PASSWORD: saerro321
POSTGRES_USER: saerrouser

103
flake.lock generated
View file

@ -1,103 +0,0 @@
{
"nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1708150887,
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
"owner": "nix-community",
"repo": "fenix",
"rev": "761431323e30846bae160e15682cfa687c200606",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1706830856,
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1707956935,
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"dir": "lib",
"lastModified": 1706550542,
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
"type": "github"
},
"original": {
"dir": "lib",
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"fenix": "fenix",
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1708018577,
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,56 +0,0 @@
{
description = "PlanetSide 2 Metagame Harvesting Service";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
nixConfig = {
extra-substituters = [
"https://nix-community.cachix.org"
];
extra-trusted-public-keys = [
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
];
};
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
systems = [ "x86_64-linux" "aarch64-linux" ];
perSystem = { config, self', pkgs, lib, system, ... }: let
fenix = inputs.fenix.packages.${system}.minimal;
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
buildDeps = [
pkgs.openssl
];
devDeps = [
fenix.toolchain
pkgs.docker-compose
pkgs.cargo-watch
];
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
in {
packages.default = (pkgs.makeRustPlatform {
cargo = fenix.toolchain;
rustc = fenix.toolchain;
}).buildRustPackage {
inherit (cargoToml.package) name version;
cargoLock.lockFile = ./Cargo.lock;
src = ./.;
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = buildDeps ++ devDeps;
};
devShells.default = pkgs.mkShell {
nativeBuildInputs = buildDeps ++ devDeps;
};
};
};
}

View file

@ -1 +0,0 @@
target

1411
hack/codegen/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,6 +8,7 @@ reqwest = { version="0.11.13", features = ["json"] }
tera = { version = "1.17.1", default-features = false }
lazy_static = "1.4.0"
regex = "1.7.0"
futures = "0.3.25"
tokio = { version = "1.22.0", features = ["full"] }
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89"

View file

@ -1,4 +1,5 @@
use std::process;
use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder};
use serde::{Deserialize, Serialize};
@ -45,9 +46,6 @@ async fn translators_rs() {
"mosquito",
"galaxy",
"valkyrie",
"wasp",
"deliverer",
"lodestar",
"liberator",
"ant",
"harasser",
@ -109,16 +107,11 @@ async fn translators_rs() {
.find(&item.name.as_ref().unwrap().en.as_ref().unwrap())
.unwrap();
let name = matched
.as_str()
.to_lowercase()
.replace("wasp", "valkyrie")
.replace("deliverer", "ant")
.replace("lodestar", "galaxy");
Vehicle {
vehicle_id: item.vehicle_id,
name: Some(LangEn { en: Some(name) }),
name: Some(LangEn {
en: Some(matched.as_str().to_string().to_lowercase()),
}),
propulsion_type: item.propulsion_type,
}
})

View file

@ -6,25 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde_json = "1.0.105"
serde = "1.0.188"
async-graphql = { version = "6.0.5", features = ["chrono"] }
axum = "0.6.20"
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.4.4", features = ["cors"] }
serde_json = "1.0.89"
serde = "1.0.149"
async-graphql = { version = "5.0.3" }
async-graphql-axum = "5.0.3"
axum = "0.6.1"
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] }
tokio = { version = "1.23.0", features = [ "full" ] }
tower-http = { version = "0.3.5", features = ["cors"] }
lazy_static = "1.4.0"
reqwest = { version = "0.11.20", features = [
"rustls-tls-webpki-roots",
"rustls",
] }
chrono = "0.4.28"
prometheus = "0.13.3"
[dependencies.openssl]
version = "0.10.57"
features = ["vendored"]
reqwest = "0.11.13"

View file

@ -1,86 +0,0 @@
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row};
use crate::telemetry;
pub struct Analytics {}
#[derive(SimpleObject, Debug, Clone)]
pub struct Event {
pub time: DateTime<Utc>,
pub event_name: String,
pub world_id: i32,
pub count: i64,
}
#[Object]
impl Analytics {
/// Get all events in analytics, bucket_size is in seconds
async fn events<'ctx>(
&self,
ctx: &Context<'ctx>,
#[graphql(default = 60)] bucket_size: u64,
world_id: Option<i32>,
#[graphql(default = false)] hi_precision: bool,
) -> Vec<Event> {
telemetry::graphql_query("Analytics", "events");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "events");
let sql = format!("
SELECT
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
CASE WHEN count(*) IS NULL THEN 0 ELSE count(*) END AS count,
event_name,
world_id
FROM analytics
WHERE time > now() - '{}'::interval {}
GROUP BY bucket, world_id, event_name
ORDER BY bucket ASC",
if hi_precision {
5
} else {
bucket_size
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if let Some(world_id) = world_id {
format!("AND world_id = {}", world_id)
} else {
"".to_string()
}
);
let mut result = query(sql.as_str()).fetch(pool);
let mut events = Vec::new();
while let Some(row) = result.try_next().await.unwrap() {
events.push(Event {
time: row.get("bucket"),
event_name: row.get("event_name"),
world_id: row.get("world_id"),
count: row.get("count"),
});
}
events
}
}
#[derive(Default)]
pub struct AnalyticsQuery;
#[Object]
impl AnalyticsQuery {
async fn analytics(&self) -> Analytics {
Analytics {}
}
}

View file

@ -1,8 +1,4 @@
use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry
};
use crate::utils::{Filters, IdOrNameBy};
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -14,11 +10,10 @@ pub struct Class {
impl Class {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
telemetry::db_read("players", "fetch");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
let sql = format!(
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND class_name = $1 {};",
"SELECT count(distinct character_id) FROM classes WHERE time > now() - interval '15 minutes' AND class_id = $1 {};",
filters.sql(),
);
@ -38,38 +33,33 @@ impl Class {
#[Object]
impl Class {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "total");
self.fetch(ctx, self.filters.clone()).await
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "nc");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(NC)),
faction: Some(IdOrNameBy::Id(1)),
..self.filters.clone()
},
)
.await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "tr");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(TR)),
faction: Some(IdOrNameBy::Id(2)),
..self.filters.clone()
},
)
.await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "vs");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(VS)),
faction: Some(IdOrNameBy::Id(3)),
..self.filters.clone()
},
)
@ -93,42 +83,36 @@ impl Classes {
#[Object]
impl Classes {
async fn infiltrator(&self) -> Class {
telemetry::graphql_query("Classes", "infiltrator");
Class {
filters: self.filters.clone(),
class_name: "infiltrator".to_string(),
}
}
async fn light_assault(&self) -> Class {
telemetry::graphql_query("Classes", "light_assault");
Class {
filters: self.filters.clone(),
class_name: "light_assault".to_string(),
}
}
async fn combat_medic(&self) -> Class {
telemetry::graphql_query("Classes", "combat_medic");
Class {
filters: self.filters.clone(),
class_name: "combat_medic".to_string(),
}
}
async fn engineer(&self) -> Class {
telemetry::graphql_query("Classes", "engineer");
Class {
filters: self.filters.clone(),
class_name: "engineer".to_string(),
}
}
async fn heavy_assault(&self) -> Class {
telemetry::graphql_query("Classes", "heavy_assault");
Class {
filters: self.filters.clone(),
class_name: "heavy_assault".to_string(),
}
}
async fn max(&self) -> Class {
telemetry::graphql_query("Classes", "max");
Class {
filters: self.filters.clone(),
class_name: "max".to_string(),
@ -148,7 +132,6 @@ impl ClassesQuery {
/// Get a specific class
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
telemetry::graphql_query("Classes", "");
Class {
filters: filter.unwrap_or_default(),
class_name,

View file

@ -1,4 +0,0 @@
pub const VS: i32 = 1;
pub const NC: i32 = 2;
pub const TR: i32 = 3;
pub const NSO: i32 = 4;

View file

@ -1,13 +1,8 @@
use crate::{telemetry, utils::ID_TO_WORLD};
use async_graphql::{Context, Enum, Object, SimpleObject};
use async_graphql::{Context, Enum, Object};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row};
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
telemetry::http_request("/health", "GET");
telemetry::db_read("analytics", "get_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(&pool)
@ -58,48 +53,13 @@ enum UpDown {
pub struct Health {}
impl Health {
async fn most_recent_event_time<'ctx>(
&self,
ctx: &Context<'ctx>,
world_id: i32,
) -> (UpDown, Option<DateTime<Utc>>) {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "most_recent_event_time");
let events_resp =
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
.bind(world_id)
.fetch_one(pool)
.await;
match events_resp {
Ok(row) => {
let last_event: DateTime<Utc> = row.get(0);
if last_event < Utc::now() - chrono::Duration::minutes(5) {
return (UpDown::Down, Some(last_event));
} else {
return (UpDown::Up, Some(last_event));
}
}
Err(_) => {
return (UpDown::Down, None);
}
}
}
}
/// Reports on the health of Saerro Listening Post
#[Object]
impl Health {
/// Did a ping to Postgres (our main datastore) succeed?
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "database");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "database_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool)
@ -113,11 +73,8 @@ impl Health {
/// Is the websocket processing jobs?
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "ingest");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "ingest_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool)
@ -139,46 +96,14 @@ impl Health {
/// Is the websocket actually turned on?
async fn ingest_reachable(&self) -> UpDown {
telemetry::graphql_query("Health", "ingest_reachable");
reqwest::get(
std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
.unwrap_or("http://localhost:8999/health".to_string()),
)
.await
.map(|_| UpDown::Up)
.unwrap_or(UpDown::Down)
}
/// Shows a disclaimer for the worlds check
async fn worlds_disclaimer(&self) -> String {
"This is a best-effort check. A world reports `DOWN` when it doesn't have new events for 5 minutes. It could be broken, it could be the reality of the game state.".to_string()
}
/// Checks if a world has had any events for the last 5 minutes
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
telemetry::graphql_query("Health", "worlds");
let mut worlds = Vec::new();
for (id, name) in ID_TO_WORLD.iter() {
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
worlds.push(WorldUpDown {
id: *id,
name: name.to_string(),
status,
last_event,
});
}
worlds
}
}
#[derive(SimpleObject)]
struct WorldUpDown {
id: i32,
name: String,
status: UpDown,
last_event: Option<DateTime<Utc>>,
}
#[derive(Default)]

View file

@ -17,4 +17,6 @@
</style>
<h1>404 Not Found</h1>
<p>[<a href="/">home</a>]</p>
<p>
[<a href="/">home</a>] [<a href="/graphql/playground">graphql playground</a>]
</p>

View file

@ -14,256 +14,47 @@
color: #cead42;
text-decoration: none;
}
.hidden {
display: none;
}
.query {
list-style-type: none;
padding-left: 0;
background-color: #131313;
width: fit-content;
padding: 2rem;
margin: 2rem;
border-radius: 10px;
border-left: #918b79 3px solid;
font-size: 1rem;
}
.query pre {
margin: 0;
}
</style>
<h1>Saerro Listening Post</h1>
<h2>Live Population Stats API for PlanetSide 2</h2>
<p>
This is a GraphQL API, which means you can query for exactly the data you
need. You can also use the GraphiQL interface to explore the data and build
your queries.
</p>
<ul>
<li><a href="/graphiql">Check out GraphiQL</a></li>
<li><a href="/graphql/playground">Check out the GraphQL Playground</a></li>
<li>
<a
id="status_query_link"
href="/graphql?query={ health { database ingest ingestReachable worldsDisclaimer worlds { name status lastEvent } } }"
href="/graphql?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
>Current system status</a
>
(<a
href="javascript:document.querySelector('#status_query').classList.toggle('hidden')"
>show GraphQL</a
href="/graphql/playground?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
>or in playground</a
>)
<ul id="status_query" class="hidden query">
<li>
<pre><code>{
health {
database
ingest
ingestReachable
worldsDisclaimer
worlds {
name
status
lastEvent
}
}
}</code></pre>
<a
href="javascript:runQuery('status_query_link', 'status_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="status_query_result"></li>
</ul>
</li>
<li>
<a
id="current_pop_query_link"
href="/graphql?query={ allWorlds { name population { total nc tr vs } } }"
href="/graphql?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
>
Current population of all worlds
</a>
(<a
href="javascript:document.querySelector('#current_pop_query').classList.toggle('hidden')"
>show GraphQL</a
href="/graphql/playground?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
>or in playground</a
>)
<ul id="current_pop_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
population {
total
nc
tr
vs
}
}
}</code></pre>
<a
href="javascript:runQuery('current_pop_query_link', 'current_pop_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="current_pop_query_result"></li>
</ul>
</li>
<li>
<a
id="complex_query_link"
href="/graphql?query={ allWorlds { name classes { combatMedic { total nc tr vs } } vehicles { total sunderer { total nc tr vs } } } }"
>
Show every Sunderer and Combat Medic for every server by faction
</a>
(<a
href="javascript:document.querySelector('#complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="complex_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
classes {
combatMedic {
total
nc
tr
vs
}
}
vehicles {
total
sunderer {
total
nc
tr
vs
}
}
}
}</code></pre>
<a
href="javascript:runQuery('complex_query_link', 'complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="complex_query_result"></li>
</ul>
</li>
<li>
<a
id="very_complex_query_link"
href="/graphql?query={ zones { all { name classes { heavyAssault { nc tr vs } lightAssault { nc tr vs } } vehicles { vanguard { total } prowler { total } magrider { total } lightning { nc vs tr } chimera { nc vs tr } } } } }"
>
Show the current counts of heavy assaults, light assaults, and tanks per
continent globally
</a>
(<a
href="javascript:document.querySelector('#very_complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="very_complex_query" class="hidden query">
<li>
<pre><code>{
zones {
all {
name
classes {
heavyAssault {
nc
tr
vs
}
lightAssault {
nc
tr
vs
}
}
vehicles {
vanguard {
total
}
prowler {
total
}
magrider {
total
}
lightning {
nc
vs
tr
}
chimera {
nc
vs
tr
}
}
}
}
}</code></pre>
<a
href="javascript:runQuery('very_complex_query_link', 'very_complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="very_complex_query_result"></li>
</ul>
</li>
</ul>
<p>
This API supports two query methods,
<a href="https://graphql.org/learn/serving-over-http/#get-request">GET</a>
and
<a href="https://graphql.org/learn/serving-over-http/#post-request">POST</a>.
To view the JSON outputs without fancy UIs, you can use a browser plugin like
<a href="https://addons.mozilla.org/en-US/firefox/addon/jsonview/"
>JSONView for Firefox</a
>
or
<a
href="https://chrome.google.com/webstore/detail/jsonvue/chklaanhfefbnpoihckbnefhakgolnmc"
>JSONVue for Chrome</a
>.
</p>
<p>
All data is an aggregate of the last 15 minutes of Death and VehicleDestroy
events, including both attacker and victim.
</p>
<hr />
<p>
This API is provided by Genudine Dynamics.<br />As always, we take no
responsibility for your use of this data... or our weapons. :)
</p>
<p>For help, please contact us in #api-dev on the PlanetSide 2 Discord.</p>
<p>
[<a href="/ingest">ingest stats</a>] [<a
href="https://github.com/genudine/saerro"
>github</a
>] [<a href="https://pstop.harasse.rs">pstop</a>]
[<a href="https://github.com/genudine/saerro">github</a>] [<a
href="https://pstop.harasse.rs"
>pstop</a
>]
</p>
<script>
const runQuery = async (linkId, resultId) => {
const link = document.getElementById(linkId);
const result = document.getElementById(resultId);
result.innerHTML = "Loading...";
result.classList.remove("hidden");
fetch(link.href)
.then((response) => response.json())
.then((data) => {
result.innerHTML = `<pre><code>${JSON.stringify(
data.data,
null,
2
)}</pre></code>`;
})
.catch((error) => {
result.innerHTML = "Failed...";
});
};
</script>

View file

@ -1,418 +0,0 @@
<!DOCTYPE html>
<title>Ingest Stats - Saerro Listening Post</title>
<meta charset="utf-8" />
<style>
body {
font-family: monospace;
background-color: #010101;
color: #e0e0e0;
font-size: 1.25rem;
line-height: 1.6;
}
a {
color: #cead42;
text-decoration: none;
}
h3 {
margin: 0;
}
.chart-container {
position: relative;
}
.main {
display: grid;
grid-template-columns: repeat(2, minmax(300px, 1fr));
gap: 1rem;
padding: 1rem;
}
.wide {
grid-column: span 2;
}
.graph-head {
display: flex;
justify-content: space-between;
align-items: center;
}
.sums-15m {
color: rgba(165, 165, 165, 1);
}
.sums-1h {
color: rgba(165, 165, 165, 0.8);
}
.sums-6h {
color: rgba(165, 165, 165, 0.6);
}
.sums-1d {
color: rgba(165, 165, 165, 0.4);
}
</style>
<h1>Ingest Stats <span id="loading">[LOADING...]</span></h1>
<div class="main">
<div class="wide">
<div class="graph-head">
<h3>All Events by Type</h3>
<p id="all-events-by-type-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="all-events-by-type" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Events by World</h3>
</div>
<div class="chart-container">
<canvas id="events-by-world" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Connery [US West]</h3>
<p id="connery-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="connery" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Miller [EU]</h3>
<p id="miller-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="miller" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Cobalt [EU]</h3>
<p id="cobalt-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="cobalt" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Emerald [US East]</h3>
<p id="emerald-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="emerald" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Jaeger [US East]</h3>
<p id="jaeger-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="jaeger" />
</div>
</div>
<div>
<div class="graph-head">
<h3>SolTech [Tokyo]</h3>
<p id="soltech-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="soltech" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Genudine [US East] [PS4]</h3>
<p id="genudine-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="genudine" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Ceres [EU] [PS4]</h3>
<p id="ceres-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="ceres" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Experience Events By ID</h3>
<p id="exp-by-id-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="exp-by-id" />
</div>
<div class="filter">
Filter to World:
<select id="exp-filter">
<option selected value="all">All</option>
<option value="1">Connery</option>
<option value="10">Miller</option>
<option value="13">Cobalt</option>
<option value="17">Emerald</option>
<option value="19">Jaeger</option>
<option value="40">SolTech</option>
<option value="1000">Genudine</option>
<option value="2000">Ceres</option>
</select>
</div>
</div>
</div>
<p>
[<a href="/">home</a>] [<a href="/ingest">1 day w/ 5m buckets</a>] [<a
href="/ingest?hi=1"
>1 hour w/ 5s buckets</a
>]
</p>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns/dist/chartjs-adapter-date-fns.bundle.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/humanize-plus@1.8.2/dist/humanize.min.js"></script>
<script>
const sumsStr = (_15m, _1h, _6h, _24h) => `
<span class="sums-15m" title="sum of last 15 minutes (${_15m})">${Humanize.intComma(
_15m
)},</span>
<span class="sums-1h" title="sum of last 1 hour (${_1h})">${Humanize.compactInteger(
_1h
)},</span>
<span class="sums-6h" title="sum of last 6 hours (${_6h})">${Humanize.compactInteger(
_6h
)},</span>
<span class="sums-1d" title="sum of last 1 day (${_24h})">${Humanize.compactInteger(
_24h
)}</span>`;
const doSums = async (id, events) => {
// Calculate sums for the last 15 minutes, 1 hour, 6 hours, and 1 day based on event timestamps
let sums = [0, 0, 0, 0];
let now = Date.now();
let fifteenMinutes = 15 * 60 * 1000;
let oneHour = 60 * 60 * 1000;
let sixHours = 6 * 60 * 60 * 1000;
let oneDay = 24 * 60 * 60 * 1000;
for (let ev of events) {
let diff = now - new Date(ev.time);
if (diff < fifteenMinutes) {
sums[0] += ev.count;
}
if (diff < oneHour) {
sums[1] += ev.count;
}
if (diff < sixHours) {
sums[2] += ev.count;
}
if (diff < oneDay) {
sums[3] += ev.count;
}
}
document.getElementById(`${id}-sums`).innerHTML = sumsStr(...sums);
};
const allEventsByType = (id, events) => {
doSums(id, events);
let allEvents = events.reduce(
(acc, ev) => {
const eventName = ev.eventName.replace(/_[0-9]+/g, "");
acc[eventName][ev.time] = acc[eventName][ev.time] ?? 0;
acc[eventName][ev.time] += ev.count;
return acc;
},
{ Death: {}, VehicleDestroy: {}, GainExperience: {} }
);
new Chart(document.getElementById(id), {
type: "line",
options: {
scales: {
y: { beginAtZero: true, suggestedMin: 0 },
x: { stacked: false, type: "timeseries" },
},
},
data: {
datasets: [
{
label: "Deaths",
data: allEvents.Death,
},
{
label: "Vehicle Destroys",
data: allEvents.VehicleDestroy,
},
{
label: "Experience Events",
data: allEvents.GainExperience,
},
],
},
});
};
const experienceEventsByID = (eventsUnfiltered) => {
const events = eventsUnfiltered.filter((ev) =>
ev.eventName.startsWith("GainExperience_")
);
doSums("exp-by-id", events);
let allEvents = events.reduce((acc, ev) => {
const eventID = ev.eventName.replace(/GainExperience_([0-9]+)/g, "$1");
acc[eventID] = acc[eventID] ?? {};
acc[eventID][ev.time] = acc[eventID][ev.time] ?? 0;
acc[eventID][ev.time] += ev.count;
return acc;
}, {});
new Chart(document.getElementById("exp-by-id"), {
type: "bar",
options: {
scales: {
y: { stacked: true, beginAtZero: true, suggestedMin: 0 },
x: { stacked: true, type: "timeseries" },
},
},
data: {
datasets: Object.keys(allEvents).map((id) => ({
label: id,
data: allEvents[id],
})),
},
});
};
const eventsByWorld = (events) => {
let allEvents = events.reduce((acc, ev) => {
acc[ev.worldId] = acc[ev.worldId] || {};
acc[ev.worldId][ev.time] = (acc[ev.time] || 0) + ev.count;
return acc;
}, {});
new Chart(document.getElementById("events-by-world"), {
type: "line",
options: {
scales: {
y: { beginAtZero: true },
x: {
type: "timeseries",
},
},
},
data: {
datasets: [
{
label: "Connery",
data: allEvents["1"],
},
{
label: "Miller",
data: allEvents["10"],
},
{
label: "Cobalt",
data: allEvents["13"],
},
{
label: "Emerald",
data: allEvents["17"],
},
{
label: "Jaeger",
data: allEvents["19"],
},
{
label: "SolTech",
data: allEvents["40"],
},
{
label: "Genudine",
data: allEvents["1000"],
},
{
label: "Ceres",
data: allEvents["2000"],
},
],
},
});
};
(async () => {
let resp = await fetch("/graphql", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
query: `
{
analytics {
events(${
location.search.includes("hi=1")
? "hiPrecision: true"
: "bucketSize: 300"
}) {
eventName
count
time
worldId
}
}
}
`,
}),
});
let body = await resp.json();
let events = body.data.analytics.events;
window.events = events;
document.getElementById("loading").style.display = "none";
allEventsByType("all-events-by-type", events);
eventsByWorld(events);
[
["connery", 1],
["miller", 10],
["cobalt", 13],
["emerald", 17],
["jaeger", 19],
["soltech", 40],
["genudine", 1000],
["ceres", 2000],
].forEach(([world, id]) => {
let worldEvents = events.filter((ev) => ev.worldId === id);
allEventsByType(world, worldEvents);
});
const expFilter = document.getElementById("exp-filter");
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
expFilter.addEventListener("change", () => {
document.getElementById("exp-by-id").outerHTML =
"<canvas id='exp-by-id' />";
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
});
})();
</script>

View file

@ -1,10 +1,7 @@
mod analytics;
mod classes;
mod factions;
mod health;
mod population;
mod query;
mod telemetry;
mod utils;
mod vehicles;
mod world;
@ -27,15 +24,9 @@ use tower_http::cors::{Any, CorsLayer};
extern crate serde_json;
async fn index() -> Html<&'static str> {
telemetry::http_request("/", "GET");
Html(include_str!("html/index.html"))
}
async fn ingest() -> Html<&'static str> {
telemetry::http_request("/ingest", "GET");
Html(include_str!("html/ingest.html"))
}
async fn handle_404() -> Html<&'static str> {
Html(include_str!("html/404.html"))
}
@ -44,7 +35,6 @@ async fn graphql_handler_post(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
Json(query): Json<Request>,
) -> Json<Response> {
telemetry::http_request("/graphql", "POST");
Json(schema.execute(query).await)
}
@ -52,8 +42,6 @@ async fn graphql_handler_get(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
query: Query<Request>,
) -> axum::response::Response {
telemetry::http_request("/graphql", "GET");
if query.query == "" {
return Redirect::to("/graphiql").into_response();
}
@ -62,8 +50,6 @@ async fn graphql_handler_get(
}
async fn graphiql() -> impl IntoResponse {
telemetry::http_request("/graphiql", "GET");
Html(
GraphiQLSource::build()
.endpoint("/graphql")
@ -75,7 +61,7 @@ async fn graphiql() -> impl IntoResponse {
#[tokio::main]
async fn main() {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
let db = sqlx::PgPool::connect(&db_url).await.unwrap();
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
@ -84,15 +70,12 @@ async fn main() {
let app = Router::new()
.route("/", get(index))
.route("/ingest", get(ingest))
.route("/health", get(health::get_health))
.route(
"/graphql",
post(graphql_handler_post).get(graphql_handler_get),
)
.route("/graphiql", get(graphiql))
.route("/metrics", get(telemetry::handler))
.route("/metrics/combined", get(telemetry::handler_combined))
.route("/graphql/playground", get(graphiql))
.fallback(handle_404)
.layer(Extension(db))
.layer(Extension(schema))

View file

@ -1,8 +1,4 @@
use crate::{
factions::{NC, NSO, TR, VS},
utils::Filters,
telemetry,
};
use crate::utils::Filters;
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -23,9 +19,8 @@ impl Population {
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_by_faction");
let sql = format!(
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND faction_id = $1 {};",
self.filters.sql(),
);
@ -45,13 +40,10 @@ impl Population {
#[Object]
impl Population {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_total");
let sql = format!(
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' {};",
self.filters.sql(),
);
@ -62,24 +54,20 @@ impl Population {
.await
.unwrap()
.get(0);
query
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "nc");
self.by_faction(ctx, NC).await
self.by_faction(ctx, 1).await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "vs");
self.by_faction(ctx, VS).await
self.by_faction(ctx, 2).await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "tr");
self.by_faction(ctx, TR).await
self.by_faction(ctx, 3).await
}
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "ns");
self.by_faction(ctx, NSO).await
self.by_faction(ctx, 4).await
}
}

View file

@ -1,6 +1,6 @@
use crate::{
analytics::AnalyticsQuery, classes::ClassesQuery, health::HealthQuery,
population::PopulationQuery, vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
classes::ClassesQuery, health::HealthQuery, population::PopulationQuery,
vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
};
use async_graphql::MergedObject;
@ -12,5 +12,4 @@ pub struct Query(
WorldQuery,
ZoneQuery,
HealthQuery,
AnalyticsQuery,
);

View file

@ -1,138 +0,0 @@
use lazy_static::lazy_static;
use prometheus::{
IntGauge,
IntGaugeVec,
register_int_gauge_vec,
register_int_gauge,
TextEncoder,
gather
};
use sqlx::{Pool, Postgres, Row};
use axum::Extension;
use chrono::{DateTime, Utc};
lazy_static! {
// http
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
"route", "method"
]).unwrap();
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
"major", "minor"
]).unwrap();
// counters
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
update_data_gauges(pool).await;
// Final output
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
let local = handler(Extension(pool)).await;
let remote = match reqwest::get(url).await {
Ok(r) => r.text().await.expect("failed to text lol"),
Err(_) => String::from("")
};
format!("{}{}", local, remote)
}
// pub fn db_write(table: &str, op: &str) {
// DB_WRITES.with_label_values(&[table, op]).inc();
// }
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}
pub fn http_request(route: &str, method: &str) {
HTTP_REQUEST.with_label_values(&[route, method]).inc();
}
pub fn graphql_query(major: &str, minor: &str) {
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
}
async fn update_data_gauges(pool: Pool<Postgres>) {
// Do some easy queries to fill our non-cumulative gauges
db_read("players", "count_all");
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
PLAYERS_TRACKED.set(player_count);
db_read("players", "get_newest");
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_PLAYER.set(player_newest.timestamp());
db_read("players", "get_oldest");
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_PLAYER.set(player_oldest.timestamp());
db_read("vehicles", "count_all");
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
VEHICLES_TRACKED.set(vehicle_count);
db_read("vehicles", "get_newest");
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
db_read("vehicles", "get_oldest");
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
}

View file

@ -1,8 +1,4 @@
use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry,
};
use crate::utils::{Filters, IdOrNameBy};
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -16,9 +12,8 @@ impl Vehicle {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("vehicles", "fetch");
let sql = format!(
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};",
filters.sql(),
);
@ -38,41 +33,33 @@ impl Vehicle {
#[Object]
impl Vehicle {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "total");
self.fetch(ctx, self.filters.clone()).await
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "nc");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(NC)),
faction: Some(IdOrNameBy::Id(1)),
..self.filters.clone()
},
)
.await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "tr");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(TR)),
faction: Some(IdOrNameBy::Id(2)),
..self.filters.clone()
},
)
.await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "vs");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(VS)),
faction: Some(IdOrNameBy::Id(3)),
..self.filters.clone()
},
)
@ -96,13 +83,10 @@ impl Vehicles {
#[Object]
impl Vehicles {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicles", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "vehicles_total");
let sql = format!(
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' {};",
self.filters.sql(),
);
@ -119,90 +103,62 @@ impl Vehicles {
// Transport
async fn flash(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "flash");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "flash".to_string(),
}
}
async fn sunderer(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "sunderer");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "sunderer".to_string(),
}
}
async fn ant(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "ant");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "ant".to_string(),
}
}
async fn harasser(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "harasser");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "harasser".to_string(),
}
}
async fn javelin(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "javelin");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "javelin".to_string(),
}
}
async fn corsair(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "corsair");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "corsair".to_string(),
}
}
// Tanks
async fn lightning(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "lightning");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "lightning".to_string(),
vehicle_name: "javelin".to_string(),
}
}
async fn prowler(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "prowler");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "prowler".to_string(),
}
}
async fn vanguard(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "vanguard");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "vanguard".to_string(),
}
}
async fn magrider(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "magrider");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "magrider".to_string(),
}
}
async fn chimera(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "chimera");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "chimera".to_string(),
@ -211,56 +167,42 @@ impl Vehicles {
// Air
async fn mosquito(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "mosquito");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "mosquito".to_string(),
}
}
async fn liberator(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "liberator");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "liberator".to_string(),
}
}
async fn galaxy(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "galaxy");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "galaxy".to_string(),
}
}
async fn valkyrie(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "valkyrie");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "valkyrie".to_string(),
}
}
async fn reaver(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "reaver");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "reaver".to_string(),
}
}
async fn scythe(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "scythe");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "scythe".to_string(),
}
}
async fn dervish(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "dervish");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "dervish".to_string(),

View file

@ -4,7 +4,6 @@ use crate::{
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
vehicles::Vehicles,
zone::Zones,
telemetry,
};
use async_graphql::Object;
@ -34,15 +33,11 @@ impl World {
impl World {
/// The ID of the world.
async fn id(&self) -> i32 {
telemetry::graphql_query("World", "id");
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
}
/// The name of the world, in official game capitalization.
async fn name(&self) -> String {
telemetry::graphql_query("World", "name");
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
// Special case for SolTech, lol.
@ -56,8 +51,6 @@ impl World {
/// Population filtered to this world.
async fn population(&self) -> Population {
telemetry::graphql_query("World", "population");
Population::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -67,8 +60,6 @@ impl World {
/// Vehicles filtered to this world.
async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("World", "vehicles");
Vehicles::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -78,8 +69,6 @@ impl World {
/// Classes filtered to this world.
async fn classes(&self) -> Classes {
telemetry::graphql_query("World", "classes");
Classes::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -89,8 +78,6 @@ impl World {
/// Get a specific zone/continent on this world.
async fn zones(&self) -> Zones {
telemetry::graphql_query("World", "zones");
Zones::new(Some(self.filter.clone()))
}
}

View file

@ -3,7 +3,6 @@ use crate::{
population::Population,
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
vehicles::Vehicles,
telemetry,
};
use async_graphql::Object;
@ -24,15 +23,11 @@ impl Zone {
impl Zone {
/// The ID of the zone/continent.
async fn id(&self) -> i32 {
telemetry::graphql_query("Zone", "id");
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
}
/// The name of the continent, in official game capitalization.
async fn name(&self) -> String {
telemetry::graphql_query("Zone", "name");
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
// Capitalize the first letter
@ -40,20 +35,14 @@ impl Zone {
}
async fn population(&self) -> Population {
telemetry::graphql_query("Zone", "population");
Population::new(Some(self.filters.clone()))
}
async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("Zone", "vehicles");
Vehicles::new(Some(self.filters.clone()))
}
async fn classes(&self) -> Classes {
telemetry::graphql_query("Zone", "classes");
Classes::new(Some(self.filters.clone()))
}
}

View file

@ -6,10 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] }
once_cell = "1.16.0"
tokio = { version = "1.23.0", features = ["full"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
lazy_static = "1.4.0"
async_once = "0.2.6"
dotenvy = "0.15.6"

View file

@ -1,4 +1,5 @@
use async_once::AsyncOnce;
use dotenvy::dotenv;
use lazy_static::lazy_static;
use migrations::cmd_migrate;
use sqlx::query;
@ -9,7 +10,7 @@ mod migrations;
lazy_static! {
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
sqlx::PgPool::connect(&db_url).await.unwrap()
});
}
@ -18,14 +19,21 @@ async fn cmd_prune() {
println!("Pruning old data...");
let pool = PG.get().await;
let rows = query("DELETE FROM players WHERE last_updated < NOW() - INTERVAL '15 minutes';")
let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
.rows_affected();
println!("Deleted {} rows of old player data", rows);
let rows = query("DELETE FROM vehicles WHERE last_updated < NOW() - INTERVAL '15 minutes';")
let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
.rows_affected();
println!("Deleted {} rows of old class data", rows);
let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
@ -50,41 +58,14 @@ fn cmd_help() {
#[tokio::main]
async fn main() {
dotenv().ok();
let command = args().nth(1).unwrap_or("help".to_string());
match command.as_str() {
"help" => cmd_help(),
"prune" => cmd_prune().await,
"auto-prune" => loop {
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"maintenance" => {
println!("Running maintenance tasks...");
println!("Checking if DB is migrated...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
println!("Running prune...");
cmd_prune().await;
println!("Done!");
}
"auto-maintenance" => loop {
println!("Running maintenance tasks...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"migrate" => cmd_migrate().await,
"print-env" => {
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
}
_ => {
println!("Unknown command: {}", command);
cmd_help();

View file

@ -1,10 +1,15 @@
use crate::PG;
use sqlx::{query, Row};
use sqlx::query;
pub async fn cmd_migrate() {
println!("Migrating database...");
tokio::join!(migrate_players(), migrate_vehicles(), migrate_analytics());
tokio::join!(
migrate_players(),
migrate_classes(),
migrate_vehicles(),
migrate_analytics()
);
}
async fn migrate_players() {
@ -21,21 +26,77 @@ async fn migrate_players() {
println!("PLAYERS => CREATE TABLE players");
query(
"CREATE TABLE players (
character_id TEXT NOT NULL PRIMARY KEY,
last_updated TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
class_name TEXT NOT NULL
);",
zone_id INT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("PLAYERS => create_hypertable");
query(
"SELECT create_hypertable('players', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("PLAYERS => add_retention_policy");
query("SELECT add_retention_policy('players', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("PLAYERS => done!");
}
async fn migrate_classes() {
let pool = PG.get().await;
println!("-> Migrating classes");
println!("CLASSES => DROP TABLE IF EXISTS classes");
query("DROP TABLE IF EXISTS classes")
.execute(pool)
.await
.unwrap();
println!("CLASSES => CREATE TABLE classes");
query(
"CREATE TABLE classes (
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
class_id TEXT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => create_hypertable");
query(
"SELECT create_hypertable('classes', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => add_retention_policy");
query("SELECT add_retention_policy('classes', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("CLASSES => done!");
}
async fn migrate_vehicles() {
let pool = PG.get().await;
@ -50,18 +111,33 @@ async fn migrate_vehicles() {
println!("VEHICLES => CREATE TABLE vehicles");
query(
"CREATE TABLE vehicles (
character_id TEXT NOT NULL PRIMARY KEY,
last_updated TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
vehicle_name TEXT NOT NULL
);",
zone_id INT NOT NULL,
vehicle_id TEXT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("VEHICLES => create_hypertable");
query(
"SELECT create_hypertable('vehicles', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("VEHICLES => add_retention_policy");
query("SELECT add_retention_policy('vehicles', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("VEHICLES => done!");
}
@ -97,15 +173,3 @@ async fn migrate_analytics() {
println!("ANALYTICS => done!");
}
pub async fn is_migrated() -> bool {
let pool = PG.get().await;
let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'vehicles', 'analytics');")
.fetch_one(pool)
.await
.unwrap()
.get(0);
tables == 3
}

View file

@ -6,22 +6,16 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
redis = { version = "0.22.1", default_features = false, features = ["r2d2"] }
lazy_static = "1.4.0"
tokio-tungstenite = { version = "0.20.0", features = [
"rustls-tls-webpki-roots",
] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
url = "2.4.1"
futures-util = "0.3.28"
futures = "0.3.28"
tokio-tungstenite = { version = "0.18.0", features=["native-tls"] }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
tokio = { version = "1.23.0", features = ["full"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
url = "2.3.1"
futures-util = "0.3.25"
futures = "0.3.25"
async_once = "0.2.6"
serde-aux = "4.2.0"
axum = "0.6.20"
prometheus = "0.13.3"
prometheus-static-metric = "0.5.1"
serde-aux = "4.1.2"
axum = "0.6.1"

View file

@ -1,50 +1,42 @@
use async_once::AsyncOnce;
use axum::{routing::get, Json, Router};
use axum::{routing::get, Router};
use futures::{pin_mut, FutureExt};
use futures_util::StreamExt;
use lazy_static::lazy_static;
use serde::Deserialize;
use serde_aux::prelude::*;
use serde_json::json;
use sqlx::{postgres::PgPoolOptions, query, Row};
use sqlx::{postgres::PgPoolOptions, query};
use std::{env, net::SocketAddr};
use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators;
mod telemetry;
lazy_static! {
// static ref PAIR: String = env::var("PAIR").unwrap_or_default();
// static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
PgPoolOptions::new().connect(&db_url).await.unwrap()
});
}
async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let worlds_raw = env::var("WORLDS").unwrap_or("all".to_string());
let worlds_raw = env::var("WORLDS").unwrap_or_default();
if worlds_raw == "" {
println!("WORLDS not set");
return;
}
let worlds: Vec<&str> = worlds_raw.split(',').collect();
let experience_ids = vec![
2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, 233, 293,
294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, 675,
];
let mut events = experience_ids
.iter()
.map(|id| format!("GainExperience_experience_id_{}", id))
.collect::<Vec<String>>();
events.push("Death".to_string());
events.push("VehicleDestroy".to_string());
// Send setup message
let setup_msg = json!({
"action": "subscribe",
"worlds": worlds,
"eventNames": events,
"eventNames": ["Death", "VehicleDestroy"],
"characters": ["all"],
"logicalAndCharactersWithWorlds": true,
"service": "event",
@ -54,7 +46,6 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
.unwrap();
println!("[ws] Sent setup message");
println!("[ws/setup] {}", setup_msg.to_string())
}
#[derive(Clone)]
@ -63,32 +54,33 @@ struct PopEvent {
team_id: i32,
character_id: String,
zone_id: i32,
loadout_id: String,
vehicle_id: String,
}
#[derive(Debug)]
struct VehicleEvent {
world_id: i32,
vehicle_id: String,
character_id: String,
zone_id: i32,
team_id: i32,
}
struct ClassEvent {
world_id: i32,
character_id: String,
loadout_id: String,
zone_id: i32,
team_id: i32,
}
struct AnalyticsEvent {
world_id: i32,
event_name: String,
}
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
let pool = PG.get().await;
telemetry::db_read("players", "get_team_id");
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
.bind(character_id.clone())
.fetch_one(pool)
.await?
.get(0);
if team_id == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(team_id)
}
// async fn track_pop(pop_event: PopEvent) {
// track_pop_db(pop_event.clone()).await;
// track_pop_redis(pop_event).await;
// }
async fn track_pop(pop_event: PopEvent) {
// println!("[ws/track_pop]");
@ -99,31 +91,76 @@ async fn track_pop(pop_event: PopEvent) {
team_id,
character_id,
zone_id,
loadout_id,
vehicle_id,
} = pop_event;
let class_name = translators::loadout_to_class(loadout_id.as_str());
let vehicle_name = if vehicle_id == "" {
"unknown".to_string()
} else {
translators::vehicle_to_name(vehicle_id.as_str())
};
query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.execute(pool)
.await
.unwrap();
}
async fn track_vehicle(vehicle_event: VehicleEvent) {
// println!("[ws/track_vehicle]");
let pool = PG.get().await;
let VehicleEvent {
world_id,
vehicle_id,
zone_id,
character_id,
team_id,
} = vehicle_event;
let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str());
if vehicle_name == "unknown" {
return;
}
query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(vehicle_name)
.execute(pool)
.await
.unwrap();
}
async fn track_class(class_event: ClassEvent) {
// println!("[ws/track_class]");
let pool = PG.get().await;
let ClassEvent {
world_id,
character_id,
loadout_id,
zone_id,
team_id,
} = class_event;
let class_name = translators::loadout_to_class(loadout_id.as_str());
if class_name == "unknown" {
return;
}
telemetry::db_write("players", "track_pop");
query(
"
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET
last_updated = EXCLUDED.last_updated,
world_id = EXCLUDED.world_id,
faction_id = EXCLUDED.faction_id,
zone_id = EXCLUDED.zone_id,
class_name = EXCLUDED.class_name
;",
"INSERT INTO classes (
time,
character_id,
world_id,
faction_id,
zone_id,
class_id
) VALUES (now(), $1, $2, $3, $4, $5);",
)
.bind(character_id.clone())
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
@ -131,31 +168,10 @@ async fn track_pop(pop_event: PopEvent) {
.execute(pool)
.await
.unwrap();
if vehicle_name != "unknown" {
telemetry::db_write("vehicles", "track_pop");
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET
last_updated = EXCLUDED.last_updated,
world_id = EXCLUDED.world_id,
faction_id = EXCLUDED.faction_id,
zone_id = EXCLUDED.zone_id,
vehicle_name = EXCLUDED.vehicle_name
;")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(vehicle_name)
.execute(pool)
.await
.unwrap();
}
}
async fn track_analytics(analytics_event: AnalyticsEvent) {
// println!("[ws/track_analytics] {:?}", analytics_event);
// println!("[ws/track_analytics]");
let pool = PG.get().await;
let AnalyticsEvent {
@ -163,21 +179,15 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
event_name,
} = analytics_event;
telemetry::db_write("analytics", "track_analytics");
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
.bind(world_id)
.bind(event_name)
.execute(pool)
.await
{
Ok(_) => {}
Err(e) => {
println!("[ws/track_analytics] ERR => {:?}", e);
}
}
.unwrap();
}
async fn process_death_event(event: &Event) {
async fn process_event(event: &Event) {
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
@ -186,14 +196,33 @@ async fn process_death_event(event: &Event) {
event_name: event.event_name.clone(),
}));
if event.character_id != "" && event.character_id != "0" {
if event.character_id != "0" {
// General population tracking
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.loadout_id.clone(),
}));
}
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.vehicle_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.character_id.clone(),
loadout_id: event.loadout_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
@ -206,58 +235,42 @@ async fn process_death_event(event: &Event) {
team_id: event.attacker_team_id.clone(),
character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.attacker_loadout_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
}));
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.attacker_character_id.clone(),
loadout_id: event.attacker_loadout_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
}
while let Some(_) = set.join_next().await {}
}
async fn process_exp_event(event: &Event) {
telemetry::experience_event(&event.world_id, &event.experience_id);
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
set.spawn(track_analytics(AnalyticsEvent {
world_id: event.world_id.clone(),
event_name: format!(
"{}_{}",
event.event_name.clone(),
event.experience_id.clone()
),
}));
// Vehicle EXP events
let vehicle_id = match event.experience_id {
201 => "11".to_string(), // Galaxy Spawn Bonus
233 => "2".to_string(), // Sunderer Spawn Bonus
674 | 675 => "160".to_string(), // ANT stuff
_ => "".to_string(),
};
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.loadout_id.clone(),
vehicle_id: vehicle_id.clone(),
}));
while let Some(_) = set.join_next().await {}
}
#[derive(Deserialize, Debug, Clone, Default)]
struct Event {
event_name: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
world_id: i32,
character_id: String,
#[serde(default)]
attacker_character_id: String,
#[serde(default, deserialize_with = "deserialize_number_from_string")]
#[serde(deserialize_with = "deserialize_number_from_string")]
attacker_team_id: i32,
#[serde(default, deserialize_with = "deserialize_number_from_string")]
#[serde(deserialize_with = "deserialize_number_from_string")]
team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")]
zone_id: i32,
@ -273,11 +286,6 @@ struct Event {
vehicle_id: String,
#[serde(default)]
attacker_vehicle_id: String,
#[serde(default, deserialize_with = "deserialize_number_from_string")]
experience_id: i32,
// #[serde(default)]
// other_id: String,
}
#[derive(Deserialize, Debug, Clone)]
@ -286,17 +294,7 @@ struct Payload {
}
async fn healthz() {
let app = Router::new().route(
"/healthz",
get(|| async {
Json(json!({
"status": "ok",
}))
}),
).route(
"/metrics",
get(telemetry::handler)
);
let app = Router::new().route("/healthz", get(|| async { "ok" }));
let port: u16 = std::env::var("PORT")
.unwrap_or("8999".to_string())
@ -321,54 +319,23 @@ async fn main() {
}
let url = url::Url::parse(&addr).unwrap();
println!("[ws] Connecting to {}", url);
let (tx, rx) = futures::channel::mpsc::unbounded();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
let (write, read) = ws_stream.split();
let fused_writer = rx.map(Ok).forward(write).fuse();
let fused_reader = read
.for_each(|msg| async {
.for_each(|msg| async move {
let body = &msg.unwrap().to_string();
let mut data: Payload = match serde_json::from_str(body) {
Ok(data) => data,
Err(_e) => {
// println!("Error: {}; body: {}", e, body.clone());
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
return;
}
};
let data: Payload = serde_json::from_str(body).unwrap_or(Payload {
payload: Event::default(),
});
if data.payload.event_name == "" {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
return;
}
telemetry::event(&data.payload.world_id, &data.payload.event_name);
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
process_death_event(&data.payload).await;
return;
}
if data.payload.event_name == "GainExperience" {
if data.payload.team_id == 0 {
match get_team_id(data.payload.character_id.clone()).await {
Ok(team_id) => {
data.payload.team_id = team_id;
}
Err(_) => {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
}
}
}
process_exp_event(&data.payload).await;
return;
}
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
process_event(&data.payload).await;
})
.fuse();

View file

@ -1,75 +0,0 @@
use lazy_static::lazy_static;
use prometheus::{
IntGaugeVec,
register_int_gauge_vec,
TextEncoder,
gather
};
lazy_static! {
// incoming events
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
"world_id", "event_name"
]).unwrap();
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
"world_id", "event_name", "reason"
]).unwrap();
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
"world_id", "experience_id"
]).unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler() -> String {
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub fn event(world_id: &i32, event_name: &String) {
EVENTS.with_label_values(&[
&world_id.to_string(),
&event_name,
]).inc();
}
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
EVENTS_DROPPED.with_label_values(&[
&world_id.to_string(),
&event_name,
reason,
]).inc();
}
pub fn experience_event(world_id: &i32, experience_id: &i32) {
EXPERIENCE_EVENTS.with_label_values(&[
&world_id.to_string(),
&experience_id.to_string(),
]).inc();
}
pub fn db_write(table: &str, op: &str) {
DB_WRITES.with_label_values(&[table, op]).inc();
}
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}

View file

@ -34,8 +34,6 @@ lazy_static! {
("1105", "vanguard"),
("2010", "flash"),
("2033", "javelin"),
("2039", "ant"),
("2040", "valkyrie"),
("2122", "mosquito"),
("2123", "reaver"),
("2124", "scythe"),
@ -49,9 +47,6 @@ lazy_static! {
("2135", "prowler"),
("2136", "dervish"),
("2137", "chimera"),
("2139", "ant"),
("2140", "galaxy"),
("2141", "valkyrie"),
("2142", "corsair"),
]);
static ref LOADOUT_TO_CLASS: HashMap<&'static str, &'static str> = HashMap::from([