Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
5fa05ac73f |
28 changed files with 1592 additions and 1571 deletions
3
.envrc
3
.envrc
|
@ -1,3 +0,0 @@
|
||||||
use flake . --accept-flake-config;
|
|
||||||
|
|
||||||
# source .envrc-local
|
|
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -1,7 +1,3 @@
|
||||||
/target
|
/target
|
||||||
.DS_Store
|
.DS_Store
|
||||||
*/.DS_Store
|
*/.DS_Store
|
||||||
.envrc-local
|
|
||||||
/.vscode
|
|
||||||
/.direnv
|
|
||||||
/result
|
|
2261
Cargo.lock
generated
2261
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -1,4 +1,8 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["services/*"]
|
|
||||||
exclude = ["hack/codegen"]
|
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
members = [
|
||||||
|
"services/*",
|
||||||
|
]
|
||||||
|
exclude = [
|
||||||
|
"hack/codegen",
|
||||||
|
]
|
|
@ -1,4 +1,4 @@
|
||||||
FROM rust:1.76.0-bullseye as rust-base
|
FROM rust:1.69.0-bullseye as rust-base
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends curl clang
|
RUN apt-get update && apt-get install -y --no-install-recommends curl clang
|
||||||
ARG MOLD_VERSION=1.11.0
|
ARG MOLD_VERSION=1.11.0
|
||||||
|
|
|
@ -2,7 +2,7 @@ version: "3"
|
||||||
|
|
||||||
services:
|
services:
|
||||||
tsdb:
|
tsdb:
|
||||||
image: docker.io/timescale/timescaledb:latest-pg14
|
image: timescale/timescaledb:latest-pg14
|
||||||
environment:
|
environment:
|
||||||
POSTGRES_PASSWORD: saerro321
|
POSTGRES_PASSWORD: saerro321
|
||||||
POSTGRES_USER: saerrouser
|
POSTGRES_USER: saerrouser
|
||||||
|
|
103
flake.lock
generated
103
flake.lock
generated
|
@ -1,103 +0,0 @@
|
||||||
{
|
|
||||||
"nodes": {
|
|
||||||
"fenix": {
|
|
||||||
"inputs": {
|
|
||||||
"nixpkgs": [
|
|
||||||
"nixpkgs"
|
|
||||||
],
|
|
||||||
"rust-analyzer-src": "rust-analyzer-src"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1708150887,
|
|
||||||
"narHash": "sha256-lyEaeShLZqQtFO+ULLfxF9fYaYpTal0Ck1B+iKYBOMs=",
|
|
||||||
"owner": "nix-community",
|
|
||||||
"repo": "fenix",
|
|
||||||
"rev": "761431323e30846bae160e15682cfa687c200606",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "nix-community",
|
|
||||||
"repo": "fenix",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"flake-parts": {
|
|
||||||
"inputs": {
|
|
||||||
"nixpkgs-lib": "nixpkgs-lib"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1706830856,
|
|
||||||
"narHash": "sha256-a0NYyp+h9hlb7ddVz4LUn1vT/PLwqfrWYcHMvFB1xYg=",
|
|
||||||
"owner": "hercules-ci",
|
|
||||||
"repo": "flake-parts",
|
|
||||||
"rev": "b253292d9c0a5ead9bc98c4e9a26c6312e27d69f",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "hercules-ci",
|
|
||||||
"repo": "flake-parts",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nixpkgs": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1707956935,
|
|
||||||
"narHash": "sha256-ZL2TrjVsiFNKOYwYQozpbvQSwvtV/3Me7Zwhmdsfyu4=",
|
|
||||||
"owner": "NixOS",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"rev": "a4d4fe8c5002202493e87ec8dbc91335ff55552c",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "NixOS",
|
|
||||||
"ref": "nixos-unstable",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nixpkgs-lib": {
|
|
||||||
"locked": {
|
|
||||||
"dir": "lib",
|
|
||||||
"lastModified": 1706550542,
|
|
||||||
"narHash": "sha256-UcsnCG6wx++23yeER4Hg18CXWbgNpqNXcHIo5/1Y+hc=",
|
|
||||||
"owner": "NixOS",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"rev": "97b17f32362e475016f942bbdfda4a4a72a8a652",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"dir": "lib",
|
|
||||||
"owner": "NixOS",
|
|
||||||
"ref": "nixos-unstable",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": {
|
|
||||||
"inputs": {
|
|
||||||
"fenix": "fenix",
|
|
||||||
"flake-parts": "flake-parts",
|
|
||||||
"nixpkgs": "nixpkgs"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"rust-analyzer-src": {
|
|
||||||
"flake": false,
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1708018577,
|
|
||||||
"narHash": "sha256-B75VUqKvQeIqAUnYw4bGjY3xxrCqzRBJHLbmD0MAWEw=",
|
|
||||||
"owner": "rust-lang",
|
|
||||||
"repo": "rust-analyzer",
|
|
||||||
"rev": "b9b0d29b8e69b02457cfabe20c4c69cdb45f3cc5",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "rust-lang",
|
|
||||||
"ref": "nightly",
|
|
||||||
"repo": "rust-analyzer",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": "root",
|
|
||||||
"version": 7
|
|
||||||
}
|
|
56
flake.nix
56
flake.nix
|
@ -1,56 +0,0 @@
|
||||||
{
|
|
||||||
description = "PlanetSide 2 Metagame Harvesting Service";
|
|
||||||
|
|
||||||
inputs = {
|
|
||||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
|
||||||
flake-parts.url = "github:hercules-ci/flake-parts";
|
|
||||||
fenix = {
|
|
||||||
url = "github:nix-community/fenix";
|
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
nixConfig = {
|
|
||||||
extra-substituters = [
|
|
||||||
"https://nix-community.cachix.org"
|
|
||||||
];
|
|
||||||
|
|
||||||
extra-trusted-public-keys = [
|
|
||||||
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs="
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
outputs = inputs: inputs.flake-parts.lib.mkFlake { inherit inputs; } {
|
|
||||||
systems = [ "x86_64-linux" "aarch64-linux" ];
|
|
||||||
perSystem = { config, self', pkgs, lib, system, ... }: let
|
|
||||||
fenix = inputs.fenix.packages.${system}.minimal;
|
|
||||||
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
|
|
||||||
|
|
||||||
buildDeps = [
|
|
||||||
pkgs.openssl
|
|
||||||
];
|
|
||||||
|
|
||||||
devDeps = [
|
|
||||||
fenix.toolchain
|
|
||||||
pkgs.docker-compose
|
|
||||||
pkgs.cargo-watch
|
|
||||||
];
|
|
||||||
PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig";
|
|
||||||
in {
|
|
||||||
packages.default = (pkgs.makeRustPlatform {
|
|
||||||
cargo = fenix.toolchain;
|
|
||||||
rustc = fenix.toolchain;
|
|
||||||
}).buildRustPackage {
|
|
||||||
inherit (cargoToml.package) name version;
|
|
||||||
cargoLock.lockFile = ./Cargo.lock;
|
|
||||||
src = ./.;
|
|
||||||
nativeBuildInputs = [ pkgs.pkg-config ];
|
|
||||||
buildInputs = buildDeps ++ devDeps;
|
|
||||||
};
|
|
||||||
|
|
||||||
devShells.default = pkgs.mkShell {
|
|
||||||
nativeBuildInputs = buildDeps ++ devDeps;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
}
|
|
|
@ -6,25 +6,19 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json = "1.0.105"
|
serde_json = "1.0"
|
||||||
serde = "1.0.188"
|
serde = "1.0"
|
||||||
async-graphql = { version = "6.0.5", features = ["chrono"] }
|
async-graphql = { version = "5.0", features = ["chrono"] }
|
||||||
axum = "0.6.20"
|
axum = "0.6.18"
|
||||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls", "postgres", "chrono" ] }
|
||||||
"runtime-tokio-rustls",
|
|
||||||
"postgres",
|
|
||||||
"chrono",
|
|
||||||
] }
|
|
||||||
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
|
||||||
tower-http = { version = "0.4.4", features = ["cors"] }
|
tower-http = { version = "0.4.0", features = ["cors"] }
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
reqwest = { version = "0.11.20", features = [
|
reqwest = { version = "0.11.18", features = ["rustls-tls-webpki-roots", "rustls"] }
|
||||||
"rustls-tls-webpki-roots",
|
chrono = "0.4.24"
|
||||||
"rustls",
|
|
||||||
] }
|
|
||||||
chrono = "0.4.28"
|
|
||||||
prometheus = "0.13.3"
|
|
||||||
|
|
||||||
[dependencies.openssl]
|
[dependencies.openssl]
|
||||||
version = "0.10.57"
|
version = "0.10.52"
|
||||||
features = ["vendored"]
|
features = [
|
||||||
|
"vendored"
|
||||||
|
]
|
|
@ -1,7 +1,6 @@
|
||||||
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
|
use async_graphql::{futures_util::TryStreamExt, Context, Object, SimpleObject};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::{query, Pool, Postgres, Row};
|
use sqlx::{query, Pool, Postgres, Row};
|
||||||
use crate::telemetry;
|
|
||||||
|
|
||||||
pub struct Analytics {}
|
pub struct Analytics {}
|
||||||
|
|
||||||
|
@ -23,10 +22,8 @@ impl Analytics {
|
||||||
world_id: Option<i32>,
|
world_id: Option<i32>,
|
||||||
#[graphql(default = false)] hi_precision: bool,
|
#[graphql(default = false)] hi_precision: bool,
|
||||||
) -> Vec<Event> {
|
) -> Vec<Event> {
|
||||||
telemetry::graphql_query("Analytics", "events");
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("analytics", "events");
|
|
||||||
let sql = format!("
|
let sql = format!("
|
||||||
SELECT
|
SELECT
|
||||||
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
|
time_bucket_gapfill('{} seconds', time, start => now() - '{}'::interval, finish => now()) AS bucket,
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
factions::{NC, TR, VS},
|
factions::{NC, TR, VS},
|
||||||
utils::{Filters, IdOrNameBy},
|
utils::{Filters, IdOrNameBy},
|
||||||
telemetry
|
|
||||||
};
|
};
|
||||||
use async_graphql::{Context, Object};
|
use async_graphql::{Context, Object};
|
||||||
use sqlx::{Pool, Postgres, Row};
|
use sqlx::{Pool, Postgres, Row};
|
||||||
|
@ -14,7 +13,6 @@ pub struct Class {
|
||||||
|
|
||||||
impl Class {
|
impl Class {
|
||||||
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
||||||
telemetry::db_read("players", "fetch");
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
|
@ -38,12 +36,9 @@ impl Class {
|
||||||
#[Object]
|
#[Object]
|
||||||
impl Class {
|
impl Class {
|
||||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Class", "total");
|
|
||||||
|
|
||||||
self.fetch(ctx, self.filters.clone()).await
|
self.fetch(ctx, self.filters.clone()).await
|
||||||
}
|
}
|
||||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Class", "nc");
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -54,7 +49,6 @@ impl Class {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Class", "tr");
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -65,7 +59,6 @@ impl Class {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Class", "vs");
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -93,42 +86,36 @@ impl Classes {
|
||||||
#[Object]
|
#[Object]
|
||||||
impl Classes {
|
impl Classes {
|
||||||
async fn infiltrator(&self) -> Class {
|
async fn infiltrator(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "infiltrator");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "infiltrator".to_string(),
|
class_name: "infiltrator".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn light_assault(&self) -> Class {
|
async fn light_assault(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "light_assault");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "light_assault".to_string(),
|
class_name: "light_assault".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn combat_medic(&self) -> Class {
|
async fn combat_medic(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "combat_medic");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "combat_medic".to_string(),
|
class_name: "combat_medic".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn engineer(&self) -> Class {
|
async fn engineer(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "engineer");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "engineer".to_string(),
|
class_name: "engineer".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn heavy_assault(&self) -> Class {
|
async fn heavy_assault(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "heavy_assault");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "heavy_assault".to_string(),
|
class_name: "heavy_assault".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn max(&self) -> Class {
|
async fn max(&self) -> Class {
|
||||||
telemetry::graphql_query("Classes", "max");
|
|
||||||
Class {
|
Class {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
class_name: "max".to_string(),
|
class_name: "max".to_string(),
|
||||||
|
@ -148,7 +135,6 @@ impl ClassesQuery {
|
||||||
|
|
||||||
/// Get a specific class
|
/// Get a specific class
|
||||||
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
|
pub async fn class(&self, filter: Option<Filters>, class_name: String) -> Class {
|
||||||
telemetry::graphql_query("Classes", "");
|
|
||||||
Class {
|
Class {
|
||||||
filters: filter.unwrap_or_default(),
|
filters: filter.unwrap_or_default(),
|
||||||
class_name,
|
class_name,
|
||||||
|
|
|
@ -1,13 +1,10 @@
|
||||||
use crate::{telemetry, utils::ID_TO_WORLD};
|
use crate::utils::ID_TO_WORLD;
|
||||||
use async_graphql::{Context, Enum, Object, SimpleObject};
|
use async_graphql::{Context, Enum, Object, SimpleObject};
|
||||||
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::{query, Pool, Postgres, Row};
|
use sqlx::{query, Pool, Postgres, Row};
|
||||||
|
|
||||||
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
|
pub async fn get_health(Extension(pool): Extension<Pool<Postgres>>) -> impl IntoResponse {
|
||||||
telemetry::http_request("/health", "GET");
|
|
||||||
|
|
||||||
telemetry::db_read("analytics", "get_health");
|
|
||||||
let events_resp =
|
let events_resp =
|
||||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||||
.fetch_one(&pool)
|
.fetch_one(&pool)
|
||||||
|
@ -66,7 +63,6 @@ impl Health {
|
||||||
) -> (UpDown, Option<DateTime<Utc>>) {
|
) -> (UpDown, Option<DateTime<Utc>>) {
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("analytics", "most_recent_event_time");
|
|
||||||
let events_resp =
|
let events_resp =
|
||||||
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
|
query("SELECT time FROM analytics WHERE world_id = $1 ORDER BY time DESC LIMIT 1")
|
||||||
.bind(world_id)
|
.bind(world_id)
|
||||||
|
@ -95,11 +91,8 @@ impl Health {
|
||||||
impl Health {
|
impl Health {
|
||||||
/// Did a ping to Postgres (our main datastore) succeed?
|
/// Did a ping to Postgres (our main datastore) succeed?
|
||||||
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
async fn database<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
||||||
telemetry::graphql_query("Health", "database");
|
|
||||||
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("analytics", "database_health");
|
|
||||||
let events_resp =
|
let events_resp =
|
||||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
|
@ -113,11 +106,8 @@ impl Health {
|
||||||
|
|
||||||
/// Is the websocket processing jobs?
|
/// Is the websocket processing jobs?
|
||||||
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
async fn ingest<'ctx>(&self, ctx: &Context<'ctx>) -> UpDown {
|
||||||
telemetry::graphql_query("Health", "ingest");
|
|
||||||
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("analytics", "ingest_health");
|
|
||||||
let events_resp =
|
let events_resp =
|
||||||
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
query("SELECT count(*) FROM analytics WHERE time > now() - interval '5 minutes'")
|
||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
|
@ -139,11 +129,9 @@ impl Health {
|
||||||
|
|
||||||
/// Is the websocket actually turned on?
|
/// Is the websocket actually turned on?
|
||||||
async fn ingest_reachable(&self) -> UpDown {
|
async fn ingest_reachable(&self) -> UpDown {
|
||||||
telemetry::graphql_query("Health", "ingest_reachable");
|
|
||||||
|
|
||||||
reqwest::get(
|
reqwest::get(
|
||||||
std::env::var("WEBSOCKET_HEALTHCHECK")
|
std::env::var("WEBSOCKET_HEALTHCHECK")
|
||||||
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()),
|
.unwrap_or("http://localhost:8999/health".to_string()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|_| UpDown::Up)
|
.map(|_| UpDown::Up)
|
||||||
|
@ -157,8 +145,6 @@ impl Health {
|
||||||
|
|
||||||
/// Checks if a world has had any events for the last 5 minutes
|
/// Checks if a world has had any events for the last 5 minutes
|
||||||
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
|
async fn worlds<'ctx>(&self, ctx: &Context<'ctx>) -> Vec<WorldUpDown> {
|
||||||
telemetry::graphql_query("Health", "worlds");
|
|
||||||
|
|
||||||
let mut worlds = Vec::new();
|
let mut worlds = Vec::new();
|
||||||
for (id, name) in ID_TO_WORLD.iter() {
|
for (id, name) in ID_TO_WORLD.iter() {
|
||||||
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
|
let (status, last_event) = self.most_recent_event_time(ctx, *id).await;
|
||||||
|
|
|
@ -4,7 +4,6 @@ mod factions;
|
||||||
mod health;
|
mod health;
|
||||||
mod population;
|
mod population;
|
||||||
mod query;
|
mod query;
|
||||||
mod telemetry;
|
|
||||||
mod utils;
|
mod utils;
|
||||||
mod vehicles;
|
mod vehicles;
|
||||||
mod world;
|
mod world;
|
||||||
|
@ -27,12 +26,10 @@ use tower_http::cors::{Any, CorsLayer};
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
|
|
||||||
async fn index() -> Html<&'static str> {
|
async fn index() -> Html<&'static str> {
|
||||||
telemetry::http_request("/", "GET");
|
|
||||||
Html(include_str!("html/index.html"))
|
Html(include_str!("html/index.html"))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ingest() -> Html<&'static str> {
|
async fn ingest() -> Html<&'static str> {
|
||||||
telemetry::http_request("/ingest", "GET");
|
|
||||||
Html(include_str!("html/ingest.html"))
|
Html(include_str!("html/ingest.html"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +41,6 @@ async fn graphql_handler_post(
|
||||||
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
||||||
Json(query): Json<Request>,
|
Json(query): Json<Request>,
|
||||||
) -> Json<Response> {
|
) -> Json<Response> {
|
||||||
telemetry::http_request("/graphql", "POST");
|
|
||||||
Json(schema.execute(query).await)
|
Json(schema.execute(query).await)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,8 +48,6 @@ async fn graphql_handler_get(
|
||||||
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
Extension(schema): Extension<Schema<query::Query, EmptyMutation, EmptySubscription>>,
|
||||||
query: Query<Request>,
|
query: Query<Request>,
|
||||||
) -> axum::response::Response {
|
) -> axum::response::Response {
|
||||||
telemetry::http_request("/graphql", "GET");
|
|
||||||
|
|
||||||
if query.query == "" {
|
if query.query == "" {
|
||||||
return Redirect::to("/graphiql").into_response();
|
return Redirect::to("/graphiql").into_response();
|
||||||
}
|
}
|
||||||
|
@ -62,8 +56,6 @@ async fn graphql_handler_get(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn graphiql() -> impl IntoResponse {
|
async fn graphiql() -> impl IntoResponse {
|
||||||
telemetry::http_request("/graphiql", "GET");
|
|
||||||
|
|
||||||
Html(
|
Html(
|
||||||
GraphiQLSource::build()
|
GraphiQLSource::build()
|
||||||
.endpoint("/graphql")
|
.endpoint("/graphql")
|
||||||
|
@ -75,7 +67,7 @@ async fn graphiql() -> impl IntoResponse {
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let db_url = std::env::var("DATABASE_URL")
|
let db_url = std::env::var("DATABASE_URL")
|
||||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||||
let db = sqlx::PgPool::connect(&db_url).await.unwrap();
|
let db = sqlx::PgPool::connect(&db_url).await.unwrap();
|
||||||
|
|
||||||
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
|
let schema = Schema::build(query::Query::default(), EmptyMutation, EmptySubscription)
|
||||||
|
@ -91,8 +83,6 @@ async fn main() {
|
||||||
post(graphql_handler_post).get(graphql_handler_get),
|
post(graphql_handler_post).get(graphql_handler_get),
|
||||||
)
|
)
|
||||||
.route("/graphiql", get(graphiql))
|
.route("/graphiql", get(graphiql))
|
||||||
.route("/metrics", get(telemetry::handler))
|
|
||||||
.route("/metrics/combined", get(telemetry::handler_combined))
|
|
||||||
.fallback(handle_404)
|
.fallback(handle_404)
|
||||||
.layer(Extension(db))
|
.layer(Extension(db))
|
||||||
.layer(Extension(schema))
|
.layer(Extension(schema))
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
factions::{NC, NSO, TR, VS},
|
factions::{NC, NSO, TR, VS},
|
||||||
utils::Filters,
|
utils::Filters,
|
||||||
telemetry,
|
|
||||||
};
|
};
|
||||||
use async_graphql::{Context, Object};
|
use async_graphql::{Context, Object};
|
||||||
use sqlx::{Pool, Postgres, Row};
|
use sqlx::{Pool, Postgres, Row};
|
||||||
|
@ -23,7 +22,6 @@ impl Population {
|
||||||
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
|
async fn by_faction<'ctx>(&self, ctx: &Context<'ctx>, faction: i32) -> i64 {
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("players", "population_by_faction");
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
|
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' AND faction_id = $1 {};",
|
||||||
self.filters.sql(),
|
self.filters.sql(),
|
||||||
|
@ -45,11 +43,8 @@ impl Population {
|
||||||
#[Object]
|
#[Object]
|
||||||
impl Population {
|
impl Population {
|
||||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Population", "total");
|
|
||||||
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("players", "population_total");
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
|
"SELECT count(*) FROM players WHERE last_updated > now() - interval '15 minutes' {};",
|
||||||
self.filters.sql(),
|
self.filters.sql(),
|
||||||
|
@ -62,23 +57,19 @@ impl Population {
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get(0);
|
.get(0);
|
||||||
|
|
||||||
query
|
query
|
||||||
}
|
}
|
||||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Population", "nc");
|
|
||||||
self.by_faction(ctx, NC).await
|
self.by_faction(ctx, NC).await
|
||||||
}
|
}
|
||||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Population", "vs");
|
|
||||||
self.by_faction(ctx, VS).await
|
self.by_faction(ctx, VS).await
|
||||||
}
|
}
|
||||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Population", "tr");
|
|
||||||
self.by_faction(ctx, TR).await
|
self.by_faction(ctx, TR).await
|
||||||
}
|
}
|
||||||
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn ns<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Population", "ns");
|
|
||||||
self.by_faction(ctx, NSO).await
|
self.by_faction(ctx, NSO).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,138 +0,0 @@
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use prometheus::{
|
|
||||||
IntGauge,
|
|
||||||
IntGaugeVec,
|
|
||||||
register_int_gauge_vec,
|
|
||||||
register_int_gauge,
|
|
||||||
TextEncoder,
|
|
||||||
gather
|
|
||||||
};
|
|
||||||
use sqlx::{Pool, Postgres, Row};
|
|
||||||
use axum::Extension;
|
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
// http
|
|
||||||
pub static ref HTTP_REQUEST: IntGaugeVec = register_int_gauge_vec!("saerro_api_http_requests", "HTTP requests", &[
|
|
||||||
"route", "method"
|
|
||||||
]).unwrap();
|
|
||||||
pub static ref GRAPHQL_QUERY: IntGaugeVec = register_int_gauge_vec!("saerro_api_graphql_query", "GraphQL queries", &[
|
|
||||||
"major", "minor"
|
|
||||||
]).unwrap();
|
|
||||||
|
|
||||||
// counters
|
|
||||||
pub static ref PLAYERS_TRACKED: IntGauge = register_int_gauge!("saerro_players_tracked", "All players tracked by Saerro right now").unwrap();
|
|
||||||
pub static ref VEHICLES_TRACKED: IntGauge = register_int_gauge!("saerro_vehicles_tracked", "All vehicles tracked by Saerro right now").unwrap();
|
|
||||||
pub static ref OLDEST_PLAYER: IntGauge = register_int_gauge!("saerro_oldest_player", "Oldest player tracked").unwrap();
|
|
||||||
pub static ref NEWEST_PLAYER: IntGauge = register_int_gauge!("saerro_newest_player", "Newest player tracked").unwrap();
|
|
||||||
pub static ref OLDEST_VEHICLE: IntGauge = register_int_gauge!("saerro_oldest_vehicle", "Oldest vehicle tracked").unwrap();
|
|
||||||
pub static ref NEWEST_VEHICLE: IntGauge = register_int_gauge!("saerro_newest_vehicle", "Newest vehicle tracked").unwrap();
|
|
||||||
|
|
||||||
// database stuff
|
|
||||||
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_writes", "Writes to Postgres", &[
|
|
||||||
"table", "op"
|
|
||||||
]).unwrap();
|
|
||||||
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_api_db_reads", "Reads from Postgres", &[
|
|
||||||
"table", "op"
|
|
||||||
]).unwrap();
|
|
||||||
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
|
|
||||||
// "table", "op"
|
|
||||||
// ]).unwrap();
|
|
||||||
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
|
|
||||||
// "table", "op"
|
|
||||||
// ]).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handler(Extension(pool): Extension<Pool<Postgres>>) -> String {
|
|
||||||
update_data_gauges(pool).await;
|
|
||||||
|
|
||||||
// Final output
|
|
||||||
let encoder = TextEncoder::new();
|
|
||||||
let mut buffer = String::new();
|
|
||||||
|
|
||||||
let metrics = gather();
|
|
||||||
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
|
|
||||||
|
|
||||||
buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handler_combined(Extension(pool): Extension<Pool<Postgres>>) -> String {
|
|
||||||
let url = std::env::var("WEBSOCKET_HEALTHCHECK")
|
|
||||||
.unwrap_or("http://127.0.0.1:8999/healthz".to_string()).replace("/healthz", "/metrics");
|
|
||||||
|
|
||||||
let local = handler(Extension(pool)).await;
|
|
||||||
let remote = match reqwest::get(url).await {
|
|
||||||
Ok(r) => r.text().await.expect("failed to text lol"),
|
|
||||||
Err(_) => String::from("")
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
format!("{}{}", local, remote)
|
|
||||||
}
|
|
||||||
|
|
||||||
// pub fn db_write(table: &str, op: &str) {
|
|
||||||
// DB_WRITES.with_label_values(&[table, op]).inc();
|
|
||||||
// }
|
|
||||||
|
|
||||||
pub fn db_read(table: &str, op: &str) {
|
|
||||||
DB_READS.with_label_values(&[table, op]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn http_request(route: &str, method: &str) {
|
|
||||||
HTTP_REQUEST.with_label_values(&[route, method]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn graphql_query(major: &str, minor: &str) {
|
|
||||||
GRAPHQL_QUERY.with_label_values(&[major, minor]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_data_gauges(pool: Pool<Postgres>) {
|
|
||||||
// Do some easy queries to fill our non-cumulative gauges
|
|
||||||
db_read("players", "count_all");
|
|
||||||
let player_count: i64 = sqlx::query("SELECT count(*) FROM players")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
PLAYERS_TRACKED.set(player_count);
|
|
||||||
|
|
||||||
db_read("players", "get_newest");
|
|
||||||
let player_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated DESC LIMIT 1")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
NEWEST_PLAYER.set(player_newest.timestamp());
|
|
||||||
|
|
||||||
db_read("players", "get_oldest");
|
|
||||||
let player_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM players ORDER BY last_updated ASC LIMIT 1")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
OLDEST_PLAYER.set(player_oldest.timestamp());
|
|
||||||
|
|
||||||
db_read("vehicles", "count_all");
|
|
||||||
let vehicle_count: i64 = sqlx::query("SELECT count(*) FROM vehicles")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
VEHICLES_TRACKED.set(vehicle_count);
|
|
||||||
|
|
||||||
db_read("vehicles", "get_newest");
|
|
||||||
let vehicle_newest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated DESC LIMIT 1")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
NEWEST_VEHICLE.set(vehicle_newest.timestamp());
|
|
||||||
|
|
||||||
db_read("vehicles", "get_oldest");
|
|
||||||
let vehicle_oldest: DateTime<Utc> = sqlx::query("SELECT last_updated FROM vehicles ORDER BY last_updated ASC LIMIT 1")
|
|
||||||
.fetch_one(&pool)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.get(0);
|
|
||||||
OLDEST_VEHICLE.set(vehicle_oldest.timestamp());
|
|
||||||
}
|
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
factions::{NC, TR, VS},
|
factions::{NC, TR, VS},
|
||||||
utils::{Filters, IdOrNameBy},
|
utils::{Filters, IdOrNameBy},
|
||||||
telemetry,
|
|
||||||
};
|
};
|
||||||
use async_graphql::{Context, Object};
|
use async_graphql::{Context, Object};
|
||||||
use sqlx::{Pool, Postgres, Row};
|
use sqlx::{Pool, Postgres, Row};
|
||||||
|
@ -16,7 +15,6 @@ impl Vehicle {
|
||||||
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
async fn fetch<'ctx>(&self, ctx: &Context<'ctx>, filters: Filters) -> i64 {
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("vehicles", "fetch");
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
|
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' AND vehicle_name = $1 {};",
|
||||||
filters.sql(),
|
filters.sql(),
|
||||||
|
@ -38,13 +36,9 @@ impl Vehicle {
|
||||||
#[Object]
|
#[Object]
|
||||||
impl Vehicle {
|
impl Vehicle {
|
||||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Vehicle", "total");
|
|
||||||
|
|
||||||
self.fetch(ctx, self.filters.clone()).await
|
self.fetch(ctx, self.filters.clone()).await
|
||||||
}
|
}
|
||||||
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn nc<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Vehicle", "nc");
|
|
||||||
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -55,8 +49,6 @@ impl Vehicle {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn tr<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Vehicle", "tr");
|
|
||||||
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -67,8 +59,6 @@ impl Vehicle {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn vs<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Vehicle", "vs");
|
|
||||||
|
|
||||||
self.fetch(
|
self.fetch(
|
||||||
ctx,
|
ctx,
|
||||||
Filters {
|
Filters {
|
||||||
|
@ -96,11 +86,8 @@ impl Vehicles {
|
||||||
#[Object]
|
#[Object]
|
||||||
impl Vehicles {
|
impl Vehicles {
|
||||||
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
async fn total<'ctx>(&self, ctx: &Context<'ctx>) -> i64 {
|
||||||
telemetry::graphql_query("Vehicles", "total");
|
|
||||||
|
|
||||||
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
let pool = ctx.data::<Pool<Postgres>>().unwrap();
|
||||||
|
|
||||||
telemetry::db_read("players", "vehicles_total");
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
|
"SELECT count(*) FROM vehicles WHERE last_updated > now() - interval '15 minutes' {};",
|
||||||
self.filters.sql(),
|
self.filters.sql(),
|
||||||
|
@ -119,48 +106,36 @@ impl Vehicles {
|
||||||
|
|
||||||
// Transport
|
// Transport
|
||||||
async fn flash(&self) -> Vehicle {
|
async fn flash(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "flash");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "flash".to_string(),
|
vehicle_name: "flash".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn sunderer(&self) -> Vehicle {
|
async fn sunderer(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "sunderer");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "sunderer".to_string(),
|
vehicle_name: "sunderer".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn ant(&self) -> Vehicle {
|
async fn ant(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "ant");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "ant".to_string(),
|
vehicle_name: "ant".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn harasser(&self) -> Vehicle {
|
async fn harasser(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "harasser");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "harasser".to_string(),
|
vehicle_name: "harasser".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn javelin(&self) -> Vehicle {
|
async fn javelin(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "javelin");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "javelin".to_string(),
|
vehicle_name: "javelin".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn corsair(&self) -> Vehicle {
|
async fn corsair(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "corsair");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "corsair".to_string(),
|
vehicle_name: "corsair".to_string(),
|
||||||
|
@ -169,40 +144,30 @@ impl Vehicles {
|
||||||
|
|
||||||
// Tanks
|
// Tanks
|
||||||
async fn lightning(&self) -> Vehicle {
|
async fn lightning(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "lightning");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "lightning".to_string(),
|
vehicle_name: "lightning".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn prowler(&self) -> Vehicle {
|
async fn prowler(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "prowler");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "prowler".to_string(),
|
vehicle_name: "prowler".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn vanguard(&self) -> Vehicle {
|
async fn vanguard(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "vanguard");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "vanguard".to_string(),
|
vehicle_name: "vanguard".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn magrider(&self) -> Vehicle {
|
async fn magrider(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "magrider");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "magrider".to_string(),
|
vehicle_name: "magrider".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn chimera(&self) -> Vehicle {
|
async fn chimera(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "chimera");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "chimera".to_string(),
|
vehicle_name: "chimera".to_string(),
|
||||||
|
@ -211,56 +176,42 @@ impl Vehicles {
|
||||||
|
|
||||||
// Air
|
// Air
|
||||||
async fn mosquito(&self) -> Vehicle {
|
async fn mosquito(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "mosquito");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "mosquito".to_string(),
|
vehicle_name: "mosquito".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn liberator(&self) -> Vehicle {
|
async fn liberator(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "liberator");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "liberator".to_string(),
|
vehicle_name: "liberator".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn galaxy(&self) -> Vehicle {
|
async fn galaxy(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "galaxy");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "galaxy".to_string(),
|
vehicle_name: "galaxy".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn valkyrie(&self) -> Vehicle {
|
async fn valkyrie(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "valkyrie");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "valkyrie".to_string(),
|
vehicle_name: "valkyrie".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn reaver(&self) -> Vehicle {
|
async fn reaver(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "reaver");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "reaver".to_string(),
|
vehicle_name: "reaver".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn scythe(&self) -> Vehicle {
|
async fn scythe(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "scythe");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "scythe".to_string(),
|
vehicle_name: "scythe".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn dervish(&self) -> Vehicle {
|
async fn dervish(&self) -> Vehicle {
|
||||||
telemetry::graphql_query("Vehicle", "dervish");
|
|
||||||
|
|
||||||
Vehicle {
|
Vehicle {
|
||||||
filters: self.filters.clone(),
|
filters: self.filters.clone(),
|
||||||
vehicle_name: "dervish".to_string(),
|
vehicle_name: "dervish".to_string(),
|
||||||
|
|
|
@ -4,7 +4,6 @@ use crate::{
|
||||||
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
|
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_WORLD, WORLD_IDS},
|
||||||
vehicles::Vehicles,
|
vehicles::Vehicles,
|
||||||
zone::Zones,
|
zone::Zones,
|
||||||
telemetry,
|
|
||||||
};
|
};
|
||||||
use async_graphql::Object;
|
use async_graphql::Object;
|
||||||
|
|
||||||
|
@ -34,15 +33,11 @@ impl World {
|
||||||
impl World {
|
impl World {
|
||||||
/// The ID of the world.
|
/// The ID of the world.
|
||||||
async fn id(&self) -> i32 {
|
async fn id(&self) -> i32 {
|
||||||
telemetry::graphql_query("World", "id");
|
|
||||||
|
|
||||||
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
|
id_or_name_to_id(&WORLD_IDS, self.filter.world.as_ref().unwrap()).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The name of the world, in official game capitalization.
|
/// The name of the world, in official game capitalization.
|
||||||
async fn name(&self) -> String {
|
async fn name(&self) -> String {
|
||||||
telemetry::graphql_query("World", "name");
|
|
||||||
|
|
||||||
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
|
let name = id_or_name_to_name(&ID_TO_WORLD, self.filter.world.as_ref().unwrap()).unwrap();
|
||||||
|
|
||||||
// Special case for SolTech, lol.
|
// Special case for SolTech, lol.
|
||||||
|
@ -56,8 +51,6 @@ impl World {
|
||||||
|
|
||||||
/// Population filtered to this world.
|
/// Population filtered to this world.
|
||||||
async fn population(&self) -> Population {
|
async fn population(&self) -> Population {
|
||||||
telemetry::graphql_query("World", "population");
|
|
||||||
|
|
||||||
Population::new(Some(Filters {
|
Population::new(Some(Filters {
|
||||||
world: self.filter.world.clone(),
|
world: self.filter.world.clone(),
|
||||||
faction: None,
|
faction: None,
|
||||||
|
@ -67,8 +60,6 @@ impl World {
|
||||||
|
|
||||||
/// Vehicles filtered to this world.
|
/// Vehicles filtered to this world.
|
||||||
async fn vehicles(&self) -> Vehicles {
|
async fn vehicles(&self) -> Vehicles {
|
||||||
telemetry::graphql_query("World", "vehicles");
|
|
||||||
|
|
||||||
Vehicles::new(Some(Filters {
|
Vehicles::new(Some(Filters {
|
||||||
world: self.filter.world.clone(),
|
world: self.filter.world.clone(),
|
||||||
faction: None,
|
faction: None,
|
||||||
|
@ -78,8 +69,6 @@ impl World {
|
||||||
|
|
||||||
/// Classes filtered to this world.
|
/// Classes filtered to this world.
|
||||||
async fn classes(&self) -> Classes {
|
async fn classes(&self) -> Classes {
|
||||||
telemetry::graphql_query("World", "classes");
|
|
||||||
|
|
||||||
Classes::new(Some(Filters {
|
Classes::new(Some(Filters {
|
||||||
world: self.filter.world.clone(),
|
world: self.filter.world.clone(),
|
||||||
faction: None,
|
faction: None,
|
||||||
|
@ -89,8 +78,6 @@ impl World {
|
||||||
|
|
||||||
/// Get a specific zone/continent on this world.
|
/// Get a specific zone/continent on this world.
|
||||||
async fn zones(&self) -> Zones {
|
async fn zones(&self) -> Zones {
|
||||||
telemetry::graphql_query("World", "zones");
|
|
||||||
|
|
||||||
Zones::new(Some(self.filter.clone()))
|
Zones::new(Some(self.filter.clone()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ use crate::{
|
||||||
population::Population,
|
population::Population,
|
||||||
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
|
utils::{id_or_name_to_id, id_or_name_to_name, Filters, IdOrNameBy, ID_TO_ZONE, ZONE_IDS},
|
||||||
vehicles::Vehicles,
|
vehicles::Vehicles,
|
||||||
telemetry,
|
|
||||||
};
|
};
|
||||||
use async_graphql::Object;
|
use async_graphql::Object;
|
||||||
|
|
||||||
|
@ -24,15 +23,11 @@ impl Zone {
|
||||||
impl Zone {
|
impl Zone {
|
||||||
/// The ID of the zone/continent.
|
/// The ID of the zone/continent.
|
||||||
async fn id(&self) -> i32 {
|
async fn id(&self) -> i32 {
|
||||||
telemetry::graphql_query("Zone", "id");
|
|
||||||
|
|
||||||
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
|
id_or_name_to_id(&ZONE_IDS, self.filters.zone.as_ref().unwrap()).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The name of the continent, in official game capitalization.
|
/// The name of the continent, in official game capitalization.
|
||||||
async fn name(&self) -> String {
|
async fn name(&self) -> String {
|
||||||
telemetry::graphql_query("Zone", "name");
|
|
||||||
|
|
||||||
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
|
let name = id_or_name_to_name(&ID_TO_ZONE, self.filters.zone.as_ref().unwrap()).unwrap();
|
||||||
|
|
||||||
// Capitalize the first letter
|
// Capitalize the first letter
|
||||||
|
@ -40,20 +35,14 @@ impl Zone {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn population(&self) -> Population {
|
async fn population(&self) -> Population {
|
||||||
telemetry::graphql_query("Zone", "population");
|
|
||||||
|
|
||||||
Population::new(Some(self.filters.clone()))
|
Population::new(Some(self.filters.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn vehicles(&self) -> Vehicles {
|
async fn vehicles(&self) -> Vehicles {
|
||||||
telemetry::graphql_query("Zone", "vehicles");
|
|
||||||
|
|
||||||
Vehicles::new(Some(self.filters.clone()))
|
Vehicles::new(Some(self.filters.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn classes(&self) -> Classes {
|
async fn classes(&self) -> Classes {
|
||||||
telemetry::graphql_query("Zone", "classes");
|
|
||||||
|
|
||||||
Classes::new(Some(self.filters.clone()))
|
Classes::new(Some(self.filters.clone()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
16
services/ess-demux/Cargo.toml
Normal file
16
services/ess-demux/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
[package]
|
||||||
|
name = "ess-demux"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
tokio-tungstenite = { version = "0.19", features = ["rustls-tls-webpki-roots"] }
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = "0.3"
|
||||||
|
future-utils = "0.12"
|
||||||
|
futures-channel = "0.3"
|
||||||
|
futures = "0.3"
|
||||||
|
serde_json = "1.0"
|
49
services/ess-demux/README.md
Normal file
49
services/ess-demux/README.md
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
# ESS Demux
|
||||||
|
|
||||||
|
This service guarantees one thing to you; it will have a websocket connected with ESS events.
|
||||||
|
|
||||||
|
The specific flow is as follows:
|
||||||
|
|
||||||
|
1. If https://push.nanite-systems.net/ is up, the client websocket is wired to that.
|
||||||
|
2. Else, connect to https://push.planetside2.com/ based on `?environment={}`, and the client websocket is wired to either 1 or 3 of those.
|
||||||
|
|
||||||
|
- If environment = `all`, it will connect 3 times to `pc`, `ps4us`, and `ps4eu`.
|
||||||
|
- Else, connect to specified environment.
|
||||||
|
- Also, try reconnecting to the main socket every minute.
|
||||||
|
|
||||||
|
3. If that fails, the client websocket will never respond.
|
||||||
|
|
||||||
|
## Why would you want this?
|
||||||
|
|
||||||
|
NSS helps be resilient to ESS failures, but NSS isn't failure-proof itself. This acts as a proxy that'll gracefully select one source or another.
|
||||||
|
|
||||||
|
### Alternatives
|
||||||
|
|
||||||
|
If you can accept the loss of PS4 data, you may use nginx or HAProxy to achieve the same effect...
|
||||||
|
|
||||||
|
[**nginx example.conf**](./docs/alternatives/ess.nginx.conf)
|
||||||
|
|
||||||
|
The above may not work entirely correctly... ymmv.
|
||||||
|
|
||||||
|
Saerro **does** want PS4 data, so we use the ess-demux service.
|
||||||
|
|
||||||
|
## How to use this
|
||||||
|
|
||||||
|
The service runs on port 8007 by default, you can change it to whatever via `PORT`, if you're using this as a bare service. You may also change the `DEFAULT_SERVICE_ID` from `s:example`; allowing you to omit this from the URL.
|
||||||
|
|
||||||
|
`docker run -d -p 8007:8007 ghcr.io/genudine/saerro/ess-demux:latest`
|
||||||
|
|
||||||
|
Connect to `ws://localhost:8007/streaming?environment=all&service-id=s:example`
|
||||||
|
|
||||||
|
Send subscriptions like any other ESS-compatible websocket.
|
||||||
|
|
||||||
|
Upon connection, you can expect an event like this:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"connected": true,
|
||||||
|
"service": "ess-demux",
|
||||||
|
"type": "essDemuxConnectionStateChanged",
|
||||||
|
"upstream": "nss" // or "ess"
|
||||||
|
}
|
||||||
|
```
|
49
services/ess-demux/docs/alternatives/ess.nginx.conf
Normal file
49
services/ess-demux/docs/alternatives/ess.nginx.conf
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
upstream ess-demux {
|
||||||
|
server localhost:8008;
|
||||||
|
server localhost:8009 backup;
|
||||||
|
}
|
||||||
|
|
||||||
|
resolver 1.1.1.1 1.0.0.1;
|
||||||
|
|
||||||
|
server {
|
||||||
|
listen 8007 default_server;
|
||||||
|
server_name _;
|
||||||
|
location /streaming {
|
||||||
|
proxy_pass http://ess-demux;
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "upgrade";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
server {
|
||||||
|
listen 8008;
|
||||||
|
add_header ess-demux-server "nss" always;
|
||||||
|
location / {
|
||||||
|
proxy_pass https://push.nanite-systems.net;
|
||||||
|
proxy_set_header Host push.nanite-systems.net;
|
||||||
|
proxy_ssl_name push.nanite-systems.net;
|
||||||
|
proxy_ssl_server_name on;
|
||||||
|
proxy_ssl_protocols TLSv1.3;
|
||||||
|
proxy_ssl_verify off;
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "upgrade";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
server {
|
||||||
|
listen 8009;
|
||||||
|
add_header ess-demux-server "ess" always;
|
||||||
|
location / {
|
||||||
|
proxy_pass https://push.planetside2.com;
|
||||||
|
proxy_set_header Host push.planetside2.com;
|
||||||
|
proxy_ssl_name push.planetside2.com;
|
||||||
|
proxy_ssl_server_name on;
|
||||||
|
proxy_ssl_protocols TLSv1.2 TLSv1.3;
|
||||||
|
proxy_ssl_verify off;
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "upgrade";
|
||||||
|
}
|
||||||
|
}
|
61
services/ess-demux/src/main.rs
Normal file
61
services/ess-demux/src/main.rs
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
use futures::{pin_mut, select, FutureExt, StreamExt, TryStreamExt};
|
||||||
|
use futures_channel::mpsc::unbounded;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tracing::{debug, info};
|
||||||
|
|
||||||
|
use crate::remote_manager::RemoteManager;
|
||||||
|
|
||||||
|
mod remote_manager;
|
||||||
|
|
||||||
|
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
|
||||||
|
info!("Incoming TCP connection from: {}", addr);
|
||||||
|
|
||||||
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
|
||||||
|
.await
|
||||||
|
.expect("Error during the websocket handshake occurred");
|
||||||
|
|
||||||
|
info!("New WebSocket connection: {}", addr);
|
||||||
|
|
||||||
|
let (local_to_remote_tx, local_to_remote_rx) = unbounded();
|
||||||
|
let (remote_to_local_tx, remote_to_local_rx) = unbounded();
|
||||||
|
let (local_outgoing, local_incoming) = ws_stream.split();
|
||||||
|
|
||||||
|
// Our client sent us a message, forward to ESS
|
||||||
|
let local_to_remote = local_incoming.map(Ok).forward(local_to_remote_tx);
|
||||||
|
|
||||||
|
// ESS sent us a message, forward to our client
|
||||||
|
let remote_to_local = remote_to_local_rx.map(Ok).forward(local_outgoing);
|
||||||
|
|
||||||
|
let upstream_connection = tokio::spawn(async move {
|
||||||
|
let mut remote = RemoteManager::new(local_to_remote_rx, remote_to_local_tx.clone());
|
||||||
|
remote.connect().await;
|
||||||
|
})
|
||||||
|
.fuse();
|
||||||
|
|
||||||
|
pin_mut!(local_to_remote, remote_to_local, upstream_connection);
|
||||||
|
select! {
|
||||||
|
_ = local_to_remote => debug!("local_to_remote exited"),
|
||||||
|
_ = remote_to_local => debug!("remote_to_local exited"),
|
||||||
|
_ = upstream_connection => debug!("upstream_connection exited"),
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Client {} disconnected", addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
let addr = format!(
|
||||||
|
"0.0.0.0:{}",
|
||||||
|
std::env::var("PORT").unwrap_or("8007".to_string())
|
||||||
|
);
|
||||||
|
|
||||||
|
let try_socket = TcpListener::bind(&addr).await;
|
||||||
|
let listener = try_socket.expect("Failed to bind");
|
||||||
|
info!("Listening on: {}", addr);
|
||||||
|
|
||||||
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
|
tokio::spawn(handle_connection(stream, addr));
|
||||||
|
}
|
||||||
|
}
|
97
services/ess-demux/src/remote_manager.rs
Normal file
97
services/ess-demux/src/remote_manager.rs
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
use tracing::{error, warn};
|
||||||
|
|
||||||
|
pub struct RemoteManager {
|
||||||
|
recv: UnboundedReceiver<Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||||
|
send: UnboundedSender<Message>,
|
||||||
|
|
||||||
|
current_upstream: Option<String>,
|
||||||
|
nss_failed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RemoteManager {
|
||||||
|
pub fn new(
|
||||||
|
recv: UnboundedReceiver<Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||||
|
send: UnboundedSender<Message>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
recv,
|
||||||
|
send,
|
||||||
|
current_upstream: None,
|
||||||
|
nss_failed: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect(&mut self) {
|
||||||
|
self.send_connection_state_changed().await;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
self.connect_loop().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_loop(&mut self) {
|
||||||
|
if self.nss_failed {
|
||||||
|
self.connect_ess().await.expect("connect_ess failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match self.connect_nss().await {
|
||||||
|
Ok(_) => {
|
||||||
|
self.nss_failed = false;
|
||||||
|
warn!("nss connection closed")
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to connect to NSS: {}", e);
|
||||||
|
self.nss_failed = true;
|
||||||
|
match self.connect_ess().await {
|
||||||
|
Ok(_) => {
|
||||||
|
warn!("ess connection closed")
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to connect to ESS: {}", e);
|
||||||
|
self.current_upstream = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_nss(&mut self) -> Result<(), tokio_tungstenite::tungstenite::Error> {
|
||||||
|
self.current_upstream = Some("nss".to_string());
|
||||||
|
self.ws_connect(
|
||||||
|
"wss://push.nanite-systems.net/streaming?environment=all&service-id=s:medkit2",
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.send_connection_state_changed().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_ess(&mut self) -> Result<(), tokio_tungstenite::tungstenite::Error> {
|
||||||
|
self.current_upstream = Some("ess".to_string());
|
||||||
|
self.ws_connect("wss://push.planetside2.com/streaming?environment=pc&service-id=s:medkit2")
|
||||||
|
.await?;
|
||||||
|
self.send_connection_state_changed().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ws_connect(&mut self, url: &str) -> Result<(), tokio_tungstenite::tungstenite::Error> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_connection_state_changed(&self) {
|
||||||
|
self.send
|
||||||
|
.unbounded_send(
|
||||||
|
json!({
|
||||||
|
"connected": self.current_upstream.is_some(),
|
||||||
|
"service": "ess-demux",
|
||||||
|
"type": "essDemuxConnectionStateChanged",
|
||||||
|
"upstream": self.current_upstream,
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
|
.expect("send_connection_state_changed failed");
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,10 +6,7 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
|
||||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls" , "postgres" ] }
|
||||||
"runtime-tokio-rustls",
|
|
||||||
"postgres",
|
|
||||||
] }
|
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
async_once = "0.2.6"
|
async_once = "0.2.6"
|
|
@ -9,7 +9,7 @@ mod migrations;
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
pub static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
||||||
let db_url = std::env::var("DATABASE_URL")
|
let db_url = std::env::var("DATABASE_URL")
|
||||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||||
sqlx::PgPool::connect(&db_url).await.unwrap()
|
sqlx::PgPool::connect(&db_url).await.unwrap()
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -71,20 +71,7 @@ async fn main() {
|
||||||
cmd_prune().await;
|
cmd_prune().await;
|
||||||
println!("Done!");
|
println!("Done!");
|
||||||
}
|
}
|
||||||
"auto-maintenance" => loop {
|
|
||||||
println!("Running maintenance tasks...");
|
|
||||||
if !migrations::is_migrated().await {
|
|
||||||
println!("DB is not migrated, running migrations...");
|
|
||||||
cmd_migrate().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd_prune().await;
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
|
|
||||||
},
|
|
||||||
"migrate" => cmd_migrate().await,
|
"migrate" => cmd_migrate().await,
|
||||||
"print-env" => {
|
|
||||||
std::env::vars().for_each(|(key, value)| println!("{}={}", key, value));
|
|
||||||
}
|
|
||||||
_ => {
|
_ => {
|
||||||
println!("Unknown command: {}", command);
|
println!("Unknown command: {}", command);
|
||||||
cmd_help();
|
cmd_help();
|
||||||
|
|
|
@ -7,21 +7,14 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
tokio-tungstenite = { version = "0.20.0", features = [
|
tokio-tungstenite = { version = "0.19.0", features=["rustls-tls-webpki-roots"] }
|
||||||
"rustls-tls-webpki-roots",
|
serde = { version = "1.0.163", features = ["derive"] }
|
||||||
] }
|
serde_json = "1.0.96"
|
||||||
serde = { version = "1.0.188", features = ["derive"] }
|
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }
|
||||||
serde_json = "1.0.105"
|
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls" , "postgres" ] }
|
||||||
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
|
url = "2.3.1"
|
||||||
sqlx = { version = "0.7.1", default_features = false, features = [
|
|
||||||
"runtime-tokio-rustls",
|
|
||||||
"postgres",
|
|
||||||
] }
|
|
||||||
url = "2.4.1"
|
|
||||||
futures-util = "0.3.28"
|
futures-util = "0.3.28"
|
||||||
futures = "0.3.28"
|
futures = "0.3.28"
|
||||||
async_once = "0.2.6"
|
async_once = "0.2.6"
|
||||||
serde-aux = "4.2.0"
|
serde-aux = "4.2.0"
|
||||||
axum = "0.6.20"
|
axum = "0.6.18"
|
||||||
prometheus = "0.13.3"
|
|
||||||
prometheus-static-metric = "0.5.1"
|
|
|
@ -12,13 +12,12 @@ use tokio::task::JoinSet;
|
||||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||||
|
|
||||||
mod translators;
|
mod translators;
|
||||||
mod telemetry;
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
|
static ref WS_ADDR: String = env::var("WS_ADDR").unwrap_or_default();
|
||||||
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
static ref PG: AsyncOnce<sqlx::PgPool> = AsyncOnce::new(async {
|
||||||
let db_url = std::env::var("DATABASE_URL")
|
let db_url = std::env::var("DATABASE_URL")
|
||||||
.unwrap_or("postgres://saerrouser:saerro321@127.0.0.1:5432/data".to_string());
|
.unwrap_or("postgres://saerrouser:saerro321@localhost:5432/data".to_string());
|
||||||
PgPoolOptions::new().connect(&db_url).await.unwrap()
|
PgPoolOptions::new().connect(&db_url).await.unwrap()
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -76,7 +75,6 @@ struct AnalyticsEvent {
|
||||||
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
|
async fn get_team_id(character_id: String) -> Result<i32, sqlx::Error> {
|
||||||
let pool = PG.get().await;
|
let pool = PG.get().await;
|
||||||
|
|
||||||
telemetry::db_read("players", "get_team_id");
|
|
||||||
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
|
let team_id: i32 = query("SELECT faction_id FROM players WHERE character_id = $1 LIMIT 1;")
|
||||||
.bind(character_id.clone())
|
.bind(character_id.clone())
|
||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
|
@ -110,7 +108,6 @@ async fn track_pop(pop_event: PopEvent) {
|
||||||
translators::vehicle_to_name(vehicle_id.as_str())
|
translators::vehicle_to_name(vehicle_id.as_str())
|
||||||
};
|
};
|
||||||
|
|
||||||
telemetry::db_write("players", "track_pop");
|
|
||||||
query(
|
query(
|
||||||
"
|
"
|
||||||
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
|
INSERT INTO players (last_updated, character_id, world_id, faction_id, zone_id, class_name)
|
||||||
|
@ -133,7 +130,6 @@ async fn track_pop(pop_event: PopEvent) {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
if vehicle_name != "unknown" {
|
if vehicle_name != "unknown" {
|
||||||
telemetry::db_write("vehicles", "track_pop");
|
|
||||||
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
|
query("INSERT INTO vehicles (last_updated, character_id, world_id, faction_id, zone_id, vehicle_name)
|
||||||
VALUES (now(), $1, $2, $3, $4, $5)
|
VALUES (now(), $1, $2, $3, $4, $5)
|
||||||
ON CONFLICT (character_id) DO UPDATE SET
|
ON CONFLICT (character_id) DO UPDATE SET
|
||||||
|
@ -163,7 +159,6 @@ async fn track_analytics(analytics_event: AnalyticsEvent) {
|
||||||
event_name,
|
event_name,
|
||||||
} = analytics_event;
|
} = analytics_event;
|
||||||
|
|
||||||
telemetry::db_write("analytics", "track_analytics");
|
|
||||||
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
|
match query("INSERT INTO analytics (time, world_id, event_name) VALUES (now(), $1, $2);")
|
||||||
.bind(world_id)
|
.bind(world_id)
|
||||||
.bind(event_name)
|
.bind(event_name)
|
||||||
|
@ -215,7 +210,6 @@ async fn process_death_event(event: &Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_exp_event(event: &Event) {
|
async fn process_exp_event(event: &Event) {
|
||||||
telemetry::experience_event(&event.world_id, &event.experience_id);
|
|
||||||
let mut set = JoinSet::new();
|
let mut set = JoinSet::new();
|
||||||
// println!("[ws/process_event] EVENT: {:?}", event);
|
// println!("[ws/process_event] EVENT: {:?}", event);
|
||||||
|
|
||||||
|
@ -293,9 +287,6 @@ async fn healthz() {
|
||||||
"status": "ok",
|
"status": "ok",
|
||||||
}))
|
}))
|
||||||
}),
|
}),
|
||||||
).route(
|
|
||||||
"/metrics",
|
|
||||||
get(telemetry::handler)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let port: u16 = std::env::var("PORT")
|
let port: u16 = std::env::var("PORT")
|
||||||
|
@ -324,7 +315,13 @@ async fn main() {
|
||||||
println!("[ws] Connecting to {}", url);
|
println!("[ws] Connecting to {}", url);
|
||||||
|
|
||||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||||
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
|
let ws_stream = match connect_async(url).await {
|
||||||
|
Ok((ws_stream, _)) => ws_stream,
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
let (write, read) = ws_stream.split();
|
let (write, read) = ws_stream.split();
|
||||||
|
|
||||||
let fused_writer = rx.map(Ok).forward(write).fuse();
|
let fused_writer = rx.map(Ok).forward(write).fuse();
|
||||||
|
@ -334,20 +331,16 @@ async fn main() {
|
||||||
|
|
||||||
let mut data: Payload = match serde_json::from_str(body) {
|
let mut data: Payload = match serde_json::from_str(body) {
|
||||||
Ok(data) => data,
|
Ok(data) => data,
|
||||||
Err(_e) => {
|
Err(_) => {
|
||||||
// println!("Error: {}; body: {}", e, body.clone());
|
// println!("Error: {}; body: {}", e, body.clone());
|
||||||
telemetry::event_dropped(&0, &"".to_string(), "decoding failure");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if data.payload.event_name == "" {
|
if data.payload.event_name == "" {
|
||||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "not event");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
telemetry::event(&data.payload.world_id, &data.payload.event_name);
|
|
||||||
|
|
||||||
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
|
if data.payload.event_name == "Death" || data.payload.event_name == "VehicleDestroy" {
|
||||||
process_death_event(&data.payload).await;
|
process_death_event(&data.payload).await;
|
||||||
return;
|
return;
|
||||||
|
@ -359,16 +352,12 @@ async fn main() {
|
||||||
Ok(team_id) => {
|
Ok(team_id) => {
|
||||||
data.payload.team_id = team_id;
|
data.payload.team_id = team_id;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {}
|
||||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "team_id missing");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
process_exp_event(&data.payload).await;
|
process_exp_event(&data.payload).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
telemetry::event_dropped(&data.payload.world_id, &data.payload.event_name, "unprocessable");
|
|
||||||
})
|
})
|
||||||
.fuse();
|
.fuse();
|
||||||
|
|
||||||
|
|
|
@ -1,75 +0,0 @@
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use prometheus::{
|
|
||||||
IntGaugeVec,
|
|
||||||
register_int_gauge_vec,
|
|
||||||
TextEncoder,
|
|
||||||
gather
|
|
||||||
};
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
// incoming events
|
|
||||||
pub static ref EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_count", "Events processed", &[
|
|
||||||
"world_id", "event_name"
|
|
||||||
]).unwrap();
|
|
||||||
pub static ref EVENTS_DROPPED: IntGaugeVec = register_int_gauge_vec!("saerro_ws_events_dropped_count", "Events dropped", &[
|
|
||||||
"world_id", "event_name", "reason"
|
|
||||||
]).unwrap();
|
|
||||||
|
|
||||||
pub static ref EXPERIENCE_EVENTS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_experience_events_count", "Experience Events processed by Exp ID", &[
|
|
||||||
"world_id", "experience_id"
|
|
||||||
]).unwrap();
|
|
||||||
|
|
||||||
// database stuff
|
|
||||||
pub static ref DB_WRITES: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_writes", "Writes to Postgres", &[
|
|
||||||
"table", "op"
|
|
||||||
]).unwrap();
|
|
||||||
pub static ref DB_READS: IntGaugeVec = register_int_gauge_vec!("saerro_ws_db_reads", "Reads from Postgres", &[
|
|
||||||
"table", "op"
|
|
||||||
]).unwrap();
|
|
||||||
// static ref DB_WTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_write_time", &[
|
|
||||||
// "table", "op"
|
|
||||||
// ]).unwrap();
|
|
||||||
// static ref DB_RTIME: HistogramVec = register_histogram_vec!("saerro_ws_db_read_time", &[
|
|
||||||
// "table", "op"
|
|
||||||
// ]).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handler() -> String {
|
|
||||||
let encoder = TextEncoder::new();
|
|
||||||
let mut buffer = String::new();
|
|
||||||
|
|
||||||
let metrics = gather();
|
|
||||||
encoder.encode_utf8(&metrics, &mut buffer).expect("prometheus metrics failed to render");
|
|
||||||
|
|
||||||
buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn event(world_id: &i32, event_name: &String) {
|
|
||||||
EVENTS.with_label_values(&[
|
|
||||||
&world_id.to_string(),
|
|
||||||
&event_name,
|
|
||||||
]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn event_dropped(world_id: &i32, event_name: &String, reason: &str) {
|
|
||||||
EVENTS_DROPPED.with_label_values(&[
|
|
||||||
&world_id.to_string(),
|
|
||||||
&event_name,
|
|
||||||
reason,
|
|
||||||
]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn experience_event(world_id: &i32, experience_id: &i32) {
|
|
||||||
EXPERIENCE_EVENTS.with_label_values(&[
|
|
||||||
&world_id.to_string(),
|
|
||||||
&experience_id.to_string(),
|
|
||||||
]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn db_write(table: &str, op: &str) {
|
|
||||||
DB_WRITES.with_label_values(&[table, op]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn db_read(table: &str, op: &str) {
|
|
||||||
DB_READS.with_label_values(&[table, op]).inc();
|
|
||||||
}
|
|
Loading…
Add table
Reference in a new issue