Compare commits
57 commits
Author | SHA1 | Date | |
---|---|---|---|
3c0ab61695 | |||
3a422b8f6f | |||
7ab5893f67 | |||
96cb2c80d8 | |||
ad8105ca94 | |||
e805a4ca5a | |||
be38bb6b5f | |||
c58fdf06b7 | |||
9ebeeb47f4 | |||
f49a8a0fda | |||
a1f0e32e30 | |||
143ec0cd3b | |||
2dd19249db | |||
79d406cee6 | |||
e5c57bf505 | |||
11127f269d | |||
97ea7455a7 | |||
bfdf7dcb52 | |||
ffc2dfbbad | |||
4e7824cbad | |||
2a47beb69a | |||
f8add369a6 | |||
118f1b6b99 | |||
bf89c4aaf0 | |||
738d2975ec | |||
24437b5520 | |||
bca70e2d5b | |||
679b49ff88 | |||
9940e9dd90 | |||
2d6da8343d | |||
1c3440d919 | |||
0f710f2712 | |||
9b6b261c16 | |||
97474de07e | |||
a9d1daf397 | |||
505e71f65f | |||
6bd048a9f4 | |||
b8fa5d9595 | |||
c483764b4b | |||
9365abc7f8 | |||
83f5f02a88 | |||
5ca02948df | |||
267a8c11c3 | |||
9ccd47afa0 | |||
d83ff16c1a | |||
9f1942344b | |||
170fdf647d | |||
1e41262d70 | |||
2665a6d25f | |||
004def8fbb | |||
89d115b61d | |||
b91019e8b4 | |||
26f0ce1a1a | |||
3bfc0b4e28 | |||
c89eb6ea74 | |||
05e30e4420 | |||
93bccd3b19 |
38 changed files with 4341 additions and 1398 deletions
0
.env
0
.env
3
.envrc
Normal file
3
.envrc
Normal file
|
@ -0,0 +1,3 @@
|
|||
use flake . --accept-flake-config;
|
||||
|
||||
# source .envrc-local
|
38
.github/workflows/ci.yaml
vendored
38
.github/workflows/ci.yaml
vendored
|
@ -17,36 +17,12 @@ jobs:
|
|||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
- run: docker buildx create --use --driver=docker-container
|
||||
- run: |
|
||||
docker build . \
|
||||
TAG_LATEST_IF_MASTER=$(if [ "$GITHUB_REF_NAME" = "main" ]; then echo "-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest"; else echo ""; fi)
|
||||
docker buildx build . \
|
||||
--build-arg SERVICE=${{ matrix.service }} \
|
||||
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }}
|
||||
- run: |
|
||||
docker tag ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} \
|
||||
ghcr.io/${{ github.repository }}/${{ matrix.service }}:latest
|
||||
if: github.ref == 'refs/heads/main'
|
||||
- run: |
|
||||
docker push ghcr.io/${{ github.repository }}/${{ matrix.service }}
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
if: github.ref == 'refs/heads/main'
|
||||
environment:
|
||||
name: production
|
||||
url: https://saerro.harasse.rs
|
||||
permissions:
|
||||
contents: "read"
|
||||
id-token: "write"
|
||||
steps:
|
||||
- id: "auth"
|
||||
uses: "google-github-actions/auth@v1"
|
||||
with:
|
||||
workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }}
|
||||
service_account: ${{ secrets.SERVICE_ACCOUNT }}
|
||||
|
||||
- name: "Set up Cloud SDK"
|
||||
uses: "google-github-actions/setup-gcloud@v1"
|
||||
|
||||
- name: "Deploy"
|
||||
run: |
|
||||
gcloud compute ssh ${{ secrets.VM_NAME }} --zone=us-central1-a --command "cd /opt && sudo docker compose pull && sudo docker compose up -d"
|
||||
-t ghcr.io/${{ github.repository }}/${{ matrix.service }}:${{ github.sha }} $TAG_LATEST_IF_MASTER \
|
||||
--push \
|
||||
--cache-to type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }} \
|
||||
--cache-from type=gha,scope=$GITHUB_REF_NAME-${{ matrix.service }}
|
||||
|
|
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -1 +1,7 @@
|
|||
/target
|
||||
.DS_Store
|
||||
*/.DS_Store
|
||||
.envrc-local
|
||||
/.vscode
|
||||
/.direnv
|
||||
/result
|
2217
Cargo.lock
generated
2217
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -1,5 +1,4 @@
|
|||
[workspace]
|
||||
members = [
|
||||
"services/*",
|
||||
"hack/*"
|
||||
]
|
||||
members = ["services/*"]
|
||||
exclude = ["hack/codegen"]
|
||||
resolver = "2"
|
||||
|
|
31
Dockerfile
31
Dockerfile
|
@ -1,18 +1,21 @@
|
|||
FROM rust:1.65.0-bullseye AS builder
|
||||
ARG SERVICE
|
||||
|
||||
FROM rust:1.76.0-bullseye as rust-base
|
||||
WORKDIR /app
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
COPY services ./services
|
||||
COPY hack ./hack
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends curl clang
|
||||
ARG MOLD_VERSION=1.11.0
|
||||
RUN curl -sSL https://github.com/rui314/mold/releases/download/v${MOLD_VERSION}/mold-${MOLD_VERSION}-x86_64-linux.tar.gz | tar xzv && \
|
||||
mv mold-${MOLD_VERSION}-x86_64-linux/bin/mold /mold && \
|
||||
rm -rf mold-${MOLD_VERSION}-x86_64-linux
|
||||
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
|
||||
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
|
||||
|
||||
RUN cargo build --bin ${SERVICE} --release
|
||||
|
||||
|
||||
FROM debian:bullseye-slim AS target
|
||||
FROM rust-base as builder
|
||||
COPY . .
|
||||
ARG SERVICE
|
||||
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
|
||||
COPY --from=builder /app/target/release/${SERVICE} /app
|
||||
ENV CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=clang
|
||||
ENV RUSTFLAGS="-C link-arg=-fuse-ld=/mold"
|
||||
RUN cargo build --release --bin ${SERVICE}
|
||||
|
||||
RUN chmod a+x /app
|
||||
CMD /app
|
||||
FROM debian:bullseye-slim as runtime
|
||||
ARG SERVICE
|
||||
COPY --from=builder /app/target/release/${SERVICE} /app
|
||||
ENTRYPOINT ["/app"]
|
75
README.md
75
README.md
|
@ -2,33 +2,35 @@
|
|||
|
||||
PlanetSide 2 live population API. This API is free and open for anyone to use.
|
||||
|
||||
https://saerro.harasse.rs
|
||||
https://saerro.ps2.live
|
||||
|
||||
Our methodology is to add any player ID seen on the Census websockets to a time-sorted set, and returning the number of player IDs seen within 15 minutes.
|
||||
tl;dr: Watch for specific events, transform and add them to a timeseries set, and query that set for the last 15 minutes.
|
||||
|
||||
We're built on 3 core types, `players`, `classes`, and `vehicles`. Each can be filtered by Continent/Zone, Faction, and World.
|
||||
|
||||
---
|
||||
|
||||
The one and only goal of this app is to provide a current "point-in-time" population status for PlanetSide 2, per world, per faction, (and later, per continent.) Historical info is _not_ a goal; you may implement this on your end.
|
||||
|
||||
Please open an issue here or get in touch with Pomf (okano#0001) on the PS2 Discord if you have complex use cases for this data; it may be trivial/easy to implement APIs tailored to your needs.
|
||||
|
||||
The main use case is for [Medkit](https://github.com/kayteh/medkit2) bot to have an in-house source of population data, without relying too heavily on any third-party stats service, like Fisu, Honu, or Voidwell; which all have different population tracking needs and goals (and thus, different data.)
|
||||
|
||||
An example of how it can be used on [pstop](https://pstop.harasse.rs) ([GitHub](https://github.com/genudine/pstop)).
|
||||
|
||||
## Architecture
|
||||
|
||||
- Websocket processors
|
||||
- A pair per PC, PS4US, PS4EU
|
||||
- Connects to [wss://push.nanite-systems.net](https://nanite-systems.net) and Census Websocket
|
||||
- Primary will connect to NS.
|
||||
- Backup will connect to Census. It will wait for 60 seconds before deciding the primary is dead, and then start processing events.
|
||||
- API
|
||||
- Serves https://saerro.harasse.rs
|
||||
- Built on axum and async-graphql
|
||||
- Redis
|
||||
- Using ZADD with score as timestamp, ZCOUNTBYSCORE by timestamp in 15 minute windows, and cleaned up with SCAN+ZREMBYSCORE, population data is tracked.
|
||||
- There is deliberately no persistence.
|
||||
- Redis "Tender"
|
||||
- Cleans up Redis every 5 mins.
|
||||
- GraphQL API
|
||||
- Serves https://saerro.ps2.live
|
||||
- Built on a "stacking filter" graph model, where each dimension adds a filter to lower dimensions.
|
||||
- Event Streaming Service (ESS) Ingest
|
||||
- WebSocket listening to https://push.nanite-systems.net (which is a resilient mirror to https://push.planetside2.com)
|
||||
- Listens for `Death`, `VehicleDestroy`, and a number of `GainExperience` events.
|
||||
- Postgres with TimescaleDB
|
||||
- Holds `players` and `analytics` tables as hypertables.
|
||||
- Timescale makes this way too fast, mind-blowing :)
|
||||
- Tasks
|
||||
- Occasional jobs that prune the database past what we actually want to retain,
|
||||
- Core data tables are kept to about 20 mins max of data, analytics to 1 week
|
||||
- Can do database resets/migrations.
|
||||
|
||||
# Developing
|
||||
|
||||
|
@ -37,48 +39,27 @@ This app is built with Rust. You can set up a build environment via https://rust
|
|||
To run,
|
||||
|
||||
```sh
|
||||
# Start Redis/backing services
|
||||
# Start backing services
|
||||
docker compose up -d
|
||||
|
||||
# Start Websocket for PC
|
||||
env \
|
||||
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
|
||||
PAIR=pc \
|
||||
ROLE=primary \
|
||||
WORLDS=1,10,13,17,19,40 \
|
||||
cargo run --bin websocket
|
||||
# Run database migrations (required first step on a freshly up'd database)
|
||||
cargo run --bin tasks migrate
|
||||
|
||||
# (Optional:) Start redundant websocket for PC
|
||||
# Start NSS ingest. Use push.planetside2.com if NSS isn't quite working...
|
||||
env \
|
||||
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2&service-id=s:$SERVICE_ID" \
|
||||
PAIR=pc \
|
||||
ROLE=backup \
|
||||
WORLDS=1,10,13,17,19,40 \
|
||||
cargo run --bin websocket
|
||||
|
||||
# (Optional:) Start PS4US websocket
|
||||
env \
|
||||
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4us&service-id=s:$SERVICE_ID" \
|
||||
PAIR=ps4us \
|
||||
WORLDS=1000 \
|
||||
cargo run --bin websocket
|
||||
|
||||
# (Optional:) Start PS4EU websocket
|
||||
env \
|
||||
WS_ADDR="wss://push.planetside2.com/streaming?environment=ps2ps4eu&service-id=s:$SERVICE_ID" \
|
||||
PAIR=ps4eu \
|
||||
WORLDS=2000 \
|
||||
WS_ADDR="wss://push.nanite-systems.net/streaming?environment=all&service-id=s:$SERVICE_ID" \
|
||||
WORLDS=all \
|
||||
cargo run --bin websocket
|
||||
|
||||
# Start API
|
||||
cargo run --bin api
|
||||
|
||||
# Run prune tool
|
||||
cargo run --bin tools prune
|
||||
cargo run --bin tasks prune
|
||||
|
||||
# Build containers
|
||||
docker build . --build-arg SERVICE=api -t saerro:api
|
||||
docker build . --build-arg SERVICE=tools -t saerro:tools
|
||||
docker build . --build-arg SERVICE=tasks -t saerro:tasks
|
||||
docker build . --build-arg SERVICE=websocket -t saerro:websocket
|
||||
```
|
||||
|
||||
|
@ -94,4 +75,4 @@ Currently, the entire stack runs on Docker. You may deploy it to any server via:
|
|||
docker compose up -d -f docker-compose.live.yaml
|
||||
```
|
||||
|
||||
It listens on port 80, it's up to you from here.
|
||||
It listens on port 80, it's up to you from here. Make sure to change passwords present in the file. It's not _that secret_ of data, but why risk it?
|
||||
|
|
|
@ -16,7 +16,7 @@ services:
|
|||
image: ghcr.io/genudine/saerro/api:latest
|
||||
pull_policy: always
|
||||
ports:
|
||||
- 8000:80
|
||||
- 80:8000
|
||||
links:
|
||||
- tsdb
|
||||
restart: always
|
||||
|
|
|
@ -2,7 +2,7 @@ version: "3"
|
|||
|
||||
services:
|
||||
tsdb:
|
||||
image: timescale/timescaledb-ha:pg14-latest
|
||||
image: docker.io/timescale/timescaledb:latest-pg14
|
||||
environment:
|
||||
POSTGRES_PASSWORD: saerro321
|
||||
POSTGRES_USER: saerrouser
|
||||
|
|
103
flake.lock
generated
Normal file
103
flake.lock
generated
Normal file
|
@ -0,0 +1,103 @@
|
|||
{
|
||||
"nodes": {
|
||||
"fenix": {
|
||||
"inputs": {
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
],
|
||||
"rust-analyzer-src": "rust-analyzer-src"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1708150887,
|
||||
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"rev": "761431323e30846bae160e15682cfa687c200606",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"flake-parts": {
|
||||
"inputs": {
|
||||
"nixpkgs-lib": "nixpkgs-lib"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1706830856,
|
||||
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
|
||||
"owner": "hercules-ci",
|
||||
"repo": "flake-parts",
|
||||
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "hercules-ci",
|
||||
"repo": "flake-parts",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1707956935,
|
||||
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs-lib": {
|
||||
"locked": {
|
||||
"dir": "lib",
|
||||
"lastModified": 1706550542,
|
||||
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"dir": "lib",
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"fenix": "fenix",
|
||||
"flake-parts": "flake-parts",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"rust-analyzer-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1708018577,
|
||||
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
|
||||
"owner": "rust-lang",
|
||||
"repo": "rust-analyzer",
|
||||
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "rust-lang",
|
||||
"ref": "nightly",
|
||||
"repo": "rust-analyzer",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
56
flake.nix
Normal file
56
flake.nix
Normal file
|
@ -0,0 +1,56 @@
|
|||
{
|
||||
description = "PlanetSide 2 Metagame Harvesting Service";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
flake-parts.url = "github:hercules-ci/flake-parts";
|
||||
fenix = {
|
||||
url = "github:nix-community/fenix";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
};
|
||||
|
||||
nixConfig = {
|
||||
extra-substituters = [
|
||||
"https://nix-community.cachix.org"
|
||||
];
|
||||
|
||||
extra-trusted-public-keys = [
|
||||
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
|
||||
];
|
||||
};
|
||||
|
||||
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
|
||||
systems = [ "x86_64-linux" "aarch64-linux" ];
|
||||
perSystem = { config, self', pkgs, lib, system, ... }: let
|
||||
fenix = inputs.fenix.packages.${system}.minimal;
|
||||
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
|
||||
|
||||
buildDeps = [
|
||||
pkgs.openssl
|
||||
];
|
||||
|
||||
devDeps = [
|
||||
fenix.toolchain
|
||||
pkgs.docker-compose
|
||||
pkgs.cargo-watch
|
||||
];
|
||||
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
|
||||
in {
|
||||
packages.default = (pkgs.makeRustPlatform {
|
||||
cargo = fenix.toolchain;
|
||||
rustc = fenix.toolchain;
|
||||
}).buildRustPackage {
|
||||
inherit (cargoToml.package) name version;
|
||||
cargoLock.lockFile = ./Cargo.lock;
|
||||
src = ./.;
|
||||
nativeBuildInputs = [ pkgs.pkg-config ];
|
||||
buildInputs = buildDeps ++ devDeps;
|
||||
};
|
||||
|
||||
devShells.default = pkgs.mkShell {
|
||||
nativeBuildInputs = buildDeps ++ devDeps;
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
1
hack/codegen/.gitignore
vendored
Normal file
1
hack/codegen/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
target
|
1411
hack/codegen/Cargo.lock
generated
Normal file
1411
hack/codegen/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
|
@ -8,7 +8,6 @@ reqwest = { version="0.11.13", features = ["json"] }
|
|||
tera = { version = "1.17.1", default-features = false }
|
||||
lazy_static = "1.4.0"
|
||||
regex = "1.7.0"
|
||||
futures = "0.3.25"
|
||||
tokio = { version = "1.22.0", features = ["full"] }
|
||||
serde = { version = "1.0.147", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
|
@ -1,5 +1,4 @@
|
|||
use std::process;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use regex::{Regex, RegexBuilder};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -46,6 +45,9 @@ async fn translators_rs() {
|
|||
"mosquito",
|
||||
"galaxy",
|
||||
"valkyrie",
|
||||
"wasp",
|
||||
"deliverer",
|
||||
"lodestar",
|
||||
"liberator",
|
||||
"ant",
|
||||
"harasser",
|
||||
|
@ -107,11 +109,16 @@ async fn translators_rs() {
|
|||
.find(&item.name.as_ref().unwrap().en.as_ref().unwrap())
|
||||
.unwrap();
|
||||
|
||||
let name = matched
|
||||
.as_str()
|
||||
.to_lowercase()
|
||||
.replace("wasp", "valkyrie")
|
||||
.replace("deliverer", "ant")
|
||||
.replace("lodestar", "galaxy");
|
||||
|
||||
Vehicle {
|
||||
vehicle_id: item.vehicle_id,
|
||||
name: Some(LangEn {
|
||||
en: Some(matched.as_str().to_string().to_lowercase()),
|
||||
}),
|
||||
name: Some(LangEn { en: Some(name) }),
|
||||
propulsion_type: item.propulsion_type,
|
||||
}
|
||||
})
|
||||
|
|
|
@ -6,13 +6,25 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
serde_json = "1.0.89"
|
||||
serde = "1.0.149"
|
||||
async-graphql = { version = "5.0.3" }
|
||||
async-graphql-axum = "5.0.3"
|
||||
axum = "0.6.1"
|
||||
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls", "postgres" ] }
|
||||
tokio = { version = "1.23.0", features = [ "full" ] }
|
||||
tower-http = { version = "0.3.5", features = ["cors"] }
|
||||
serde_json = "1.0.105"
|
||||
serde = "1.0.188"
|
||||
async-graphql = { version = "6.0.5", features = ["chrono"] }
|
||||
axum = "0.6.20"
|
||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"postgres",
|
||||
"chrono",
|
||||
] }
|
||||
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
|
||||
tower-http = { version = "0.4.4", features = ["cors"] }
|
||||
lazy_static = "1.4.0"
|
||||
reqwest = "0.11.13"
|
||||
reqwest = { version = "0.11.20", features = [
|
||||
"rustls-tls-webpki-roots",
|
||||
"rustls",
|
||||
] }
|
||||
chrono = "0.4.28"
|
||||
prometheus = "0.13.3"
|
||||
|
||||
[dependencies.openssl]
|
||||
version = "0.10.57"
|
||||
features = ["vendored"]
|
||||
|
|
86
services/api/src/analytics.rs
Normal file
86
services/api/src/analytics.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::{query, Pool, Postgres, Row};
|
||||
use crate::telemetry;
|
||||
|
||||
pub struct Analytics {}
|
||||
|
||||
#[derive(SimpleObject, Debug, Clone)]
|
||||
pub struct Event {
|
||||
pub time: DateTime<Utc>,
|
||||
pub event_name: String,
|
||||
pub world_id: i32,
|
||||
pub count: i64,
|
||||
}
|
||||
|
||||
#[Object]
|
||||
impl Analytics {
|
||||
/// Get all events in analytics, bucket_size is in seconds
|
||||
async fn events<'ctx>(
|
||||
&self,
|
||||
ctx: &Context<'ctx>,
|
||||
#[graphql(default = 60)] bucket_size: u64,
|
||||
world_id: Option<i32>,
|
||||
#[graphql(default = false)] hi_precision: bool,
|
||||
) -> Vec<Event> {
|
||||
telemetry::graphql_query("Analytics", "events");
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("analytics", "events");
|
||||
let sql = format!("
|
||||
SELECT
|
||||
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
|
||||
CASE WHEN count(*) IS NULL THEN 0 ELSE count(*) END AS count,
|
||||
event_name,
|
||||
world_id
|
||||
FROM analytics
|
||||
WHERE time > now() - '{}'::interval {}
|
||||
GROUP BY bucket, world_id, event_name
|
||||
ORDER BY bucket ASC",
|
||||
if hi_precision {
|
||||
5
|
||||
} else {
|
||||
bucket_size
|
||||
},
|
||||
if hi_precision {
|
||||
"1 hour"
|
||||
} else {
|
||||
"1 day"
|
||||
},
|
||||
if hi_precision {
|
||||
"1 hour"
|
||||
} else {
|
||||
"1 day"
|
||||
},
|
||||
if let Some(world_id) = world_id {
|
||||
format!("AND world_id = {}", world_id)
|
||||
} else {
|
||||
"".to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let mut result = query(sql.as_str()).fetch(pool);
|
||||
|
||||
let mut events = Vec::new();
|
||||
while let Some(row) = result.try_next().await.unwrap() {
|
||||
events.push(Event {
|
||||
time: row.get("bucket"),
|
||||
event_name: row.get("event_name"),
|
||||
world_id: row.get("world_id"),
|
||||
count: row.get("count"),
|
||||
});
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct AnalyticsQuery;
|
||||
|
||||
#[Object]
|
||||
impl AnalyticsQuery {
|
||||
async fn analytics(&self) -> Analytics {
|
||||
Analytics {}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,8 @@
|
|||
use crate::utils::{Filters, IdOrNameBy};
|
||||
use crate::{
|
||||
factions::{NC, TR, VS},
|
||||
utils::{Filters, IdOrNameBy},
|
||||
telemetry
|
||||
};
|
||||
use async_graphql::{Context, Object};
|
||||
use sqlx::{Pool, Postgres, Row};
|
||||
|
||||
|
@ -10,10 +14,11 @@ pub struct Class {
|
|||
|
||||
impl Class {
|
||||
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
||||
telemetry::db_read("players", "fetch");
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
let sql = format!(
|
||||
"SELECT count(distinct character_id) FROM classes WHERE time > now() - interval '15 minutes' AND class_id = $1 {};",
|
||||
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND class_name = $1 {};",
|
||||
filters.sql(),
|
||||
);
|
||||
|
||||
|
@ -33,33 +38,38 @@ impl Class {
|
|||
#[Object]
|
||||
impl Class {
|
||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Class", "total");
|
||||
|
||||
self.fetch(ctx, self.filters.clone()).await
|
||||
}
|
||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Class", "nc");
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(1)),
|
||||
faction: Some(IdOrNameBy::Id(NC)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Class", "tr");
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(2)),
|
||||
faction: Some(IdOrNameBy::Id(TR)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Class", "vs");
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(3)),
|
||||
faction: Some(IdOrNameBy::Id(VS)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
|
@ -83,36 +93,42 @@ impl Classes {
|
|||
#[Object]
|
||||
impl Classes {
|
||||
async fn infiltrator(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "infiltrator");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "infiltrator".to_string(),
|
||||
}
|
||||
}
|
||||
async fn light_assault(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "light_assault");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "light_assault".to_string(),
|
||||
}
|
||||
}
|
||||
async fn combat_medic(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "combat_medic");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "combat_medic".to_string(),
|
||||
}
|
||||
}
|
||||
async fn engineer(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "engineer");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "engineer".to_string(),
|
||||
}
|
||||
}
|
||||
async fn heavy_assault(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "heavy_assault");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "heavy_assault".to_string(),
|
||||
}
|
||||
}
|
||||
async fn max(&self) -> Class {
|
||||
telemetry::graphql_query("Classes", "max");
|
||||
Class {
|
||||
filters: self.filters.clone(),
|
||||
class_name: "max".to_string(),
|
||||
|
@ -132,6 +148,7 @@ impl ClassesQuery {
|
|||
|
||||
/// Get a specific class
|
||||
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
|
||||
telemetry::graphql_query("Classes", "");
|
||||
Class {
|
||||
filters: filter.unwrap_or_default(),
|
||||
class_name,
|
||||
|
|
4
services/api/src/factions.rs
Normal file
4
services/api/src/factions.rs
Normal file
|
@ -0,0 +1,4 @@
|
|||
pub const VS: i32 = 1;
|
||||
pub const NC: i32 = 2;
|
||||
pub const TR: i32 = 3;
|
||||
pub const NSO: i32 = 4;
|
|
@ -1,8 +1,13 @@
|
|||
use async_graphql::{Context, Enum, Object};
|
||||
use crate::{telemetry, utils::ID_TO_WORLD};
|
||||
use async_graphql::{Context, Enum, Object, SimpleObject};
|
||||
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::{query, Pool, Postgres, Row};
|
||||
|
||||
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
|
||||
telemetry::http_request("/health", "GET");
|
||||
|
||||
telemetry::db_read("analytics", "get_health");
|
||||
let events_resp =
|
||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||
.fetch_one(&pool)
|
||||
|
@ -53,13 +58,48 @@ enum UpDown {
|
|||
|
||||
pub struct Health {}
|
||||
|
||||
impl Health {
|
||||
async fn most_recent_event_time<'ctx>(
|
||||
&self,
|
||||
ctx: &Context<'ctx>,
|
||||
world_id: i32,
|
||||
) -> (UpDown, Option<DateTime<Utc>>) {
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("analytics", "most_recent_event_time");
|
||||
let events_resp =
|
||||
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
|
||||
.bind(world_id)
|
||||
.fetch_one(pool)
|
||||
.await;
|
||||
|
||||
match events_resp {
|
||||
Ok(row) => {
|
||||
let last_event: DateTime<Utc> = row.get(0);
|
||||
|
||||
if last_event < Utc::now() - chrono::Duration::minutes(5) {
|
||||
return (UpDown::Down, Some(last_event));
|
||||
} else {
|
||||
return (UpDown::Up, Some(last_event));
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
return (UpDown::Down, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reports on the health of Saerro Listening Post
|
||||
#[Object]
|
||||
impl Health {
|
||||
/// Did a ping to Postgres (our main datastore) succeed?
|
||||
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
||||
telemetry::graphql_query("Health", "database");
|
||||
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("analytics", "database_health");
|
||||
let events_resp =
|
||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||
.fetch_one(pool)
|
||||
|
@ -73,8 +113,11 @@ impl Health {
|
|||
|
||||
/// Is the websocket processing jobs?
|
||||
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
||||
telemetry::graphql_query("Health", "ingest");
|
||||
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("analytics", "ingest_health");
|
||||
let events_resp =
|
||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||
.fetch_one(pool)
|
||||
|
@ -96,14 +139,46 @@ impl Health {
|
|||
|
||||
/// Is the websocket actually turned on?
|
||||
async fn ingest_reachable(&self) -> UpDown {
|
||||
telemetry::graphql_query("Health", "ingest_reachable");
|
||||
|
||||
reqwest::get(
|
||||
std::env::var("WEBSOCKET_HEALTHCHECK")
|
||||
.unwrap_or("http://localhost:8999/health".to_string()),
|
||||
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
|
||||
)
|
||||
.await
|
||||
.map(|_| UpDown::Up)
|
||||
.unwrap_or(UpDown::Down)
|
||||
}
|
||||
|
||||
/// Shows a disclaimer for the worlds check
|
||||
async fn worlds_disclaimer(&self) -> String {
|
||||
"This is a best-effort check. A world reports `DOWN` when it doesn't have new events for 5 minutes. It could be broken, it could be the reality of the game state.".to_string()
|
||||
}
|
||||
|
||||
/// Checks if a world has had any events for the last 5 minutes
|
||||
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
|
||||
telemetry::graphql_query("Health", "worlds");
|
||||
|
||||
let mut worlds = Vec::new();
|
||||
for (id, name) in ID_TO_WORLD.iter() {
|
||||
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
|
||||
worlds.push(WorldUpDown {
|
||||
id: *id,
|
||||
name: name.to_string(),
|
||||
status,
|
||||
last_event,
|
||||
});
|
||||
}
|
||||
worlds
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(SimpleObject)]
|
||||
struct WorldUpDown {
|
||||
id: i32,
|
||||
name: String,
|
||||
status: UpDown,
|
||||
last_event: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
|
|
@ -17,6 +17,4 @@
|
|||
</style>
|
||||
|
||||
<h1>404 Not Found</h1>
|
||||
<p>
|
||||
[<a href="/">home</a>] [<a href="/graphql/playground">graphql playground</a>]
|
||||
</p>
|
||||
<p>[<a href="/">home</a>]</p>
|
||||
|
|
|
@ -14,47 +14,256 @@
|
|||
color: #cead42;
|
||||
text-decoration: none;
|
||||
}
|
||||
|
||||
.hidden {
|
||||
display: none;
|
||||
}
|
||||
|
||||
.query {
|
||||
list-style-type: none;
|
||||
padding-left: 0;
|
||||
background-color: #131313;
|
||||
width: fit-content;
|
||||
padding: 2rem;
|
||||
margin: 2rem;
|
||||
border-radius: 10px;
|
||||
border-left: #918b79 3px solid;
|
||||
font-size: 1rem;
|
||||
}
|
||||
|
||||
.query pre {
|
||||
margin: 0;
|
||||
}
|
||||
</style>
|
||||
|
||||
<h1>Saerro Listening Post</h1>
|
||||
<h2>Live Population Stats API for PlanetSide 2</h2>
|
||||
<p>
|
||||
This is a GraphQL API, which means you can query for exactly the data you
|
||||
need. You can also use the GraphiQL interface to explore the data and build
|
||||
your queries.
|
||||
</p>
|
||||
<ul>
|
||||
<li><a href="/graphql/playground">Check out the GraphQL Playground</a></li>
|
||||
<li><a href="/graphiql">Check out GraphiQL</a></li>
|
||||
<li>
|
||||
<a
|
||||
href="/graphql?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
|
||||
id="status_query_link"
|
||||
href="/graphql?query={ health { database ingest ingestReachable worldsDisclaimer worlds { name status lastEvent } } }"
|
||||
>Current system status</a
|
||||
>
|
||||
(<a
|
||||
href="/graphql/playground?query=%7B%20health%20%7B%20pc%20redis%20ps4us%20ps4eu%20%7D%7D"
|
||||
>or in playground</a
|
||||
href="javascript:document.querySelector('#status_query').classList.toggle('hidden')"
|
||||
>show GraphQL</a
|
||||
>)
|
||||
<ul id="status_query" class="hidden query">
|
||||
<li>
|
||||
<pre><code>{
|
||||
health {
|
||||
database
|
||||
ingest
|
||||
ingestReachable
|
||||
worldsDisclaimer
|
||||
worlds {
|
||||
name
|
||||
status
|
||||
lastEvent
|
||||
}
|
||||
}
|
||||
}</code></pre>
|
||||
<a
|
||||
href="javascript:runQuery('status_query_link', 'status_query_result')"
|
||||
>Run ⫸</a
|
||||
><br />
|
||||
</li>
|
||||
<li class="hidden" id="status_query_result"></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li>
|
||||
<a
|
||||
href="/graphql?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
|
||||
id="current_pop_query_link"
|
||||
href="/graphql?query={ allWorlds { name population { total nc tr vs } } }"
|
||||
>
|
||||
Current population of all worlds
|
||||
</a>
|
||||
(<a
|
||||
href="/graphql/playground?query=%7B%0A%20%20allWorlds%20%7B%0A%20%20%20%20name%0A%20%20%20%20population%0A%20%20%7D%0A%7D%0A"
|
||||
>or in playground</a
|
||||
href="javascript:document.querySelector('#current_pop_query').classList.toggle('hidden')"
|
||||
>show GraphQL</a
|
||||
>)
|
||||
<ul id="current_pop_query" class="hidden query">
|
||||
<li>
|
||||
<pre><code>{
|
||||
allWorlds {
|
||||
name
|
||||
population {
|
||||
total
|
||||
nc
|
||||
tr
|
||||
vs
|
||||
}
|
||||
}
|
||||
}</code></pre>
|
||||
<a
|
||||
href="javascript:runQuery('current_pop_query_link', 'current_pop_query_result')"
|
||||
>Run ⫸</a
|
||||
><br />
|
||||
</li>
|
||||
<li class="hidden" id="current_pop_query_result"></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li>
|
||||
<a
|
||||
id="complex_query_link"
|
||||
href="/graphql?query={ allWorlds { name classes { combatMedic { total nc tr vs } } vehicles { total sunderer { total nc tr vs } } } }"
|
||||
>
|
||||
Show every Sunderer and Combat Medic for every server by faction
|
||||
</a>
|
||||
(<a
|
||||
href="javascript:document.querySelector('#complex_query').classList.toggle('hidden')"
|
||||
>show GraphQL</a
|
||||
>)
|
||||
<ul id="complex_query" class="hidden query">
|
||||
<li>
|
||||
<pre><code>{
|
||||
allWorlds {
|
||||
name
|
||||
classes {
|
||||
combatMedic {
|
||||
total
|
||||
nc
|
||||
tr
|
||||
vs
|
||||
}
|
||||
}
|
||||
vehicles {
|
||||
total
|
||||
sunderer {
|
||||
total
|
||||
nc
|
||||
tr
|
||||
vs
|
||||
}
|
||||
}
|
||||
}
|
||||
}</code></pre>
|
||||
<a
|
||||
href="javascript:runQuery('complex_query_link', 'complex_query_result')"
|
||||
>Run ⫸</a
|
||||
><br />
|
||||
</li>
|
||||
<li class="hidden" id="complex_query_result"></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li>
|
||||
<a
|
||||
id="very_complex_query_link"
|
||||
href="/graphql?query={ zones { all { name classes { heavyAssault { nc tr vs } lightAssault { nc tr vs } } vehicles { vanguard { total } prowler { total } magrider { total } lightning { nc vs tr } chimera { nc vs tr } } } } }"
|
||||
>
|
||||
Show the current counts of heavy assaults, light assaults, and tanks per
|
||||
continent globally
|
||||
</a>
|
||||
(<a
|
||||
href="javascript:document.querySelector('#very_complex_query').classList.toggle('hidden')"
|
||||
>show GraphQL</a
|
||||
>)
|
||||
<ul id="very_complex_query" class="hidden query">
|
||||
<li>
|
||||
<pre><code>{
|
||||
zones {
|
||||
all {
|
||||
name
|
||||
classes {
|
||||
heavyAssault {
|
||||
nc
|
||||
tr
|
||||
vs
|
||||
}
|
||||
lightAssault {
|
||||
nc
|
||||
tr
|
||||
vs
|
||||
}
|
||||
}
|
||||
vehicles {
|
||||
vanguard {
|
||||
total
|
||||
}
|
||||
prowler {
|
||||
total
|
||||
}
|
||||
magrider {
|
||||
total
|
||||
}
|
||||
lightning {
|
||||
nc
|
||||
vs
|
||||
tr
|
||||
}
|
||||
chimera {
|
||||
nc
|
||||
vs
|
||||
tr
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}</code></pre>
|
||||
<a
|
||||
href="javascript:runQuery('very_complex_query_link', 'very_complex_query_result')"
|
||||
>Run ⫸</a
|
||||
><br />
|
||||
</li>
|
||||
<li class="hidden" id="very_complex_query_result"></li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
<p>
|
||||
This API supports two query methods,
|
||||
<a href="https://graphql.org/learn/serving-over-http/#get-request">GET</a>
|
||||
and
|
||||
<a href="https://graphql.org/learn/serving-over-http/#post-request">POST</a>.
|
||||
To view the JSON outputs without fancy UIs, you can use a browser plugin like
|
||||
<a href="https://addons.mozilla.org/en-US/firefox/addon/jsonview/"
|
||||
>JSONView for Firefox</a
|
||||
>
|
||||
or
|
||||
<a
|
||||
href="https://chrome.google.com/webstore/detail/jsonvue/chklaanhfefbnpoihckbnefhakgolnmc"
|
||||
>JSONVue for Chrome</a
|
||||
>.
|
||||
</p>
|
||||
<p>
|
||||
All data is an aggregate of the last 15 minutes of Death and VehicleDestroy
|
||||
events, including both attacker and victim.
|
||||
</p>
|
||||
|
||||
<hr />
|
||||
<p>
|
||||
This API is provided by Genudine Dynamics.<br />As always, we take no
|
||||
responsibility for your use of this data... or our weapons. :)
|
||||
</p>
|
||||
<p>For help, please contact us in #api-dev on the PlanetSide 2 Discord.</p>
|
||||
<p>
|
||||
[<a href="https://github.com/genudine/saerro">github</a>] [<a
|
||||
href="https://pstop.harasse.rs"
|
||||
>pstop</a
|
||||
>]
|
||||
[<a href="/ingest">ingest stats</a>] [<a
|
||||
href="https://github.com/genudine/saerro"
|
||||
>github</a
|
||||
>] [<a href="https://pstop.harasse.rs">pstop</a>]
|
||||
</p>
|
||||
<script>
|
||||
const runQuery = async (linkId, resultId) => {
|
||||
const link = document.getElementById(linkId);
|
||||
const result = document.getElementById(resultId);
|
||||
result.innerHTML = "Loading...";
|
||||
result.classList.remove("hidden");
|
||||
fetch(link.href)
|
||||
.then((response) => response.json())
|
||||
.then((data) => {
|
||||
result.innerHTML = `<pre><code>${JSON.stringify(
|
||||
data.data,
|
||||
null,
|
||||
2
|
||||
)}</pre></code>`;
|
||||
})
|
||||
.catch((error) => {
|
||||
result.innerHTML = "Failed...";
|
||||
});
|
||||
};
|
||||
</script>
|
||||
|
|
418
services/api/src/html/ingest.html
Normal file
418
services/api/src/html/ingest.html
Normal file
|
@ -0,0 +1,418 @@
|
|||
<!DOCTYPE html>
|
||||
<title>Ingest Stats - Saerro Listening Post</title>
|
||||
<meta charset="utf-8" />
|
||||
<style>
|
||||
body {
|
||||
font-family: monospace;
|
||||
background-color: #010101;
|
||||
color: #e0e0e0;
|
||||
font-size: 1.25rem;
|
||||
line-height: 1.6;
|
||||
}
|
||||
|
||||
a {
|
||||
color: #cead42;
|
||||
text-decoration: none;
|
||||
}
|
||||
|
||||
h3 {
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
.chart-container {
|
||||
position: relative;
|
||||
}
|
||||
|
||||
.main {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(2, minmax(300px, 1fr));
|
||||
gap: 1rem;
|
||||
padding: 1rem;
|
||||
}
|
||||
|
||||
.wide {
|
||||
grid-column: span 2;
|
||||
}
|
||||
|
||||
.graph-head {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.sums-15m {
|
||||
color: rgba(165, 165, 165, 1);
|
||||
}
|
||||
|
||||
.sums-1h {
|
||||
color: rgba(165, 165, 165, 0.8);
|
||||
}
|
||||
|
||||
.sums-6h {
|
||||
color: rgba(165, 165, 165, 0.6);
|
||||
}
|
||||
|
||||
.sums-1d {
|
||||
color: rgba(165, 165, 165, 0.4);
|
||||
}
|
||||
</style>
|
||||
<h1>Ingest Stats <span id="loading">[LOADING...]</span></h1>
|
||||
<div class="main">
|
||||
<div class="wide">
|
||||
<div class="graph-head">
|
||||
<h3>All Events by Type</h3>
|
||||
<p id="all-events-by-type-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container">
|
||||
<canvas id="all-events-by-type" />
|
||||
</div>
|
||||
</div>
|
||||
<div class="wide">
|
||||
<div class="graph-head">
|
||||
<h3>Events by World</h3>
|
||||
</div>
|
||||
<div class="chart-container">
|
||||
<canvas id="events-by-world" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Connery [US West]</h3>
|
||||
<p id="connery-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="connery" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Miller [EU]</h3>
|
||||
<p id="miller-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="miller" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Cobalt [EU]</h3>
|
||||
<p id="cobalt-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="cobalt" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Emerald [US East]</h3>
|
||||
<p id="emerald-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="emerald" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Jaeger [US East]</h3>
|
||||
<p id="jaeger-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="jaeger" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>SolTech [Tokyo]</h3>
|
||||
<p id="soltech-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="soltech" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Genudine [US East] [PS4]</h3>
|
||||
<p id="genudine-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="genudine" />
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div class="graph-head">
|
||||
<h3>Ceres [EU] [PS4]</h3>
|
||||
<p id="ceres-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container smaller">
|
||||
<canvas id="ceres" />
|
||||
</div>
|
||||
</div>
|
||||
<div class="wide">
|
||||
<div class="graph-head">
|
||||
<h3>Experience Events By ID</h3>
|
||||
<p id="exp-by-id-sums">(0, 0, 0, 0)</p>
|
||||
</div>
|
||||
<div class="chart-container">
|
||||
<canvas id="exp-by-id" />
|
||||
</div>
|
||||
<div class="filter">
|
||||
Filter to World:
|
||||
<select id="exp-filter">
|
||||
<option selected value="all">All</option>
|
||||
<option value="1">Connery</option>
|
||||
<option value="10">Miller</option>
|
||||
<option value="13">Cobalt</option>
|
||||
<option value="17">Emerald</option>
|
||||
<option value="19">Jaeger</option>
|
||||
<option value="40">SolTech</option>
|
||||
<option value="1000">Genudine</option>
|
||||
<option value="2000">Ceres</option>
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<p>
|
||||
[<a href="/">home</a>] [<a href="/ingest">1 day w/ 5m buckets</a>] [<a
|
||||
href="/ingest?hi=1"
|
||||
>1 hour w/ 5s buckets</a
|
||||
>]
|
||||
</p>
|
||||
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
|
||||
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns/dist/chartjs-adapter-date-fns.bundle.min.js"></script>
|
||||
<script src="https://cdn.jsdelivr.net/npm/humanize-plus@1.8.2/dist/humanize.min.js"></script>
|
||||
<script>
|
||||
const sumsStr = (_15m, _1h, _6h, _24h) => `
|
||||
<span class="sums-15m" title="sum of last 15 minutes (${_15m})">${Humanize.intComma(
|
||||
_15m
|
||||
)},</span>
|
||||
<span class="sums-1h" title="sum of last 1 hour (${_1h})">${Humanize.compactInteger(
|
||||
_1h
|
||||
)},</span>
|
||||
<span class="sums-6h" title="sum of last 6 hours (${_6h})">${Humanize.compactInteger(
|
||||
_6h
|
||||
)},</span>
|
||||
<span class="sums-1d" title="sum of last 1 day (${_24h})">${Humanize.compactInteger(
|
||||
_24h
|
||||
)}</span>`;
|
||||
|
||||
const doSums = async (id, events) => {
|
||||
// Calculate sums for the last 15 minutes, 1 hour, 6 hours, and 1 day based on event timestamps
|
||||
let sums = [0, 0, 0, 0];
|
||||
let now = Date.now();
|
||||
let fifteenMinutes = 15 * 60 * 1000;
|
||||
let oneHour = 60 * 60 * 1000;
|
||||
let sixHours = 6 * 60 * 60 * 1000;
|
||||
let oneDay = 24 * 60 * 60 * 1000;
|
||||
|
||||
for (let ev of events) {
|
||||
let diff = now - new Date(ev.time);
|
||||
if (diff < fifteenMinutes) {
|
||||
sums[0] += ev.count;
|
||||
}
|
||||
if (diff < oneHour) {
|
||||
sums[1] += ev.count;
|
||||
}
|
||||
if (diff < sixHours) {
|
||||
sums[2] += ev.count;
|
||||
}
|
||||
if (diff < oneDay) {
|
||||
sums[3] += ev.count;
|
||||
}
|
||||
}
|
||||
|
||||
document.getElementById(`${id}-sums`).innerHTML = sumsStr(...sums);
|
||||
};
|
||||
|
||||
const allEventsByType = (id, events) => {
|
||||
doSums(id, events);
|
||||
let allEvents = events.reduce(
|
||||
(acc, ev) => {
|
||||
const eventName = ev.eventName.replace(/_[0-9]+/g, "");
|
||||
acc[eventName][ev.time] = acc[eventName][ev.time] ?? 0;
|
||||
acc[eventName][ev.time] += ev.count;
|
||||
return acc;
|
||||
},
|
||||
{ Death: {}, VehicleDestroy: {}, GainExperience: {} }
|
||||
);
|
||||
|
||||
new Chart(document.getElementById(id), {
|
||||
type: "line",
|
||||
options: {
|
||||
scales: {
|
||||
y: { beginAtZero: true, suggestedMin: 0 },
|
||||
x: { stacked: false, type: "timeseries" },
|
||||
},
|
||||
},
|
||||
data: {
|
||||
datasets: [
|
||||
{
|
||||
label: "Deaths",
|
||||
data: allEvents.Death,
|
||||
},
|
||||
{
|
||||
label: "Vehicle Destroys",
|
||||
data: allEvents.VehicleDestroy,
|
||||
},
|
||||
{
|
||||
label: "Experience Events",
|
||||
data: allEvents.GainExperience,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const experienceEventsByID = (eventsUnfiltered) => {
|
||||
const events = eventsUnfiltered.filter((ev) =>
|
||||
ev.eventName.startsWith("GainExperience_")
|
||||
);
|
||||
|
||||
doSums("exp-by-id", events);
|
||||
let allEvents = events.reduce((acc, ev) => {
|
||||
const eventID = ev.eventName.replace(/GainExperience_([0-9]+)/g, "$1");
|
||||
acc[eventID] = acc[eventID] ?? {};
|
||||
acc[eventID][ev.time] = acc[eventID][ev.time] ?? 0;
|
||||
acc[eventID][ev.time] += ev.count;
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
new Chart(document.getElementById("exp-by-id"), {
|
||||
type: "bar",
|
||||
options: {
|
||||
scales: {
|
||||
y: { stacked: true, beginAtZero: true, suggestedMin: 0 },
|
||||
x: { stacked: true, type: "timeseries" },
|
||||
},
|
||||
},
|
||||
data: {
|
||||
datasets: Object.keys(allEvents).map((id) => ({
|
||||
label: id,
|
||||
data: allEvents[id],
|
||||
})),
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const eventsByWorld = (events) => {
|
||||
let allEvents = events.reduce((acc, ev) => {
|
||||
acc[ev.worldId] = acc[ev.worldId] || {};
|
||||
acc[ev.worldId][ev.time] = (acc[ev.time] || 0) + ev.count;
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
new Chart(document.getElementById("events-by-world"), {
|
||||
type: "line",
|
||||
options: {
|
||||
scales: {
|
||||
y: { beginAtZero: true },
|
||||
x: {
|
||||
type: "timeseries",
|
||||
},
|
||||
},
|
||||
},
|
||||
data: {
|
||||
datasets: [
|
||||
{
|
||||
label: "Connery",
|
||||
data: allEvents["1"],
|
||||
},
|
||||
{
|
||||
label: "Miller",
|
||||
data: allEvents["10"],
|
||||
},
|
||||
{
|
||||
label: "Cobalt",
|
||||
data: allEvents["13"],
|
||||
},
|
||||
{
|
||||
label: "Emerald",
|
||||
data: allEvents["17"],
|
||||
},
|
||||
{
|
||||
label: "Jaeger",
|
||||
data: allEvents["19"],
|
||||
},
|
||||
{
|
||||
label: "SolTech",
|
||||
data: allEvents["40"],
|
||||
},
|
||||
{
|
||||
label: "Genudine",
|
||||
data: allEvents["1000"],
|
||||
},
|
||||
{
|
||||
label: "Ceres",
|
||||
data: allEvents["2000"],
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
(async () => {
|
||||
let resp = await fetch("/graphql", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
query: `
|
||||
{
|
||||
analytics {
|
||||
events(${
|
||||
location.search.includes("hi=1")
|
||||
? "hiPrecision: true"
|
||||
: "bucketSize: 300"
|
||||
}) {
|
||||
eventName
|
||||
count
|
||||
time
|
||||
worldId
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
}),
|
||||
});
|
||||
|
||||
let body = await resp.json();
|
||||
|
||||
let events = body.data.analytics.events;
|
||||
window.events = events;
|
||||
|
||||
document.getElementById("loading").style.display = "none";
|
||||
|
||||
allEventsByType("all-events-by-type", events);
|
||||
eventsByWorld(events);
|
||||
[
|
||||
["connery", 1],
|
||||
["miller", 10],
|
||||
["cobalt", 13],
|
||||
["emerald", 17],
|
||||
["jaeger", 19],
|
||||
["soltech", 40],
|
||||
["genudine", 1000],
|
||||
["ceres", 2000],
|
||||
].forEach(([world, id]) => {
|
||||
let worldEvents = events.filter((ev) => ev.worldId === id);
|
||||
allEventsByType(world, worldEvents);
|
||||
});
|
||||
|
||||
const expFilter = document.getElementById("exp-filter");
|
||||
experienceEventsByID(
|
||||
expFilter.value === "all"
|
||||
? events
|
||||
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
|
||||
);
|
||||
|
||||
expFilter.addEventListener("change", () => {
|
||||
document.getElementById("exp-by-id").outerHTML =
|
||||
"<canvas id='exp-by-id' />";
|
||||
experienceEventsByID(
|
||||
expFilter.value === "all"
|
||||
? events
|
||||
: events.filter((ev) => ev.worldId === parseInt(expFilter.value))
|
||||
);
|
||||
});
|
||||
})();
|
||||
</script>
|
|
@ -1,7 +1,10 @@
|
|||
mod analytics;
|
||||
mod classes;
|
||||
mod factions;
|
||||
mod health;
|
||||
mod population;
|
||||
mod query;
|
||||
mod telemetry;
|
||||
mod utils;
|
||||
mod vehicles;
|
||||
mod world;
|
||||
|
@ -24,9 +27,15 @@ use tower_http::cors::{Any, CorsLayer};
|
|||
extern crate serde_json;
|
||||
|
||||
async fn index() -> Html<&'static str> {
|
||||
telemetry::http_request("/", "GET");
|
||||
Html(include_str!("html/index.html"))
|
||||
}
|
||||
|
||||
async fn ingest() -> Html<&'static str> {
|
||||
telemetry::http_request("/ingest", "GET");
|
||||
Html(include_str!("html/ingest.html"))
|
||||
}
|
||||
|
||||
async fn handle_404() -> Html<&'static str> {
|
||||
Html(include_str!("html/404.html"))
|
||||
}
|
||||
|
@ -35,6 +44,7 @@ async fn graphql_handler_post(
|
|||
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
||||
Json(query): Json<Request>,
|
||||
) -> Json<Response> {
|
||||
telemetry::http_request("/graphql", "POST");
|
||||
Json(schema.execute(query).await)
|
||||
}
|
||||
|
||||
|
@ -42,6 +52,8 @@ async fn graphql_handler_get(
|
|||
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
||||
query: Query<Request>,
|
||||
) -> axum::response::Response {
|
||||
telemetry::http_request("/graphql", "GET");
|
||||
|
||||
if query.query == "" {
|
||||
return Redirect::to("/graphiql").into_response();
|
||||
}
|
||||
|
@ -50,6 +62,8 @@ async fn graphql_handler_get(
|
|||
}
|
||||
|
||||
async fn graphiql() -> impl IntoResponse {
|
||||
telemetry::http_request("/graphiql", "GET");
|
||||
|
||||
Html(
|
||||
GraphiQLSource::build()
|
||||
.endpoint("/graphql")
|
||||
|
@ -61,7 +75,7 @@ async fn graphiql() -> impl IntoResponse {
|
|||
#[tokio::main]
|
||||
async fn main() {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
||||
let db = sqlx::PgPool::connect(&db_url).await.unwrap();
|
||||
|
||||
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
|
||||
|
@ -70,12 +84,15 @@ async fn main() {
|
|||
|
||||
let app = Router::new()
|
||||
.route("/", get(index))
|
||||
.route("/ingest", get(ingest))
|
||||
.route("/health", get(health::get_health))
|
||||
.route(
|
||||
"/graphql",
|
||||
post(graphql_handler_post).get(graphql_handler_get),
|
||||
)
|
||||
.route("/graphql/playground", get(graphiql))
|
||||
.route("/graphiql", get(graphiql))
|
||||
.route("/metrics", get(telemetry::handler))
|
||||
.route("/metrics/combined", get(telemetry::handler_combined))
|
||||
.fallback(handle_404)
|
||||
.layer(Extension(db))
|
||||
.layer(Extension(schema))
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
use crate::utils::Filters;
|
||||
use crate::{
|
||||
factions::{NC, NSO, TR, VS},
|
||||
utils::Filters,
|
||||
telemetry,
|
||||
};
|
||||
use async_graphql::{Context, Object};
|
||||
use sqlx::{Pool, Postgres, Row};
|
||||
|
||||
|
@ -19,8 +23,9 @@ impl Population {
|
|||
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("players", "population_by_faction");
|
||||
let sql = format!(
|
||||
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' AND faction_id = $1 {};",
|
||||
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
|
||||
self.filters.sql(),
|
||||
);
|
||||
|
||||
|
@ -40,10 +45,13 @@ impl Population {
|
|||
#[Object]
|
||||
impl Population {
|
||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Population", "total");
|
||||
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("players", "population_total");
|
||||
let sql = format!(
|
||||
"SELECT count(distinct character_id) FROM players WHERE time > now() - interval '15 minutes' {};",
|
||||
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
|
||||
self.filters.sql(),
|
||||
);
|
||||
|
||||
|
@ -54,20 +62,24 @@ impl Population {
|
|||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
|
||||
|
||||
query
|
||||
}
|
||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
self.by_faction(ctx, 1).await
|
||||
telemetry::graphql_query("Population", "nc");
|
||||
self.by_faction(ctx, NC).await
|
||||
}
|
||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
self.by_faction(ctx, 2).await
|
||||
telemetry::graphql_query("Population", "vs");
|
||||
self.by_faction(ctx, VS).await
|
||||
}
|
||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
self.by_faction(ctx, 3).await
|
||||
telemetry::graphql_query("Population", "tr");
|
||||
self.by_faction(ctx, TR).await
|
||||
}
|
||||
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
self.by_faction(ctx, 4).await
|
||||
telemetry::graphql_query("Population", "ns");
|
||||
self.by_faction(ctx, NSO).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
classes::ClassesQuery, health::HealthQuery, population::PopulationQuery,
|
||||
vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
|
||||
analytics::AnalyticsQuery, classes::ClassesQuery, health::HealthQuery,
|
||||
population::PopulationQuery, vehicles::VehicleQuery, world::WorldQuery, zone::ZoneQuery,
|
||||
};
|
||||
use async_graphql::MergedObject;
|
||||
|
||||
|
@ -12,4 +12,5 @@ pub struct Query(
|
|||
WorldQuery,
|
||||
ZoneQuery,
|
||||
HealthQuery,
|
||||
AnalyticsQuery,
|
||||
);
|
||||
|
|
138
services/api/src/telemetry.rs
Normal file
138
services/api/src/telemetry.rs
Normal file
|
@ -0,0 +1,138 @@
|
|||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
IntGauge,
|
||||
IntGaugeVec,
|
||||
register_int_gauge_vec,
|
||||
register_int_gauge,
|
||||
TextEncoder,
|
||||
gather
|
||||
};
|
||||
use sqlx::{Pool, Postgres, Row};
|
||||
use axum::Extension;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
lazy_static! {
|
||||
// http
|
||||
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
|
||||
"route", "method"
|
||||
]).unwrap();
|
||||
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
|
||||
"major", "minor"
|
||||
]).unwrap();
|
||||
|
||||
// counters
|
||||
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
|
||||
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
|
||||
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
|
||||
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
|
||||
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
|
||||
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
|
||||
|
||||
// database stuff
|
||||
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
}
|
||||
|
||||
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
|
||||
update_data_gauges(pool).await;
|
||||
|
||||
// Final output
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = String::new();
|
||||
|
||||
let metrics = gather();
|
||||
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
|
||||
|
||||
buffer
|
||||
}
|
||||
|
||||
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
|
||||
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
|
||||
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
|
||||
|
||||
let local = handler(Extension(pool)).await;
|
||||
let remote = match reqwest::get(url).await {
|
||||
Ok(r) => r.text().await.expect("failed to text lol"),
|
||||
Err(_) => String::from("")
|
||||
};
|
||||
|
||||
|
||||
format!("{}{}", local, remote)
|
||||
}
|
||||
|
||||
// pub fn db_write(table: &str, op: &str) {
|
||||
// DB_WRITES.with_label_values(&[table, op]).inc();
|
||||
// }
|
||||
|
||||
pub fn db_read(table: &str, op: &str) {
|
||||
DB_READS.with_label_values(&[table, op]).inc();
|
||||
}
|
||||
|
||||
pub fn http_request(route: &str, method: &str) {
|
||||
HTTP_REQUEST.with_label_values(&[route, method]).inc();
|
||||
}
|
||||
|
||||
pub fn graphql_query(major: &str, minor: &str) {
|
||||
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
|
||||
}
|
||||
|
||||
async fn update_data_gauges(pool: Pool<Postgres>) {
|
||||
// Do some easy queries to fill our non-cumulative gauges
|
||||
db_read("players", "count_all");
|
||||
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
PLAYERS_TRACKED.set(player_count);
|
||||
|
||||
db_read("players", "get_newest");
|
||||
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
NEWEST_PLAYER.set(player_newest.timestamp());
|
||||
|
||||
db_read("players", "get_oldest");
|
||||
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
OLDEST_PLAYER.set(player_oldest.timestamp());
|
||||
|
||||
db_read("vehicles", "count_all");
|
||||
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
VEHICLES_TRACKED.set(vehicle_count);
|
||||
|
||||
db_read("vehicles", "get_newest");
|
||||
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
|
||||
|
||||
db_read("vehicles", "get_oldest");
|
||||
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
|
||||
}
|
|
@ -1,4 +1,8 @@
|
|||
use crate::utils::{Filters, IdOrNameBy};
|
||||
use crate::{
|
||||
factions::{NC, TR, VS},
|
||||
utils::{Filters, IdOrNameBy},
|
||||
telemetry,
|
||||
};
|
||||
use async_graphql::{Context, Object};
|
||||
use sqlx::{Pool, Postgres, Row};
|
||||
|
||||
|
@ -12,8 +16,9 @@ impl Vehicle {
|
|||
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("vehicles", "fetch");
|
||||
let sql = format!(
|
||||
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' AND vehicle_id = $1 {};",
|
||||
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
|
||||
filters.sql(),
|
||||
);
|
||||
|
||||
|
@ -33,33 +38,41 @@ impl Vehicle {
|
|||
#[Object]
|
||||
impl Vehicle {
|
||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Vehicle", "total");
|
||||
|
||||
self.fetch(ctx, self.filters.clone()).await
|
||||
}
|
||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Vehicle", "nc");
|
||||
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(1)),
|
||||
faction: Some(IdOrNameBy::Id(NC)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Vehicle", "tr");
|
||||
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(2)),
|
||||
faction: Some(IdOrNameBy::Id(TR)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Vehicle", "vs");
|
||||
|
||||
self.fetch(
|
||||
ctx,
|
||||
Filters {
|
||||
faction: Some(IdOrNameBy::Id(3)),
|
||||
faction: Some(IdOrNameBy::Id(VS)),
|
||||
..self.filters.clone()
|
||||
},
|
||||
)
|
||||
|
@ -83,10 +96,13 @@ impl Vehicles {
|
|||
#[Object]
|
||||
impl Vehicles {
|
||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||
telemetry::graphql_query("Vehicles", "total");
|
||||
|
||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||
|
||||
telemetry::db_read("players", "vehicles_total");
|
||||
let sql = format!(
|
||||
"SELECT count(distinct character_id) FROM vehicles WHERE time > now() - interval '15 minutes' {};",
|
||||
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
|
||||
self.filters.sql(),
|
||||
);
|
||||
|
||||
|
@ -103,62 +119,90 @@ impl Vehicles {
|
|||
|
||||
// Transport
|
||||
async fn flash(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "flash");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "flash".to_string(),
|
||||
}
|
||||
}
|
||||
async fn sunderer(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "sunderer");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "sunderer".to_string(),
|
||||
}
|
||||
}
|
||||
async fn ant(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "ant");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "ant".to_string(),
|
||||
}
|
||||
}
|
||||
async fn harasser(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "harasser");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "harasser".to_string(),
|
||||
}
|
||||
}
|
||||
async fn javelin(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "javelin");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "javelin".to_string(),
|
||||
}
|
||||
}
|
||||
async fn corsair(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "corsair");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "corsair".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
// Tanks
|
||||
async fn lightning(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "lightning");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "javelin".to_string(),
|
||||
vehicle_name: "lightning".to_string(),
|
||||
}
|
||||
}
|
||||
async fn prowler(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "prowler");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "prowler".to_string(),
|
||||
}
|
||||
}
|
||||
async fn vanguard(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "vanguard");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "vanguard".to_string(),
|
||||
}
|
||||
}
|
||||
async fn magrider(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "magrider");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "magrider".to_string(),
|
||||
}
|
||||
}
|
||||
async fn chimera(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "chimera");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "chimera".to_string(),
|
||||
|
@ -167,42 +211,56 @@ impl Vehicles {
|
|||
|
||||
// Air
|
||||
async fn mosquito(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "mosquito");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "mosquito".to_string(),
|
||||
}
|
||||
}
|
||||
async fn liberator(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "liberator");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "liberator".to_string(),
|
||||
}
|
||||
}
|
||||
async fn galaxy(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "galaxy");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "galaxy".to_string(),
|
||||
}
|
||||
}
|
||||
async fn valkyrie(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "valkyrie");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "valkyrie".to_string(),
|
||||
}
|
||||
}
|
||||
async fn reaver(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "reaver");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "reaver".to_string(),
|
||||
}
|
||||
}
|
||||
async fn scythe(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "scythe");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "scythe".to_string(),
|
||||
}
|
||||
}
|
||||
async fn dervish(&self) -> Vehicle {
|
||||
telemetry::graphql_query("Vehicle", "dervish");
|
||||
|
||||
Vehicle {
|
||||
filters: self.filters.clone(),
|
||||
vehicle_name: "dervish".to_string(),
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::{
|
|||
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
|
||||
vehicles::Vehicles,
|
||||
zone::Zones,
|
||||
telemetry,
|
||||
};
|
||||
use async_graphql::Object;
|
||||
|
||||
|
@ -33,11 +34,15 @@ impl World {
|
|||
impl World {
|
||||
/// The ID of the world.
|
||||
async fn id(&self) -> i32 {
|
||||
telemetry::graphql_query("World", "id");
|
||||
|
||||
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
|
||||
}
|
||||
|
||||
/// The name of the world, in official game capitalization.
|
||||
async fn name(&self) -> String {
|
||||
telemetry::graphql_query("World", "name");
|
||||
|
||||
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
|
||||
|
||||
// Special case for SolTech, lol.
|
||||
|
@ -51,6 +56,8 @@ impl World {
|
|||
|
||||
/// Population filtered to this world.
|
||||
async fn population(&self) -> Population {
|
||||
telemetry::graphql_query("World", "population");
|
||||
|
||||
Population::new(Some(Filters {
|
||||
world: self.filter.world.clone(),
|
||||
faction: None,
|
||||
|
@ -60,6 +67,8 @@ impl World {
|
|||
|
||||
/// Vehicles filtered to this world.
|
||||
async fn vehicles(&self) -> Vehicles {
|
||||
telemetry::graphql_query("World", "vehicles");
|
||||
|
||||
Vehicles::new(Some(Filters {
|
||||
world: self.filter.world.clone(),
|
||||
faction: None,
|
||||
|
@ -69,6 +78,8 @@ impl World {
|
|||
|
||||
/// Classes filtered to this world.
|
||||
async fn classes(&self) -> Classes {
|
||||
telemetry::graphql_query("World", "classes");
|
||||
|
||||
Classes::new(Some(Filters {
|
||||
world: self.filter.world.clone(),
|
||||
faction: None,
|
||||
|
@ -78,6 +89,8 @@ impl World {
|
|||
|
||||
/// Get a specific zone/continent on this world.
|
||||
async fn zones(&self) -> Zones {
|
||||
telemetry::graphql_query("World", "zones");
|
||||
|
||||
Zones::new(Some(self.filter.clone()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ use crate::{
|
|||
population::Population,
|
||||
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
|
||||
vehicles::Vehicles,
|
||||
telemetry,
|
||||
};
|
||||
use async_graphql::Object;
|
||||
|
||||
|
@ -23,11 +24,15 @@ impl Zone {
|
|||
impl Zone {
|
||||
/// The ID of the zone/continent.
|
||||
async fn id(&self) -> i32 {
|
||||
telemetry::graphql_query("Zone", "id");
|
||||
|
||||
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
|
||||
}
|
||||
|
||||
/// The name of the continent, in official game capitalization.
|
||||
async fn name(&self) -> String {
|
||||
telemetry::graphql_query("Zone", "name");
|
||||
|
||||
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
|
||||
|
||||
// Capitalize the first letter
|
||||
|
@ -35,14 +40,20 @@ impl Zone {
|
|||
}
|
||||
|
||||
async fn population(&self) -> Population {
|
||||
telemetry::graphql_query("Zone", "population");
|
||||
|
||||
Population::new(Some(self.filters.clone()))
|
||||
}
|
||||
|
||||
async fn vehicles(&self) -> Vehicles {
|
||||
telemetry::graphql_query("Zone", "vehicles");
|
||||
|
||||
Vehicles::new(Some(self.filters.clone()))
|
||||
}
|
||||
|
||||
async fn classes(&self) -> Classes {
|
||||
telemetry::graphql_query("Zone", "classes");
|
||||
|
||||
Classes::new(Some(self.filters.clone()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,10 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
redis = { version = "0.22.1", features = ["aio", "r2d2", "tokio-comp"] }
|
||||
once_cell = "1.16.0"
|
||||
tokio = { version = "1.23.0", features = ["full"] }
|
||||
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
|
||||
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
|
||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"postgres",
|
||||
] }
|
||||
lazy_static = "1.4.0"
|
||||
async_once = "0.2.6"
|
||||
dotenvy = "0.15.6"
|
|
@ -1,5 +1,4 @@
|
|||
use async_once::AsyncOnce;
|
||||
use dotenvy::dotenv;
|
||||
use lazy_static::lazy_static;
|
||||
use migrations::cmd_migrate;
|
||||
use sqlx::query;
|
||||
|
@ -10,7 +9,7 @@ mod migrations;
|
|||
lazy_static! {
|
||||
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
||||
sqlx::PgPool::connect(&db_url).await.unwrap()
|
||||
});
|
||||
}
|
||||
|
@ -19,21 +18,14 @@ async fn cmd_prune() {
|
|||
println!("Pruning old data...");
|
||||
let pool = PG.get().await;
|
||||
|
||||
let rows = query("DELETE FROM players WHERE time < NOW() - INTERVAL '15 minutes';")
|
||||
let rows = query("DELETE FROM players WHERE last_updated < NOW() - INTERVAL '15 minutes';")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.rows_affected();
|
||||
println!("Deleted {} rows of old player data", rows);
|
||||
|
||||
let rows = query("DELETE FROM classes WHERE time < NOW() - INTERVAL '15 minutes';")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.rows_affected();
|
||||
println!("Deleted {} rows of old class data", rows);
|
||||
|
||||
let rows = query("DELETE FROM vehicles WHERE time < NOW() - INTERVAL '15 minutes';")
|
||||
let rows = query("DELETE FROM vehicles WHERE last_updated < NOW() - INTERVAL '15 minutes';")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -58,14 +50,41 @@ fn cmd_help() {
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenv().ok();
|
||||
|
||||
let command = args().nth(1).unwrap_or("help".to_string());
|
||||
|
||||
match command.as_str() {
|
||||
"help" => cmd_help(),
|
||||
"prune" => cmd_prune().await,
|
||||
"auto-prune" => loop {
|
||||
cmd_prune().await;
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
|
||||
},
|
||||
"maintenance" => {
|
||||
println!("Running maintenance tasks...");
|
||||
println!("Checking if DB is migrated...");
|
||||
if !migrations::is_migrated().await {
|
||||
println!("DB is not migrated, running migrations...");
|
||||
cmd_migrate().await;
|
||||
}
|
||||
|
||||
println!("Running prune...");
|
||||
cmd_prune().await;
|
||||
println!("Done!");
|
||||
}
|
||||
"auto-maintenance" => loop {
|
||||
println!("Running maintenance tasks...");
|
||||
if !migrations::is_migrated().await {
|
||||
println!("DB is not migrated, running migrations...");
|
||||
cmd_migrate().await;
|
||||
}
|
||||
|
||||
cmd_prune().await;
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
|
||||
},
|
||||
"migrate" => cmd_migrate().await,
|
||||
"print-env" => {
|
||||
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
|
||||
}
|
||||
_ => {
|
||||
println!("Unknown command: {}", command);
|
||||
cmd_help();
|
||||
|
|
|
@ -1,15 +1,10 @@
|
|||
use crate::PG;
|
||||
use sqlx::query;
|
||||
use sqlx::{query, Row};
|
||||
|
||||
pub async fn cmd_migrate() {
|
||||
println!("Migrating database...");
|
||||
|
||||
tokio::join!(
|
||||
migrate_players(),
|
||||
migrate_classes(),
|
||||
migrate_vehicles(),
|
||||
migrate_analytics()
|
||||
);
|
||||
tokio::join!(migrate_players(), migrate_vehicles(), migrate_analytics());
|
||||
}
|
||||
|
||||
async fn migrate_players() {
|
||||
|
@ -26,77 +21,21 @@ async fn migrate_players() {
|
|||
println!("PLAYERS => CREATE TABLE players");
|
||||
query(
|
||||
"CREATE TABLE players (
|
||||
character_id TEXT NOT NULL,
|
||||
time TIMESTAMPTZ NOT NULL,
|
||||
character_id TEXT NOT NULL PRIMARY KEY,
|
||||
last_updated TIMESTAMPTZ NOT NULL,
|
||||
world_id INT NOT NULL,
|
||||
faction_id INT NOT NULL,
|
||||
zone_id INT NOT NULL);",
|
||||
zone_id INT NOT NULL,
|
||||
class_name TEXT NOT NULL
|
||||
);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("PLAYERS => create_hypertable");
|
||||
query(
|
||||
"SELECT create_hypertable('players', 'time',
|
||||
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("PLAYERS => add_retention_policy");
|
||||
query("SELECT add_retention_policy('players', INTERVAL '15 minutes');")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("PLAYERS => done!");
|
||||
}
|
||||
|
||||
async fn migrate_classes() {
|
||||
let pool = PG.get().await;
|
||||
|
||||
println!("-> Migrating classes");
|
||||
|
||||
println!("CLASSES => DROP TABLE IF EXISTS classes");
|
||||
query("DROP TABLE IF EXISTS classes")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("CLASSES => CREATE TABLE classes");
|
||||
query(
|
||||
"CREATE TABLE classes (
|
||||
character_id TEXT NOT NULL,
|
||||
time TIMESTAMPTZ NOT NULL,
|
||||
world_id INT NOT NULL,
|
||||
faction_id INT NOT NULL,
|
||||
zone_id INT NOT NULL,
|
||||
class_id TEXT NOT NULL);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("CLASSES => create_hypertable");
|
||||
query(
|
||||
"SELECT create_hypertable('classes', 'time',
|
||||
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("CLASSES => add_retention_policy");
|
||||
query("SELECT add_retention_policy('classes', INTERVAL '15 minutes');")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("CLASSES => done!");
|
||||
}
|
||||
|
||||
async fn migrate_vehicles() {
|
||||
let pool = PG.get().await;
|
||||
|
||||
|
@ -111,33 +50,18 @@ async fn migrate_vehicles() {
|
|||
println!("VEHICLES => CREATE TABLE vehicles");
|
||||
query(
|
||||
"CREATE TABLE vehicles (
|
||||
character_id TEXT NOT NULL,
|
||||
time TIMESTAMPTZ NOT NULL,
|
||||
character_id TEXT NOT NULL PRIMARY KEY,
|
||||
last_updated TIMESTAMPTZ NOT NULL,
|
||||
world_id INT NOT NULL,
|
||||
faction_id INT NOT NULL,
|
||||
zone_id INT NOT NULL,
|
||||
vehicle_id TEXT NOT NULL);",
|
||||
zone_id INT NOT NULL,
|
||||
vehicle_name TEXT NOT NULL
|
||||
);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("VEHICLES => create_hypertable");
|
||||
query(
|
||||
"SELECT create_hypertable('vehicles', 'time',
|
||||
chunk_time_interval => INTERVAL '1 minute', if_not_exists => TRUE);",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("VEHICLES => add_retention_policy");
|
||||
|
||||
query("SELECT add_retention_policy('vehicles', INTERVAL '15 minutes');")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("VEHICLES => done!");
|
||||
}
|
||||
|
||||
|
@ -173,3 +97,15 @@ async fn migrate_analytics() {
|
|||
|
||||
println!("ANALYTICS => done!");
|
||||
}
|
||||
|
||||
pub async fn is_migrated() -> bool {
|
||||
let pool = PG.get().await;
|
||||
|
||||
let tables: i64 = query("SELECT count(1) FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('players', 'vehicles', 'analytics');")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
.get(0);
|
||||
|
||||
tables == 3
|
||||
}
|
||||
|
|
|
@ -6,16 +6,22 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
redis = { version = "0.22.1", default_features = false, features = ["r2d2"] }
|
||||
lazy_static = "1.4.0"
|
||||
tokio-tungstenite = { version = "0.18.0", features=["native-tls"] }
|
||||
serde = { version = "1.0.149", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
||||
tokio = { version = "1.23.0", features = ["full"] }
|
||||
sqlx = { version = "0.6.2", features = [ "runtime-tokio-native-tls" , "postgres" ] }
|
||||
url = "2.3.1"
|
||||
futures-util = "0.3.25"
|
||||
futures = "0.3.25"
|
||||
tokio-tungstenite = { version = "0.20.0", features = [
|
||||
"rustls-tls-webpki-roots",
|
||||
] }
|
||||
serde = { version = "1.0.188", features = ["derive"] }
|
||||
serde_json = "1.0.105"
|
||||
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
|
||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"postgres",
|
||||
] }
|
||||
url = "2.4.1"
|
||||
futures-util = "0.3.28"
|
||||
futures = "0.3.28"
|
||||
async_once = "0.2.6"
|
||||
serde-aux = "4.1.2"
|
||||
axum = "0.6.1"
|
||||
serde-aux = "4.2.0"
|
||||
axum = "0.6.20"
|
||||
prometheus = "0.13.3"
|
||||
prometheus-static-metric = "0.5.1"
|
||||
|
|
|
@ -1,42 +1,50 @@
|
|||
use async_once::AsyncOnce;
|
||||
use axum::{routing::get, Router};
|
||||
use axum::{routing::get, Json, Router};
|
||||
use futures::{pin_mut, FutureExt};
|
||||
use futures_util::StreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
use serde::Deserialize;
|
||||
use serde_aux::prelude::*;
|
||||
use serde_json::json;
|
||||
use sqlx::{postgres::PgPoolOptions, query};
|
||||
use sqlx::{postgres::PgPoolOptions, query, Row};
|
||||
use std::{env, net::SocketAddr};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
|
||||
mod translators;
|
||||
mod telemetry;
|
||||
|
||||
lazy_static! {
|
||||
// static ref PAIR: String = env::var("PAIR").unwrap_or_default();
|
||||
// static ref ROLE: String = env::var("ROLE").unwrap_or("primary".to_string());
|
||||
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
|
||||
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
||||
let db_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
||||
PgPoolOptions::new().connect(&db_url).await.unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
|
||||
let worlds_raw = env::var("WORLDS").unwrap_or_default();
|
||||
if worlds_raw == "" {
|
||||
println!("WORLDS not set");
|
||||
return;
|
||||
}
|
||||
let worlds_raw = env::var("WORLDS").unwrap_or("all".to_string());
|
||||
let worlds: Vec<&str> = worlds_raw.split(',').collect();
|
||||
|
||||
let experience_ids = vec![
|
||||
2, 3, 4, 5, 6, 7, 34, 51, 53, 55, 57, 86, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
|
||||
100, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 201, 233, 293,
|
||||
294, 302, 303, 353, 354, 355, 438, 439, 503, 505, 579, 581, 584, 653, 656, 674, 675,
|
||||
];
|
||||
let mut events = experience_ids
|
||||
.iter()
|
||||
.map(|id| format!("GainExperience_experience_id_{}", id))
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
events.push("Death".to_string());
|
||||
events.push("VehicleDestroy".to_string());
|
||||
|
||||
// Send setup message
|
||||
let setup_msg = json!({
|
||||
"action": "subscribe",
|
||||
"worlds": worlds,
|
||||
"eventNames": ["Death", "VehicleDestroy"],
|
||||
"eventNames": events,
|
||||
"characters": ["all"],
|
||||
"logicalAndCharactersWithWorlds": true,
|
||||
"service": "event",
|
||||
|
@ -46,6 +54,7 @@ async fn send_init(tx: futures::channel::mpsc::UnboundedSender<Message>) {
|
|||
.unwrap();
|
||||
|
||||
println!("[ws] Sent setup message");
|
||||
println!("[ws/setup] {}", setup_msg.to_string())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -54,33 +63,32 @@ struct PopEvent {
|
|||
team_id: i32,
|
||||
character_id: String,
|
||||
zone_id: i32,
|
||||
}
|
||||
|
||||
struct VehicleEvent {
|
||||
world_id: i32,
|
||||
vehicle_id: String,
|
||||
character_id: String,
|
||||
zone_id: i32,
|
||||
team_id: i32,
|
||||
}
|
||||
|
||||
struct ClassEvent {
|
||||
world_id: i32,
|
||||
character_id: String,
|
||||
loadout_id: String,
|
||||
zone_id: i32,
|
||||
team_id: i32,
|
||||
vehicle_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AnalyticsEvent {
|
||||
world_id: i32,
|
||||
event_name: String,
|
||||
}
|
||||
|
||||
// async fn track_pop(pop_event: PopEvent) {
|
||||
// track_pop_db(pop_event.clone()).await;
|
||||
// track_pop_redis(pop_event).await;
|
||||
// }
|
||||
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
|
||||
let pool = PG.get().await;
|
||||
|
||||
telemetry::db_read("players", "get_team_id");
|
||||
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
|
||||
.bind(character_id.clone())
|
||||
.fetch_one(pool)
|
||||
.await?
|
||||
.get(0);
|
||||
|
||||
if team_id == 0 {
|
||||
return Err(sqlx::Error::RowNotFound);
|
||||
}
|
||||
|
||||
Ok(team_id)
|
||||
}
|
||||
|
||||
async fn track_pop(pop_event: PopEvent) {
|
||||
// println!("[ws/track_pop]");
|
||||
|
@ -91,76 +99,31 @@ async fn track_pop(pop_event: PopEvent) {
|
|||
team_id,
|
||||
character_id,
|
||||
zone_id,
|
||||
loadout_id,
|
||||
vehicle_id,
|
||||
} = pop_event;
|
||||
|
||||
query("INSERT INTO players (time, character_id, world_id, faction_id, zone_id) VALUES (now(), $1, $2, $3, $4);")
|
||||
.bind(character_id)
|
||||
.bind(world_id)
|
||||
.bind(team_id)
|
||||
.bind(zone_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn track_vehicle(vehicle_event: VehicleEvent) {
|
||||
// println!("[ws/track_vehicle]");
|
||||
let pool = PG.get().await;
|
||||
|
||||
let VehicleEvent {
|
||||
world_id,
|
||||
vehicle_id,
|
||||
zone_id,
|
||||
character_id,
|
||||
team_id,
|
||||
} = vehicle_event;
|
||||
|
||||
let vehicle_name = translators::vehicle_to_name(vehicle_id.as_str());
|
||||
|
||||
if vehicle_name == "unknown" {
|
||||
return;
|
||||
}
|
||||
|
||||
query("INSERT INTO vehicles (time, character_id, world_id, faction_id, zone_id, vehicle_id) VALUES (now(), $1, $2, $3, $4, $5);")
|
||||
.bind(character_id)
|
||||
.bind(world_id)
|
||||
.bind(team_id)
|
||||
.bind(zone_id)
|
||||
.bind(vehicle_name)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn track_class(class_event: ClassEvent) {
|
||||
// println!("[ws/track_class]");
|
||||
let pool = PG.get().await;
|
||||
|
||||
let ClassEvent {
|
||||
world_id,
|
||||
character_id,
|
||||
loadout_id,
|
||||
zone_id,
|
||||
team_id,
|
||||
} = class_event;
|
||||
|
||||
let class_name = translators::loadout_to_class(loadout_id.as_str());
|
||||
let vehicle_name = if vehicle_id == "" {
|
||||
"unknown".to_string()
|
||||
} else {
|
||||
translators::vehicle_to_name(vehicle_id.as_str())
|
||||
};
|
||||
|
||||
if class_name == "unknown" {
|
||||
return;
|
||||
}
|
||||
|
||||
telemetry::db_write("players", "track_pop");
|
||||
query(
|
||||
"INSERT INTO classes (
|
||||
time,
|
||||
character_id,
|
||||
world_id,
|
||||
faction_id,
|
||||
zone_id,
|
||||
class_id
|
||||
) VALUES (now(), $1, $2, $3, $4, $5);",
|
||||
"
|
||||
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
|
||||
VALUES (now(), $1, $2, $3, $4, $5)
|
||||
ON CONFLICT (character_id) DO UPDATE SET
|
||||
last_updated = EXCLUDED.last_updated,
|
||||
world_id = EXCLUDED.world_id,
|
||||
faction_id = EXCLUDED.faction_id,
|
||||
zone_id = EXCLUDED.zone_id,
|
||||
class_name = EXCLUDED.class_name
|
||||
;",
|
||||
)
|
||||
.bind(character_id)
|
||||
.bind(character_id.clone())
|
||||
.bind(world_id)
|
||||
.bind(team_id)
|
||||
.bind(zone_id)
|
||||
|
@ -168,10 +131,31 @@ async fn track_class(class_event: ClassEvent) {
|
|||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if vehicle_name != "unknown" {
|
||||
telemetry::db_write("vehicles", "track_pop");
|
||||
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
|
||||
VALUES (now(), $1, $2, $3, $4, $5)
|
||||
ON CONFLICT (character_id) DO UPDATE SET
|
||||
last_updated = EXCLUDED.last_updated,
|
||||
world_id = EXCLUDED.world_id,
|
||||
faction_id = EXCLUDED.faction_id,
|
||||
zone_id = EXCLUDED.zone_id,
|
||||
vehicle_name = EXCLUDED.vehicle_name
|
||||
;")
|
||||
.bind(character_id)
|
||||
.bind(world_id)
|
||||
.bind(team_id)
|
||||
.bind(zone_id)
|
||||
.bind(vehicle_name)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn track_analytics(analytics_event: AnalyticsEvent) {
|
||||
// println!("[ws/track_analytics]");
|
||||
// println!("[ws/track_analytics] {:?}", analytics_event);
|
||||
let pool = PG.get().await;
|
||||
|
||||
let AnalyticsEvent {
|
||||
|
@ -179,15 +163,21 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
|
|||
event_name,
|
||||
} = analytics_event;
|
||||
|
||||
query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
|
||||
telemetry::db_write("analytics", "track_analytics");
|
||||
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
|
||||
.bind(world_id)
|
||||
.bind(event_name)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("[ws/track_analytics] ERR => {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_event(event: &Event) {
|
||||
async fn process_death_event(event: &Event) {
|
||||
let mut set = JoinSet::new();
|
||||
// println!("[ws/process_event] EVENT: {:?}", event);
|
||||
|
||||
|
@ -196,33 +186,14 @@ async fn process_event(event: &Event) {
|
|||
event_name: event.event_name.clone(),
|
||||
}));
|
||||
|
||||
if event.character_id != "0" {
|
||||
// General population tracking
|
||||
if event.character_id != "" && event.character_id != "0" {
|
||||
set.spawn(track_pop(PopEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
team_id: event.team_id.clone(),
|
||||
character_id: event.character_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
if event.event_name == "VehicleDestroy" {
|
||||
set.spawn(track_vehicle(VehicleEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
vehicle_id: event.vehicle_id.clone(),
|
||||
character_id: event.character_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
team_id: event.team_id.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
if event.event_name == "Death" {
|
||||
set.spawn(track_class(ClassEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
character_id: event.character_id.clone(),
|
||||
loadout_id: event.loadout_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
team_id: event.team_id.clone(),
|
||||
vehicle_id: event.vehicle_id.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -235,42 +206,58 @@ async fn process_event(event: &Event) {
|
|||
team_id: event.attacker_team_id.clone(),
|
||||
character_id: event.attacker_character_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
loadout_id: event.attacker_loadout_id.clone(),
|
||||
vehicle_id: event.attacker_vehicle_id.clone(),
|
||||
}));
|
||||
|
||||
if event.event_name == "VehicleDestroy" {
|
||||
set.spawn(track_vehicle(VehicleEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
vehicle_id: event.attacker_vehicle_id.clone(),
|
||||
character_id: event.attacker_character_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
team_id: event.attacker_team_id.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
if event.event_name == "Death" {
|
||||
set.spawn(track_class(ClassEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
character_id: event.attacker_character_id.clone(),
|
||||
loadout_id: event.attacker_loadout_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
team_id: event.attacker_team_id.clone(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(_) = set.join_next().await {}
|
||||
}
|
||||
|
||||
async fn process_exp_event(event: &Event) {
|
||||
telemetry::experience_event(&event.world_id, &event.experience_id);
|
||||
let mut set = JoinSet::new();
|
||||
// println!("[ws/process_event] EVENT: {:?}", event);
|
||||
|
||||
set.spawn(track_analytics(AnalyticsEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
event_name: format!(
|
||||
"{}_{}",
|
||||
event.event_name.clone(),
|
||||
event.experience_id.clone()
|
||||
),
|
||||
}));
|
||||
|
||||
// Vehicle EXP events
|
||||
let vehicle_id = match event.experience_id {
|
||||
201 => "11".to_string(), // Galaxy Spawn Bonus
|
||||
233 => "2".to_string(), // Sunderer Spawn Bonus
|
||||
674 | 675 => "160".to_string(), // ANT stuff
|
||||
_ => "".to_string(),
|
||||
};
|
||||
|
||||
set.spawn(track_pop(PopEvent {
|
||||
world_id: event.world_id.clone(),
|
||||
team_id: event.team_id.clone(),
|
||||
character_id: event.character_id.clone(),
|
||||
zone_id: event.zone_id.clone(),
|
||||
loadout_id: event.loadout_id.clone(),
|
||||
vehicle_id: vehicle_id.clone(),
|
||||
}));
|
||||
|
||||
while let Some(_) = set.join_next().await {}
|
||||
}
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
struct Event {
|
||||
event_name: String,
|
||||
#[serde(deserialize_with = "deserialize_number_from_string")]
|
||||
world_id: i32,
|
||||
character_id: String,
|
||||
#[serde(default)]
|
||||
attacker_character_id: String,
|
||||
#[serde(deserialize_with = "deserialize_number_from_string")]
|
||||
#[serde(default, deserialize_with = "deserialize_number_from_string")]
|
||||
attacker_team_id: i32,
|
||||
#[serde(deserialize_with = "deserialize_number_from_string")]
|
||||
#[serde(default, deserialize_with = "deserialize_number_from_string")]
|
||||
team_id: i32,
|
||||
#[serde(deserialize_with = "deserialize_number_from_string")]
|
||||
zone_id: i32,
|
||||
|
@ -286,6 +273,11 @@ struct Event {
|
|||
vehicle_id: String,
|
||||
#[serde(default)]
|
||||
attacker_vehicle_id: String,
|
||||
|
||||
#[serde(default, deserialize_with = "deserialize_number_from_string")]
|
||||
experience_id: i32,
|
||||
// #[serde(default)]
|
||||
// other_id: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
|
@ -294,7 +286,17 @@ struct Payload {
|
|||
}
|
||||
|
||||
async fn healthz() {
|
||||
let app = Router::new().route("/healthz", get(|| async { "ok" }));
|
||||
let app = Router::new().route(
|
||||
"/healthz",
|
||||
get(|| async {
|
||||
Json(json!({
|
||||
"status": "ok",
|
||||
}))
|
||||
}),
|
||||
).route(
|
||||
"/metrics",
|
||||
get(telemetry::handler)
|
||||
);
|
||||
|
||||
let port: u16 = std::env::var("PORT")
|
||||
.unwrap_or("8999".to_string())
|
||||
|
@ -319,23 +321,54 @@ async fn main() {
|
|||
}
|
||||
let url = url::Url::parse(&addr).unwrap();
|
||||
|
||||
println!("[ws] Connecting to {}", url);
|
||||
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
|
||||
let (write, read) = ws_stream.split();
|
||||
|
||||
let fused_writer = rx.map(Ok).forward(write).fuse();
|
||||
let fused_reader = read
|
||||
.for_each(|msg| async move {
|
||||
.for_each(|msg| async {
|
||||
let body = &msg.unwrap().to_string();
|
||||
let data: Payload = serde_json::from_str(body).unwrap_or(Payload {
|
||||
payload: Event::default(),
|
||||
});
|
||||
|
||||
let mut data: Payload = match serde_json::from_str(body) {
|
||||
Ok(data) => data,
|
||||
Err(_e) => {
|
||||
// println!("Error: {}; body: {}", e, body.clone());
|
||||
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if data.payload.event_name == "" {
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
|
||||
return;
|
||||
}
|
||||
|
||||
process_event(&data.payload).await;
|
||||
telemetry::event(&data.payload.world_id, &data.payload.event_name);
|
||||
|
||||
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
|
||||
process_death_event(&data.payload).await;
|
||||
return;
|
||||
}
|
||||
|
||||
if data.payload.event_name == "GainExperience" {
|
||||
if data.payload.team_id == 0 {
|
||||
match get_team_id(data.payload.character_id.clone()).await {
|
||||
Ok(team_id) => {
|
||||
data.payload.team_id = team_id;
|
||||
}
|
||||
Err(_) => {
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
|
||||
}
|
||||
}
|
||||
}
|
||||
process_exp_event(&data.payload).await;
|
||||
return;
|
||||
}
|
||||
|
||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
|
||||
})
|
||||
.fuse();
|
||||
|
||||
|
|
75
services/websocket/src/telemetry.rs
Normal file
75
services/websocket/src/telemetry.rs
Normal file
|
@ -0,0 +1,75 @@
|
|||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
IntGaugeVec,
|
||||
register_int_gauge_vec,
|
||||
TextEncoder,
|
||||
gather
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
// incoming events
|
||||
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
|
||||
"world_id", "event_name"
|
||||
]).unwrap();
|
||||
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
|
||||
"world_id", "event_name", "reason"
|
||||
]).unwrap();
|
||||
|
||||
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
|
||||
"world_id", "experience_id"
|
||||
]).unwrap();
|
||||
|
||||
// database stuff
|
||||
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
|
||||
"table", "op"
|
||||
]).unwrap();
|
||||
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
|
||||
// "table", "op"
|
||||
// ]).unwrap();
|
||||
}
|
||||
|
||||
pub async fn handler() -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = String::new();
|
||||
|
||||
let metrics = gather();
|
||||
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
|
||||
|
||||
buffer
|
||||
}
|
||||
|
||||
pub fn event(world_id: &i32, event_name: &String) {
|
||||
EVENTS.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&event_name,
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
|
||||
EVENTS_DROPPED.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&event_name,
|
||||
reason,
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn experience_event(world_id: &i32, experience_id: &i32) {
|
||||
EXPERIENCE_EVENTS.with_label_values(&[
|
||||
&world_id.to_string(),
|
||||
&experience_id.to_string(),
|
||||
]).inc();
|
||||
}
|
||||
|
||||
pub fn db_write(table: &str, op: &str) {
|
||||
DB_WRITES.with_label_values(&[table, op]).inc();
|
||||
}
|
||||
|
||||
pub fn db_read(table: &str, op: &str) {
|
||||
DB_READS.with_label_values(&[table, op]).inc();
|
||||
}
|
|
@ -34,6 +34,8 @@ lazy_static! {
|
|||
("1105", "vanguard"),
|
||||
("2010", "flash"),
|
||||
("2033", "javelin"),
|
||||
("2039", "ant"),
|
||||
("2040", "valkyrie"),
|
||||
("2122", "mosquito"),
|
||||
("2123", "reaver"),
|
||||
("2124", "scythe"),
|
||||
|
@ -47,6 +49,9 @@ lazy_static! {
|
|||
("2135", "prowler"),
|
||||
("2136", "dervish"),
|
||||
("2137", "chimera"),
|
||||
("2139", "ant"),
|
||||
("2140", "galaxy"),
|
||||
("2141", "valkyrie"),
|
||||
("2142", "corsair"),
|
||||
]);
|
||||
static ref LOADOUT_TO_CLASS: HashMap<&'static str, &'static str> = HashMap::from([
|
||||
|
|
Loading…
Add table
Reference in a new issue