Compare commits

...
Sign in to create a new pull request.

57 commits
rework ... main

Author SHA1 Message Date
noe
3c0ab61695 fix docker image 2024-02-18 00:54:54 -05:00
noe
3a422b8f6f add prometheus to api and ws 2024-02-18 00:52:39 -05:00
noe
7ab5893f67 nix 2024-02-17 03:32:13 -05:00
96cb2c80d8 add auto-maint 2023-09-04 14:06:04 -04:00
ad8105ca94 add tasks print-env 2023-09-04 02:45:29 -04:00
e805a4ca5a update all cargo packages' 2023-09-04 01:30:53 -04:00
be38bb6b5f use nix 2023-09-04 01:30:40 -04:00
c58fdf06b7 refactor to use default localhost IPs instead of localhost 2023-09-04 01:30:23 -04:00
9ebeeb47f4 stacked exp bar chart correctly 2023-06-29 16:33:52 -04:00
f49a8a0fda add filter to exp by id 2023-06-29 15:46:45 -04:00
a1f0e32e30 make experience more specific 2023-06-29 15:30:29 -04:00
143ec0cd3b completely restructure database 2023-06-02 02:14:12 -04:00
2dd19249db remove Dockerfile.old 2023-05-31 16:40:32 -04:00
79d406cee6 update readme.md 2023-05-31 01:02:13 -04:00
e5c57bf505 move to 1 table instead of 3 (what was I thinking???) 2023-05-31 01:01:17 -04:00
11127f269d increase hypertable bucket size to 5 minutes 2023-05-30 20:50:30 -04:00
97ea7455a7 remove spot kill as it produces redundant data 2023-05-30 08:52:40 -04:00
bfdf7dcb52 adjust dockerfile 2023-05-28 18:14:23 -04:00
ffc2dfbbad remove cargo-chef from docker, it's slow now 2023-05-28 15:03:22 -04:00
4e7824cbad update packages 2023-05-28 14:51:03 -04:00
2a47beb69a ps4 needs to find team_id 2023-05-28 14:45:30 -04:00
f8add369a6 add pop tracking to exp events 2023-05-28 13:47:30 -04:00
118f1b6b99 add raw totals to ingest hovers 2023-05-28 13:39:08 -04:00
bf89c4aaf0 add a bunch more exp events 2023-05-28 13:20:54 -04:00
738d2975ec add GainExperience listeners 2023-05-28 13:07:05 -04:00
24437b5520 fix lightning, add corsair; update packages 2023-05-20 23:17:57 -04:00
bca70e2d5b fix faction IDs 2023-03-05 11:34:35 -05:00
679b49ff88 add general maintenance script to always run in prod 2023-01-23 08:53:06 -05:00
9940e9dd90 zerofill analytics 2022-12-14 08:40:59 -05:00
2d6da8343d remove double push: 2022-12-13 01:17:31 -05:00
1c3440d919 thonk 2022-12-13 01:11:55 -05:00
0f710f2712 tokio machine broke 2022-12-13 01:07:15 -05:00
9b6b261c16 debug why gha cache is failing 2022-12-13 00:57:58 -05:00
97474de07e reset sqlx features 2022-12-13 00:52:02 -05:00
a9d1daf397 remove cacher copy 2022-12-13 00:49:53 -05:00
505e71f65f fix build command 2022-12-13 00:45:34 -05:00
6bd048a9f4 gha cache docker build 2022-12-13 00:43:59 -05:00
b8fa5d9595 fix bin-selects 2022-12-13 00:40:39 -05:00
c483764b4b bin-select on cargo chef 2022-12-13 00:30:50 -05:00
9365abc7f8 remove extra clang 2022-12-13 00:23:31 -05:00
83f5f02a88 fix mold import 2022-12-13 00:23:02 -05:00
5ca02948df more build optimization 2022-12-13 00:20:24 -05:00
267a8c11c3 optimize build 2022-12-13 00:07:27 -05:00
9ccd47afa0 remove deploy step 2022-12-12 23:27:42 -05:00
d83ff16c1a add hi-precision bucketing for short term viz 2022-12-12 23:26:03 -05:00
9f1942344b update url 2022-12-12 23:09:23 -05:00
170fdf647d improvements to ingest.html 2022-12-12 23:07:47 -05:00
1e41262d70 add ingest analytics page 2022-12-12 19:40:05 -05:00
2665a6d25f initialize analytics 2022-12-11 11:33:06 -05:00
004def8fbb update health to be more helpful 2022-12-10 22:01:27 -05:00
89d115b61d update codegen to translate ant, valk, and gal variants 2022-12-09 17:47:56 -05:00
b91019e8b4 allow attacker_team_id to be empty, PS4 bug 2022-12-09 17:42:17 -05:00
26f0ce1a1a update htmls 2022-12-09 15:48:25 -05:00
3bfc0b4e28 update htmls 2022-12-09 15:23:09 -05:00
c89eb6ea74 update readme 2022-12-09 13:59:20 -05:00
05e30e4420 cleanup 2022-12-09 11:16:53 -05:00
93bccd3b19 fix graphiql url 2022-12-09 08:35:38 -05:00
38 changed files with 4341 additions and 1398 deletions

0
.env
View file

3
.envrc Normal file
View file

@ -0,0 +1,3 @@
use flake . --accept-flake-config;
# source .envrc-local

View file

@ -17,36 +17,12 @@ jobs:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: docker buildx create --use --driver=docker-container
- run: |
docker build . \
TAG_LATEST_IF_MASTER=$(if [ "$GITHUB_REF_NAME" = "main" ]; then echo "-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest"; else echo ""; fi)
docker buildx build . \
--build-arg SERVICE=${{ matrix.service }} \
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }}
- run: |
docker tag ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} \
ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest
if: github.ref == 'refs/heads/main'
- run: |
docker push ghcr.io/${{ github.repository }}/${{ matrix.service }}
deploy:
runs-on: ubuntu-latest
needs: build
if: github.ref == 'refs/heads/main'
environment:
name: production
url: https://saerro.harasse.rs
permissions:
contents: "read"
id-token: "write"
steps:
- id: "auth"
uses: "google-github-actions/auth@v1"
with:
workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }}
service_account: ${{ secrets.SERVICE_ACCOUNT }}
- name: "Set up Cloud SDK"
uses: "google-github-actions/setup-gcloud@v1"
- name: "Deploy"
run: |
gcloud compute ssh ${{ secrets.VM_NAME }} --zone=us-central1-a --command "cd /opt && sudo docker compose pull && sudo docker compose up -d"
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} $TAG_LATEST_IF_MASTER \
--push \
--cache-to type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }} \
--cache-from type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }}

6
.gitignore vendored
View file

@ -1 +1,7 @@
/target
.DS_Store
*/.DS_Store
.envrc-local
/.vscode
/.direnv
/result

2217
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,4 @@
[workspace]
members = [
"services/*",
"hack/*"
]
members = ["services/*"]
exclude = ["hack/codegen"]
resolver = "2"

View file

@ -1,18 +1,21 @@
FROM rust:1.65.0-bullseye AS builder
ARG SERVICE
FROM rust:1.76.0-bullseye as rust-base
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY services ./services
COPY hack ./hack
RUN apt-get update && apt-get install -y --no-install-recommends curl clang
ARG MOLD_VERSION=1.11.0
RUN curl -sSL https://github.com/rui314/mold/releases/download/v${MOLD_VERSION}/mold-${MOLD_VERSION}-x86_64-linux.tar.gz | tar xzv && \
mv mold-${MOLD_VERSION}-x86_64-linux/bin/mold /mold && \
rm -rf mold-${MOLD_VERSION}-x86_64-linux
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
RUN cargo build --bin ${SERVICE} --release
FROM debian:bullseye-slim AS target
FROM rust-base as builder
COPY . .
ARG SERVICE
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/${SERVICE} /app
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
RUN cargo build --release --bin ${SERVICE}
RUN chmod a+x /app
CMD /app
FROM debian:bullseye-slim as runtime
ARG SERVICE
COPY --from=builder /app/target/release/${SERVICE} /app
ENTRYPOINT ["/app"]

View file

