ess-demux

This commit is contained in:
41666 2023-07-08 23:31:18 -04:00
parent 9ebeeb47f4
commit 5fa05ac73f
9 changed files with 1498 additions and 596 deletions

1807
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"services/*",
]

View file

@ -6,9 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde_json = "1.0.96"
serde = "1.0.163"
async-graphql = { version = "5.0.8", features = ["chrono"] }
serde_json = "1.0"
serde = "1.0"
async-graphql = { version = "5.0", features = ["chrono"] }
axum = "0.6.18"
sqlx = { version = "0.6.3", default_features = false, features = [ "runtime-tokio-rustls", "postgres", "chrono" ] }
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread"] }

View file

@ -0,0 +1,16 @@
[package]
name = "ess-demux"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.19", features = ["rustls-tls-webpki-roots"] }
tracing = "0.1"
tracing-subscriber = "0.3"
future-utils = "0.12"
futures-channel = "0.3"
futures = "0.3"
serde_json = "1.0"

View file

@ -0,0 +1,49 @@
# ESS Demux
This service guarantees one thing to you; it will have a websocket connected with ESS events.
The specific flow is as follows:
1. If https://push.nanite-systems.net/ is up, the client websocket is wired to that.
2. Else, connect to https://push.planetside2.com/ based on `?environment={}`, and the client websocket is wired to either 1 or 3 of those.
- If environment = `all`, it will connect 3 times to `pc`, `ps4us`, and `ps4eu`.
- Else, connect to specified environment.
- Also, try reconnecting to the main socket every minute.
3. If that fails, the client websocket will never respond.
## Why would you want this?
NSS helps be resilient to ESS failures, but NSS isn't failure-proof itself. This acts as a proxy that'll gracefully select one source or another.
### Alternatives
If you can accept the loss of PS4 data, you may use nginx or HAProxy to achieve the same effect...
[**nginx example.conf**](./docs/alternatives/ess.nginx.conf)
The above may not work entirely correctly... ymmv.
Saerro **does** want PS4 data, so we use the ess-demux service.
## How to use this
The service runs on port 8007 by default, you can change it to whatever via `PORT`, if you're using this as a bare service. You may also change the `DEFAULT_SERVICE_ID` from `s:example`; allowing you to omit this from the URL.
`docker run -d -p 8007:8007 ghcr.io/genudine/saerro/ess-demux:latest`
Connect to `ws://localhost:8007/streaming?environment=all&service-id=s:example`
Send subscriptions like any other ESS-compatible websocket.
Upon connection, you can expect an event like this:
```json
{
"connected": true,
"service": "ess-demux",
"type": "essDemuxConnectionStateChanged",
"upstream": "nss" // or "ess"
}
```

View file

