[rpc]: implement both HTTP and NATS transports

This commit is contained in:
41666 2019-06-03 07:22:00 -05:00
parent 30d08a630f
commit 9de1b93a7f
No known key found for this signature in database
GPG key ID: BC51D07640DC10AF
13 changed files with 397 additions and 24 deletions

View file

@ -2,6 +2,8 @@
"name": "@roleypoly/rpc",
"version": "2.0.0",
"devDependencies": {
"@types/superagent": "^4.1.1",
"@types/cookie": "^0.3.3",
"lint-staged": "^8.1.7",
"tslint": "^5.17.0",
"typescript": "^3.5.1"
@ -13,6 +15,10 @@
},
"private": true,
"dependencies": {
"@kayteh/bento": "^0.1.1"
"@kayteh/bento": "^0.2.2",
"cookie": "^0.4.0",
"cross-fetch": "^3.0.3",
"nats": "^1.2.10",
"superagent": "^5.0.5"
}
}

View file

@ -0,0 +1,2 @@
export { default as NATSTransport } from './utils/NATSTransport'
export { default as HTTPTransport } from './utils/HTTPTransport'

View file

@ -1,11 +0,0 @@
/**
* GENERATED FILE. This file was generated by @kayteh/bento. Editing it is a bad idea.
* @generated
*/
import Bento, { IBentoTransport } from '@kayteh/bento'
export type ServerSlug = {
id?: string
name?: string
ownerID?: string
icon?: string
}

View file

