Compare commits

...
Sign in to create a new pull request.

57 commits
rework ... main

Author SHA1 Message Date
noe
3c0ab61695 fix docker image 2024-02-18 00:54:54 -05:00
noe
3a422b8f6f add prometheus to api and ws 2024-02-18 00:52:39 -05:00
noe
7ab5893f67 nix 2024-02-17 03:32:13 -05:00
96cb2c80d8 add auto-maint 2023-09-04 14:06:04 -04:00
ad8105ca94 add tasks print-env 2023-09-04 02:45:29 -04:00
e805a4ca5a update all cargo packages' 2023-09-04 01:30:53 -04:00
be38bb6b5f use nix 2023-09-04 01:30:40 -04:00
c58fdf06b7 refactor to use default localhost IPs instead of localhost 2023-09-04 01:30:23 -04:00
9ebeeb47f4 stacked exp bar chart correctly 2023-06-29 16:33:52 -04:00
f49a8a0fda add filter to exp by id 2023-06-29 15:46:45 -04:00
a1f0e32e30 make experience more specific 2023-06-29 15:30:29 -04:00
143ec0cd3b completely restructure database 2023-06-02 02:14:12 -04:00
2dd19249db remove Dockerfile.old 2023-05-31 16:40:32 -04:00
79d406cee6 update readme.md 2023-05-31 01:02:13 -04:00
e5c57bf505 move to 1 table instead of 3 (what was I thinking???) 2023-05-31 01:01:17 -04:00
11127f269d increase hypertable bucket size to 5 minutes 2023-05-30 20:50:30 -04:00
97ea7455a7 remove spot kill as it produces redundant data 2023-05-30 08:52:40 -04:00
bfdf7dcb52 adjust dockerfile 2023-05-28 18:14:23 -04:00
ffc2dfbbad remove cargo-chef from docker, it's slow now 2023-05-28 15:03:22 -04:00
4e7824cbad update packages 2023-05-28 14:51:03 -04:00
2a47beb69a ps4 needs to find team_id 2023-05-28 14:45:30 -04:00
f8add369a6 add pop tracking to exp events 2023-05-28 13:47:30 -04:00
118f1b6b99 add raw totals to ingest hovers 2023-05-28 13:39:08 -04:00
bf89c4aaf0 add a bunch more exp events 2023-05-28 13:20:54 -04:00
738d2975ec add GainExperience listeners 2023-05-28 13:07:05 -04:00
24437b5520 fix lightning, add corsair; update packages 2023-05-20 23:17:57 -04:00
bca70e2d5b fix faction IDs 2023-03-05 11:34:35 -05:00
679b49ff88 add general maintenance script to always run in prod 2023-01-23 08:53:06 -05:00
9940e9dd90 zerofill analytics 2022-12-14 08:40:59 -05:00
2d6da8343d remove double push: 2022-12-13 01:17:31 -05:00
1c3440d919 thonk 2022-12-13 01:11:55 -05:00
0f710f2712 tokio machine broke 2022-12-13 01:07:15 -05:00
9b6b261c16 debug why gha cache is failing 2022-12-13 00:57:58 -05:00
97474de07e reset sqlx features 2022-12-13 00:52:02 -05:00
a9d1daf397 remove cacher copy 2022-12-13 00:49:53 -05:00
505e71f65f fix build command 2022-12-13 00:45:34 -05:00
6bd048a9f4 gha cache docker build 2022-12-13 00:43:59 -05:00
b8fa5d9595 fix bin-selects 2022-12-13 00:40:39 -05:00
c483764b4b bin-select on cargo chef 2022-12-13 00:30:50 -05:00
9365abc7f8 remove extra clang 2022-12-13 00:23:31 -05:00
83f5f02a88 fix mold import 2022-12-13 00:23:02 -05:00
5ca02948df more build optimization 2022-12-13 00:20:24 -05:00
267a8c11c3 optimize build 2022-12-13 00:07:27 -05:00
9ccd47afa0 remove deploy step 2022-12-12 23:27:42 -05:00
d83ff16c1a add hi-precision bucketing for short term viz 2022-12-12 23:26:03 -05:00
9f1942344b update url 2022-12-12 23:09:23 -05:00
170fdf647d improvements to ingest.html 2022-12-12 23:07:47 -05:00
1e41262d70 add ingest analytics page 2022-12-12 19:40:05 -05:00
2665a6d25f initialize analytics 2022-12-11 11:33:06 -05:00
004def8fbb update health to be more helpful 2022-12-10 22:01:27 -05:00
89d115b61d update codegen to translate ant, valk, and gal variants 2022-12-09 17:47:56 -05:00
b91019e8b4 allow attacker_team_id to be empty, PS4 bug 2022-12-09 17:42:17 -05:00
26f0ce1a1a update htmls 2022-12-09 15:48:25 -05:00
3bfc0b4e28 update htmls 2022-12-09 15:23:09 -05:00
c89eb6ea74 update readme 2022-12-09 13:59:20 -05:00
05e30e4420 cleanup 2022-12-09 11:16:53 -05:00
93bccd3b19 fix graphiql url 2022-12-09 08:35:38 -05:00
38 changed files with 4341 additions and 1398 deletions

0
.env
View file

3
.envrc Normal file
View file

@ -0,0 +1,3 @@
use flake . --accept-flake-config;
# source .envrc-local

View file

@ -17,36 +17,12 @@ jobs:
registry: ghcr.io registry: ghcr.io
username: ${{ github.actor }} username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- run: docker buildx create --use --driver=docker-container
- run: | - run: |
docker build . \ TAG_LATEST_IF_MASTER=$(if [ "$GITHUB_REF_NAME" = "main" ]; then echo "-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest"; else echo ""; fi)
docker buildx build . \
--build-arg SERVICE=${{ matrix.service }} \ --build-arg SERVICE=${{ matrix.service }} \
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} -t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} $TAG_LATEST_IF_MASTER \
- run: | --push \
docker tag ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} \ --cache-to type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }} \
ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest --cache-from type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }}
if: github.ref == 'refs/heads/main'
- run: |
docker push ghcr.io/${{ github.repository }}/${{ matrix.service }}
deploy:
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
environment:
name: production
url: https://saerro.harasse.rs
permissions:
contents: "read"
id-token: "write"
steps:
- id: "auth"
uses: "google-github-actions/auth@v1"
with:
workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }}
service_account: ${{ secrets.SERVICE_ACCOUNT }}
- name: "Set up Cloud SDK"
uses: "google-github-actions/setup-gcloud@v1"
- name: "Deploy"
run: |
gcloud compute ssh ${{ secrets.VM_NAME }} --zone=us-central1-a --command "cd /opt && sudo docker compose pull && sudo docker compose up -d"

6
.gitignore vendored
View file

@ -1 +1,7 @@
/target /target
.DS_Store
*/.DS_Store
.envrc-local
/.vscode
/.direnv
/result

2217
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,4 @@
[workspace] [workspace]
members = [ members = ["services/*"]
"services/*", exclude = ["hack/codegen"]
"hack/*" resolver = "2"
]

View file

@ -1,18 +1,21 @@
FROM rust:1.65.0-bullseye AS builder FROM rust:1.76.0-bullseye as rust-base
ARG SERVICE
WORKDIR /app WORKDIR /app
COPY Cargo.toml Cargo.lock ./ RUN apt-get update && apt-get install -y --no-install-recommends curl clang
COPY services ./services ARG MOLD_VERSION=1.11.0
COPY hack ./hack RUN curl -sSL https://github.com/rui314/mold/releases/download/v${MOLD_VERSION}/mold-${MOLD_VERSION}-x86_64-linux.tar.gz | tar xzv && \
mv mold-${MOLD_VERSION}-x86_64-linux/bin/mold /mold && \
rm -rf mold-${MOLD_VERSION}-x86_64-linux
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
RUN cargo build --bin ${SERVICE} --release FROM rust-base as builder
COPY . .
FROM debian:bullseye-slim AS target
ARG SERVICE ARG SERVICE
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
COPY --from=builder /app/target/release/${SERVICE} /app ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
RUN cargo build --release --bin ${SERVICE}
RUN chmod a+x /app FROM debian:bullseye-slim as runtime
CMD /app ARG SERVICE
COPY --from=builder /app/target/release/${SERVICE} /app
ENTRYPOINT ["/app"]

View file