@ -0,0 +1,49 @@
upstream ess-demux {
server localhost:8008;
server localhost:8009 backup;
}
resolver 1.1.1.1 1.0.0.1;
server {
listen 8007 default_server;
server_name _;
location /streaming {
proxy_pass http://ess-demux;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
server {
listen 8008;
add_header ess-demux-server "nss" always;
location / {
proxy_pass https://push.nanite-systems.net;
proxy_set_header Host push.nanite-systems.net;
proxy_ssl_name push.nanite-systems.net;
proxy_ssl_server_name on;
proxy_ssl_protocols TLSv1.3;
proxy_ssl_verify off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
server {
listen 8009;
add_header ess-demux-server "ess" always;
location / {
proxy_pass https://push.planetside2.com;
proxy_set_header Host push.planetside2.com;
proxy_ssl_name push.planetside2.com;
proxy_ssl_server_name on;
proxy_ssl_protocols TLSv1.2 TLSv1.3;
proxy_ssl_verify off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

View file

@ -0,0 +1,61 @@
use futures::{pin_mut, select, FutureExt, StreamExt, TryStreamExt};
use futures_channel::mpsc::unbounded;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tracing::{debug, info};
use crate::remote_manager::RemoteManager;
mod remote_manager;
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
info!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
let (local_to_remote_tx, local_to_remote_rx) = unbounded();
let (remote_to_local_tx, remote_to_local_rx) = unbounded();
let (local_outgoing, local_incoming) = ws_stream.split();
// Our client sent us a message, forward to ESS
let local_to_remote = local_incoming.map(Ok).forward(local_to_remote_tx);
// ESS sent us a message, forward to our client
let remote_to_local = remote_to_local_rx.map(Ok).forward(local_outgoing);
let upstream_connection = tokio::spawn(async move {
let mut remote = RemoteManager::new(local_to_remote_rx, remote_to_local_tx.clone());
remote.connect().await;
})
.fuse();
pin_mut!(local_to_remote, remote_to_local, upstream_connection);
select! {
_ = local_to_remote => debug!("local_to_remote exited"),
_ = remote_to_local => debug!("remote_to_local exited"),
_ = upstream_connection => debug!("upstream_connection exited"),
}
info!("Client {} disconnected", addr);
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let addr = format!(
"0.0.0.0:{}",
std::env::var("PORT").unwrap_or("8007".to_string())
);
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr));
}
}

View file

@ -0,0 +1,97 @@
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use serde_json::json;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, warn};
pub struct RemoteManager {
recv: UnboundedReceiver<Result<Message, tokio_tungstenite::tungstenite::Error>>,
send: UnboundedSender<Message>,
current_upstream: Option<String>,
nss_failed: bool,
}
impl RemoteManager {
pub fn new(
recv: UnboundedReceiver<Result<Message, tokio_tungstenite::tungstenite::Error>>,
send: UnboundedSender<Message>,
) -> Self {
Self {
recv,
send,
current_upstream: None,
nss_failed: false,
}
}
pub async fn connect(&mut self) {
self.send_connection_state_changed().await;
loop {
self.connect_loop().await;
}
}
async fn connect_loop(&mut self) {
if self.nss_failed {
self.connect_ess().await.expect("connect_ess failed");
return;
}
match self.connect_nss().await {
Ok(_) => {
self.nss_failed = false;
warn!("nss connection closed")
}
Err(e) => {
warn!("Failed to connect to NSS: {}", e);
self.nss_failed = true;
match self.connect_ess().await {
Ok(_) => {
warn!("ess connection closed")
}
Err(e) => {
error!("Failed to connect to ESS: {}", e);
self.current_upstream = None;
}
}
}
}
}
async fn connect_nss(&mut self) -> Result<(), tokio_tungstenite::tungstenite::Error> {
self.current_upstream = Some("nss".to_string());
self.ws_connect(
"wss://push.nanite-systems.net/streaming?environment=all&service-id=s:medkit2",
)
.await?;
self.send_connection_state_changed().await;
Ok(())
}
async fn connect_ess(&mut self) -> Result<(), tokio_tungstenite::tungstenite::Error> {
self.current_upstream = Some("ess".to_string());
self.ws_connect("wss://push.planetside2.com/streaming?environment=pc&service-id=s:medkit2")
.await?;
self.send_connection_state_changed().await;
Ok(())
}
async fn ws_connect(&mut self, url: &str) -> Result<(), tokio_tungstenite::tungstenite::Error> {
todo!()
}
async fn send_connection_state_changed(&self) {
self.send
.unbounded_send(
json!({
"connected": self.current_upstream.is_some(),
"service": "ess-demux",
"type": "essDemuxConnectionStateChanged",
"upstream": self.current_upstream,
})
.to_string()
.into(),
)
.expect("send_connection_state_changed failed");
}
}

View file

@ -315,7 +315,13 @@ async fn main() {
println!("[ws] Connecting to {}", url);
let (tx, rx) = futures::channel::mpsc::unbounded();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
let ws_stream = match connect_async(url).await {
Ok((ws_stream, _)) => ws_stream,
Err(e) => {
println!("Error: {}", e);
return;
}
};
let (write, read) = ws_stream.split();
let fused_writer = rx.map(Ok).forward(write).fuse();