@ -1,5 +1,7 @@
syntax = "proto3";
// @bento-exclude
message ServerSlug {
string id = 1;
string name = 2;

View file

@ -0,0 +1,155 @@
import Bento, { Transport, IBentoSerializer } from '@kayteh/bento'
import http from 'http'
import superagent from 'superagent'
import cookie from 'cookie'
const txtEnc = new TextEncoder()
const txtDec = new TextDecoder()
const castString = (val: string | string[] | undefined): string => {
if (typeof val === 'string') {
return val
}
if (Array.isArray(val)) {
return val[0]
}
return val || ''
}
export type HTTPContext = {
cookies: { [x: string]: string }
headers: http.IncomingHttpHeaders
requestor: {
userAgent: string
clientIp: string
}
}
export default class HTTPTransport extends Transport {
constructor (
bento: Bento,
serializer: IBentoSerializer,
private endpoint: string,
private injectHeaders: { [x: string]: string } = {}
) {
super(bento, serializer)
}
handler = () => (req: http.IncomingMessage, res: http.ServerResponse) => {
// we're using bare http, so this can get a little dicey
// we do not assume we are routing in any special way here.
// a standardized approach would be POST /api/_rpc
if (req.method !== 'POST') {
res.statusCode = 405
res.end('Method not acceptable.')
return
}
return this.run(req, res)
}
run = (req: http.IncomingMessage, res: http.ServerResponse) => {
let buf = ''
req.on('data', (chunk: string) => {
buf += chunk
})
req.on('end', async () => {
const o = await this.receiver({ buffer: txtEnc.encode(buf), ctx: this.getContext(req, res) })
res.statusCode = 200
res.end(o)
})
}
/**
* get real client IP from headers or fallback to a default.
* since proxies add headers to tell a backend what is relevant,
* we use this failover pattern:
* - True-Client-IP (from Cloudflare)
* - X-Forwarded-For (0 position is true client)
* - X-Client-IP (from Cloudfront, or even HAProxy by hand)
* - default
* @param h http headers
* @param def fallback (usually socket remoteAddr)
*/
getClientIP (h: http.IncomingHttpHeaders, def: string): string {
// we cast all of these to string because there will literally never be another.
if (h['true-client-ip'] !== undefined) {
return castString(h['true-client-ip'])
}
if (h['x-client-ip'] !== undefined) {
return castString(h['x-client-ip'])
}
if (h['x-forwarded-for'] !== undefined) {
return castString(h['x-forwarded-for']).split(', ')[0]
}
return def
}
// overridable
getContext = (req: http.IncomingMessage, res: http.ServerResponse): HTTPContext => {
return {
headers: req.headers,
cookies: cookie.parse(req.headers.cookie || ''),
requestor: {
clientIp: this.getClientIP(req.headers, req.socket.remoteAddress || ''),
userAgent: req.headers['user-agent'] || ''
}
}
}
/**
* creates a fake header that we extract JSON from to properly pass cookies in a server->server environment.
* @param o cookie object
*/
withCookies (o: { [x: string]: string }) {
const out: string[] = []
for (const [key, val] of Object.entries(o)) {
out.push(cookie.serialize(key, val))
}
this.injectHeaders['@@-Set-Cookie'] = JSON.stringify(out)
}
withAuthorization (token: string) {
this.injectHeaders['Authorization'] = token
}
/**
* parses and removes the synthetic cookie header
* @param o headers
*/
cookiesFromSyntheticHeaders (o: { '@@-Set-Cookie'?: string }): string[] {
if (o['@@-Set-Cookie'] !== undefined) {
const out = JSON.parse(o['@@-Set-Cookie'])
delete o['@@-Set-Cookie']
return out
}
return []
}
async sender (data: ArrayBuffer, _: { service: string, fn: string }): Promise<ArrayBuffer> {
const c = this.cookiesFromSyntheticHeaders(this.injectHeaders)
const r = superagent.post(this.endpoint)
.type('')
.send(txtDec.decode(data))
.set('User-Agent', 'roleypoly/2.0 bento http client (+https://roleypoly.com)')
.withCredentials()
.set(this.injectHeaders)
if (c.length > 0) {
r.set('Cookie', c)
}
const res = await r
return Buffer.from(res.body)
}
}

View file

@ -0,0 +1,47 @@
import NATS, { connect, NatsError } from 'nats'
import Bento, { Transport, IBentoSerializer } from '@kayteh/bento'
export default class NATSTransport extends Transport {
NATS: NATS.Client
constructor (
bento: Bento,
serializer: IBentoSerializer,
addr: string = 'nats://localhsot:4222/',
private prefix: string = ''
) {
super(bento, serializer)
this.NATS = connect({
url: addr,
preserveBuffers: true
})
}
public hookHandlers = () => {
for (const svc in this.bento.serviceRegistry.keys) {
this.NATS.subscribe(`${this.prefix}-rpc:${svc}`, this.rpcHandler)
}
}
rpcHandler = async (request: ArrayBuffer, replyTo: string) => {
this.NATS.publish(replyTo, await this.receiver({
ctx: {},
buffer: request
}))
}
sender (data: ArrayBuffer, { service }: { service: string }): Promise<ArrayBuffer> {
return new Promise((resolve, reject) => {
this.NATS.requestOne(`${this.prefix}-rpc:${service}`, data, 5000, (incoming: NatsError | Buffer) => {
if (incoming instanceof NatsError) {
reject(incoming)
return
}
resolve(incoming)
return
})
})
}
}

View file

@ -0,0 +1,34 @@
import HTTPTransport from '../HTTPTransport'
import Bento, { JSONSerializer } from '@kayteh/bento'
import { MockBackendClient } from './mock.bento'
import MockBackendServer from './mock-server'
import * as http from 'http'
import * as sinon from 'sinon'
describe('HTTPTransport', () => {
const NOW = Date.now()
const PORT = 20000 + (+(('' + NOW).slice(-4)))
const bento = new Bento()
const tt = new HTTPTransport(
bento,
new JSONSerializer(),
`https://localhost:${PORT}/api/_rpc`,
{}
)
const h = tt.handler()
const spy = sinon.spy(h)
const s = http.createServer(spy)
s.listen(PORT)
bento.transport = tt
bento.service(MockBackendClient.__SERVICE__, MockBackendServer)
const cc = bento.client(MockBackendClient)
it('handles full flow properly', async () => {
const out = await cc.helloBackend({ hello: 'yes!' })
expect(out.message).toBe(`hello, yes!! i'm bot!!`)
})
s.close()
})

View file

@ -0,0 +1,26 @@
import NATSTransport from '../NATSTransport'
import Bento, { JSONSerializer } from '@kayteh/bento'
import { MockBackendClient } from './mock.bento'
import MockBackendServer from './mock-server'
describe('NATSTransport', () => {
const NOW = Date.now()
const bento = new Bento()
const tt = new NATSTransport(
bento,
new JSONSerializer(),
'nats://localhost:4222/',
'' + NOW
)
bento.transport = tt
bento.service(MockBackendClient.__SERVICE__, MockBackendServer)
const cc = bento.client(MockBackendClient)
it('handles full flow properly', async () => {
const out = await cc.helloBackend({ hello: 'yes!' })
expect(out.message).toBe(`hello, yes!! i'm bot!!`)
})
})

View file

@ -0,0 +1,9 @@
import { IMockBackendService, HelloMsg, HelloReply } from './mock.bento'
export default class MockBackendService implements IMockBackendService {
helloBackend (ctx: any, msg: HelloMsg): HelloReply {
return {
message: `hello, ${msg.hello}! i'm bot!!`
}
}
}

View file

@ -0,0 +1,34 @@
/**
* GENERATED FILE. This file was generated by @kayteh/bento. Editing it is a bad idea.
* @generated
*/
import Bento, { IBentoTransport } from '@kayteh/bento'
export type HelloMsg = {
hello?: string
}
export type HelloReply = {
message?: string
}
export interface IMockBotService {
helloBot (ctx: any, request: HelloMsg): Promise<HelloReply> | HelloReply
}
export class MockBotClient {
static __SERVICE__: string = 'MockBot'
constructor (private bento: Bento, private transport?: IBentoTransport) {}
async helloBot (request: HelloMsg): Promise<HelloReply> {
return this.bento.makeRequest(this.transport || undefined, 'MockBot', 'HelloBot', request)
}
}
export interface IMockBackendService {
helloBackend (ctx: any, request: HelloMsg): Promise<HelloReply> | HelloReply
}
export class MockBackendClient {
static __SERVICE__: string = 'MockBackend'
constructor (private bento: Bento, private transport?: IBentoTransport) {}
async helloBackend (request: HelloMsg): Promise<HelloReply> {
return this.bento.makeRequest(this.transport || undefined, 'MockBackend', 'HelloBackend', request)
}
}

View file

@ -0,0 +1,17 @@
syntax = "proto3";
service MockBot {
rpc HelloBot (HelloMsg) returns (HelloReply) {};
}
service MockBackend {
rpc HelloBackend (HelloMsg) returns (HelloReply) {};
}
message HelloMsg {
string hello = 1;
}
message HelloReply {
string message = 1;
}