@ -2,33 +2,35 @@
PlanetSide 2 live population API. This API is free and open for anyone to use. PlanetSide 2 live population API. This API is free and open for anyone to use.
https://saerro.harasse.rs https://saerro.ps2.live
Our methodology is to add any player ID seen on the Census websockets to a time-sorted set, and returning the number of player IDs seen within 15 minutes. tl;dr: Watch for specific events, transform and add them to a timeseries set, and query that set for the last 15 minutes.
We're built on 3 core types, `players`, `classes`, and `vehicles`. Each can be filtered by Continent/Zone, Faction, and World.
--- ---
The one and only goal of this app is to provide a current "point-in-time" population status for PlanetSide 2, per world, per faction, (and later, per continent.) Historical info is _not_ a goal; you may implement this on your end.
Please open an issue here or get in touch with Pomf (okano#0001) on the PS2 Discord if you have complex use cases for this data; it may be trivial/easy to implement APIs tailored to your needs. Please open an issue here or get in touch with Pomf (okano#0001) on the PS2 Discord if you have complex use cases for this data; it may be trivial/easy to implement APIs tailored to your needs.
The main use case is for [Medkit](https://github.com/kayteh/medkit2) bot to have an in-house source of population data, without relying too heavily on any third-party stats service, like Fisu, Honu, or Voidwell; which all have different population tracking needs and goals (and thus, different data.) The main use case is for [Medkit](https://github.com/kayteh/medkit2) bot to have an in-house source of population data, without relying too heavily on any third-party stats service, like Fisu, Honu, or Voidwell; which all have different population tracking needs and goals (and thus, different data.)
An example of how it can be used on [pstop](https://pstop.harasse.rs) ([GitHub](https://github.com/genudine/pstop)).
## Architecture ## Architecture
- Websocket processors - GraphQL API
- A pair per PC, PS4US, PS4EU - Serves https://saerro.ps2.live
- Connects to [wss://push.nanite-systems.net](https://nanite-systems.net) and Census Websocket - Built on a "stacking filter" graph model, where each dimension adds a filter to lower dimensions.
- Primary will connect to NS. - Event Streaming Service (ESS) Ingest
- Backup will connect to Census. It will wait for 60 seconds before deciding the primary is dead, and then start processing events. - WebSocket listening to https://push.nanite-systems.net (which is a resilient mirror to https://push.planetside2.com)
- API - Listens for `Death`, `VehicleDestroy`, and a number of `GainExperience` events.
- Serves https://saerro.harasse.rs - Postgres with TimescaleDB
- Built on axum and async-graphql - Holds `players` and `analytics` tables as hypertables.
- Redis - Timescale makes this way too fast, mind-blowing :)
- Using ZADD with score as timestamp, ZCOUNTBYSCORE by timestamp in 15 minute windows, and cleaned up with SCAN+ZREMBYSCORE, population data is tracked. - Tasks
- There is deliberately no persistence. - Occasional jobs that prune the database past what we actually want to retain,
- Redis "Tender" - Core data tables are kept to about 20 mins max of data, analytics to 1 week
- Cleans up Redis every 5 mins. - Can do database resets/migrations.
# Developing # Developing
@ -37,48 +39,27 @@ This app is built with Rust. You can set up a build environment via https://rust
To run, To run,
```sh ```sh
# Start Redis/backing services # Start backing services
docker compose up -d docker compose up -d
# Start Websocket for PC # Run database migrations (required first step on a freshly up'd database)
env \ cargo run --bin tasks migrate
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
PAIR=pc \
ROLE=primary \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# (Optional:) Start redundant websocket for PC # Start NSS ingest. Use push.planetside2.com if NSS isn't quite working...
env \ env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \ WS_ADDR="wss://push.nanite-systems.net/streaming?environment=all&service-id=s:$SERVICE_ID" \
PAIR=pc \ WORLDS=all \
ROLE=backup \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# (Optional:) Start PS4US websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4us&service-id=s:$SERVICE_ID" \
PAIR=ps4us \
WORLDS=1000 \
cargo run --bin websocket
# (Optional:) Start PS4EU websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4eu&service-id=s:$SERVICE_ID" \
PAIR=ps4eu \
WORLDS=2000 \
cargo run --bin websocket cargo run --bin websocket
# Start API # Start API
cargo run --bin api cargo run --bin api
# Run prune tool # Run prune tool
cargo run --bin tools prune cargo run --bin tasks prune
# Build containers # Build containers
docker build . --build-arg SERVICE=api -t saerro:api docker build . --build-arg SERVICE=api -t saerro:api
docker build . --build-arg SERVICE=tools -t saerro:tools docker build . --build-arg SERVICE=tasks -t saerro:tasks
docker build . --build-arg SERVICE=websocket -t saerro:websocket docker build . --build-arg SERVICE=websocket -t saerro:websocket
``` ```
@ -94,4 +75,4 @@ Currently, the entire stack runs on Docker. You may deploy it to any server via:
docker compose up -d -f docker-compose.live.yaml docker compose up -d -f docker-compose.live.yaml
``` ```
It listens on port 80, it's up to you from here. It listens on port 80, it's up to you from here. Make sure to change passwords present in the file. It's not _that secret_ of data, but why risk it?

View file

@ -16,7 +16,7 @@ services:
image: ghcr.io/genudine/saerro/api:latest image: ghcr.io/genudine/saerro/api:latest
pull_policy: always pull_policy: always
ports: ports:
- 8000:80 - 80:8000
links: links:
- tsdb - tsdb
restart: always restart: always

View file

@ -2,7 +2,7 @@ version: "3"
services: services:
tsdb: tsdb:
image: timescale/timescaledb-ha:pg14-latest image: docker.io/timescale/timescaledb:latest-pg14
environment: environment:
POSTGRES_PASSWORD: saerro321 POSTGRES_PASSWORD: saerro321
POSTGRES_USER: saerrouser POSTGRES_USER: saerrouser

103
flake.lock generated Normal file
View file

@ -0,0 +1,103 @@
{
"nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1708150887,
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
"owner": "nix-community",
"repo": "fenix",
"rev": "761431323e30846bae160e15682cfa687c200606",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1706830856,
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1707956935,
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"dir": "lib",
"lastModified": 1706550542,
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
"type": "github"
},
"original": {
"dir": "lib",
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"fenix": "fenix",
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1708018577,
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

56
flake.nix Normal file
View file

@ -0,0 +1,56 @@
{
description = "PlanetSide 2 Metagame Harvesting Service";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
nixConfig = {
extra-substituters = [
"https://nix-community.cachix.org"
];
extra-trusted-public-keys = [
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
];
};
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
systems = [ "x86_64-linux" "aarch64-linux" ];
perSystem = { config, self', pkgs, lib, system, ... }: let
fenix = inputs.fenix.packages.${system}.minimal;
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
buildDeps = [
pkgs.openssl
];
devDeps = [
fenix.toolchain
pkgs.docker-compose
pkgs.cargo-watch
];
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
in {
packages.default = (pkgs.makeRustPlatform {
cargo = fenix.toolchain;
rustc = fenix.toolchain;
}).buildRustPackage {
inherit (cargoToml.package) name version;
cargoLock.lockFile = ./Cargo.lock;
src = ./.;
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = buildDeps ++ devDeps;
};
devShells.default = pkgs.mkShell {
nativeBuildInputs = buildDeps ++ devDeps;
};
};
};
}

1
hack/codegen/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
target

1411
hack/codegen/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,6 @@ reqwest = { version="0.11.13", features = ["json"] }
tera = { version = "1.17.1", default-features = false } tera = { version = "1.17.1", default-features = false }
lazy_static = "1.4.0" lazy_static = "1.4.0"
regex = "1.7.0" regex = "1.7.0"
futures = "0.3.25"
tokio = { version = "1.22.0", features = ["full"] } tokio = { version = "1.22.0", features = ["full"] }
serde = { version = "1.0.147", features = ["derive"] } serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89" serde_json = "1.0.89"

View file

@ -1,5 +1,4 @@
use std::process; use std::process;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder}; use regex::{Regex, RegexBuilder};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -46,6 +45,9 @@ async fn translators_rs() {
"mosquito", "mosquito",
"galaxy", "galaxy",
"valkyrie", "valkyrie",
"wasp",
"deliverer",
"lodestar",
"liberator", "liberator",
"ant", "ant",
"harasser", "harasser",
@ -107,11 +109,16 @@ async fn translators_rs() {
.find(&item.name.as_ref().unwrap().en.as_ref().unwrap()) .find(&item.name.as_ref().unwrap().en.as_ref().unwrap())
.unwrap(); .unwrap();
let name = matched
.as_str()
.to_lowercase()
.replace("wasp", "valkyrie")
.replace("deliverer", "ant")
.replace("lodestar", "galaxy");
Vehicle { Vehicle {
vehicle_id: item.vehicle_id, vehicle_id: item.vehicle_id,
name: Some(LangEn { name: Some(LangEn { en: Some(name) }),
en: Some(matched.as_str().to_string().to_lowercase()),
}),
propulsion_type: item.propulsion_type, propulsion_type: item.propulsion_type,
} }
}) })

View file

@ -6,13 +6,25 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
serde_json = "1.0.89" serde_json = "1.0.105"
serde = "1.0.149" serde = "1.0.188"
async-graphql = { version = "5.0.3" } async-graphql = { version = "6.0.5", features = ["chrono"] }
async-graphql-axum = "5.0.3" axum = "0.6.20"
axum = "0.6.1" sqlx = { version = "0.7.1", default_features = false, features = [
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] } "runtime-tokio-rustls",
tokio = { version = "1.23.0", features = [ "full" ] } "postgres",
tower-http = { version = "0.3.5", features = ["cors"] } "chrono",
] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.4.4", features = ["cors"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
reqwest = "0.11.13" reqwest = { version = "0.11.20", features = [
"rustls-tls-webpki-roots",
"rustls",
] }
chrono = "0.4.28"
prometheus = "0.13.3"
[dependencies.openssl]
version = "0.10.57"
features = ["vendored"]

View file

