Compare commits
7 commits
Author | SHA1 | Date | |
---|---|---|---|
d22a0762ab | |||
6b9c879ba7 | |||
5fef497bfc | |||
315a05d59a | |||
30266b294d | |||
ae2c32e5ca | |||
65849ab8bd |
7 changed files with 613 additions and 520 deletions
914
Cargo.lock
generated
914
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -12,9 +12,11 @@ serde_json = "1.0.96"
|
|||
tokio = { version = "1.0", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
sled = "0.34"
|
||||
reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots", "rustls", "json"] }
|
||||
openssl = { version = "0.10", features = ["vendored"] }
|
||||
bincode = "1.3"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tower-http = { version = "0.4", features = ["trace"] }
|
||||
r2d2_sqlite = "0.22"
|
||||
r2d2 = "0.8"
|
||||
rusqlite = { version = "0.29.0", features = ["bundled"] }
|
|
@ -1,27 +0,0 @@
|
|||
job "agg-population" {
|
||||
type = "service"
|
||||
|
||||
update {
|
||||
max_parallel = 1
|
||||
stagger = "10s"
|
||||
}
|
||||
|
||||
group "api" {
|
||||
count = 1
|
||||
|
||||
network {
|
||||
port "http" {
|
||||
static = 3000
|
||||
}
|
||||
}
|
||||
|
||||
task "api" {
|
||||
driver = "docker"
|
||||
|
||||
config {
|
||||
image = "ghcr.io/genudine/agg-population/agg-population:latest"
|
||||
ports = ["http"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,23 +1,29 @@
|
|||
use crate::{
|
||||
sources::{fisu, honu, saerro, sanctuary, voidwell},
|
||||
types::{AllResponse, Population, Response},
|
||||
types::{Population, Response},
|
||||
};
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
Json,
|
||||
};
|
||||
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub async fn get_one_world(State(db): State<sled::Db>, Path(world): Path<i32>) -> Json<Response> {
|
||||
Json(get_world(db, world).await)
|
||||
pub async fn get_one_world(
|
||||
State(db): State<r2d2::Pool<SqliteConnectionManager>>,
|
||||
Path(world): Path<i32>,
|
||||
) -> Json<Response> {
|
||||
Json(get_world(db, world, false).await)
|
||||
}
|
||||
|
||||
pub async fn get_all_worlds(State(db): State<sled::Db>) -> Json<AllResponse> {
|
||||
pub async fn get_all_worlds(
|
||||
State(db): State<r2d2::Pool<SqliteConnectionManager>>,
|
||||
) -> Json<Vec<Response>> {
|
||||
let mut set = JoinSet::new();
|
||||
let mut worlds = vec![Response::default(); 8];
|
||||
|
||||
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
|
||||
set.spawn(get_world(db.clone(), world));
|
||||
set.spawn(get_world(db.clone(), world, false));
|
||||
}
|
||||
|
||||
let mut i = 0;
|
||||
|
@ -26,13 +32,19 @@ pub async fn get_all_worlds(State(db): State<sled::Db>) -> Json<AllResponse> {
|
|||
i += 1;
|
||||
}
|
||||
|
||||
Json(AllResponse { worlds })
|
||||
Json(worlds)
|
||||
}
|
||||
|
||||
async fn get_world(db: sled::Db, world: i32) -> Response {
|
||||
pub async fn get_world(
|
||||
db: r2d2::Pool<SqliteConnectionManager>,
|
||||
world: i32,
|
||||
skip_cache: bool,
|
||||
) -> Response {
|
||||
if !skip_cache {
|
||||
if let Ok(data) = world_from_cache(db.clone(), world) {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
let mut response = Response::default();
|
||||
response.id = world;
|
||||
|
@ -72,26 +84,40 @@ async fn get_world(db: sled::Db, world: i32) -> Response {
|
|||
response
|
||||
}
|
||||
|
||||
fn world_from_cache(db: sled::Db, world: i32) -> Result<Response, ()> {
|
||||
let key = format!("world:{}", world);
|
||||
let value = match db.get(key) {
|
||||
Ok(Some(value)) => value,
|
||||
_ => return Err(()),
|
||||
};
|
||||
#[tracing::instrument(skip(db))]
|
||||
fn world_from_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32) -> Result<Response, ()> {
|
||||
let db = db.get().unwrap();
|
||||
let mut query = db.prepare("SELECT data FROM worlds WHERE id = ?").unwrap();
|
||||
let value: Result<Vec<u8>, _> = query.query_row(params![world], |r| r.get(0));
|
||||
|
||||
match bincode::deserialize::<Response>(&value) {
|
||||
Ok(response) => {
|
||||
if response.cached_at + chrono::Duration::minutes(3) < chrono::Utc::now() {
|
||||
if value.is_err() {
|
||||
tracing::debug!("Cache miss (non-exist) for world {}", world);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
match bincode::deserialize::<Response>(value.unwrap().as_slice()) {
|
||||
Ok(response) => {
|
||||
if response.cached_at + chrono::Duration::minutes(5) < chrono::Utc::now() {
|
||||
tracing::debug!("Cache miss (expired) for world {}", world);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
tracing::debug!("Cache hit for world {}", world);
|
||||
Ok(response)
|
||||
}
|
||||
_ => Err(()),
|
||||
_ => {
|
||||
tracing::debug!("Cache miss (corrupt) for world {}", world);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn world_to_cache(db: sled::Db, world: i32, response: &Response) {
|
||||
let key = format!("world:{}", world);
|
||||
#[tracing::instrument(skip(db, response))]
|
||||
fn world_to_cache(db: r2d2::Pool<SqliteConnectionManager>, world: i32, response: &Response) {
|
||||
let value = bincode::serialize(response).unwrap();
|
||||
db.insert(key, value).unwrap();
|
||||
let db = db.get().unwrap();
|
||||
let mut query = db
|
||||
.prepare("INSERT OR REPLACE INTO worlds (id, data) VALUES (?, ?)")
|
||||
.unwrap();
|
||||
query.execute(params![world, value]).unwrap();
|
||||
}
|
||||
|
|
44
src/main.rs
44
src/main.rs
|
@ -1,6 +1,8 @@
|
|||
use crate::handlers::{get_all_worlds, get_one_world};
|
||||
use crate::handlers::{get_all_worlds, get_one_world, get_world};
|
||||
use axum::{response::Html, routing::get, Router};
|
||||
use r2d2_sqlite::{rusqlite::params, SqliteConnectionManager};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::task::JoinSet;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
mod handlers;
|
||||
|
@ -10,10 +12,21 @@ mod types;
|
|||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter("tower_http=trace")
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::from_default_env()
|
||||
.add_directive("tower_http=trace".parse().unwrap()),
|
||||
)
|
||||
.init();
|
||||
|
||||
let db = sled::open("/tmp/agg-population").expect("open");
|
||||
let sqlite_manager = SqliteConnectionManager::memory();
|
||||
let pool = r2d2::Pool::new(sqlite_manager).unwrap();
|
||||
pool.get()
|
||||
.unwrap()
|
||||
.execute(
|
||||
"CREATE TABLE worlds (id INTEGER NOT NULL PRIMARY KEY, data BLOB);",
|
||||
params![],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let app = Router::new()
|
||||
.route("/", get(root))
|
||||
|
@ -21,10 +34,29 @@ async fn main() {
|
|||
.route("/population/all", get(get_all_worlds))
|
||||
.route("/population/:world", get(get_one_world))
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.with_state(db);
|
||||
.with_state(pool.clone());
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
|
||||
tracing::debug!("listening on {}", addr);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let mut set = JoinSet::new();
|
||||
for world in vec![1, 10, 13, 17, 19, 40, 1000, 2000] {
|
||||
set.spawn(get_world(pool.clone(), world, true));
|
||||
}
|
||||
|
||||
while let Some(_) = set.join_next().await {}
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 3)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let addr = SocketAddr::from((
|
||||
[0, 0, 0, 0],
|
||||
std::env::var("PORT")
|
||||
.unwrap_or("3000".to_string())
|
||||
.parse()
|
||||
.unwrap(),
|
||||
));
|
||||
tracing::debug!("listening on http://{}", addr);
|
||||
axum::Server::bind(&addr)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use chrono::{Duration, Utc};
|
||||
|
||||
use crate::types::Population;
|
||||
|
||||
pub async fn saerro(world: i32) -> Result<Population, ()> {
|
||||
|
@ -116,6 +118,8 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
|
|||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Root {
|
||||
#[serde(rename = "onlineCharacters")]
|
||||
pub online_characters: i32,
|
||||
#[serde(rename = "zoneStates")]
|
||||
pub zone_states: Vec<ZoneState>,
|
||||
}
|
||||
|
@ -150,29 +154,39 @@ pub async fn voidwell(world: i32) -> Result<Population, ()> {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(Population {
|
||||
nc: response
|
||||
let ns_avg: i32 = response
|
||||
.zone_states
|
||||
.iter()
|
||||
.map(|zone| zone.population.ns)
|
||||
.sum::<i32>()
|
||||
/ 3;
|
||||
|
||||
let nc = response
|
||||
.zone_states
|
||||
.iter()
|
||||
.map(|zone| zone.population.nc)
|
||||
.sum(),
|
||||
tr: response
|
||||
.sum::<i32>()
|
||||
+ ns_avg;
|
||||
|
||||
let tr = response
|
||||
.zone_states
|
||||
.iter()
|
||||
.map(|zone| zone.population.tr)
|
||||
.sum(),
|
||||
vs: response
|
||||
.sum::<i32>()
|
||||
+ ns_avg;
|
||||
|
||||
let vs = response
|
||||
.zone_states
|
||||
.iter()
|
||||
.map(|zone| zone.population.vs)
|
||||
.sum(),
|
||||
total: response
|
||||
.zone_states
|
||||
.iter()
|
||||
.map(|zone| {
|
||||
zone.population.nc + zone.population.tr + zone.population.vs + zone.population.ns
|
||||
})
|
||||
.sum(),
|
||||
.sum::<i32>()
|
||||
+ ns_avg;
|
||||
|
||||
Ok(Population {
|
||||
nc,
|
||||
tr,
|
||||
vs,
|
||||
total: response.online_characters,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -190,6 +204,7 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
|
|||
#[derive(serde::Deserialize)]
|
||||
struct World {
|
||||
pub population: SanctuaryPopulation,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
|
@ -215,6 +230,13 @@ pub async fn sanctuary(world: i32) -> Result<Population, ()> {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// error if over 15 minutes old
|
||||
if response.world_population_list[0].timestamp
|
||||
< (Utc::now() - Duration::minutes(15)).timestamp()
|
||||
{
|
||||
return Err(());
|
||||
}
|
||||
|
||||
Ok(Population {
|
||||
nc: response.world_population_list[0].population.nc,
|
||||
tr: response.world_population_list[0].population.tr,
|
||||
|
|
|
@ -1,12 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
|
||||
pub struct AllResponse {
|
||||
pub worlds: Vec<Response>,
|
||||
}
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
|
||||
pub struct Response {
|
||||
|
|
Loading…
Add table
Reference in a new issue