Compare commits

..

No commits in common. "main" and "rustify" have entirely different histories.

7 changed files with 517 additions and 610 deletions

908
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -12,11 +12,9 @@ serde_json = "1.0.96"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
sled = "0.34"
reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots", "rustls", "json"] }
openssl = { version = "0.10", features = ["vendored"] }
bincode = "1.3"
chrono = { version = "0.4", features = ["serde"] }
tower-http = { version = "0.4", features = ["trace"] }
r2d2_sqlite = "0.22"
r2d2 = "0.8"
rusqlite = { version = "0.29.0", features = ["bundled"] }

View file

@ -0,0 +1,27 @@
job "agg-population" {
type = "service"
update {
max_parallel = 1
stagger = "10s"
}
group "api" {
count = 1
network {
port "http" {
static = 3000
}
}
task "api" {
driver = "docker"
config {
image = "ghcr.io/genudine/agg-population/agg-population:latest"
ports = ["http"]
}
}
}
}

View file

@ -1,29 +1,23 @@
use crate::{
sources::{fisu, honu, saerro, sanctuary, voidwell},
types::{Population, Response},
types::{AllResponse, Population, Response},
};
use axum::{
extract::{Path, State},
Json,
};
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
use tokio::task::JoinSet;
pub async fn get_one_world(
State(db): State<r2d2::Pool<SqliteConnectionManager>>,
Path(world): Path<i32>,
) -> Json<Response> {
Json(get_world(db, world, false).await)
pub async fn get_one_world(State(db): State<sled::Db>, Path(world): Path<i32>) -> Json<Response> {
Json(get_world(db, world).await)
}
pub async fn get_all_worlds(
State(db): State<r2d2::Pool<SqliteConnectionManager>>,
) -> Json<Vec<Response>> {
pub async fn get_all_worlds(State(db): State<sled::Db>) -> Json<AllResponse> {
let mut set = JoinSet::new();
let mut worlds = vec![Response::default(); 8];
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
set.spawn(get_world(db.clone(), world, false));
set.spawn(get_world(db.clone(), world));
}
let mut i = 0;
@ -32,18 +26,12 @@ pub async fn get_all_worlds(
i += 1;
}
Json(worlds)
Json(AllResponse { worlds })
}
pub async fn get_world(
db: r2d2::Pool<SqliteConnectionManager>,
world: i32,
skip_cache: bool,
) -> Response {
if !skip_cache {
if let Ok(data) = world_from_cache(db.clone(), world) {
return data;
}
async fn get_world(db: sled::Db, world: i32) -> Response {
if let Ok(data) = world_from_cache(db.clone(), world) {
return data;
}
let mut response = Response::default();
@ -84,40 +72,26 @@ pub async fn get_world(
response
}
#[tracing::instrument(skip(db))]
fn world_from_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32) -> Result<Response, ()> {
let db = db.get().unwrap();
let mut query = db.prepare("SELECT data FROM worlds WHERE id = ?").unwrap();
let value: Result<Vec<u8>, _> = query.query_row(params![world], |r| r.get(0));
fn world_from_cache(db: sled::Db, world: i32) -> Result<Response, ()> {
let key = format!("world:{}", world);
let value = match db.get(key) {
Ok(Some(value)) => value,
_ => return Err(()),
};
if value.is_err() {
tracing::debug!("Cache miss (non-exist) for world {}", world);
return Err(());
}
match bincode::deserialize::<Response>(value.unwrap().as_slice()) {
match bincode::deserialize::<Response>(&value) {
Ok(response) => {
if response.cached_at + chrono::Duration::minutes(5) < chrono::Utc::now() {
tracing::debug!("Cache miss (expired) for world {}", world);
if response.cached_at + chrono::Duration::minutes(3) < chrono::Utc::now() {
return Err(());
}
tracing::debug!("Cache hit for world {}", world);
Ok(response)
}
_ => {
tracing::debug!("Cache miss (corrupt) for world {}", world);
Err(())
}
_ => Err(()),
}
}
#[tracing::instrument(skip(db, response))]
fn world_to_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32, response: &Response) {
fn world_to_cache(db: sled::Db, world: i32, response: &Response) {
let key = format!("world:{}", world);
let value = bincode::serialize(response).unwrap();
let db = db.get().unwrap();
let mut query = db
.prepare("INSERT OR REPLACE INTO worlds (id, data) VALUES (?, ?)")
.unwrap();
query.execute(params![world, value]).unwrap();
db.insert(key, value).unwrap();
}

View file

@ -1,8 +1,6 @@
use crate::handlers::{get_all_worlds, get_one_world, get_world};
use crate::handlers::{get_all_worlds, get_one_world};
use axum::{response::Html, routing::get, Router};
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
use std::net::SocketAddr;
use tokio::task::JoinSet;
use tower_http::trace::TraceLayer;
mod handlers;
@ -12,21 +10,10 @@ mod types;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("tower_http=trace".parse().unwrap()),
)
.with_env_filter("tower_http=trace")
.init();
let sqlite_manager = SqliteConnectionManager::memory();
let pool = r2d2::Pool::new(sqlite_manager).unwrap();
pool.get()
.unwrap()
.execute(
"CREATE TABLE worlds (id INTEGER NOT NULL PRIMARY KEY, data BLOB);",
params![],
)
.unwrap();
let db = sled::open("/tmp/agg-population").expect("open");
let app = Router::new()
.route("/", get(root))
@ -34,29 +21,10 @@ async fn main() {
.route("/population/all", get(get_all_worlds))
.route("/population/:world", get(get_one_world))
.layer(TraceLayer::new_for_http())
.with_state(pool.clone());
.with_state(db);
tokio::spawn(async move {
loop {
let mut set = JoinSet::new();
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
set.spawn(get_world(pool.clone(), world, true));
}
while let Some(_) = set.join_next().await {}
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 3)).await;
}
});
let addr = SocketAddr::from((
[0, 0, 0, 0],
std::env::var("PORT")
.unwrap_or("3000".to_string())
.parse()
.unwrap(),
));
tracing::debug!("listening on http://{}", addr);
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await

View file

@ -1,5 +1,3 @@
use chrono::{Duration, Utc};
use crate::types::Population;
pub async fn saerro(world: i32) -> Result<Population, ()> {
@ -118,8 +116,6 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
#[derive(serde::Deserialize)]
struct Root {
#[serde(rename = "onlineCharacters")]
pub online_characters: i32,
#[serde(rename = "zoneStates")]
pub zone_states: Vec<ZoneState>,
}
@ -154,39 +150,29 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
.await
.unwrap();
let ns_avg: i32 = response
.zone_states
.iter()
.map(|zone| zone.population.ns)
.sum::<i32>()
/ 3;
let nc = response
.zone_states
.iter()
.map(|zone| zone.population.nc)
.sum::<i32>()
+ ns_avg;
let tr = response
.zone_states
.iter()
.map(|zone| zone.population.tr)
.sum::<i32>()
+ ns_avg;
let vs = response
.zone_states
.iter()
.map(|zone| zone.population.vs)
.sum::<i32>()
+ ns_avg;
Ok(Population {
nc,
tr,
vs,
total: response.online_characters,
nc: response
.zone_states
.iter()
.map(|zone| zone.population.nc)
.sum(),
tr: response
.zone_states
.iter()
.map(|zone| zone.population.tr)
.sum(),
vs: response
.zone_states
.iter()
.map(|zone| zone.population.vs)
.sum(),
total: response
.zone_states
.iter()
.map(|zone| {
zone.population.nc + zone.population.tr + zone.population.vs + zone.population.ns
})
.sum(),
})
}
@ -204,7 +190,6 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
#[derive(serde::Deserialize)]
struct World {
pub population: SanctuaryPopulation,
pub timestamp: i64,
}
#[derive(serde::Deserialize)]
@ -230,13 +215,6 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
.await
.unwrap();
// error if over 15 minutes old
if response.world_population_list[0].timestamp
< (Utc::now() - Duration::minutes(15)).timestamp()
{
return Err(());
}
Ok(Population {
nc: response.world_population_list[0].population.nc,
tr: response.world_population_list[0].population.tr,

View file

@ -1,6 +1,12 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct AllResponse {
pub worlds: Vec<Response>,
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct Response {