@ -0,0 +1,86 @@
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row};
use crate::telemetry;
pub struct Analytics {}
#[derive(SimpleObject, Debug, Clone)]
pub struct Event {
pub time: DateTime<Utc>,
pub event_name: String,
pub world_id: i32,
pub count: i64,
}
#[Object]
impl Analytics {
/// Get all events in analytics, bucket_size is in seconds
async fn events<'ctx>(
&self,
ctx: &Context<'ctx>,
#[graphql(default = 60)] bucket_size: u64,
world_id: Option<i32>,
#[graphql(default = false)] hi_precision: bool,
) -> Vec<Event> {
telemetry::graphql_query("Analytics", "events");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "events");
let sql = format!("
SELECT
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
CASE WHEN count(*) IS NULL THEN 0 ELSE count(*) END AS count,
event_name,
world_id
FROM analytics
WHERE time > now() - '{}'::interval {}
GROUP BY bucket, world_id, event_name
ORDER BY bucket ASC",
if hi_precision {
5
} else {
bucket_size
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if let Some(world_id) = world_id {
format!("AND world_id = {}", world_id)
} else {
"".to_string()
}
);
let mut result = query(sql.as_str()).fetch(pool);
let mut events = Vec::new();
while let Some(row) = result.try_next().await.unwrap() {
events.push(Event {
time: row.get("bucket"),
event_name: row.get("event_name"),
world_id: row.get("world_id"),
count: row.get("count"),
});
}
events
}
}
#[derive(Default)]
pub struct AnalyticsQuery;
#[Object]
impl AnalyticsQuery {
async fn analytics(&self) -> Analytics {
Analytics {}
}
}

View file