@ -2,33 +2,35 @@
PlanetSide 2 live population API. This API is free and open for anyone to use.
https://saerro.harasse.rs
https://saerro.ps2.live
Our methodology is to add any player ID seen on the Census websockets to a time-sorted set, and returning the number of player IDs seen within 15 minutes.
tl;dr: Watch for specific events, transform and add them to a timeseries set, and query that set for the last 15 minutes.
We're built on 3 core types, `players`, `classes`, and `vehicles`. Each can be filtered by Continent/Zone, Faction, and World.
---
The one and only goal of this app is to provide a current "point-in-time" population status for PlanetSide 2, per world, per faction, (and later, per continent.) Historical info is _not_ a goal; you may implement this on your end.
Please open an issue here or get in touch with Pomf (okano#0001) on the PS2 Discord if you have complex use cases for this data; it may be trivial/easy to implement APIs tailored to your needs.
The main use case is for [Medkit](https://github.com/kayteh/medkit2) bot to have an in-house source of population data, without relying too heavily on any third-party stats service, like Fisu, Honu, or Voidwell; which all have different population tracking needs and goals (and thus, different data.)
An example of how it can be used on [pstop](https://pstop.harasse.rs) ([GitHub](https://github.com/genudine/pstop)).
## Architecture
- Websocket processors
- A pair per PC, PS4US, PS4EU
- Connects to [wss://push.nanite-systems.net](https://nanite-systems.net) and Census Websocket
- Primary will connect to NS.
- Backup will connect to Census. It will wait for 60 seconds before deciding the primary is dead, and then start processing events.
- API
- Serves https://saerro.harasse.rs
- Built on axum and async-graphql
- Redis
- Using ZADD with score as timestamp, ZCOUNTBYSCORE by timestamp in 15 minute windows, and cleaned up with SCAN+ZREMBYSCORE, population data is tracked.
- There is deliberately no persistence.
- Redis "Tender"
- Cleans up Redis every 5 mins.
- GraphQL API
- Serves https://saerro.ps2.live
- Built on a "stacking filter" graph model, where each dimension adds a filter to lower dimensions.
- Event Streaming Service (ESS) Ingest
- WebSocket listening to https://push.nanite-systems.net (which is a resilient mirror to https://push.planetside2.com)
- Listens for `Death`, `VehicleDestroy`, and a number of `GainExperience` events.
- Postgres with TimescaleDB
- Holds `players` and `analytics` tables as hypertables.
- Timescale makes this way too fast, mind-blowing :)
- Tasks
- Occasional jobs that prune the database past what we actually want to retain,
- Core data tables are kept to about 20 mins max of data, analytics to 1 week
- Can do database resets/migrations.
# Developing
@ -37,48 +39,27 @@ This app is built with Rust. You can set up a build environment via https://rust
To run,
```sh
# Start Redis/backing services
# Start backing services
docker compose up -d
# Start Websocket for PC
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
PAIR=pc \
ROLE=primary \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# Run database migrations (required first step on a freshly up'd database)
cargo run --bin tasks migrate
# (Optional:) Start redundant websocket for PC
# Start NSS ingest. Use push.planetside2.com if NSS isn't quite working...
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
PAIR=pc \
ROLE=backup \
WORLDS=1,10,13,17,19,40 \
cargo run --bin websocket
# (Optional:) Start PS4US websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4us&service-id=s:$SERVICE_ID" \
PAIR=ps4us \
WORLDS=1000 \
cargo run --bin websocket
# (Optional:) Start PS4EU websocket
env \
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4eu&service-id=s:$SERVICE_ID" \
PAIR=ps4eu \
WORLDS=2000 \
WS_ADDR="wss://push.nanite-systems.net/streaming?environment=all&service-id=s:$SERVICE_ID" \
WORLDS=all \
cargo run --bin websocket
# Start API
cargo run --bin api
# Run prune tool
cargo run --bin tools prune
cargo run --bin tasks prune
# Build containers
docker build . --build-arg SERVICE=api -t saerro:api
docker build . --build-arg SERVICE=tools -t saerro:tools
docker build . --build-arg SERVICE=tasks -t saerro:tasks
docker build . --build-arg SERVICE=websocket -t saerro:websocket
```
@ -94,4 +75,4 @@ Currently, the entire stack runs on Docker. You may deploy it to any server via:
docker compose up -d -f docker-compose.live.yaml
```
It listens on port 80, it's up to you from here.
It listens on port 80, it's up to you from here. Make sure to change passwords present in the file. It's not _that secret_ of data, but why risk it?

View file

@ -16,7 +16,7 @@ services:
image: ghcr.io/genudine/saerro/api:latest
pull_policy: always
ports:
- 8000:80
- 80:8000
links:
- tsdb
restart: always

View file

@ -2,7 +2,7 @@ version: "3"
services:
tsdb:
image: timescale/timescaledb-ha:pg14-latest
image: docker.io/timescale/timescaledb:latest-pg14
environment:
POSTGRES_PASSWORD: saerro321
POSTGRES_USER: saerrouser

103
flake.lock generated Normal file
View file

@ -0,0 +1,103 @@
{
"nodes": {
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1708150887,
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
"owner": "nix-community",
"repo": "fenix",
"rev": "761431323e30846bae160e15682cfa687c200606",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1706830856,
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1707956935,
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"dir": "lib",
"lastModified": 1706550542,
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
"type": "github"
},
"original": {
"dir": "lib",
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"fenix": "fenix",
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1708018577,
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

56
flake.nix Normal file
View file

@ -0,0 +1,56 @@
{
description = "PlanetSide 2 Metagame Harvesting Service";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
nixConfig = {
extra-substituters = [
"https://nix-community.cachix.org"
];
extra-trusted-public-keys = [
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
];
};
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
systems = [ "x86_64-linux" "aarch64-linux" ];
perSystem = { config, self', pkgs, lib, system, ... }: let
fenix = inputs.fenix.packages.${system}.minimal;
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
buildDeps = [
pkgs.openssl
];
devDeps = [
fenix.toolchain
pkgs.docker-compose
pkgs.cargo-watch
];
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
in {
packages.default = (pkgs.makeRustPlatform {
cargo = fenix.toolchain;
rustc = fenix.toolchain;
}).buildRustPackage {
inherit (cargoToml.package) name version;
cargoLock.lockFile = ./Cargo.lock;
src = ./.;
nativeBuildInputs = [ pkgs.pkg-config ];
buildInputs = buildDeps ++ devDeps;
};
devShells.default = pkgs.mkShell {
nativeBuildInputs = buildDeps ++ devDeps;
};
};
};
}

1
hack/codegen/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
target

1411
hack/codegen/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,6 @@ reqwest = { version="0.11.13", features = ["json"] }
tera = { version = "1.17.1", default-features = false }
lazy_static = "1.4.0"
regex = "1.7.0"
futures = "0.3.25"
tokio = { version = "1.22.0", features = ["full"] }
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89"

View file

@ -1,5 +1,4 @@
use std::process;
use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder};
use serde::{Deserialize, Serialize};
@ -46,6 +45,9 @@ async fn translators_rs() {
"mosquito",
"galaxy",
"valkyrie",
"wasp",
"deliverer",
"lodestar",
"liberator",
"ant",
"harasser",
@ -107,11 +109,16 @@ async fn translators_rs() {
.find(&item.name.as_ref().unwrap().en.as_ref().unwrap())
.unwrap();
let name = matched
.as_str()
.to_lowercase()
.replace("wasp", "valkyrie")
.replace("deliverer", "ant")
.replace("lodestar", "galaxy");
Vehicle {
vehicle_id: item.vehicle_id,
name: Some(LangEn {
en: Some(matched.as_str().to_string().to_lowercase()),
}),
name: Some(LangEn { en: Some(name) }),
propulsion_type: item.propulsion_type,
}
})

View file

@ -6,13 +6,25 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde_json = "1.0.89"
serde = "1.0.149"
async-graphql = { version = "5.0.3" }
async-graphql-axum = "5.0.3"
axum = "0.6.1"
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] }
tokio = { version = "1.23.0", features = [ "full" ] }
tower-http = { version = "0.3.5", features = ["cors"] }
serde_json = "1.0.105"
serde = "1.0.188"
async-graphql = { version = "6.0.5", features = ["chrono"] }
axum = "0.6.20"
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.4.4", features = ["cors"] }
lazy_static = "1.4.0"
reqwest = "0.11.13"
reqwest = { version = "0.11.20", features = [
"rustls-tls-webpki-roots",
"rustls",
] }
chrono = "0.4.28"
prometheus = "0.13.3"
[dependencies.openssl]
version = "0.10.57"
features = ["vendored"]

View file

@ -0,0 +1,86 @@
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row};
use crate::telemetry;
pub struct Analytics {}
#[derive(SimpleObject, Debug, Clone)]
pub struct Event {
pub time: DateTime<Utc>,
pub event_name: String,
pub world_id: i32,
pub count: i64,
}
#[Object]
impl Analytics {
/// Get all events in analytics, bucket_size is in seconds
async fn events<'ctx>(
&self,
ctx: &Context<'ctx>,
#[graphql(default = 60)] bucket_size: u64,
world_id: Option<i32>,
#[graphql(default = false)] hi_precision: bool,
) -> Vec<Event> {
telemetry::graphql_query("Analytics", "events");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "events");
let sql = format!("
SELECT
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
CASE WHEN count(*) IS NULL THEN 0 ELSE count(*) END AS count,
event_name,
world_id
FROM analytics
WHERE time > now() - '{}'::interval {}
GROUP BY bucket, world_id, event_name
ORDER BY bucket ASC",
if hi_precision {
5
} else {
bucket_size
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if hi_precision {
"1 hour"
} else {
"1 day"
},
if let Some(world_id) = world_id {
format!("AND world_id = {}", world_id)
} else {
"".to_string()
}
);
let mut result = query(sql.as_str()).fetch(pool);
let mut events = Vec::new();
while let Some(row) = result.try_next().await.unwrap() {
events.push(Event {
time: row.get("bucket"),
event_name: row.get("event_name"),
world_id: row.get("world_id"),
count: row.get("count"),
});
}
events
}
}
#[derive(Default)]
pub struct AnalyticsQuery;
#[Object]
impl AnalyticsQuery {
async fn analytics(&self) -> Analytics {
Analytics {}
}
}

View file

@ -1,4 +1,8 @@
use crate::utils::{Filters, IdOrNameBy};
use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry
};
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -10,10 +14,11 @@ pub struct Class {
impl Class {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
telemetry::db_read("players", "fetch");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
let sql = format!(
"SELECT count(distinct character_id) FROM classes WHERE time > now() - interval '15 minutes' AND class_id = $1 {};",
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND class_name = $1 {};",
filters.sql(),
);
@ -33,33 +38,38 @@ impl Class {
#[Object]
impl Class {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "total");
self.fetch(ctx, self.filters.clone()).await
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "nc");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(1)),
faction: Some(IdOrNameBy::Id(NC)),
..self.filters.clone()
},
)
.await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "tr");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(2)),
faction: Some(IdOrNameBy::Id(TR)),
..self.filters.clone()
},
)
.await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Class", "vs");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(3)),
faction: Some(IdOrNameBy::Id(VS)),
..self.filters.clone()
},
)
@ -83,36 +93,42 @@ impl Classes {
#[Object]
impl Classes {
async fn infiltrator(&self) -> Class {
telemetry::graphql_query("Classes", "infiltrator");
Class {
filters: self.filters.clone(),
class_name: "infiltrator".to_string(),
}
}
async fn light_assault(&self) -> Class {
telemetry::graphql_query("Classes", "light_assault");
Class {
filters: self.filters.clone(),
class_name: "light_assault".to_string(),
}
}
async fn combat_medic(&self) -> Class {
telemetry::graphql_query("Classes", "combat_medic");
Class {
filters: self.filters.clone(),
class_name: "combat_medic".to_string(),
}
}
async fn engineer(&self) -> Class {
telemetry::graphql_query("Classes", "engineer");
Class {
filters: self.filters.clone(),
class_name: "engineer".to_string(),
}
}
async fn heavy_assault(&self) -> Class {
telemetry::graphql_query("Classes", "heavy_assault");
Class {
filters: self.filters.clone(),
class_name: "heavy_assault".to_string(),
}
}
async fn max(&self) -> Class {
telemetry::graphql_query("Classes", "max");
Class {
filters: self.filters.clone(),
class_name: "max".to_string(),
@ -132,6 +148,7 @@ impl ClassesQuery {
/// Get a specific class
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
telemetry::graphql_query("Classes", "");
Class {
filters: filter.unwrap_or_default(),
class_name,

View file

@ -0,0 +1,4 @@
pub const VS: i32 = 1;
pub const NC: i32 = 2;
pub const TR: i32 = 3;
pub const NSO: i32 = 4;

View file

@ -1,8 +1,13 @@
use async_graphql::{Context, Enum, Object};
use crate::{telemetry, utils::ID_TO_WORLD};
use async_graphql::{Context, Enum, Object, SimpleObject};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use chrono::{DateTime, Utc};
use sqlx::{query, Pool, Postgres, Row};
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
telemetry::http_request("/health", "GET");
telemetry::db_read("analytics", "get_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(&pool)
@ -53,13 +58,48 @@ enum UpDown {
pub struct Health {}
impl Health {
async fn most_recent_event_time<'ctx>(
&self,
ctx: &Context<'ctx>,
world_id: i32,
) -> (UpDown, Option<DateTime<Utc>>) {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "most_recent_event_time");
let events_resp =
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
.bind(world_id)
.fetch_one(pool)
.await;
match events_resp {
Ok(row) => {
let last_event: DateTime<Utc> = row.get(0);
if last_event < Utc::now() - chrono::Duration::minutes(5) {
return (UpDown::Down, Some(last_event));
} else {
return (UpDown::Up, Some(last_event));
}
}
Err(_) => {
return (UpDown::Down, None);
}
}
}
}
/// Reports on the health of Saerro Listening Post
#[Object]
impl Health {
/// Did a ping to Postgres (our main datastore) succeed?
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "database");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "database_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool)
@ -73,8 +113,11 @@ impl Health {
/// Is the websocket processing jobs?
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
telemetry::graphql_query("Health", "ingest");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("analytics", "ingest_health");
let events_resp =
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
.fetch_one(pool)
@ -96,14 +139,46 @@ impl Health {
/// Is the websocket actually turned on?
async fn ingest_reachable(&self) -> UpDown {
telemetry::graphql_query("Health", "ingest_reachable");
reqwest::get(
std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://localhost:8999/health".to_string()),
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
)
.await
.map(|_| UpDown::Up)
.unwrap_or(UpDown::Down)
}
/// Shows a disclaimer for the worlds check
async fn worlds_disclaimer(&self) -> String {
"This is a best-effort check. A world reports `DOWN` when it doesn't have new events for 5 minutes. It could be broken, it could be the reality of the game state.".to_string()
}
/// Checks if a world has had any events for the last 5 minutes
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
telemetry::graphql_query("Health", "worlds");
let mut worlds = Vec::new();
for (id, name) in ID_TO_WORLD.iter() {
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
worlds.push(WorldUpDown {
id: *id,
name: name.to_string(),
status,
last_event,
});
}
worlds
}
}
#[derive(SimpleObject)]
struct WorldUpDown {
id: i32,
name: String,
status: UpDown,
last_event: Option<DateTime<Utc>>,
}
#[derive(Default)]

View file

@ -17,6 +17,4 @@
</style>
<h1>404 Not Found</h1>
<p>
[<a href="/">home</a>] [<a href="/graphql/playground">graphql playground</a>]
</p>
<p>[<a href="/">home</a>]</p>

View file

@ -14,47 +14,256 @@
color: #cead42;
text-decoration: none;
}
.hidden {
display: none;
}
.query {
list-style-type: none;
padding-left: 0;
background-color: #131313;
width: fit-content;
padding: 2rem;
margin: 2rem;
border-radius: 10px;
border-left: #918b79 3px solid;
font-size: 1rem;
}
.query pre {
margin: 0;
}
</style>
<h1>Saerro Listening Post</h1>
<h2>Live Population Stats API for PlanetSide 2</h2>
<p>
This is a GraphQL API, which means you can query for exactly the data you
need. You can also use the GraphiQL interface to explore the data and build
your queries.
</p>
<ul>
<li><a href="/graphql/playground">Check out the GraphQL Playground</a></li>
<li><a href="/graphiql">Check out GraphiQL</a></li>
<li>
<a
href="/graphql?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
id="status_query_link"
href="/graphql?query={ health { database ingest ingestReachable worldsDisclaimer worlds { name status lastEvent } } }"
>Current system status</a
>
(<a
href="/graphql/playground?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
>or in playground</a
href="javascript:document.querySelector('#status_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="status_query" class="hidden query">
<li>
<pre><code>{
health {
database
ingest
ingestReachable
worldsDisclaimer
worlds {
name
status
lastEvent
}
}
}</code></pre>
<a
href="javascript:runQuery('status_query_link', 'status_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="status_query_result"></li>
</ul>
</li>
<li>
<a
href="/graphql?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
id="current_pop_query_link"
href="/graphql?query={ allWorlds { name population { total nc tr vs } } }"
>
Current population of all worlds
</a>
(<a
href="/graphql/playground?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
>or in playground</a
href="javascript:document.querySelector('#current_pop_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="current_pop_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
population {
total
nc
tr
vs
}
}
}</code></pre>
<a
href="javascript:runQuery('current_pop_query_link', 'current_pop_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="current_pop_query_result"></li>
</ul>
</li>
<li>
<a
id="complex_query_link"
href="/graphql?query={ allWorlds { name classes { combatMedic { total nc tr vs } } vehicles { total sunderer { total nc tr vs } } } }"
>
Show every Sunderer and Combat Medic for every server by faction
</a>
(<a
href="javascript:document.querySelector('#complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="complex_query" class="hidden query">
<li>
<pre><code>{
allWorlds {
name
classes {
combatMedic {
total
nc
tr
vs
}
}
vehicles {
total
sunderer {
total
nc
tr
vs
}
}
}
}</code></pre>
<a
href="javascript:runQuery('complex_query_link', 'complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="complex_query_result"></li>
</ul>
</li>
<li>
<a
id="very_complex_query_link"
href="/graphql?query={ zones { all { name classes { heavyAssault { nc tr vs } lightAssault { nc tr vs } } vehicles { vanguard { total } prowler { total } magrider { total } lightning { nc vs tr } chimera { nc vs tr } } } } }"
>
Show the current counts of heavy assaults, light assaults, and tanks per
continent globally
</a>
(<a
href="javascript:document.querySelector('#very_complex_query').classList.toggle('hidden')"
>show GraphQL</a
>)
<ul id="very_complex_query" class="hidden query">
<li>
<pre><code>{
zones {
all {
name
classes {
heavyAssault {
nc
tr
vs
}
lightAssault {
nc
tr
vs
}
}
vehicles {
vanguard {
total
}
prowler {
total
}
magrider {
total
}
lightning {
nc
vs
tr
}
chimera {
nc
vs
tr
}
}
}
}
}</code></pre>
<a
href="javascript:runQuery('very_complex_query_link', 'very_complex_query_result')"
>Run ⫸</a
><br />
</li>
<li class="hidden" id="very_complex_query_result"></li>
</ul>
</li>
</ul>
<p>
This API supports two query methods,
<a href="https://graphql.org/learn/serving-over-http/#get-request">GET</a>
and
<a href="https://graphql.org/learn/serving-over-http/#post-request">POST</a>.
To view the JSON outputs without fancy UIs, you can use a browser plugin like
<a href="https://addons.mozilla.org/en-US/firefox/addon/jsonview/"
>JSONView for Firefox</a
>
or
<a
href="https://chrome.google.com/webstore/detail/jsonvue/chklaanhfefbnpoihckbnefhakgolnmc"
>JSONVue for Chrome</a
>.
</p>
<p>
All data is an aggregate of the last 15 minutes of Death and VehicleDestroy
events, including both attacker and victim.
</p>
<hr />
<p>
This API is provided by Genudine Dynamics.<br />As always, we take no
responsibility for your use of this data... or our weapons. :)
</p>
<p>For help, please contact us in #api-dev on the PlanetSide 2 Discord.</p>
<p>
[<a href="https://github.com/genudine/saerro">github</a>] [<a
href="https://pstop.harasse.rs"
>pstop</a
>]
[<a href="/ingest">ingest stats</a>] [<a
href="https://github.com/genudine/saerro"
>github</a
>] [<a href="https://pstop.harasse.rs">pstop</a>]
</p>
<script>
const runQuery = async (linkId, resultId) => {
const link = document.getElementById(linkId);
const result = document.getElementById(resultId);
result.innerHTML = "Loading...";
result.classList.remove("hidden");
fetch(link.href)
.then((response) => response.json())
.then((data) => {
result.innerHTML = `<pre><code>${JSON.stringify(
data.data,
null,
2
)}</pre></code>`;
})
.catch((error) => {
result.innerHTML = "Failed...";
});
};
</script>

View file

@ -0,0 +1,418 @@
<!DOCTYPE html>
<title>Ingest Stats - Saerro Listening Post</title>
<meta charset="utf-8" />
<style>
body {
font-family: monospace;
background-color: #010101;
color: #e0e0e0;
font-size: 1.25rem;
line-height: 1.6;
}
a {
color: #cead42;
text-decoration: none;
}
h3 {
margin: 0;
}
.chart-container {
position: relative;
}
.main {
display: grid;
grid-template-columns: repeat(2, minmax(300px, 1fr));
gap: 1rem;
padding: 1rem;
}
.wide {
grid-column: span 2;
}
.graph-head {
display: flex;
justify-content: space-between;
align-items: center;
}
.sums-15m {
color: rgba(165, 165, 165, 1);
}
.sums-1h {
color: rgba(165, 165, 165, 0.8);
}
.sums-6h {
color: rgba(165, 165, 165, 0.6);
}
.sums-1d {
color: rgba(165, 165, 165, 0.4);
}
</style>
<h1>Ingest Stats <span id="loading">[LOADING...]</span></h1>
<div class="main">
<div class="wide">
<div class="graph-head">
<h3>All Events by Type</h3>
<p id="all-events-by-type-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="all-events-by-type" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Events by World</h3>
</div>
<div class="chart-container">
<canvas id="events-by-world" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Connery [US West]</h3>
<p id="connery-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="connery" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Miller [EU]</h3>
<p id="miller-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="miller" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Cobalt [EU]</h3>
<p id="cobalt-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="cobalt" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Emerald [US East]</h3>
<p id="emerald-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="emerald" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Jaeger [US East]</h3>
<p id="jaeger-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="jaeger" />
</div>
</div>
<div>
<div class="graph-head">
<h3>SolTech [Tokyo]</h3>
<p id="soltech-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="soltech" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Genudine [US East] [PS4]</h3>
<p id="genudine-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="genudine" />
</div>
</div>
<div>
<div class="graph-head">
<h3>Ceres [EU] [PS4]</h3>
<p id="ceres-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container smaller">
<canvas id="ceres" />
</div>
</div>
<div class="wide">
<div class="graph-head">
<h3>Experience Events By ID</h3>
<p id="exp-by-id-sums">(0, 0, 0, 0)</p>
</div>
<div class="chart-container">
<canvas id="exp-by-id" />
</div>
<div class="filter">
Filter to World:
<select id="exp-filter">
<option selected value="all">All</option>
<option value="1">Connery</option>
<option value="10">Miller</option>
<option value="13">Cobalt</option>
<option value="17">Emerald</option>
<option value="19">Jaeger</option>
<option value="40">SolTech</option>
<option value="1000">Genudine</option>
<option value="2000">Ceres</option>
</select>
</div>
</div>
</div>
<p>
[<a href="/">home</a>] [<a href="/ingest">1 day w/ 5m buckets</a>] [<a
href="/ingest?hi=1"
>1 hour w/ 5s buckets</a
>]
</p>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns/dist/chartjs-adapter-date-fns.bundle.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/humanize-plus@1.8.2/dist/humanize.min.js"></script>
<script>
const sumsStr = (_15m, _1h, _6h, _24h) => `
<span class="sums-15m" title="sum of last 15 minutes (${_15m})">${Humanize.intComma(
_15m
)},</span>
<span class="sums-1h" title="sum of last 1 hour (${_1h})">${Humanize.compactInteger(
_1h
)},</span>
<span class="sums-6h" title="sum of last 6 hours (${_6h})">${Humanize.compactInteger(
_6h
)},</span>
<span class="sums-1d" title="sum of last 1 day (${_24h})">${Humanize.compactInteger(
_24h
)}</span>`;
const doSums = async (id, events) => {
// Calculate sums for the last 15 minutes, 1 hour, 6 hours, and 1 day based on event timestamps
let sums = [0, 0, 0, 0];
let now = Date.now();
let fifteenMinutes = 15 * 60 * 1000;
let oneHour = 60 * 60 * 1000;
let sixHours = 6 * 60 * 60 * 1000;
let oneDay = 24 * 60 * 60 * 1000;
for (let ev of events) {
let diff = now - new Date(ev.time);
if (diff < fifteenMinutes) {
sums[0] += ev.count;
}
if (diff < oneHour) {
sums[1] += ev.count;
}
if (diff < sixHours) {
sums[2] += ev.count;
}
if (diff < oneDay) {
sums[3] += ev.count;
}
}
document.getElementById(`${id}-sums`).innerHTML = sumsStr(...sums);
};
const allEventsByType = (id, events) => {
doSums(id, events);
let allEvents = events.reduce(
(acc, ev) => {
const eventName = ev.eventName.replace(/_[0-9]+/g, "");
acc[eventName][ev.time] = acc[eventName][ev.time] ?? 0;
acc[eventName][ev.time] += ev.count;
return acc;
},
{ Death: {}, VehicleDestroy: {}, GainExperience: {} }
);
new Chart(document.getElementById(id), {
type: "line",
options: {
scales: {
y: { beginAtZero: true, suggestedMin: 0 },
x: { stacked: false, type: "timeseries" },
},
},
data: {
datasets: [
{
label: "Deaths",
data: allEvents.Death,
},
{
label: "Vehicle Destroys",
data: allEvents.VehicleDestroy,
},
{
label: "Experience Events",
data: allEvents.GainExperience,
},
],
},
});
};
const experienceEventsByID = (eventsUnfiltered) => {
const events = eventsUnfiltered.filter((ev) =>
ev.eventName.startsWith("GainExperience_")
);
doSums("exp-by-id", events);
let allEvents = events.reduce((acc, ev) => {
const eventID = ev.eventName.replace(/GainExperience_([0-9]+)/g, "$1");
acc[eventID] = acc[eventID] ?? {};
acc[eventID][ev.time] = acc[eventID][ev.time] ?? 0;
acc[eventID][ev.time] += ev.count;
return acc;
}, {});
new Chart(document.getElementById("exp-by-id"), {
type: "bar",
options: {
scales: {
y: { stacked: true, beginAtZero: true, suggestedMin: 0 },
x: { stacked: true, type: "timeseries" },
},
},
data: {
datasets: Object.keys(allEvents).map((id) => ({
label: id,
data: allEvents[id],
})),
},
});
};
const eventsByWorld = (events) => {
let allEvents = events.reduce((acc, ev) => {
acc[ev.worldId] = acc[ev.worldId] || {};
acc[ev.worldId][ev.time] = (acc[ev.time] || 0) + ev.count;
return acc;
}, {});
new Chart(document.getElementById("events-by-world"), {
type: "line",
options: {
scales: {
y: { beginAtZero: true },
x: {
type: "timeseries",
},
},
},
data: {
datasets: [
{
label: "Connery",
data: allEvents["1"],
},
{
label: "Miller",
data: allEvents["10"],
},
{
label: "Cobalt",
data: allEvents["13"],
},
{
label: "Emerald",
data: allEvents["17"],
},
{
label: "Jaeger",
data: allEvents["19"],
},
{
label: "SolTech",
data: allEvents["40"],
},
{
label: "Genudine",
data: allEvents["1000"],
},
{
label: "Ceres",
data: allEvents["2000"],
},
],
},
});
};
(async () => {
let resp = await fetch("/graphql", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
query: `
{
analytics {
events(${
location.search.includes("hi=1")
? "hiPrecision: true"
: "bucketSize: 300"
}) {
eventName
count
time
worldId
}
}
}
`,
}),
});
let body = await resp.json();
let events = body.data.analytics.events;
window.events = events;
document.getElementById("loading").style.display = "none";
allEventsByType("all-events-by-type", events);
eventsByWorld(events);
[
["connery", 1],
["miller", 10],
["cobalt", 13],
["emerald", 17],
["jaeger", 19],
["soltech", 40],
["genudine", 1000],
["ceres", 2000],
].forEach(([world, id]) => {
let worldEvents = events.filter((ev) => ev.worldId === id);
allEventsByType(world, worldEvents);
});
const expFilter = document.getElementById("exp-filter");
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
expFilter.addEventListener("change", () => {
document.getElementById("exp-by-id").outerHTML =
"<canvas id='exp-by-id' />";
experienceEventsByID(
expFilter.value === "all"
? events
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
);
});
})();
</script>

View file

@ -1,7 +1,10 @@
mod analytics;
mod classes;
mod factions;
mod health;
mod population;
mod query;
mod telemetry;
mod utils;
mod vehicles;
mod world;
@ -24,9 +27,15 @@ use tower_http::cors::{Any, CorsLayer};
extern crate serde_json;
async fn index() -> Html<&'static str> {
telemetry::http_request("/", "GET");
Html(include_str!("html/index.html"))
}
async fn ingest() -> Html<&'static str> {
telemetry::http_request("/ingest", "GET");
Html(include_str!("html/ingest.html"))
}
async fn handle_404() -> Html<&'static str> {
Html(include_str!("html/404.html"))
}
@ -35,6 +44,7 @@ async fn graphql_handler_post(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
Json(query): Json<Request>,
) -> Json<Response> {
telemetry::http_request("/graphql", "POST");
Json(schema.execute(query).await)
}
@ -42,6 +52,8 @@ async fn graphql_handler_get(
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
query: Query<Request>,
) -> axum::response::Response {
telemetry::http_request("/graphql", "GET");
if query.query == "" {
return Redirect::to("/graphiql").into_response();
}
@ -50,6 +62,8 @@ async fn graphql_handler_get(
}
async fn graphiql() -> impl IntoResponse {
telemetry::http_request("/graphiql", "GET");
Html(
GraphiQLSource::build()
.endpoint("/graphql")
@ -61,7 +75,7 @@ async fn graphiql() -> impl IntoResponse {
#[tokio::main]
async fn main() {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
let db = sqlx::PgPool::connect(&db_url).await.unwrap();
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
@ -70,12 +84,15 @@ async fn main() {
let app = Router::new()
.route("/", get(index))
.route("/ingest", get(ingest))
.route("/health", get(health::get_health))
.route(
"/graphql",
post(graphql_handler_post).get(graphql_handler_get),
)
.route("/graphql/playground", get(graphiql))
.route("/graphiql", get(graphiql))
.route("/metrics", get(telemetry::handler))
.route("/metrics/combined", get(telemetry::handler_combined))
.fallback(handle_404)
.layer(Extension(db))
.layer(Extension(schema))

View file

@ -1,4 +1,8 @@
use crate::utils::Filters;
use crate::{
factions::{NC, NSO, TR, VS},
utils::Filters,
telemetry,
};
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -19,8 +23,9 @@ impl Population {
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_by_faction");
let sql = format!(
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND faction_id = $1 {};",
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
self.filters.sql(),
);
@ -40,10 +45,13 @@ impl Population {
#[Object]
impl Population {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Population", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "population_total");
let sql = format!(
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' {};",
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(),
);
@ -54,20 +62,24 @@ impl Population {
.await
.unwrap()
.get(0);
query
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 1).await
telemetry::graphql_query("Population", "nc");
self.by_faction(ctx, NC).await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 2).await
telemetry::graphql_query("Population", "vs");
self.by_faction(ctx, VS).await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 3).await
telemetry::graphql_query("Population", "tr");
self.by_faction(ctx, TR).await
}
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
self.by_faction(ctx, 4).await
telemetry::graphql_query("Population", "ns");
self.by_faction(ctx, NSO).await
}
}

View file

@ -1,6 +1,6 @@
use crate::{
classes::ClassesQuery, health::HealthQuery, population::PopulationQuery,
vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
analytics::AnalyticsQuery, classes::ClassesQuery, health::HealthQuery,
population::PopulationQuery, vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
};
use async_graphql::MergedObject;
@ -12,4 +12,5 @@ pub struct Query(
WorldQuery,
ZoneQuery,
HealthQuery,
AnalyticsQuery,
);

View file

@ -0,0 +1,138 @@
use lazy_static::lazy_static;
use prometheus::{
IntGauge,
IntGaugeVec,
register_int_gauge_vec,
register_int_gauge,
TextEncoder,
gather
};
use sqlx::{Pool, Postgres, Row};
use axum::Extension;
use chrono::{DateTime, Utc};
lazy_static! {
// http
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
"route", "method"
]).unwrap();
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
"major", "minor"
]).unwrap();
// counters
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
update_data_gauges(pool).await;
// Final output
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
let local = handler(Extension(pool)).await;
let remote = match reqwest::get(url).await {
Ok(r) => r.text().await.expect("failed to text lol"),
Err(_) => String::from("")
};
format!("{}{}", local, remote)
}
// pub fn db_write(table: &str, op: &str) {
// DB_WRITES.with_label_values(&[table, op]).inc();
// }
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}
pub fn http_request(route: &str, method: &str) {
HTTP_REQUEST.with_label_values(&[route, method]).inc();
}
pub fn graphql_query(major: &str, minor: &str) {
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
}
async fn update_data_gauges(pool: Pool<Postgres>) {
// Do some easy queries to fill our non-cumulative gauges
db_read("players", "count_all");
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
PLAYERS_TRACKED.set(player_count);
db_read("players", "get_newest");
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_PLAYER.set(player_newest.timestamp());
db_read("players", "get_oldest");
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_PLAYER.set(player_oldest.timestamp());
db_read("vehicles", "count_all");
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
VEHICLES_TRACKED.set(vehicle_count);
db_read("vehicles", "get_newest");
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
db_read("vehicles", "get_oldest");
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
.fetch_one(&pool)
.await
.unwrap()
.get(0);
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
}

View file

@ -1,4 +1,8 @@
use crate::utils::{Filters, IdOrNameBy};
use crate::{
factions::{NC, TR, VS},
utils::{Filters, IdOrNameBy},
telemetry,
};
use async_graphql::{Context, Object};
use sqlx::{Pool, Postgres, Row};
@ -12,8 +16,9 @@ impl Vehicle {
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("vehicles", "fetch");
let sql = format!(
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};",
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
filters.sql(),
);
@ -33,33 +38,41 @@ impl Vehicle {
#[Object]
impl Vehicle {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "total");
self.fetch(ctx, self.filters.clone()).await
}
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "nc");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(1)),
faction: Some(IdOrNameBy::Id(NC)),
..self.filters.clone()
},
)
.await
}
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "tr");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(2)),
faction: Some(IdOrNameBy::Id(TR)),
..self.filters.clone()
},
)
.await
}
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicle", "vs");
self.fetch(
ctx,
Filters {
faction: Some(IdOrNameBy::Id(3)),
faction: Some(IdOrNameBy::Id(VS)),
..self.filters.clone()
},
)
@ -83,10 +96,13 @@ impl Vehicles {
#[Object]
impl Vehicles {
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
telemetry::graphql_query("Vehicles", "total");
let pool = ctx.data::<Pool<Postgres>>().unwrap();
telemetry::db_read("players", "vehicles_total");
let sql = format!(
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' {};",
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
self.filters.sql(),
);
@ -103,62 +119,90 @@ impl Vehicles {
// Transport
async fn flash(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "flash");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "flash".to_string(),
}
}
async fn sunderer(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "sunderer");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "sunderer".to_string(),
}
}
async fn ant(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "ant");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "ant".to_string(),
}
}
async fn harasser(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "harasser");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "harasser".to_string(),
}
}
async fn javelin(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "javelin");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "javelin".to_string(),
}
}
async fn corsair(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "corsair");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "corsair".to_string(),
}
}
// Tanks
async fn lightning(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "lightning");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "javelin".to_string(),
vehicle_name: "lightning".to_string(),
}
}
async fn prowler(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "prowler");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "prowler".to_string(),
}
}
async fn vanguard(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "vanguard");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "vanguard".to_string(),
}
}
async fn magrider(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "magrider");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "magrider".to_string(),
}
}
async fn chimera(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "chimera");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "chimera".to_string(),
@ -167,42 +211,56 @@ impl Vehicles {
// Air
async fn mosquito(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "mosquito");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "mosquito".to_string(),
}
}
async fn liberator(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "liberator");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "liberator".to_string(),
}
}
async fn galaxy(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "galaxy");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "galaxy".to_string(),
}
}
async fn valkyrie(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "valkyrie");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "valkyrie".to_string(),
}
}
async fn reaver(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "reaver");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "reaver".to_string(),
}
}
async fn scythe(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "scythe");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "scythe".to_string(),
}
}
async fn dervish(&self) -> Vehicle {
telemetry::graphql_query("Vehicle", "dervish");
Vehicle {
filters: self.filters.clone(),
vehicle_name: "dervish".to_string(),

View file

@ -4,6 +4,7 @@ use crate::{
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
vehicles::Vehicles,
zone::Zones,
telemetry,
};
use async_graphql::Object;
@ -33,11 +34,15 @@ impl World {
impl World {
/// The ID of the world.
async fn id(&self) -> i32 {
telemetry::graphql_query("World", "id");
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
}
/// The name of the world, in official game capitalization.
async fn name(&self) -> String {
telemetry::graphql_query("World", "name");
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
// Special case for SolTech, lol.
@ -51,6 +56,8 @@ impl World {
/// Population filtered to this world.
async fn population(&self) -> Population {
telemetry::graphql_query("World", "population");
Population::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -60,6 +67,8 @@ impl World {
/// Vehicles filtered to this world.
async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("World", "vehicles");
Vehicles::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -69,6 +78,8 @@ impl World {
/// Classes filtered to this world.
async fn classes(&self) -> Classes {
telemetry::graphql_query("World", "classes");
Classes::new(Some(Filters {
world: self.filter.world.clone(),
faction: None,
@ -78,6 +89,8 @@ impl World {
/// Get a specific zone/continent on this world.
async fn zones(&self) -> Zones {
telemetry::graphql_query("World", "zones");
Zones::new(Some(self.filter.clone()))
}
}

View file

@ -3,6 +3,7 @@ use crate::{
population::Population,
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
vehicles::Vehicles,
telemetry,
};
use async_graphql::Object;
@ -23,11 +24,15 @@ impl Zone {
impl Zone {
/// The ID of the zone/continent.
async fn id(&self) -> i32 {
telemetry::graphql_query("Zone", "id");
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
}
/// The name of the continent, in official game capitalization.
async fn name(&self) -> String {
telemetry::graphql_query("Zone", "name");
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
// Capitalize the first letter
@ -35,14 +40,20 @@ impl Zone {
}
async fn population(&self) -> Population {
telemetry::graphql_query("Zone", "population");
Population::new(Some(self.filters.clone()))
}
async fn vehicles(&self) -> Vehicles {
telemetry::graphql_query("Zone", "vehicles");
Vehicles::new(Some(self.filters.clone()))
}
async fn classes(&self) -> Classes {
telemetry::graphql_query("Zone", "classes");
Classes::new(Some(self.filters.clone()))
}
}

View file

@ -6,10 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] }
once_cell = "1.16.0"
tokio = { version = "1.23.0", features = ["full"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
lazy_static = "1.4.0"
async_once = "0.2.6"
dotenvy = "0.15.6"

View file

@ -1,5 +1,4 @@
use async_once::AsyncOnce;
use dotenvy::dotenv;
use lazy_static::lazy_static;
use migrations::cmd_migrate;
use sqlx::query;
@ -10,7 +9,7 @@ mod migrations;
lazy_static! {
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
sqlx::PgPool::connect(&db_url).await.unwrap()
});
}
@ -19,21 +18,14 @@ async fn cmd_prune() {
println!("Pruning old data...");
let pool = PG.get().await;
let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';")
let rows = query("DELETE FROM players WHERE last_updated < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
.rows_affected();
println!("Deleted {} rows of old player data", rows);
let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
.rows_affected();
println!("Deleted {} rows of old class data", rows);
let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';")
let rows = query("DELETE FROM vehicles WHERE last_updated < NOW() - INTERVAL '15 minutes';")
.execute(pool)
.await
.unwrap()
@ -58,14 +50,41 @@ fn cmd_help() {
#[tokio::main]
async fn main() {
dotenv().ok();
let command = args().nth(1).unwrap_or("help".to_string());
match command.as_str() {
"help" => cmd_help(),
"prune" => cmd_prune().await,
"auto-prune" => loop {
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"maintenance" => {
println!("Running maintenance tasks...");
println!("Checking if DB is migrated...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
println!("Running prune...");
cmd_prune().await;
println!("Done!");
}
"auto-maintenance" => loop {
println!("Running maintenance tasks...");
if !migrations::is_migrated().await {
println!("DB is not migrated, running migrations...");
cmd_migrate().await;
}
cmd_prune().await;
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
},
"migrate" => cmd_migrate().await,
"print-env" => {
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
}
_ => {
println!("Unknown command: {}", command);
cmd_help();

View file

@ -1,15 +1,10 @@
use crate::PG;
use sqlx::query;
use sqlx::{query, Row};
pub async fn cmd_migrate() {
println!("Migrating database...");
tokio::join!(
migrate_players(),
migrate_classes(),
migrate_vehicles(),
migrate_analytics()
);
tokio::join!(migrate_players(), migrate_vehicles(), migrate_analytics());
}
async fn migrate_players() {
@ -26,77 +21,21 @@ async fn migrate_players() {
println!("PLAYERS => CREATE TABLE players");
query(
"CREATE TABLE players (
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL PRIMARY KEY,
last_updated TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL);",
zone_id INT NOT NULL,
class_name TEXT NOT NULL
);",
)
.execute(pool)
.await
.unwrap();
println!("PLAYERS => create_hypertable");
query(
"SELECT create_hypertable('players', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("PLAYERS => add_retention_policy");
query("SELECT add_retention_policy('players', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("PLAYERS => done!");
}
async fn migrate_classes() {
let pool = PG.get().await;
println!("-> Migrating classes");
println!("CLASSES => DROP TABLE IF EXISTS classes");
query("DROP TABLE IF EXISTS classes")
.execute(pool)
.await
.unwrap();
println!("CLASSES => CREATE TABLE classes");
query(
"CREATE TABLE classes (
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
class_id TEXT NOT NULL);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => create_hypertable");
query(
"SELECT create_hypertable('classes', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("CLASSES => add_retention_policy");
query("SELECT add_retention_policy('classes', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("CLASSES => done!");
}
async fn migrate_vehicles() {
let pool = PG.get().await;
@ -111,33 +50,18 @@ async fn migrate_vehicles() {
println!("VEHICLES => CREATE TABLE vehicles");
query(
"CREATE TABLE vehicles (
character_id TEXT NOT NULL,
time TIMESTAMPTZ NOT NULL,
character_id TEXT NOT NULL PRIMARY KEY,
last_updated TIMESTAMPTZ NOT NULL,
world_id INT NOT NULL,
faction_id INT NOT NULL,
zone_id INT NOT NULL,
vehicle_id TEXT NOT NULL);",
zone_id INT NOT NULL,
vehicle_name TEXT NOT NULL
);",
)
.execute(pool)
.await
.unwrap();
println!("VEHICLES => create_hypertable");
query(
"SELECT create_hypertable('vehicles', 'time',
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
)
.execute(pool)
.await
.unwrap();
println!("VEHICLES => add_retention_policy");
query("SELECT add_retention_policy('vehicles', INTERVAL '15 minutes');")
.execute(pool)
.await
.unwrap();
println!("VEHICLES => done!");
}
@ -173,3 +97,15 @@ async fn migrate_analytics() {
println!("ANALYTICS => done!");
}
pub async fn is_migrated() -> bool {
let pool = PG.get().await;
let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'vehicles', 'analytics');")
.fetch_one(pool)
.await
.unwrap()
.get(0);
tables == 3
}

View file

@ -6,16 +6,22 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
redis = { version = "0.22.1", default_features = false, features = ["r2d2"] }
lazy_static = "1.4.0"
tokio-tungstenite = { version = "0.18.0", features=["native-tls"] }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
tokio = { version = "1.23.0", features = ["full"] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
url = "2.3.1"
futures-util = "0.3.25"
futures = "0.3.25"
tokio-tungstenite = { version = "0.20.0", features = [
"rustls-tls-webpki-roots",
] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.7.1", default_features = false, features = [
"runtime-tokio-rustls",
"postgres",
] }
url = "2.4.1"
futures-util = "0.3.28"
futures = "0.3.28"
async_once = "0.2.6"
serde-aux = "4.1.2"
axum = "0.6.1"
serde-aux = "4.2.0"
axum = "0.6.20"
prometheus = "0.13.3"
prometheus-static-metric = "0.5.1"

View file

@ -1,42 +1,50 @@
use async_once::AsyncOnce;
use axum::{routing::get, Router};
use axum::{routing::get, Json, Router};
use futures::{pin_mut, FutureExt};
use futures_util::StreamExt;
use lazy_static::lazy_static;
use serde::Deserialize;
use serde_aux::prelude::*;
use serde_json::json;
use sqlx::{postgres::PgPoolOptions, query};
use sqlx::{postgres::PgPoolOptions, query, Row};
use std::{env, net::SocketAddr};
use tokio::task::JoinSet;
use tokio_tungstenite::{connect_async, tungstenite::Message};
mod translators;
mod telemetry;
lazy_static! {
// static ref PAIR: String = env::var("PAIR").unwrap_or_default();
// static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
let db_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
PgPoolOptions::new().connect(&db_url).await.unwrap()
});
}
async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let worlds_raw = env::var("WORLDS").unwrap_or_default();
if worlds_raw == "" {
println!("WORLDS not set");
return;
}
let worlds_raw = env::var("WORLDS").unwrap_or("all".to_string());
let worlds: Vec<&str> = worlds_raw.split(',').collect();
let experience_ids = vec![
2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, 233, 293,
294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, 675,
];
let mut events = experience_ids
.iter()
.map(|id| format!("GainExperience_experience_id_{}", id))
.collect::<Vec<String>>();
events.push("Death".to_string());
events.push("VehicleDestroy".to_string());
// Send setup message
let setup_msg = json!({
"action": "subscribe",
"worlds": worlds,
"eventNames": ["Death", "VehicleDestroy"],
"eventNames": events,
"characters": ["all"],
"logicalAndCharactersWithWorlds": true,
"service": "event",
@ -46,6 +54,7 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
.unwrap();
println!("[ws] Sent setup message");
println!("[ws/setup] {}", setup_msg.to_string())
}
#[derive(Clone)]
@ -54,33 +63,32 @@ struct PopEvent {
team_id: i32,
character_id: String,
zone_id: i32,
}
struct VehicleEvent {
world_id: i32,
vehicle_id: String,
character_id: String,
zone_id: i32,
team_id: i32,
}
struct ClassEvent {
world_id: i32,
character_id: String,
loadout_id: String,
zone_id: i32,
team_id: i32,
vehicle_id: String,
}
#[derive(Debug)]
struct AnalyticsEvent {
world_id: i32,
event_name: String,
}
// async fn track_pop(pop_event: PopEvent) {
// track_pop_db(pop_event.clone()).await;
// track_pop_redis(pop_event).await;
// }
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
let pool = PG.get().await;
telemetry::db_read("players", "get_team_id");
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
.bind(character_id.clone())
.fetch_one(pool)
.await?
.get(0);
if team_id == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(team_id)
}
async fn track_pop(pop_event: PopEvent) {
// println!("[ws/track_pop]");
@ -91,76 +99,31 @@ async fn track_pop(pop_event: PopEvent) {
team_id,
character_id,
zone_id,
loadout_id,
vehicle_id,
} = pop_event;
query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.execute(pool)
.await
.unwrap();
}
async fn track_vehicle(vehicle_event: VehicleEvent) {
// println!("[ws/track_vehicle]");
let pool = PG.get().await;
let VehicleEvent {
world_id,
vehicle_id,
zone_id,
character_id,
team_id,
} = vehicle_event;
let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str());
if vehicle_name == "unknown" {
return;
}
query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(vehicle_name)
.execute(pool)
.await
.unwrap();
}
async fn track_class(class_event: ClassEvent) {
// println!("[ws/track_class]");
let pool = PG.get().await;
let ClassEvent {
world_id,
character_id,
loadout_id,
zone_id,
team_id,
} = class_event;
let class_name = translators::loadout_to_class(loadout_id.as_str());
let vehicle_name = if vehicle_id == "" {
"unknown".to_string()
} else {
translators::vehicle_to_name(vehicle_id.as_str())
};
if class_name == "unknown" {
return;
}
telemetry::db_write("players", "track_pop");
query(
"INSERT INTO classes (
time,
character_id,
world_id,
faction_id,
zone_id,
class_id
) VALUES (now(), $1, $2, $3, $4, $5);",
"
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET
last_updated = EXCLUDED.last_updated,
world_id = EXCLUDED.world_id,
faction_id = EXCLUDED.faction_id,
zone_id = EXCLUDED.zone_id,
class_name = EXCLUDED.class_name
;",
)
.bind(character_id)
.bind(character_id.clone())
.bind(world_id)
.bind(team_id)
.bind(zone_id)
@ -168,10 +131,31 @@ async fn track_class(class_event: ClassEvent) {
.execute(pool)
.await
.unwrap();
if vehicle_name != "unknown" {
telemetry::db_write("vehicles", "track_pop");
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
VALUES (now(), $1, $2, $3, $4, $5)
ON CONFLICT (character_id) DO UPDATE SET
last_updated = EXCLUDED.last_updated,
world_id = EXCLUDED.world_id,
faction_id = EXCLUDED.faction_id,
zone_id = EXCLUDED.zone_id,
vehicle_name = EXCLUDED.vehicle_name
;")
.bind(character_id)
.bind(world_id)
.bind(team_id)
.bind(zone_id)
.bind(vehicle_name)
.execute(pool)
.await
.unwrap();
}
}
async fn track_analytics(analytics_event: AnalyticsEvent) {
// println!("[ws/track_analytics]");
// println!("[ws/track_analytics] {:?}", analytics_event);
let pool = PG.get().await;
let AnalyticsEvent {
@ -179,15 +163,21 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
event_name,
} = analytics_event;
query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
telemetry::db_write("analytics", "track_analytics");
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
.bind(world_id)
.bind(event_name)
.execute(pool)
.await
.unwrap();
{
Ok(_) => {}
Err(e) => {
println!("[ws/track_analytics] ERR => {:?}", e);
}
}
}
async fn process_event(event: &Event) {
async fn process_death_event(event: &Event) {
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
@ -196,33 +186,14 @@ async fn process_event(event: &Event) {
event_name: event.event_name.clone(),
}));
if event.character_id != "0" {
// General population tracking
if event.character_id != "" && event.character_id != "0" {
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
}));
}
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.vehicle_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.character_id.clone(),
loadout_id: event.loadout_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.team_id.clone(),
vehicle_id: event.vehicle_id.clone(),
}));
}
@ -235,42 +206,58 @@ async fn process_event(event: &Event) {
team_id: event.attacker_team_id.clone(),
character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.attacker_loadout_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
}));
if event.event_name == "VehicleDestroy" {
set.spawn(track_vehicle(VehicleEvent {
world_id: event.world_id.clone(),
vehicle_id: event.attacker_vehicle_id.clone(),
character_id: event.attacker_character_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
if event.event_name == "Death" {
set.spawn(track_class(ClassEvent {
world_id: event.world_id.clone(),
character_id: event.attacker_character_id.clone(),
loadout_id: event.attacker_loadout_id.clone(),
zone_id: event.zone_id.clone(),
team_id: event.attacker_team_id.clone(),
}));
}
}
while let Some(_) = set.join_next().await {}
}
async fn process_exp_event(event: &Event) {
telemetry::experience_event(&event.world_id, &event.experience_id);
let mut set = JoinSet::new();
// println!("[ws/process_event] EVENT: {:?}", event);
set.spawn(track_analytics(AnalyticsEvent {
world_id: event.world_id.clone(),
event_name: format!(
"{}_{}",
event.event_name.clone(),
event.experience_id.clone()
),
}));
// Vehicle EXP events
let vehicle_id = match event.experience_id {
201 => "11".to_string(), // Galaxy Spawn Bonus
233 => "2".to_string(), // Sunderer Spawn Bonus
674 | 675 => "160".to_string(), // ANT stuff
_ => "".to_string(),
};
set.spawn(track_pop(PopEvent {
world_id: event.world_id.clone(),
team_id: event.team_id.clone(),
character_id: event.character_id.clone(),
zone_id: event.zone_id.clone(),
loadout_id: event.loadout_id.clone(),
vehicle_id: vehicle_id.clone(),
}));
while let Some(_) = set.join_next().await {}
}
#[derive(Deserialize, Debug, Clone, Default)]
struct Event {
event_name: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
world_id: i32,
character_id: String,
#[serde(default)]
attacker_character_id: String,
#[serde(deserialize_with = "deserialize_number_from_string")]
#[serde(default, deserialize_with = "deserialize_number_from_string")]
attacker_team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")]
#[serde(default, deserialize_with = "deserialize_number_from_string")]
team_id: i32,
#[serde(deserialize_with = "deserialize_number_from_string")]
zone_id: i32,
@ -286,6 +273,11 @@ struct Event {
vehicle_id: String,
#[serde(default)]
attacker_vehicle_id: String,
#[serde(default, deserialize_with = "deserialize_number_from_string")]
experience_id: i32,
// #[serde(default)]
// other_id: String,
}
#[derive(Deserialize, Debug, Clone)]
@ -294,7 +286,17 @@ struct Payload {
}
async fn healthz() {
let app = Router::new().route("/healthz", get(|| async { "ok" }));
let app = Router::new().route(
"/healthz",
get(|| async {
Json(json!({
"status": "ok",
}))
}),
).route(
"/metrics",
get(telemetry::handler)
);
let port: u16 = std::env::var("PORT")
.unwrap_or("8999".to_string())
@ -319,23 +321,54 @@ async fn main() {
}
let url = url::Url::parse(&addr).unwrap();
println!("[ws] Connecting to {}", url);
let (tx, rx) = futures::channel::mpsc::unbounded();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
let (write, read) = ws_stream.split();
let fused_writer = rx.map(Ok).forward(write).fuse();
let fused_reader = read
.for_each(|msg| async move {
.for_each(|msg| async {
let body = &msg.unwrap().to_string();
let data: Payload = serde_json::from_str(body).unwrap_or(Payload {
payload: Event::default(),
});
let mut data: Payload = match serde_json::from_str(body) {
Ok(data) => data,
Err(_e) => {
// println!("Error: {}; body: {}", e, body.clone());
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
return;
}
};
if data.payload.event_name == "" {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
return;
}
process_event(&data.payload).await;
telemetry::event(&data.payload.world_id, &data.payload.event_name);
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
process_death_event(&data.payload).await;
return;
}
if data.payload.event_name == "GainExperience" {
if data.payload.team_id == 0 {
match get_team_id(data.payload.character_id.clone()).await {
Ok(team_id) => {
data.payload.team_id = team_id;
}
Err(_) => {
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
}
}
}
process_exp_event(&data.payload).await;
return;
}
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
})
.fuse();

View file

@ -0,0 +1,75 @@
use lazy_static::lazy_static;
use prometheus::{
IntGaugeVec,
register_int_gauge_vec,
TextEncoder,
gather
};
lazy_static! {
// incoming events
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
"world_id", "event_name"
]).unwrap();
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
"world_id", "event_name", "reason"
]).unwrap();
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
"world_id", "experience_id"
]).unwrap();
// database stuff
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
"table", "op"
]).unwrap();
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
"table", "op"
]).unwrap();
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
// "table", "op"
// ]).unwrap();
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
// "table", "op"
// ]).unwrap();
}
pub async fn handler() -> String {
let encoder = TextEncoder::new();
let mut buffer = String::new();
let metrics = gather();
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
buffer
}
pub fn event(world_id: &i32, event_name: &String) {
EVENTS.with_label_values(&[
&world_id.to_string(),
&event_name,
]).inc();
}
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
EVENTS_DROPPED.with_label_values(&[
&world_id.to_string(),
&event_name,
reason,
]).inc();
}
pub fn experience_event(world_id: &i32, experience_id: &i32) {
EXPERIENCE_EVENTS.with_label_values(&[
&world_id.to_string(),
&experience_id.to_string(),
]).inc();
}
pub fn db_write(table: &str, op: &str) {
DB_WRITES.with_label_values(&[table, op]).inc();
}
pub fn db_read(table: &str, op: &str) {
DB_READS.with_label_values(&[table, op]).inc();
}

View file

@ -34,6 +34,8 @@ lazy_static! {
("1105", "vanguard"),
("2010", "flash"),
("2033", "javelin"),
("2039", "ant"),
("2040", "valkyrie"),
("2122", "mosquito"),
("2123", "reaver"),
("2124", "scythe"),
@ -47,6 +49,9 @@ lazy_static! {
("2135", "prowler"),
("2136", "dervish"),
("2137", "chimera"),
("2139", "ant"),
("2140", "galaxy"),
("2141", "valkyrie"),
("2142", "corsair"),
]);
static ref LOADOUT_TO_CLASS: HashMap<&'static str, &'static str> = HashMap::from([