ws and pruner done!!!

This commit is contained in:
41666 2024-10-28 13:46:52 -07:00
parent c5cc245e25
commit 74add408e6
34 changed files with 1455 additions and 221 deletions

View file

@ -1,72 +0,0 @@
package main
import (
"context"
"database/sql"
"testing"
"time"
"github.com/genudine/saerro-go/translators"
"github.com/genudine/saerro-go/types"
"github.com/stretchr/testify/assert"
_ "modernc.org/sqlite"
)
func getEventHandlerTestShim(t *testing.T) (EventHandler, *sql.DB) {
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("test shim: sqlite open failed, %v", err)
}
return EventHandler{
Ingest: &Ingest{
DB: db,
},
}, db
}
func TestHandleDeath(t *testing.T) {
eh, db := getEventHandlerTestShim(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
event := types.ESSEvent{
EventName: "Death",
WorldID: 17,
ZoneID: 2,
CharacterID: "DollNC",
LoadoutID: "3",
TeamID: types.NC,
AttackerCharacterID: "Lyyti",
AttackerLoadoutID: "3",
AttackerTeamID: types.TR,
}
eh.HandleDeath(ctx, event)
type player struct {
CharacterID string `json:"character_id"`
ClassName string `json:"class_name"`
}
var player1 player
err := db.QueryRowContext(ctx, "SELECT * FROM players WHERE character_id = ?", event.CharacterID).Scan(&player1)
if err != nil {
t.Error(err)
}
assert.Equal(t, event.CharacterID, player1.CharacterID)
assert.Equal(t, translators.ClassFromLoadout(event.LoadoutID), player1.ClassName)
var player2 player
err = db.QueryRowContext(ctx, "SELECT * FROM players WHERE character_id = ?", event.AttackerCharacterID).Scan(&player2)
if err != nil {
t.Error(err)
}
assert.Equal(t, event.AttackerCharacterID, player2.CharacterID)
assert.Equal(t, translators.ClassFromLoadout(event.AttackerLoadoutID), player2.ClassName)
}

View file

@ -1,19 +1,30 @@
package main
package eventhandler
import (
"context"
"log"
"database/sql"
"github.com/genudine/saerro-go/cmd/ws/ingest"
"github.com/genudine/saerro-go/store"
"github.com/genudine/saerro-go/types"
)
type EventHandler struct {
Ingest *Ingest
Ingest *ingest.Ingest
}
func NewEventHandler(db *sql.DB) EventHandler {
return EventHandler{
Ingest: &ingest.Ingest{
PlayerStore: store.NewPlayerStore(db),
},
}
}
func (eh *EventHandler) HandleEvent(ctx context.Context, event types.ESSEvent) {
if event.EventName == "" {
log.Println("invalid event; dropping")
// log.Println("invalid event; dropping")
return
}
if event.EventName == "Death" || event.EventName == "VehicleDestroy" {
@ -27,14 +38,14 @@ func (eh *EventHandler) HandleEvent(ctx context.Context, event types.ESSEvent) {
func (eh *EventHandler) HandleDeath(ctx context.Context, event types.ESSEvent) {
if event.CharacterID != "" && event.CharacterID != "0" {
log.Println("got pop event")
pe := PopEventFromESSEvent(event, false)
// log.Println("got pop event")
pe := types.PopEventFromESSEvent(event, false)
eh.Ingest.TrackPop(ctx, pe)
}
if event.AttackerCharacterID != "" && event.AttackerCharacterID != "0" && event.AttackerTeamID != 0 {
log.Println("got attacker pop event")
pe := PopEventFromESSEvent(event, true)
pe := types.PopEventFromESSEvent(event, true)
// fmt.Println("got attacker pop event", event)
eh.Ingest.TrackPop(ctx, pe)
}
}
@ -45,18 +56,16 @@ func (eh *EventHandler) HandleExperience(ctx context.Context, event types.ESSEve
switch event.ExperienceID {
case 201: // Galaxy Spawn Bonus
vehicleID = "11"
break
case 233: // Sunderer Spawn Bonus
vehicleID = "2"
break
case 674: // ANT stuff
case 674:
fallthrough // ANT stuff
case 675:
vehicleID = "160"
break
}
event.VehicleID = vehicleID
pe := PopEventFromESSEvent(event, false)
pe := types.PopEventFromESSEvent(event, false)
eh.Ingest.TrackPop(ctx, pe)
}

View file

@ -0,0 +1,144 @@
package eventhandler
import (
"context"
"testing"
"time"
"github.com/avast/retry-go"
"github.com/stretchr/testify/assert"
"github.com/genudine/saerro-go/cmd/ws/ingest"
"github.com/genudine/saerro-go/store"
"github.com/genudine/saerro-go/translators"
"github.com/genudine/saerro-go/types"
"github.com/genudine/saerro-go/util/testutil"
)
func getEventHandlerTestShim(t *testing.T) (EventHandler, context.Context) {
t.Helper()
db := testutil.GetTestDB(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)
return EventHandler{
Ingest: &ingest.Ingest{
PlayerStore: store.NewPlayerStore(db),
},
}, ctx
}
func TestHandleDeath(t *testing.T) {
eh, ctx := getEventHandlerTestShim(t)
event := types.ESSEvent{
EventName: "Death",
WorldID: 17,
ZoneID: 2,
CharacterID: "LyytisDoll",
LoadoutID: 3,
TeamID: types.NC,
AttackerCharacterID: "Lyyti",
AttackerLoadoutID: 3,
AttackerTeamID: types.TR,
}
eh.HandleDeath(ctx, event)
player1, err := eh.Ingest.PlayerStore.GetOne(ctx, event.CharacterID)
assert.NoError(t, err, "player1 fetch failed")
assert.Equal(t, event.CharacterID, player1.CharacterID)
assert.Equal(t, string(translators.ClassFromLoadout(event.LoadoutID)), player1.ClassName)
player2, err := eh.Ingest.PlayerStore.GetOne(ctx, event.AttackerCharacterID)
assert.NoError(t, err, "player2 fetch failed")
assert.Equal(t, event.AttackerCharacterID, player2.CharacterID)
assert.Equal(t, string(translators.ClassFromLoadout(event.AttackerLoadoutID)), player2.ClassName)
}
func TestHandleExperience(t *testing.T) {
eh, ctx := getEventHandlerTestShim(t)
event := types.ESSEvent{
EventName: "GainExperience",
WorldID: 17,
ZoneID: 2,
CharacterID: "LyytisDoll",
LoadoutID: 3,
TeamID: types.NC,
ExperienceID: 674,
}
eh.HandleExperience(ctx, event)
player, err := eh.Ingest.PlayerStore.GetOne(ctx, event.CharacterID)
assert.NoError(t, err, "player fetch check failed")
assert.Equal(t, event.CharacterID, player.CharacterID)
assert.Equal(t, string(translators.ClassFromLoadout(event.LoadoutID)), player.ClassName)
}
func TestHandleAnalytics(t *testing.T) {
eh, ctx := getEventHandlerTestShim(t)
event := types.ESSEvent{
EventName: "GainExperience",
WorldID: 17,
ZoneID: 2,
CharacterID: "LyytisDoll",
LoadoutID: 3,
TeamID: types.NC,
ExperienceID: 674,
}
eh.HandleAnalytics(ctx, event)
}
func TestHandleEvent(t *testing.T) {
eh, ctx := getEventHandlerTestShim(t)
events := []types.ESSEvent{
{
EventName: "Death",
WorldID: 17,
ZoneID: 2,
CharacterID: "LyytisDoll",
LoadoutID: 3,
TeamID: types.NC,
AttackerCharacterID: "Lyyti",
AttackerLoadoutID: 3,
AttackerTeamID: types.TR,
},
{
EventName: "GainExperience",
WorldID: 17,
ZoneID: 2,
CharacterID: "DollNC",
LoadoutID: 3,
TeamID: types.NC,
ExperienceID: 201,
},
}
for _, event := range events {
eh.HandleEvent(ctx, event)
}
checkPlayers := []string{"LyytisDoll", "Lyyti", "DollNC"}
for _, id := range checkPlayers {
// eventual consistency <333
err := retry.Do(func() error {
_, err := eh.Ingest.PlayerStore.GetOne(ctx, id)
return err
})
assert.NoError(t, err)
}
}

View file

@ -1,14 +0,0 @@
package main
import (
"context"
"database/sql"
)
type Ingest struct {
DB *sql.DB
}
func (i *Ingest) TrackPop(ctx context.Context, event PopEvent) {
}

55
cmd/ws/ingest/ingest.go Normal file
View file

@ -0,0 +1,55 @@
package ingest
import (
"context"
"fmt"
"log"
"github.com/genudine/saerro-go/store"
"github.com/genudine/saerro-go/types"
)
type Ingest struct {
PlayerStore store.IPlayerStore
}
func (i *Ingest) TrackPop(ctx context.Context, event types.PopEvent) {
player := event.ToPlayer()
err := i.fixupPlayer(ctx, player)
if err != nil {
log.Println("ingest: player fixup failed, dropping event", err)
return
}
err = i.PlayerStore.Insert(ctx, player)
if err != nil {
log.Println("TrackPop Insert failed", err)
}
}
func (i *Ingest) fixupPlayer(ctx context.Context, player *types.Player) error {
if player.ClassName != "unknown" && player.FactionID != 0 {
// all fixups are done
return nil
}
storedPlayer, err := i.PlayerStore.GetOne(ctx, player.CharacterID)
if err != nil {
return fmt.Errorf("ingest: fixupPlayer: fetching player %s failed: %w", player.CharacterID, err)
}
// probably VehicleDestroy
if player.ClassName == "unknown" {
// TODO: maybe get this from census, profile_id
player.ClassName = storedPlayer.ClassName
}
// probably PS4
if player.FactionID == 0 {
// TODO: get this from census
player.FactionID = storedPlayer.FactionID
}
return nil
}

View file

@ -0,0 +1,84 @@
package ingest_test
import (
"context"
"errors"
"testing"
"time"
"github.com/genudine/saerro-go/cmd/ws/ingest"
"github.com/genudine/saerro-go/store/storemock"
"github.com/genudine/saerro-go/translators"
"github.com/genudine/saerro-go/types"
)
func mkIngest(t *testing.T) (context.Context, *ingest.Ingest, *storemock.MockPlayerStore) {
t.Helper()
ps := new(storemock.MockPlayerStore)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)
i := &ingest.Ingest{
PlayerStore: ps,
}
return ctx, i, ps
}
func TestTrackPopHappyPath(t *testing.T) {
ctx, i, ps := mkIngest(t)
// Combat Medic on Emerald
event := types.PopEvent{
WorldID: 17,
ZoneID: 4,
TeamID: types.TR,
LoadoutID: 4,
ClassName: translators.CombatMedic,
CharacterID: "aaaa",
}
eventPlayer := event.ToPlayer()
ps.On("Insert", ctx, eventPlayer).Return(nil).Once()
i.TrackPop(ctx, event)
}
func TestTrackPopFixup(t *testing.T) {
ctx, i, ps := mkIngest(t)
event := types.PopEvent{
WorldID: 17,
ZoneID: 4,
TeamID: 0,
ClassName: "unknown",
CharacterID: "bbbb",
}
pastEventPlayer := event.ToPlayer()
pastEventPlayer.ClassName = "light_assault"
pastEventPlayer.FactionID = types.VS
ps.On("GetOne", ctx, event.CharacterID).Return(pastEventPlayer, nil).Once()
ps.On("Insert", ctx, pastEventPlayer).Return(nil).Once()
i.TrackPop(ctx, event)
}
func TestTrackPopFixupFailed(t *testing.T) {
ctx, i, ps := mkIngest(t)
event := types.PopEvent{
WorldID: 17,
ZoneID: 4,
TeamID: 0,
ClassName: "unknown",
CharacterID: "bbbb",
}
ps.On("GetOne", ctx, event.CharacterID).Return(nil, errors.New("ingest fixup failed")).Once()
i.TrackPop(ctx, event)
}

View file

@ -1,42 +0,0 @@
package main
import (
"github.com/genudine/saerro-go/translators"
"github.com/genudine/saerro-go/types"
)
type PopEvent struct {
WorldID uint16
ZoneID uint32
CharacterID string
LoadoutID string
TeamID types.Faction
VehicleID string
VehicleName translators.Vehicle
ClassName translators.Class
}
func PopEventFromESSEvent(event types.ESSEvent, attacker bool) PopEvent {
pe := PopEvent{
WorldID: event.WorldID,
ZoneID: event.ZoneID,
}
if !attacker {
pe.CharacterID = event.CharacterID
pe.LoadoutID = event.LoadoutID
pe.TeamID = event.TeamID
pe.VehicleID = event.VehicleID
} else {
pe.CharacterID = event.AttackerCharacterID
pe.LoadoutID = event.AttackerLoadoutID
pe.TeamID = event.AttackerTeamID
pe.VehicleID = event.AttackerVehicleID
}
pe.ClassName = translators.ClassFromLoadout(pe.LoadoutID)
pe.VehicleName = translators.VehicleNameFromID(pe.VehicleID)
return pe
}

View file

@ -2,14 +2,15 @@ package main
import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/genudine/saerro-go/types"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"github.com/genudine/saerro-go/cmd/ws/eventhandler"
"github.com/genudine/saerro-go/cmd/ws/wsmanager"
"github.com/genudine/saerro-go/util"
)
func main() {
@ -18,54 +19,41 @@ func main() {
log.Fatalln("WS_ADDR is not set.")
}
db, err := sql.Open("sqlite", ":memory:")
db, err := util.GetDBConnection(os.Getenv("DB_ADDR"))
if err != nil {
log.Fatalln("database connection failed", err)
log.Fatalln(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
eventHandler := eventhandler.NewEventHandler(db)
wsm := wsmanager.NewWebsocketManager(eventHandler)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
wsConn, _, err := websocket.Dial(ctx, wsAddr, nil)
err = wsm.Connect(ctx, wsAddr)
if err != nil {
log.Fatalln("Connection to ESS failed.", err)
}
defer wsConn.Close(websocket.StatusInternalError, "internal error. bye")
err = wsjson.Write(ctx, wsConn, map[string]interface{}{
"action": "subscribe",
"worlds": "all",
"eventNames": getEventNames(),
"characters": []string{"all"},
"service": "event",
"logicalAndCharactersWithWorlds": true,
})
if err != nil {
log.Fatalln("subscription write failed", err)
log.Fatalln(err)
}
log.Println("subscribe done")
go wsm.Start()
eventHandler := EventHandler{
Ingest: &Ingest{
DB: db,
},
}
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var event types.ESSData
err := wsjson.Read(ctx, wsConn, &event)
go func() {
time.Sleep(time.Second * 1)
err = wsm.Subscribe(ctx)
if err != nil {
log.Println("wsjson read failed", err)
cancel()
continue
wsm.FailClose()
log.Fatalln("subscribe failed", err)
}
log.Println("sent subscribe")
}()
go eventHandler.HandleEvent(ctx, event.Payload)
exitSignal := make(chan os.Signal, 1)
signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM)
select {
case <-exitSignal:
log.Println("got interrupt, exiting...")
case <-wsm.Closed:
log.Println("websocket closed, bailing...")
}
wsConn.Close(websocket.StatusNormalClosure, "")
}

View file

@ -1,4 +1,4 @@
package main
package wsmanager
import (
"fmt"

View file

@ -1,4 +1,4 @@
package main
package wsmanager
import (
"testing"

View file

@ -0,0 +1,124 @@
package wsmanager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/coder/websocket"
"github.com/genudine/saerro-go/cmd/ws/eventhandler"
"github.com/genudine/saerro-go/types"
)
type WebsocketManager struct {
Conn *websocket.Conn
EventHandler eventhandler.EventHandler
Closed chan bool
}
func NewWebsocketManager(eh eventhandler.EventHandler) WebsocketManager {
return WebsocketManager{
EventHandler: eh,
Closed: make(chan bool, 1),
}
}
func (wsm *WebsocketManager) Connect(ctx context.Context, addr string) (err error) {
wsm.Conn, _, err = websocket.Dial(ctx, addr, nil)
if err != nil {
return fmt.Errorf("wsm: connect failed: %w", err)
}
log.Println("wsm: connected to", addr)
return
}
type ESSSubscription struct {
Action string `json:"action,omitempty"`
Worlds []string `json:"worlds,omitempty"`
EventNames []string `json:"eventNames,omitempty"`
Characters []string `json:"characters,omitempty"`
Service string `json:"service,omitempty"`
LogicalAndCharactersWithWorlds bool `json:"logicalAndCharactersWithWorlds,omitempty"`
}
func (wsm *WebsocketManager) Subscribe(ctx context.Context) error {
sub := ESSSubscription{
Action: "subscribe",
Service: "event",
Worlds: []string{"all"},
EventNames: getEventNames(),
Characters: []string{"all"},
LogicalAndCharactersWithWorlds: true,
}
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(sub)
if err != nil {
return fmt.Errorf("wsm: subscribe: json encode failed: %w", err)
}
log.Printf("wsm: subscribe message: %s", buf.String())
err = wsm.Conn.Write(ctx, websocket.MessageText, buf.Bytes())
if err != nil {
return fmt.Errorf("wsm: subscribe: ws write failed: %w", err)
}
return nil
}
func (wsm *WebsocketManager) Start() {
go wsm.startWatchdog()
for {
ctx := context.Background()
var event types.ESSData
_, data, err := wsm.Conn.Read(ctx)
if err != nil {
log.Fatalln("wsm: read failed:", err)
}
// log.Printf("raw event: %s", string(data))
err = json.Unmarshal(data, &event)
if err != nil {
log.Println("wsm: json unmarshal failed:", err)
log.Println("wsm: json unmarshal failed (payload)", string(data))
}
go wsm.EventHandler.HandleEvent(ctx, event.Payload)
}
}
func (wsm *WebsocketManager) startWatchdog() {
for {
time.Sleep(time.Second * 30)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
err := wsm.Conn.Ping(ctx)
if err != nil {
log.Println("wsm: watchdog failed")
wsm.Closed <- true
}
cancel()
}
}
func (wsm *WebsocketManager) Close() {
wsm.Conn.Close(websocket.StatusNormalClosure, "")
wsm.Closed <- true
}
func (wsm *WebsocketManager) FailClose() {
wsm.Conn.Close(websocket.StatusAbnormalClosure, "")
wsm.Closed <- true
}