@ -1,4 +1,8 @@
use crate::utils::{Filters, IdOrNameBy}; use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry
};
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -10,10 +14,11 @@ pub struct Class {
impl Class { impl Class {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 { async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
telemetry::db_read("players", "fetch");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
let sql = format!( let sql = format!(
"SELECT count(distinct character_id) FROM classes WHERE time > now() - interval '15 minutes' AND class_id = $1 {};", "SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND class_name = $1 {};",
filters.sql(), filters.sql(),
); );
@ -33,33 +38,38 @@ impl Class {
#[Object] #[Object]
impl Class { impl Class {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "total");
self.fetch(ctx, self.filters.clone()).await self.fetch(ctx, self.filters.clone()).await
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "nc");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(1)), faction: Some(IdOrNameBy::Id(NC)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
.await .await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "tr");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(2)), faction: Some(IdOrNameBy::Id(TR)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
.await .await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "vs");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(3)), faction: Some(IdOrNameBy::Id(VS)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
@ -83,36 +93,42 @@ impl Classes {
#[Object] #[Object]
impl Classes { impl Classes {
async fn infiltrator(&self) -> Class { async fn infiltrator(&self) -> Class {
telemetry::graphql_query("Classes", "infiltrator");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "infiltrator".to_string(), class_name: "infiltrator".to_string(),
} }
} }
async fn light_assault(&self) -> Class { async fn light_assault(&self) -> Class {
telemetry::graphql_query("Classes", "light_assault");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "light_assault".to_string(), class_name: "light_assault".to_string(),
} }
} }
async fn combat_medic(&self) -> Class { async fn combat_medic(&self) -> Class {
telemetry::graphql_query("Classes", "combat_medic");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "combat_medic".to_string(), class_name: "combat_medic".to_string(),
} }
} }
async fn engineer(&self) -> Class { async fn engineer(&self) -> Class {
telemetry::graphql_query("Classes", "engineer");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "engineer".to_string(), class_name: "engineer".to_string(),
} }
} }
async fn heavy_assault(&self) -> Class { async fn heavy_assault(&self) -> Class {
telemetry::graphql_query("Classes", "heavy_assault");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "heavy_assault".to_string(), class_name: "heavy_assault".to_string(),
} }
} }
async fn max(&self) -> Class { async fn max(&self) -> Class {
telemetry::graphql_query("Classes", "max");
Class { Class {
filters: self.filters.clone(), filters: self.filters.clone(),
class_name: "max".to_string(), class_name: "max".to_string(),
@ -132,6 +148,7 @@ impl ClassesQuery {
/// Get a specific class /// Get a specific class
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class { pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
telemetry::graphql_query("Classes", "");
Class { Class {
filters: filter.unwrap_or_default(), filters: filter.unwrap_or_default(),
class_name, class_name,

View file

@ -0,0 +1,4 @@
pub const VS: i32 = 1;
pub const NC: i32 = 2;
pub const TR: i32 = 3;
pub const NSO: i32 = 4;

View file

@ -1,8 +1,13 @@
use async_graphql::{Context, Enum, Object}; use crate::{telemetry, utils::ID_TO_WORLD};
use async_graphql::{Context, Enum, Object, SimpleObject};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row}; use sqlx::{query, Pool, Postgres, Row};
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse { pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
telemetry::http_request("/health", "GET");
telemetry::db_read("analytics", "get_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(&pool) .fetch_one(&pool)
@ -53,13 +58,48 @@ enum UpDown {
pub struct Health {} pub struct Health {}
impl Health {
async fn most_recent_event_time<'ctx>(
&self,
ctx: &Context<'ctx>,
world_id: i32,
) -> (UpDown, Option<DateTime<Utc>>) {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "most_recent_event_time");
let events_resp =
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
.bind(world_id)
.fetch_one(pool)
.await;
match events_resp {
Ok(row) => {
let last_event: DateTime<Utc> = row.get(0);
if last_event < Utc::now() - chrono::Duration::minutes(5) {
return (UpDown::Down, Some(last_event));
} else {
return (UpDown::Up, Some(last_event));
}
}
Err(_) => {
return (UpDown::Down, None);
}
}
}
}
/// Reports on the health of Saerro Listening Post /// Reports on the health of Saerro Listening Post
#[Object] #[Object]
impl Health { impl Health {
/// Did a ping to Postgres (our main datastore) succeed? /// Did a ping to Postgres (our main datastore) succeed?
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "database");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "database_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool) .fetch_one(pool)
@ -73,8 +113,11 @@ impl Health {
/// Is the websocket processing jobs? /// Is the websocket processing jobs?
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown { async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "ingest");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "ingest_health");
let events_resp = let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'") query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool) .fetch_one(pool)
@ -96,14 +139,46 @@ impl Health {
/// Is the websocket actually turned on? /// Is the websocket actually turned on?
async fn ingest_reachable(&self) -> UpDown { async fn ingest_reachable(&self) -> UpDown {
telemetry::graphql_query("Health", "ingest_reachable");
reqwest::get( reqwest::get(
std::env::var("WEBSOCKET_HEALTHCHECK") std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://localhost:8999/health".to_string()), .unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
) )
.await .await
.map(|_| UpDown::Up) .map(|_| UpDown::Up)
.unwrap_or(UpDown::Down) .unwrap_or(UpDown::Down)
} }
/// Shows a disclaimer for the worlds check
async fn worlds_disclaimer(&self) -> String {
"This is a best-effort check. A world reports `DOWN` when it doesn't have new events for 5 minutes. It could be broken, it could be the reality of the game state.".to_string()
}
/// Checks if a world has had any events for the last 5 minutes
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
telemetry::graphql_query("Health", "worlds");
let mut worlds = Vec::new();
for (id, name) in ID_TO_WORLD.iter() {
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
worlds.push(WorldUpDown {
id: *id,
name: name.to_string(),
status,
last_event,
});
}
worlds
}
}
#[derive(SimpleObject)]
struct WorldUpDown {
id: i32,
name: String,
status: UpDown,
last_event: Option<DateTime<Utc>>,
} }
#[derive(Default)] #[derive(Default)]

View file

@ -17,6 +17,4 @@
</style> </style>
<h1>404 Not Found</h1> <h1>404 Not Found</h1>
<p> <p>[<a href="/">home</a>]</p>
[<a href="/">home</a>] [<a href="/graphql/playground">graphql playground</a>]
</p>

View file

@ -14,47 +14,256 @@
color: #cead42; color: #cead42;
text-decoration: none; text-decoration: none;
} }
.hidden {
display: none;
}
.query {
list-style-type: none;
padding-left: 0;
background-color: #131313;
width: fit-content;
padding: 2rem;
margin: 2rem;
border-radius: 10px;
border-left: #918b79 3px solid;
font-size: 1rem;
}
.query pre {
margin: 0;
}
</style> </style>
<h1>Saerro Listening Post</h1> <h1>Saerro Listening Post</h1>
<h2>Live Population Stats API for PlanetSide 2</h2> <h2>Live Population Stats API for PlanetSide 2</h2>
<p>
This is a GraphQL API, which means you can query for exactly the data you
need. You can also use the GraphiQL interface to explore the data and build
your queries.
</p>
<ul> <ul>
<li><a href="/graphql/playground">Check out the GraphQL Playground</a></li> <li><a href="/graphiql">Check out GraphiQL</a></li>
<li> <li>
<a <a
href="/graphql?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D" id="status_query_link"
href="/graphql?query={ health { database ingest ingestReachable worldsDisclaimer worlds { name status lastEvent } } }"
>Current system status</a >Current system status</a
> >
(<a (<a
href="/graphql/playground?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D" href="javascript:document.querySelector('#status_query').classList.toggle('hidden')"
>or in playground</a >show GraphQL</a
>) >)
<ul id="status_query" class="hidden query">
<li>
<pre><code>{
health {
database
ingest
ingestReachable
worldsDisclaimer
worlds {
name
status
lastEvent
}
}
}</code></pre>
<a
href="javascript:runQuery('status_query_link', 'status_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="status_query_result"></li>
</ul>
</li> </li>
<li> <li>
<a <a
href="/graphql?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A" id="current_pop_query_link"
href="/graphql?query={ allWorlds { name population { total nc tr vs } } }"
> >
Current population of all worlds Current population of all worlds
</a> </a>
(<a (<a
href="/graphql/playground?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A" href="javascript:document.querySelector('#current_pop_query').classList.toggle('hidden')"
>or in playground</a >show GraphQL</a
>) >)
<ul id="current_pop_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
population {
total
nc
tr
vs
}
}
}</code></pre>
<a
href="javascript:runQuery('current_pop_query_link', 'current_pop_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="current_pop_query_result"></li>
</ul>
</li>
<li>
<a
id="complex_query_link"
href="/graphql?query={ allWorlds { name classes { combatMedic { total nc tr vs } } vehicles { total sunderer { total nc tr vs } } } }"
>
Show every Sunderer and Combat Medic for every server by faction
</a>
(<a
href="javascript:document.querySelector('#complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="complex_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
classes {
combatMedic {
total
nc
tr
vs
}
}
vehicles {
total
sunderer {
total
nc
tr
vs
}
}
}
}</code></pre>
<a
href="javascript:runQuery('complex_query_link', 'complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="complex_query_result"></li>
</ul>
</li>
<li>
<a
id="very_complex_query_link"
href="/graphql?query={ zones { all { name classes { heavyAssault { nc tr vs } lightAssault { nc tr vs } } vehicles { vanguard { total } prowler { total } magrider { total } lightning { nc vs tr } chimera { nc vs tr } } } } }"
>
Show the current counts of heavy assaults, light assaults, and tanks per
continent globally
</a>
(<a
href="javascript:document.querySelector('#very_complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="very_complex_query" class="hidden query">
<li>
<pre><code>{
zones {
all {
name
classes {
heavyAssault {
nc
tr
vs
}
lightAssault {
nc
tr
vs
}
}
vehicles {
vanguard {
total
}
prowler {
total
}
magrider {
total
}
lightning {
nc
vs
tr
}
chimera {
nc
vs
tr
}
}
}
}
}</code></pre>
<a
href="javascript:runQuery('very_complex_query_link', 'very_complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="very_complex_query_result"></li>
</ul>
</li> </li>
</ul> </ul>
<p>
This API supports two query methods,
<a href="https://graphql.org/learn/serving-over-http/#get-request">GET</a>
and
<a href="https://graphql.org/learn/serving-over-http/#post-request">POST</a>.
To view the JSON outputs without fancy UIs, you can use a browser plugin like
<a href="https://addons.mozilla.org/en-US/firefox/addon/jsonview/"
>JSONView for Firefox</a
>
or
<a
href="https://chrome.google.com/webstore/detail/jsonvue/chklaanhfefbnpoihckbnefhakgolnmc"
>JSONVue for Chrome</a
>.
</p>
<p> <p>
All data is an aggregate of the last 15 minutes of Death and VehicleDestroy All data is an aggregate of the last 15 minutes of Death and VehicleDestroy
events, including both attacker and victim. events, including both attacker and victim.
</p> </p>
<hr />
<p> <p>
This API is provided by Genudine Dynamics.<br />As always, we take no This API is provided by Genudine Dynamics.<br />As always, we take no
responsibility for your use of this data... or our weapons. :) responsibility for your use of this data... or our weapons. :)
</p> </p>
<p>For help, please contact us in #api-dev on the PlanetSide 2 Discord.</p> <p>For help, please contact us in #api-dev on the PlanetSide 2 Discord.</p>
<p> <p>
[<a href="https://github.com/genudine/saerro">github</a>] [<a [<a href="/ingest">ingest stats</a>] [<a
href="https://pstop.harasse.rs" href="https://github.com/genudine/saerro"
>pstop</a >github</a
>] >] [<a href="https://pstop.harasse.rs">pstop</a>]
</p> </p>
<script>
const runQuery = async (linkId, resultId) => {
const link = document.getElementById(linkId);
const result = document.getElementById(resultId);
result.innerHTML = "Loading...";
result.classList.remove("hidden");
fetch(link.href)
.then((response) => response.json())
.then((data) => {
result.innerHTML = `<pre><code>${JSON.stringify(
data.data,
null,
2
)}</pre></code>`;
})
.catch((error) => {
result.innerHTML = "Failed...";
});
};
</script>

View file

@ -0,0 +1,418 @@
<!DOCTYPE html>
<title>Ingest Stats - Saerro Listening Post</title>
<meta charset="utf-8" />
<style>
body {
font-family: monospace;
background-color: #010101;
color: #e0e0e0;
font-size: 1.25rem;
line-height: 1.6;
}
a {
color: #cead42;
text-decoration: none;
}
h3 {
margin: 0;
}
.chart-container {
position: relative;
}
.main {
display: grid;
grid-template-columns: repeat(2, minmax(300px, 1fr));
gap: 1rem;
padding: 1rem;
}
.wide {
grid-column: span 2;
}
.graph-head {
display: flex;
justify-content: space-between;
align-items: center;
}
.sums-15m {
color: rgba(165, 165, 165, 1);
}
.sums-1h {
color: rgba(165, 165, 165, 0.8);
}
.sums-6h {
color: rgba(165, 165, 165, 0.6);
}
.sums-1d {
color: rgba(165, 165, 165, 0.4);
}
</style>
<h1>Ingest Stats <span id="loading">[LOADING...]</span></h1>
<div class="main">
<div class="wide">
<div class="graph-head">
<h3>All Events by Type</h3>
<p id="all-events-by-type-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="all-events-by-type" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Events by World</h3>
</div>
<div class="chart-container">
<canvas id="events-by-world" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Connery [US West]</h3>
<p id="connery-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="connery" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Miller [EU]</h3>
<p id="miller-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="miller" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Cobalt [EU]</h3>
<p id="cobalt-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="cobalt" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Emerald [US East]</h3>
<p id="emerald-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="emerald" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Jaeger [US East]</h3>
<p id="jaeger-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="jaeger" />
</div>
</div>
<div>
<div class="graph-head">
<h3>SolTech [Tokyo]</h3>
<p id="soltech-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="soltech" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Genudine [US East] [PS4]</h3>
<p id="genudine-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="genudine" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Ceres [EU] [PS4]</h3>
<p id="ceres-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="ceres" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Experience Events By ID</h3>
<p id="exp-by-id-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="exp-by-id" />
</div>
<div class="filter">
Filter to World:
<select id="exp-filter">
<option selected value="all">All</option>
<option value="1">Connery</option>
<option value="10">Miller</option>
<option value="13">Cobalt</option>
<option value="17">Emerald</option>
<option value="19">Jaeger</option>
<option value="40">SolTech</option>
<option value="1000">Genudine</option>
<option value="2000">Ceres</option>
</select>
</div>
</div>
</div>
<p>
[<a href="/">home</a>] [<a href="/ingest">1 day w/ 5m buckets</a>] [<a
href="/ingest?hi=1"
>1 hour w/ 5s buckets</a
>]
</p>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns/dist/chartjs-adapter-date-fns.bundle.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/humanize-plus@1.8.2/dist/humanize.min.js"></script>
<script>
const sumsStr = (_15m, _1h, _6h, _24h) => `
<span class="sums-15m" title="sum of last 15 minutes (${_15m})">${Humanize.intComma(
_15m
)},</span>
<span class="sums-1h" title="sum of last 1 hour (${_1h})">${Humanize.compactInteger(
_1h
)},</span>
<span class="sums-6h" title="sum of last 6 hours (${_6h})">${Humanize.compactInteger(
_6h
)},</span>
<span class="sums-1d" title="sum of last 1 day (${_24h})">${Humanize.compactInteger(
_24h
)}</span>`;
const doSums = async (id, events) => {
// Calculate sums for the last 15 minutes, 1 hour, 6 hours, and 1 day based on event timestamps
let sums = [0, 0, 0, 0];
let now = Date.now();
let fifteenMinutes = 15 * 60 * 1000;
let oneHour = 60 * 60 * 1000;
let sixHours = 6 * 60 * 60 * 1000;
let oneDay = 24 * 60 * 60 * 1000;
for (let ev of events) {
let diff = now - new Date(ev.time);
if (diff < fifteenMinutes) {
sums[0] += ev.count;
}
if (diff < oneHour) {
sums[1] += ev.count;
}
if (diff < sixHours) {
sums[2] += ev.count;
}
if (diff < oneDay) {
sums[3] += ev.count;
}
}
document.getElementById(`${id}-sums`).innerHTML = sumsStr(...sums);
};
const allEventsByType = (id, events) => {
doSums(id, events);
let allEvents = events.reduce(
(acc, ev) => {
const eventName = ev.eventName.replace(/_[0-9]+/g, "");
acc[eventName][ev.time] = acc[eventName][ev.time] ?? 0;
acc[eventName][ev.time] += ev.count;
return acc;
},
{ Death: {}, VehicleDestroy: {}, GainExperience: {} }
);
new Chart(document.getElementById(id), {
type: "line",
options: {
scales: {
y: { beginAtZero: true, suggestedMin: 0 },
x: { stacked: false, type: "timeseries" },
},
},
data: {
datasets: [
{
label: "Deaths",
data: allEvents.Death,
},
{
label: "Vehicle Destroys",
data: allEvents.VehicleDestroy,
},
{
label: "Experience Events",
data: allEvents.GainExperience,
},
],
},
});
};
const experienceEventsByID = (eventsUnfiltered) => {
const events = eventsUnfiltered.filter((ev) =>
ev.eventName.startsWith("GainExperience_")
);
doSums("exp-by-id", events);
let allEvents = events.reduce((acc, ev) => {
const eventID = ev.eventName.replace(/GainExperience_([0-9]+)/g, "$1");
acc[eventID] = acc[eventID] ?? {};
acc[eventID][ev.time] = acc[eventID][ev.time] ?? 0;
acc[eventID][ev.time] += ev.count;
return acc;
}, {});
new Chart(document.getElementById("exp-by-id"), {
type: "bar",
options: {
scales: {
y: { stacked: true, beginAtZero: true, suggestedMin: 0 },
x: { stacked: true, type: "timeseries" },
},
},
data: {
datasets: Object.keys(allEvents).map((id) => ({
label: id,
data: allEvents[id],
})),
},
});
};
const eventsByWorld = (events) => {
let allEvents = events.reduce((acc, ev) => {
acc[ev.worldId] = acc[ev.worldId] || {};
acc[ev.worldId][ev.time] = (acc[ev.time] || 0) + ev.count;
return acc;
}, {});
new Chart(document.getElementById("events-by-world"), {
type: "line",
options: {
scales: {
y: { beginAtZero: true },
x: {
type: "timeseries",
},
},
},
data: {
datasets: [
{
label: "Connery",
data: allEvents["1"],
},
{
label: "Miller",
data: allEvents["10"],
},
{
label: "Cobalt",
data: allEvents["13"],
},
{
label: "Emerald",
data: allEvents["17"],
},
{
label: "Jaeger",
data: allEvents["19"],
},
{
label: "SolTech",
data: allEvents["40"],
},
{
label: "Genudine",
data: allEvents["1000"],
},
{
label: "Ceres",
data: allEvents["2000"],
},
],
},
});
};
(async () => {
let resp = await fetch("/graphql", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
query: `
{
analytics {
events(${
location.search.includes("hi=1")
? "hiPrecision: true"
: "bucketSize: 300"
}) {
eventName
count
time
worldId
}
}
}
`,
}),
});
let body = await resp.json();
let events = body.data.analytics.events;
window.events = events;
document.getElementById("loading").style.display = "none";
allEventsByType("all-events-by-type", events);
eventsByWorld(events);
[
["connery", 1],
["miller", 10],
["cobalt", 13],
["emerald", 17],
["jaeger", 19],
["soltech", 40],
["genudine", 1000],
["ceres", 2000],
].forEach(([world, id]) => {
let worldEvents = events.filter((ev) => ev.worldId === id);
allEventsByType(world, worldEvents);
});
const expFilter = document.getElementById("exp-filter");
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
expFilter.addEventListener("change", () => {
document.getElementById("exp-by-id").outerHTML =
"<canvas id='exp-by-id' />";
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
});
})();
</script>

