Compare commits

..

No commits in common. "main" and "rustify" have entirely different histories.

7 changed files with 517 additions and 610 deletions

908
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -12,11 +12,9 @@ serde_json = "1.0.96"
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
sled = "0.34"
reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots", "rustls", "json"] } reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots", "rustls", "json"] }
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"] }
bincode = "1.3" bincode = "1.3"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
tower-http = { version = "0.4", features = ["trace"] } tower-http = { version = "0.4", features = ["trace"] }
r2d2_sqlite = "0.22"
r2d2 = "0.8"
rusqlite = { version = "0.29.0", features = ["bundled"] }

View file

@ -0,0 +1,27 @@
job "agg-population" {
type = "service"
update {
max_parallel = 1
stagger = "10s"
}
group "api" {
count = 1
network {
port "http" {
static = 3000
}
}
task "api" {
driver = "docker"
config {
image = "ghcr.io/genudine/agg-population/agg-population:latest"
ports = ["http"]
}
}
}
}

View file

@ -1,29 +1,23 @@
use crate::{ use crate::{
sources::{fisu, honu, saerro, sanctuary, voidwell}, sources::{fisu, honu, saerro, sanctuary, voidwell},
types::{Population, Response}, types::{AllResponse, Population, Response},
}; };
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
Json, Json,
}; };
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
use tokio::task::JoinSet; use tokio::task::JoinSet;
pub async fn get_one_world( pub async fn get_one_world(State(db): State<sled::Db>, Path(world): Path<i32>) -> Json<Response> {
State(db): State<r2d2::Pool<SqliteConnectionManager>>, Json(get_world(db, world).await)
Path(world): Path<i32>,
) -> Json<Response> {
Json(get_world(db, world, false).await)
} }
pub async fn get_all_worlds( pub async fn get_all_worlds(State(db): State<sled::Db>) -> Json<AllResponse> {
State(db): State<r2d2::Pool<SqliteConnectionManager>>,
) -> Json<Vec<Response>> {
let mut set = JoinSet::new(); let mut set = JoinSet::new();
let mut worlds = vec![Response::default(); 8]; let mut worlds = vec![Response::default(); 8];
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] { for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
set.spawn(get_world(db.clone(), world, false)); set.spawn(get_world(db.clone(), world));
} }
let mut i = 0; let mut i = 0;
@ -32,18 +26,12 @@ pub async fn get_all_worlds(
i += 1; i += 1;
} }
Json(worlds) Json(AllResponse { worlds })
} }
pub async fn get_world( async fn get_world(db: sled::Db, world: i32) -> Response {
db: r2d2::Pool<SqliteConnectionManager>, if let Ok(data) = world_from_cache(db.clone(), world) {
world: i32, return data;
skip_cache: bool,
) -> Response {
if !skip_cache {
if let Ok(data) = world_from_cache(db.clone(), world) {
return data;
}
} }
let mut response = Response::default(); let mut response = Response::default();
@ -84,40 +72,26 @@ pub async fn get_world(
response response
} }
#[tracing::instrument(skip(db))] fn world_from_cache(db: sled::Db, world: i32) -> Result<Response, ()> {
fn world_from_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32) -> Result<Response, ()> { let key = format!("world:{}", world);
let db = db.get().unwrap(); let value = match db.get(key) {
let mut query = db.prepare("SELECT data FROM worlds WHERE id = ?").unwrap(); Ok(Some(value)) => value,
let value: Result<Vec<u8>, _> = query.query_row(params![world], |r| r.get(0)); _ => return Err(()),
};
if value.is_err() { match bincode::deserialize::<Response>(&value) {
tracing::debug!("Cache miss (non-exist) for world {}", world);
return Err(());
}
match bincode::deserialize::<Response>(value.unwrap().as_slice()) {
Ok(response) => { Ok(response) => {
if response.cached_at + chrono::Duration::minutes(5) < chrono::Utc::now() { if response.cached_at + chrono::Duration::minutes(3) < chrono::Utc::now() {
tracing::debug!("Cache miss (expired) for world {}", world);
return Err(()); return Err(());
} }
tracing::debug!("Cache hit for world {}", world);
Ok(response) Ok(response)
} }
_ => { _ => Err(()),
tracing::debug!("Cache miss (corrupt) for world {}", world);
Err(())
}
} }
} }
#[tracing::instrument(skip(db, response))] fn world_to_cache(db: sled::Db, world: i32, response: &Response) {
fn world_to_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32, response: &Response) { let key = format!("world:{}", world);
let value = bincode::serialize(response).unwrap(); let value = bincode::serialize(response).unwrap();
let db = db.get().unwrap(); db.insert(key, value).unwrap();
let mut query = db
.prepare("INSERT OR REPLACE INTO worlds (id, data) VALUES (?, ?)")
.unwrap();
query.execute(params![world, value]).unwrap();
} }