View file

@ -1,7 +1,10 @@
mod analytics;
mod classes; mod classes;
mod factions;
mod health; mod health;
mod population; mod population;
mod query; mod query;
mod telemetry;
mod utils; mod utils;
mod vehicles; mod vehicles;
mod world; mod world;
@ -24,9 +27,15 @@ use tower_http::cors::{Any, CorsLayer};
extern crate serde_json; extern crate serde_json;
async fn index() -> Html<&'static str> { async fn index() -> Html<&'static str> {
telemetry::http_request("/", "GET");
Html(include_str!("html/index.html")) Html(include_str!("html/index.html"))
} }
async fn ingest() -> Html<&'static str> {
telemetry::http_request("/ingest", "GET");
Html(include_str!("html/ingest.html"))
}
async fn handle_404() -> Html<&'static str> { async fn handle_404() -> Html<&'static str> {
Html(include_str!("html/404.html")) Html(include_str!("html/404.html"))
} }
@ -35,6 +44,7 @@ async fn graphql_handler_post(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>, Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
Json(query): Json<Request>, Json(query): Json<Request>,
) -> Json<Response> { ) -> Json<Response> {
telemetry::http_request("/graphql", "POST");
Json(schema.execute(query).await) Json(schema.execute(query).await)
} }
@ -42,6 +52,8 @@ async fn graphql_handler_get(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>, Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
query: Query<Request>, query: Query<Request>,
) -> axum::response::Response { ) -> axum::response::Response {
telemetry::http_request("/graphql", "GET");
if query.query == "" { if query.query == "" {
return Redirect::to("/graphiql").into_response(); return Redirect::to("/graphiql").into_response();
} }
@ -50,6 +62,8 @@ async fn graphql_handler_get(
} }
async fn graphiql() -> impl IntoResponse { async fn graphiql() -> impl IntoResponse {
telemetry::http_request("/graphiql", "GET");
Html( Html(
GraphiQLSource::build() GraphiQLSource::build()
.endpoint("/graphql") .endpoint("/graphql")
@ -61,7 +75,7 @@ async fn graphiql() -> impl IntoResponse {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
let db = sqlx::PgPool::connect(&db_url).await.unwrap(); let db = sqlx::PgPool::connect(&db_url).await.unwrap();
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription) let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
@ -70,12 +84,15 @@ async fn main() {
let app = Router::new() let app = Router::new()
.route("/", get(index)) .route("/", get(index))
.route("/ingest", get(ingest))
.route("/health", get(health::get_health)) .route("/health", get(health::get_health))
.route( .route(
"/graphql", "/graphql",
post(graphql_handler_post).get(graphql_handler_get), post(graphql_handler_post).get(graphql_handler_get),
) )
.route("/graphql/playground", get(graphiql)) .route("/graphiql", get(graphiql))
.route("/metrics", get(telemetry::handler))
.route("/metrics/combined", get(telemetry::handler_combined))
.fallback(handle_404) .fallback(handle_404)
.layer(Extension(db)) .layer(Extension(db))
.layer(Extension(schema)) .layer(Extension(schema))

View file

@ -1,4 +1,8 @@
use crate::utils::Filters; use crate::{
factions::{NC, NSO, TR, VS},
utils::Filters,
telemetry,
};
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -19,8 +23,9 @@ impl Population {
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 { async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_by_faction");
let sql = format!( let sql = format!(
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND faction_id = $1 {};", "SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
self.filters.sql(), self.filters.sql(),
); );
@ -40,10 +45,13 @@ impl Population {
#[Object] #[Object]
impl Population { impl Population {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_total");
let sql = format!( let sql = format!(
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' {};", "SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(), self.filters.sql(),
); );
@ -58,16 +66,20 @@ impl Population {
query query
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 1).await telemetry::graphql_query("Population", "nc");
self.by_faction(ctx, NC).await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 2).await telemetry::graphql_query("Population", "vs");
self.by_faction(ctx, VS).await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 3).await telemetry::graphql_query("Population", "tr");
self.by_faction(ctx, TR).await
} }
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 4).await telemetry::graphql_query("Population", "ns");
self.by_faction(ctx, NSO).await
} }
} }

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
classes::ClassesQuery, health::HealthQuery, population::PopulationQuery, analytics::AnalyticsQuery, classes::ClassesQuery, health::HealthQuery,
vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery, population::PopulationQuery, vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
}; };
use async_graphql::MergedObject; use async_graphql::MergedObject;
@ -12,4 +12,5 @@ pub struct Query(
WorldQuery, WorldQuery,
ZoneQuery, ZoneQuery,
HealthQuery, HealthQuery,
AnalyticsQuery,
); );

View file

@ -0,0 +1,138 @@
use lazy_static::lazy_static;
use prometheus::{
IntGauge,
IntGaugeVec,
register_int_gauge_vec,
register_int_gauge,
TextEncoder,
gather
};
use sqlx::{Pool, Postgres, Row};
use axum::Extension;
use chrono::{DateTime, Utc};
lazy_static! {
// http
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
"route", "method"
]).unwrap();
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
"major", "minor"
]).unwrap();
// counters
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
update_data_gauges(pool).await;
// Final output
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
let local = handler(Extension(pool)).await;
let remote = match reqwest::get(url).await {
Ok(r) => r.text().await.expect("failed to text lol"),
Err(_) => String::from("")
};
format!("{}{}", local, remote)
}
// pub fn db_write(table: &str, op: &str) {
// DB_WRITES.with_label_values(&[table, op]).inc();
// }
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}
pub fn http_request(route: &str, method: &str) {
HTTP_REQUEST.with_label_values(&[route, method]).inc();
}
pub fn graphql_query(major: &str, minor: &str) {
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
}
async fn update_data_gauges(pool: Pool<Postgres>) {
// Do some easy queries to fill our non-cumulative gauges
db_read("players", "count_all");
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
PLAYERS_TRACKED.set(player_count);
db_read("players", "get_newest");
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_PLAYER.set(player_newest.timestamp());
db_read("players", "get_oldest");
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_PLAYER.set(player_oldest.timestamp());
db_read("vehicles", "count_all");
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
VEHICLES_TRACKED.set(vehicle_count);
db_read("vehicles", "get_newest");
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
db_read("vehicles", "get_oldest");
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
}

View file

@ -1,4 +1,8 @@
use crate::utils::{Filters, IdOrNameBy}; use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry,
};
use async_graphql::{Context, Object}; use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row}; use sqlx::{Pool, Postgres, Row};
@ -12,8 +16,9 @@ impl Vehicle {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 { async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("vehicles", "fetch");
let sql = format!( let sql = format!(
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};", "SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
filters.sql(), filters.sql(),
); );
@ -33,33 +38,41 @@ impl Vehicle {
#[Object] #[Object]
impl Vehicle { impl Vehicle {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "total");
self.fetch(ctx, self.filters.clone()).await self.fetch(ctx, self.filters.clone()).await
} }
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "nc");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(1)), faction: Some(IdOrNameBy::Id(NC)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
.await .await
} }
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "tr");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(2)), faction: Some(IdOrNameBy::Id(TR)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
.await .await
} }
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "vs");
self.fetch( self.fetch(
ctx, ctx,
Filters { Filters {
faction: Some(IdOrNameBy::Id(3)), faction: Some(IdOrNameBy::Id(VS)),
..self.filters.clone() ..self.filters.clone()
}, },
) )
@ -83,10 +96,13 @@ impl Vehicles {
#[Object] #[Object]
impl Vehicles { impl Vehicles {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 { async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicles", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap(); let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "vehicles_total");
let sql = format!( let sql = format!(
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' {};", "SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(), self.filters.sql(),
); );
@ -103,62 +119,90 @@ impl Vehicles {
// Transport // Transport
async fn flash(&self) -> Vehicle { async fn flash(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "flash");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "flash".to_string(), vehicle_name: "flash".to_string(),
} }
} }
async fn sunderer(&self) -> Vehicle { async fn sunderer(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "sunderer");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "sunderer".to_string(), vehicle_name: "sunderer".to_string(),
} }
} }
async fn ant(&self) -> Vehicle { async fn ant(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "ant");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "ant".to_string(), vehicle_name: "ant".to_string(),
} }
} }
async fn harasser(&self) -> Vehicle { async fn harasser(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "harasser");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "harasser".to_string(), vehicle_name: "harasser".to_string(),
} }
} }
async fn javelin(&self) -> Vehicle { async fn javelin(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "javelin");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "javelin".to_string(), vehicle_name: "javelin".to_string(),
} }
} }
async fn corsair(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "corsair");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "corsair".to_string(),
}
}
// Tanks // Tanks
async fn lightning(&self) -> Vehicle { async fn lightning(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "lightning");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "javelin".to_string(), vehicle_name: "lightning".to_string(),
} }
} }
async fn prowler(&self) -> Vehicle { async fn prowler(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "prowler");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "prowler".to_string(), vehicle_name: "prowler".to_string(),
} }
} }
async fn vanguard(&self) -> Vehicle { async fn vanguard(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "vanguard");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "vanguard".to_string(), vehicle_name: "vanguard".to_string(),
} }
} }
async fn magrider(&self) -> Vehicle { async fn magrider(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "magrider");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "magrider".to_string(), vehicle_name: "magrider".to_string(),
} }
} }
async fn chimera(&self) -> Vehicle { async fn chimera(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "chimera");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "chimera".to_string(), vehicle_name: "chimera".to_string(),
@ -167,42 +211,56 @@ impl Vehicles {
// Air // Air
async fn mosquito(&self) -> Vehicle { async fn mosquito(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "mosquito");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "mosquito".to_string(), vehicle_name: "mosquito".to_string(),
} }
} }
async fn liberator(&self) -> Vehicle { async fn liberator(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "liberator");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "liberator".to_string(), vehicle_name: "liberator".to_string(),
} }
} }
async fn galaxy(&self) -> Vehicle { async fn galaxy(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "galaxy");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "galaxy".to_string(), vehicle_name: "galaxy".to_string(),
} }
} }
async fn valkyrie(&self) -> Vehicle { async fn valkyrie(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "valkyrie");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "valkyrie".to_string(), vehicle_name: "valkyrie".to_string(),
} }
} }
async fn reaver(&self) -> Vehicle { async fn reaver(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "reaver");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "reaver".to_string(), vehicle_name: "reaver".to_string(),
} }
} }
async fn scythe(&self) -> Vehicle { async fn scythe(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "scythe");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "scythe".to_string(), vehicle_name: "scythe".to_string(),
} }
} }
async fn dervish(&self) -> Vehicle { async fn dervish(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "dervish");
Vehicle { Vehicle {
filters: self.filters.clone(), filters: self.filters.clone(),
vehicle_name: "dervish".to_string(), vehicle_name: "dervish".to_string(),

View file

@ -4,6 +4,7 @@ use crate::{
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS}, utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
vehicles::Vehicles, vehicles::Vehicles,
zone::Zones, zone::Zones,
telemetry,
}; };
use async_graphql::Object; use async_graphql::Object;
@ -33,11 +34,15 @@ impl World {
impl World { impl World {
/// The ID of the world. /// The ID of the world.
async fn id(&self) -> i32 { async fn id(&self) -> i32 {
telemetry::graphql_query("World", "id");
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap() id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
} }
/// The name of the world, in official game capitalization. /// The name of the world, in official game capitalization.
async fn name(&self) -> String { async fn name(&self) -> String {
telemetry::graphql_query("World", "name");
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap(); let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
// Special case for SolTech, lol. // Special case for SolTech, lol.
@ -51,6 +56,8 @@ impl World {
/// Population filtered to this world. /// Population filtered to this world.
async fn population(&self) -> Population { async fn population(&self) -> Population {
telemetry::graphql_query("World", "population");
Population::new(Some(Filters { Population::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -60,6 +67,8 @@ impl World {
/// Vehicles filtered to this world. /// Vehicles filtered to this world.
async fn vehicles(&self) -> Vehicles { async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("World", "vehicles");
Vehicles::new(Some(Filters { Vehicles::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -69,6 +78,8 @@ impl World {
/// Classes filtered to this world. /// Classes filtered to this world.
async fn classes(&self) -> Classes { async fn classes(&self) -> Classes {
telemetry::graphql_query("World", "classes");
Classes::new(Some(Filters { Classes::new(Some(Filters {
world: self.filter.world.clone(), world: self.filter.world.clone(),
faction: None, faction: None,
@ -78,6 +89,8 @@ impl World {
/// Get a specific zone/continent on this world. /// Get a specific zone/continent on this world.
async fn zones(&self) -> Zones { async fn zones(&self) -> Zones {
telemetry::graphql_query("World", "zones");
Zones::new(Some(self.filter.clone())) Zones::new(Some(self.filter.clone()))
} }
} }

View file

@ -3,6 +3,7 @@ use crate::{
population::Population, population::Population,
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS}, utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
vehicles::Vehicles, vehicles::Vehicles,
telemetry,
}; };
use async_graphql::Object; use async_graphql::Object;
@ -23,11 +24,15 @@ impl Zone {
impl Zone { impl Zone {
/// The ID of the zone/continent. /// The ID of the zone/continent.
async fn id(&self) -> i32 { async fn id(&self) -> i32 {
telemetry::graphql_query("Zone", "id");
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap() id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
} }
/// The name of the continent, in official game capitalization. /// The name of the continent, in official game capitalization.
async fn name(&self) -> String { async fn name(&self) -> String {
telemetry::graphql_query("Zone", "name");
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap(); let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
// Capitalize the first letter // Capitalize the first letter
@ -35,14 +40,20 @@ impl Zone {
} }
async fn population(&self) -> Population { async fn population(&self) -> Population {
telemetry::graphql_query("Zone", "population");
Population::new(Some(self.filters.clone())) Population::new(Some(self.filters.clone()))
} }
async fn vehicles(&self) -> Vehicles { async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("Zone", "vehicles");
Vehicles::new(Some(self.filters.clone())) Vehicles::new(Some(self.filters.clone()))
} }
async fn classes(&self) -> Classes { async fn classes(&self) -> Classes {
telemetry::graphql_query("Zone", "classes");
Classes::new(Some(self.filters.clone())) Classes::new(Some(self.filters.clone()))
} }
} }

View file

@ -6,10 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] } tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
once_cell = "1.16.0" sqlx = { version = "0.7.1", default_features = false, features = [
tokio = { version = "1.23.0", features = ["full"] } "runtime-tokio-rustls",
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] } "postgres",
] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
async_once = "0.2.6" async_once = "0.2.6"
dotenvy = "0.15.6"

View file

@ -1,5 +1,4 @@
use async_once::AsyncOnce; use async_once::AsyncOnce;
use dotenvy::dotenv;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use migrations::cmd_migrate; use migrations::cmd_migrate;
use sqlx::query; use sqlx::query;
@ -10,7 +9,7 @@ mod migrations;
lazy_static! { lazy_static! {
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
sqlx::PgPool::connect(&db_url).await.unwrap() sqlx::PgPool::connect(&db_url).await.unwrap()
}); });
} }
@ -19,21 +18,14 @@ async fn cmd_prune() {
println!("Pruning old data..."); println!("Pruning old data...");
let pool = PG.get().await; let pool = PG.get().await;
let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';") let rows = query("DELETE FROM players WHERE last_updated < NOW() - INTERVAL '15 minutes';")
.execute(pool) .execute(pool)
.await .await
.unwrap() .unwrap()
.rows_affected(); .rows_affected();
println!("Deleted {} rows of old player data", rows); println!("Deleted {} rows of old player data", rows);
let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';") let rows = query("DELETE FROM vehicles WHERE last_updated < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
.rows_affected();
println!("Deleted {} rows of old class data", rows);
let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';")
.execute(pool) .execute(pool)
.await .await
.unwrap() .unwrap()
@ -58,14 +50,41 @@ fn cmd_help() {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
dotenv().ok();
let command = args().nth(1).unwrap_or("help".to_string()); let command = args().nth(1).unwrap_or("help".to_string());
match command.as_str() { match command.as_str() {
"help" => cmd_help(), "help" => cmd_help(),
"prune" => cmd_prune().await, "prune" => cmd_prune().await,
"auto-prune" => loop {
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"maintenance" => {
println!("Running maintenance tasks...");
println!("Checking if DB is migrated...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
println!("Running prune...");
cmd_prune().await;
println!("Done!");
}
"auto-maintenance" => loop {
println!("Running maintenance tasks...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"migrate" => cmd_migrate().await, "migrate" => cmd_migrate().await,
"print-env" => {
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
}
_ => { _ => {
println!("Unknown command: {}", command); println!("Unknown command: {}", command);
cmd_help(); cmd_help();

View file

@ -1,15 +1,10 @@
use crate::PG; use crate::PG;
use sqlx::query; use sqlx::{query, Row};
pub async fn cmd_migrate() { pub async fn cmd_migrate() {
println!("Migrating database..."); println!("Migrating database...");
tokio::join!( tokio::join!(migrate_players(), migrate_vehicles(), migrate_analytics());
migrate_players(),
migrate_classes(),
migrate_vehicles(),
migrate_analytics()
);
} }
async fn migrate_players() { async fn migrate_players() {
@ -26,77 +21,21 @@ async fn migrate_players() {
println!("PLAYERS => CREATE TABLE players"); println!("PLAYERS => CREATE TABLE players");
query( query(
"CREATE TABLE players ( "CREATE TABLE players (
character_id TEXT NOT NULL, character_id TEXT NOT NULL PRIMARY KEY,
time TIMESTAMPTZ NOT NULL, last_updated TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL, world_id INT NOT NULL,
faction_id INT NOT NULL, faction_id INT NOT NULL,
zone_id INT NOT NULL);", zone_id INT NOT NULL,
class_name TEXT NOT NULL
);",
) )
.execute(pool) .execute(pool)
.await .await
.unwrap(); .unwrap();
println!("PLAYERS => create_hypertable");
query(
"SELECT create_hypertable('players', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("PLAYERS => add_retention_policy");
query("SELECT add_retention_policy('players', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("PLAYERS => done!"); println!("PLAYERS => done!");
} }
async fn migrate_classes() {
let pool = PG.get().await;
println!("-> Migrating classes");
println!("CLASSES => DROP TABLE IF EXISTS classes");
query("DROP TABLE IF EXISTS classes")
.execute(pool)
.await
.unwrap();
println!("CLASSES => CREATE TABLE classes");
query(
"CREATE TABLE classes (
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
class_id TEXT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => create_hypertable");
query(
"SELECT create_hypertable('classes', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => add_retention_policy");
query("SELECT add_retention_policy('classes', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("CLASSES => done!");
}
async fn migrate_vehicles() { async fn migrate_vehicles() {
let pool = PG.get().await; let pool = PG.get().await;
@ -111,33 +50,18 @@ async fn migrate_vehicles() {
println!("VEHICLES => CREATE TABLE vehicles"); println!("VEHICLES => CREATE TABLE vehicles");
query( query(
"CREATE TABLE vehicles ( "CREATE TABLE vehicles (
character_id TEXT NOT NULL, character_id TEXT NOT NULL PRIMARY KEY,
time TIMESTAMPTZ NOT NULL, last_updated TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL, world_id INT NOT NULL,
faction_id INT NOT NULL, faction_id INT NOT NULL,
zone_id INT NOT NULL, zone_id INT NOT NULL,
vehicle_id TEXT NOT NULL);", vehicle_name TEXT NOT NULL
);",
) )
.execute(pool) .execute(pool)
.await .await
.unwrap(); .unwrap();
println!("VEHICLES => create_hypertable");
query(
"SELECT create_hypertable('vehicles', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("VEHICLES => add_retention_policy");
query("SELECT add_retention_policy('vehicles', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("VEHICLES => done!"); println!("VEHICLES => done!");
} }
@ -173,3 +97,15 @@ async fn migrate_analytics() {
println!("ANALYTICS => done!"); println!("ANALYTICS => done!");
} }
pub async fn is_migrated() -> bool {
let pool = PG.get().await;
let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'vehicles', 'analytics');")
.fetch_one(pool)
.await
.unwrap()
.get(0);
tables == 3
}

View file

@ -6,16 +6,22 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
redis = { version = "0.22.1", default_features = false, features = ["r2d2"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio-tungstenite = { version = "0.18.0", features=["native-tls"] } tokio-tungstenite = { version = "0.20.0", features = [
serde = { version = "1.0.149", features = ["derive"] } "rustls-tls-webpki-roots",
serde_json = "1.0.89" ] }
tokio = { version = "1.23.0", features = ["full"] } serde = { version = "1.0.188", features = ["derive"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] } serde_json = "1.0.105"
url = "2.3.1" tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
futures-util = "0.3.25" sqlx = { version = "0.7.1", default_features = false, features = [
futures = "0.3.25" "runtime-tokio-rustls",
"postgres",
] }
url = "2.4.1"
futures-util = "0.3.28"
futures = "0.3.28"
async_once = "0.2.6" async_once = "0.2.6"
serde-aux = "4.1.2" serde-aux = "4.2.0"
axum = "0.6.1" axum = "0.6.20"
prometheus = "0.13.3"
prometheus-static-metric = "0.5.1"

View file

@ -1,42 +1,50 @@
use async_once::AsyncOnce; use async_once::AsyncOnce;
use axum::{routing::get, Router}; use axum::{routing::get, Json, Router};
use futures::{pin_mut, FutureExt}; use futures::{pin_mut, FutureExt};
use futures_util::StreamExt; use futures_util::StreamExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use serde::Deserialize; use serde::Deserialize;
use serde_aux::prelude::*; use serde_aux::prelude::*;
use serde_json::json; use serde_json::json;
use sqlx::{postgres::PgPoolOptions, query}; use sqlx::{postgres::PgPoolOptions, query, Row};
use std::{env, net::SocketAddr}; use std::{env, net::SocketAddr};
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators; mod translators;
mod telemetry;
lazy_static! { lazy_static! {
// static ref PAIR: String = env::var("PAIR").unwrap_or_default();
// static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default(); static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async { static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL") let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string()); .unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
PgPoolOptions::new().connect(&db_url).await.unwrap() PgPoolOptions::new().connect(&db_url).await.unwrap()
}); });
} }
async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) { async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let worlds_raw = env::var("WORLDS").unwrap_or_default(); let worlds_raw = env::var("WORLDS").unwrap_or("all".to_string());
if worlds_raw == "" {
println!("WORLDS not set");
return;
}
let worlds: Vec<&str> = worlds_raw.split(',').collect(); let worlds: Vec<&str> = worlds_raw.split(',').collect();
let experience_ids = vec![
2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, 233, 293,
294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, 675,
];
let mut events = experience_ids
.iter()
.map(|id| format!("GainExperience_experience_id_{}", id))
.collect::<Vec<String>>();
events.push("Death".to_string());
events.push("VehicleDestroy".to_string());
// Send setup message // Send setup message
let setup_msg = json!({ let setup_msg = json!({
"action": "subscribe", "action": "subscribe",
"worlds": worlds, "worlds": worlds,
"eventNames": ["Death", "VehicleDestroy"], "eventNames": events,
"characters": ["all"], "characters": ["all"],
"logicalAndCharactersWithWorlds": true, "logicalAndCharactersWithWorlds": true,
"service": "event", "service": "event",
@ -46,6 +54,7 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
.unwrap(); .unwrap();
println!("[ws] Sent setup message"); println!("[ws] Sent setup message");
println!("[ws/setup] {}", setup_msg.to_string())
} }
#[derive(Clone)] #[derive(Clone)]
@ -54,33 +63,32 @@ struct PopEvent {
team_id: i32, team_id: i32,
character_id: String, character_id: String,
zone_id: i32, zone_id: i32,
}
struct VehicleEvent {
world_id: i32,
vehicle_id: String,
character_id: String,
zone_id: i32,
team_id: i32,
}
struct ClassEvent {
world_id: i32,
character_id: String,
loadout_id: String, loadout_id: String,
zone_id: i32, vehicle_id: String,
team_id: i32,
} }
#[derive(Debug)]
struct AnalyticsEvent { struct AnalyticsEvent {
world_id: i32, world_id: i32,
event_name: String, event_name: String,
} }
// async fn track_pop(pop_event: PopEvent) { async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
// track_pop_db(pop_event.clone()).await; let pool = PG.get().await;
// track_pop_redis(pop_event).await;
// } telemetry::db_read("players", "get_team_id");
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
.bind(character_id.clone())
.fetch_one(pool)
.await?
.get(0);
if team_id == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(team_id)
}
async fn track_pop(pop_event: PopEvent) { async fn track_pop(pop_event: PopEvent) {
// println!("[ws/track_pop]"); // println!("[ws/track_pop]");
@ -91,37 +99,50 @@ async fn track_pop(pop_event: PopEvent) {
team_id, team_id,
character_id, character_id,
zone_id, zone_id,
loadout_id,
vehicle_id,
} = pop_event; } = pop_event;
query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);") let class_name = translators::loadout_to_class(loadout_id.as_str());
.bind(character_id) let vehicle_name = if vehicle_id == "" {
"unknown".to_string()
} else {
translators::vehicle_to_name(vehicle_id.as_str())
};
telemetry::db_write("players", "track_pop");
query(
"
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET
last_updated = EXCLUDED.last_updated,
world_id = EXCLUDED.world_id,
faction_id = EXCLUDED.faction_id,
zone_id = EXCLUDED.zone_id,
class_name = EXCLUDED.class_name
;",
)
.bind(character_id.clone())
.bind(world_id) .bind(world_id)
.bind(team_id) .bind(team_id)
.bind(zone_id) .bind(zone_id)
.bind(class_name)
.execute(pool) .execute(pool)
.await .await
.unwrap(); .unwrap();
}
async fn track_vehicle(vehicle_event: VehicleEvent) { if vehicle_name != "unknown" {
// println!("[ws/track_vehicle]"); telemetry::db_write("vehicles", "track_pop");
let pool = PG.get().await; query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
VALUES (now(), $1, $2, $3, $4, $5)
let VehicleEvent { ON CONFLICT (character_id) DO UPDATE SET
world_id, last_updated = EXCLUDED.last_updated,
vehicle_id, world_id = EXCLUDED.world_id,
zone_id, faction_id = EXCLUDED.faction_id,
character_id, zone_id = EXCLUDED.zone_id,
team_id, vehicle_name = EXCLUDED.vehicle_name
} = vehicle_event; ;")
let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str());
if vehicle_name == "unknown" {
return;
}
query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);")
.bind(character_id) .bind(character_id)
.bind(world_id) .bind(world_id)
.bind(team_id) .bind(team_id)
@ -131,47 +152,10 @@ async fn track_vehicle(vehicle_event: VehicleEvent) {
.await .await
.unwrap(); .unwrap();
} }
async fn track_class(class_event: ClassEvent) {
// println!("[ws/track_class]");
let pool = PG.get().await;
let ClassEvent {
world_id,
character_id,
loadout_id,
zone_id,
team_id,
} = class_event;
let class_name = translators::loadout_to_class(loadout_id.as_str());
if class_name == "unknown" {
return;
}
query(
"INSERT INTO classes (
time,
character_id,
world_id,
faction_id,
zone_id,
class_id
) VALUES (now(), $1, $2, $3, $4, $5);",
)
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(class_name)
.execute(pool)
.await
.unwrap();
} }
async fn track_analytics(analytics_event: AnalyticsEvent) { async fn track_analytics(analytics_event: AnalyticsEvent) {
// println!("[ws/track_analytics]"); // println!("[ws/track_analytics] {:?}", analytics_event);
let pool = PG.get().await; let pool = PG.get().await;
let AnalyticsEvent { let AnalyticsEvent {
@ -179,15 +163,21 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
event_name, event_name,
} = analytics_event; } = analytics_event;
query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);") telemetry::db_write("analytics", "track_analytics");
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
.bind(world_id) .bind(world_id)
.bind(event_name) .bind(event_name)
.execute(pool) .execute(pool)
.await .await
.unwrap(); {
Ok(_) => {}
Err(e) => {
println!("[ws/track_analytics] ERR => {:?}", e);
}
}
} }
async fn process_event(event: &Event) { async fn process_death_event(event: &Event) {
let mut set = JoinSet::new(); let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event); // println!("[ws/process_event] EVENT: {:?}", event);
@ -196,33 +186,14 @@ async fn process_event(event: &Event) {
event_name: event.event_name.clone(), event_name: event.event_name.clone(),
})); }));
if event.character_id != "0" { if event.character_id != "" && event.character_id != "0" {
// General population tracking
set.spawn(track_pop(PopEvent { set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(), world_id: event.world_id.clone(),
team_id: event.team_id.clone(), team_id: event.team_id.clone(),
character_id: event.character_id.clone(), character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(), zone_id: event.zone_id.clone(),
}));
}
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.vehicle_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.character_id.clone(),
loadout_id: event.loadout_id.clone(), loadout_id: event.loadout_id.clone(),
zone_id: event.zone_id.clone(), vehicle_id: event.vehicle_id.clone(),
team_id: event.team_id.clone(),
})); }));
} }
@ -235,42 +206,58 @@ async fn process_event(event: &Event) {
team_id: event.attacker_team_id.clone(), team_id: event.attacker_team_id.clone(),
character_id: event.attacker_character_id.clone(), character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(), zone_id: event.zone_id.clone(),
}));
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.attacker_character_id.clone(),
loadout_id: event.attacker_loadout_id.clone(), loadout_id: event.attacker_loadout_id.clone(),
zone_id: event.zone_id.clone(), vehicle_id: event.attacker_vehicle_id.clone(),
team_id: event.attacker_team_id.clone(),
})); }));
} }
}
while let Some(_) = set.join_next().await {} while let Some(_) = set.join_next().await {}
} }
async fn process_exp_event(event: &Event) {
telemetry::experience_event(&event.world_id, &event.experience_id);
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
set.spawn(track_analytics(AnalyticsEvent {
world_id: event.world_id.clone(),
event_name: format!(
"{}_{}",
event.event_name.clone(),
event.experience_id.clone()
),
}));
// Vehicle EXP events
let vehicle_id = match event.experience_id {
201 => "11".to_string(), // Galaxy Spawn Bonus
233 => "2".to_string(), // Sunderer Spawn Bonus
674 | 675 => "160".to_string(), // ANT stuff
_ => "".to_string(),
};
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.loadout_id.clone(),
vehicle_id: vehicle_id.clone(),
}));
while let Some(_) = set.join_next().await {}
}
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
struct Event { struct Event {
event_name: String, event_name: String,
#[serde(deserialize_with = "deserialize_number_from_string")] #[serde(deserialize_with = "deserialize_number_from_string")]
world_id: i32, world_id: i32,
character_id: String, character_id: String,
#[serde(default)]
attacker_character_id: String, attacker_character_id: String,
#[serde(deserialize_with = "deserialize_number_from_string")] #[serde(default, deserialize_with = "deserialize_number_from_string")]
attacker_team_id: i32, attacker_team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")] #[serde(default, deserialize_with = "deserialize_number_from_string")]
team_id: i32, team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")] #[serde(deserialize_with = "deserialize_number_from_string")]
zone_id: i32, zone_id: i32,
@ -286,6 +273,11 @@ struct Event {
vehicle_id: String, vehicle_id: String,
#[serde(default)] #[serde(default)]
attacker_vehicle_id: String, attacker_vehicle_id: String,
#[serde(default, deserialize_with = "deserialize_number_from_string")]
experience_id: i32,
// #[serde(default)]
// other_id: String,
} }
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -294,7 +286,17 @@ struct Payload {
} }
async fn healthz() { async fn healthz() {
let app = Router::new().route("/healthz", get(|| async { "ok" })); let app = Router::new().route(
"/healthz",
get(|| async {
Json(json!({
"status": "ok",
}))
}),
).route(
"/metrics",
get(telemetry::handler)
);
let port: u16 = std::env::var("PORT") let port: u16 = std::env::var("PORT")
.unwrap_or("8999".to_string()) .unwrap_or("8999".to_string())
@ -319,23 +321,54 @@ async fn main() {
} }
let url = url::Url::parse(&addr).unwrap(); let url = url::Url::parse(&addr).unwrap();
println!("[ws] Connecting to {}", url);
let (tx, rx) = futures::channel::mpsc::unbounded(); let (tx, rx) = futures::channel::mpsc::unbounded();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
let (write, read) = ws_stream.split(); let (write, read) = ws_stream.split();
let fused_writer = rx.map(Ok).forward(write).fuse(); let fused_writer = rx.map(Ok).forward(write).fuse();
let fused_reader = read let fused_reader = read
.for_each(|msg| async move { .for_each(|msg| async {
let body = &msg.unwrap().to_string(); let body = &msg.unwrap().to_string();
let data: Payload = serde_json::from_str(body).unwrap_or(Payload {
payload: Event::default(), let mut data: Payload = match serde_json::from_str(body) {
}); Ok(data) => data,
Err(_e) => {
// println!("Error: {}; body: {}", e, body.clone());
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
return;
}
};
if data.payload.event_name == "" { if data.payload.event_name == "" {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
return; return;
} }
process_event(&data.payload).await; telemetry::event(&data.payload.world_id, &data.payload.event_name);
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
process_death_event(&data.payload).await;
return;
}
if data.payload.event_name == "GainExperience" {
if data.payload.team_id == 0 {
match get_team_id(data.payload.character_id.clone()).await {
Ok(team_id) => {
data.payload.team_id = team_id;
}
Err(_) => {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
}
}
}
process_exp_event(&data.payload).await;
return;
}
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
}) })
.fuse(); .fuse();