View file

@ -1,8 +1,6 @@
use crate::handlers::{get_all_worlds, get_one_world, get_world}; use crate::handlers::{get_all_worlds, get_one_world};
use axum::{response::Html, routing::get, Router}; use axum::{response::Html, routing::get, Router};
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::task::JoinSet;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
mod handlers; mod handlers;
@ -12,21 +10,10 @@ mod types;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter( .with_env_filter("tower_http=trace")
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("tower_http=trace".parse().unwrap()),
)
.init(); .init();
let sqlite_manager = SqliteConnectionManager::memory(); let db = sled::open("/tmp/agg-population").expect("open");
let pool = r2d2::Pool::new(sqlite_manager).unwrap();
pool.get()
.unwrap()
.execute(
"CREATE TABLE worlds (id INTEGER NOT NULL PRIMARY KEY, data BLOB);",
params![],
)
.unwrap();
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
@ -34,29 +21,10 @@ async fn main() {
.route("/population/all", get(get_all_worlds)) .route("/population/all", get(get_all_worlds))
.route("/population/:world", get(get_one_world)) .route("/population/:world", get(get_one_world))
.layer(TraceLayer::new_for_http()) .layer(TraceLayer::new_for_http())
.with_state(pool.clone()); .with_state(db);
tokio::spawn(async move { let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
loop { tracing::debug!("listening on {}", addr);
let mut set = JoinSet::new();
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
set.spawn(get_world(pool.clone(), world, true));
}
while let Some(_) = set.join_next().await {}
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 3)).await;
}
});
let addr = SocketAddr::from((
[0, 0, 0, 0],
std::env::var("PORT")
.unwrap_or("3000".to_string())
.parse()
.unwrap(),
));
tracing::debug!("listening on http://{}", addr);
axum::Server::bind(&addr) axum::Server::bind(&addr)
.serve(app.into_make_service()) .serve(app.into_make_service())
.await .await

View file

@ -1,5 +1,3 @@
use chrono::{Duration, Utc};
use crate::types::Population; use crate::types::Population;
pub async fn saerro(world: i32) -> Result<Population, ()> { pub async fn saerro(world: i32) -> Result<Population, ()> {
@ -118,8 +116,6 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct Root { struct Root {
#[serde(rename = "onlineCharacters")]
pub online_characters: i32,
#[serde(rename = "zoneStates")] #[serde(rename = "zoneStates")]
pub zone_states: Vec<ZoneState>, pub zone_states: Vec<ZoneState>,
} }
@ -154,39 +150,29 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
.await .await
.unwrap(); .unwrap();
let ns_avg: i32 = response
.zone_states
.iter()
.map(|zone| zone.population.ns)
.sum::<i32>()
/ 3;
let nc = response
.zone_states
.iter()
.map(|zone| zone.population.nc)
.sum::<i32>()
+ ns_avg;
let tr = response
.zone_states
.iter()
.map(|zone| zone.population.tr)
.sum::<i32>()
+ ns_avg;
let vs = response
.zone_states
.iter()
.map(|zone| zone.population.vs)
.sum::<i32>()
+ ns_avg;
Ok(Population { Ok(Population {
nc, nc: response
tr, .zone_states
vs, .iter()
total: response.online_characters, .map(|zone| zone.population.nc)
.sum(),
tr: response
.zone_states
.iter()
.map(|zone| zone.population.tr)
.sum(),
vs: response
.zone_states
.iter()
.map(|zone| zone.population.vs)
.sum(),
total: response
.zone_states
.iter()
.map(|zone| {
zone.population.nc + zone.population.tr + zone.population.vs + zone.population.ns
})
.sum(),
}) })
} }
@ -204,7 +190,6 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct World { struct World {
pub population: SanctuaryPopulation, pub population: SanctuaryPopulation,
pub timestamp: i64,
} }
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
@ -230,13 +215,6 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
.await .await
.unwrap(); .unwrap();
// error if over 15 minutes old
if response.world_population_list[0].timestamp
< (Utc::now() - Duration::minutes(15)).timestamp()
{
return Err(());
}
Ok(Population { Ok(Population {
nc: response.world_population_list[0].population.nc, nc: response.world_population_list[0].population.nc,
tr: response.world_population_list[0].population.tr, tr: response.world_population_list[0].population.tr,

View file

@ -1,6 +1,12 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct AllResponse {
pub worlds: Vec<Response>,
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)] #[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct Response { pub struct Response {