View file

@ -0,0 +1,75 @@
use lazy_static::lazy_static;
use prometheus::{
IntGaugeVec,
register_int_gauge_vec,
TextEncoder,
gather
};
lazy_static! {
// incoming events
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
"world_id", "event_name"
]).unwrap();
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
"world_id", "event_name", "reason"
]).unwrap();
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
"world_id", "experience_id"
]).unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler() -> String {
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub fn event(world_id: &i32, event_name: &String) {
EVENTS.with_label_values(&[
&world_id.to_string(),
&event_name,
]).inc();
}
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
EVENTS_DROPPED.with_label_values(&[
&world_id.to_string(),
&event_name,
reason,
]).inc();
}
pub fn experience_event(world_id: &i32, experience_id: &i32) {
EXPERIENCE_EVENTS.with_label_values(&[
&world_id.to_string(),
&experience_id.to_string(),
]).inc();
}
pub fn db_write(table: &str, op: &str) {
DB_WRITES.with_label_values(&[table, op]).inc();
}
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}

View file

@ -34,6 +34,8 @@ lazy_static! {
("1105", "vanguard"), ("1105", "vanguard"),
("2010", "flash"), ("2010", "flash"),
("2033", "javelin"), ("2033", "javelin"),
("2039", "ant"),
("2040", "valkyrie"),
("2122", "mosquito"), ("2122", "mosquito"),
("2123", "reaver"), ("2123", "reaver"),
("2124", "scythe"), ("2124", "scythe"),
@ -47,6 +49,9 @@ lazy_static! {
("2135", "prowler"), ("2135", "prowler"),
("2136", "dervish"), ("2136", "dervish"),
("2137", "chimera"), ("2137", "chimera"),
("2139", "ant"),
("2140", "galaxy"),
("2141", "valkyrie"),
("2142", "corsair"), ("2142", "corsair"),
]); ]);
static ref LOADOUT_TO_CLASS: HashMap<&'static str, &'static str> = HashMap::from([ static ref LOADOUT_TO_CLASS: HashMap<&'static str, &'static str> = HashMap::from([