From 21a2d597ddd926dfa36045036b592100741881c6 Mon Sep 17 00:00:00 2001 From: Marcus Date: Thu, 1 Apr 2021 21:15:41 -0400 Subject: [PATCH] pingo pongo --- lib/Gateway.js | 135 +++++++++++++++++++++++++++++++++++++++++++++ lib/STP/index.js | 24 +++++++- lib/STP/packets.js | 19 +++++++ lib/node.js | 5 +- relay/index.js | 4 +- upnp.js | 2 - 6 files changed, 182 insertions(+), 7 deletions(-) create mode 100644 lib/Gateway.js diff --git a/lib/Gateway.js b/lib/Gateway.js new file mode 100644 index 0000000..3c567b7 --- /dev/null +++ b/lib/Gateway.js @@ -0,0 +1,135 @@ +const { config } = require('./config'); +const Keyv = require('keyv'); +const { KeyvFile } = require('keyv-file'); +const { Signale } = require('signale'); +const log = new Signale().scope('GTWY'); +const interactive = new Signale({interactive: true, scope: 'GTWY'}); +const stp = require('./STP'); + +class Gateway { + constructor(identity, endpoints) { + this.identity = identity; + const appdata = process.env.APPDATA || (process.platform == 'darwin' ? process.env.HOME + '/Library/Preferences' : process.env.HOME + "/.local/share") + + this.endpoints = new Keyv({ + store: new KeyvFile({ + filename: `${appdata}/valnet/relay/${this.identity.name}-endpoints.json` + }) + }); + + this.ready = this.insertEndpoints(endpoints) + .then(this.networkTest.bind(this)); + } + + async insertEndpoints(endpoints) { + for (const endpoint of endpoints) { + const storeValue = await this.endpoints.get(endpoint); + if (storeValue) continue; + + const [host, port] = endpoint.split(':'); + const record = new EndpointRecord(host, port, null, 'unknown'); + const currentEnpoints = await this.endpoints.get('cache') || []; + + if (currentEnpoints.indexOf(endpoint) === -1) { + currentEnpoints.push(endpoint); + await this.endpoints.set('cache', currentEnpoints); + } + + await this.endpoints.set(endpoint, record); + } + + log.info('gateway endpoints:'); + log.info('\t' + (await this.endpoints.get('cache')).join('\n\t')); + } + + async networkTest() { + const endpoints = (await Promise.all( + (await this.endpoints.get('cache')) + .map(endpoint => this.endpoints.get(endpoint)) + )).map(EndpointRecord.fromJson); + + for(const endpoint of endpoints) { + await this.testEndpoint(endpoint.host, endpoint.port); + } + } + + async testEndpoint(host, port) { + await new Promise(async (res, rej) => { + let pings = []; + let maxPings = 10; + let connectionAttempts = 0; + log.info(`Conencting to ${host}:${port}...`); + + while(connectionAttempts < 10 && pings.length < maxPings) { + + await new Promise(async (res) => { + const client = stp.connect({ + identity: this.identity, + ip: host, + port: parseInt(port) + }); + + client.on('error', _ => _); + + client.on('ready', async () => { + while(pings.length < maxPings) { + interactive.info(`[${pings.length + 1}/${maxPings}] Testing connection...`); + pings.push(await client.ping()); + // pings.push(10); + await new Promise(res => setTimeout(res, 1000)); + } + interactive.success(`Test complete. Average Ping: ${pings.reduce((a, v) => a += (v / maxPings), 0)}`) + client.tcpSocket.destroy(); + res(); + }); + + client.on('close', () => { + res(); + }); + + }); + + } + }); + } +} + +class EndpointRecord { + + /** + * @param {Object|string} json string / object representation + * @returns {EndpointRecord} + */ + static fromJson(obj) { + if (typeof obj === 'string') + return EndpointRecord.fromJson(JSON.parse(obj)); + + return new EndpointRecord( + obj.host, + obj.port, + obj.lastPing ? new PingRecord( + obj.lastPing.average, + obj.lastPing.tests, + obj.lastPing.date + ) : null, + obj.status + ); + } + + constructor(host, port, lastPing, status) { + this.host = host; + this.port = port; + this.lastPing = lastPing; + this.status = status; + } +} + +class PingRecord { + constructor(average, tests, date) { + this.average = average; + this.tests = tests; + this.date = date; + } +} + +module.exports = Gateway \ No newline at end of file diff --git a/lib/STP/index.js b/lib/STP/index.js index 64c2517..16d8dea 100644 --- a/lib/STP/index.js +++ b/lib/STP/index.js @@ -4,7 +4,9 @@ const NodeRSA = require('node-rsa'); const log = require('signale').scope('_STP'); const { KeyExchangePacket, - AckPacket + AckPacket, + PingPacket, + PongPacket } = require('./packets'); module.exports.createServer = function({identity = {}, port = 5000} = {}, cb = _ => _) { @@ -64,6 +66,8 @@ class STPSocket extends EventEmitter { SECURED = Symbol('secured'); readyState = this.CONNECTING; + pingCallbacks = new Map(); + get loopback() { return this.identity.publicKey === this.externalKey.exportKey('pkcs8-public-pem'); @@ -131,6 +135,11 @@ class STPSocket extends EventEmitter { return; } + if (this.readyState === this.SECURED && obj.cmd === 'PING') { + this.tcpSocket.write(new PongPacket(obj.data.id).toBuffer()); + return; + } + } handshake() { @@ -139,4 +148,17 @@ class STPSocket extends EventEmitter { const buffer = packet.toBuffer(); this.tcpSocket.write(buffer); } + + async ping() { + const startTime = new Date().getTime(); + return await new Promise(async (res) => { + const packet = new PingPacket(); + this.pingCallbacks.set(packet.data.id, _ => { + res(new Date().getTime() - startTime); + this.pingCallbacks.delete(packet.data.id); + }); + this.tcpSocket.write(packet.toBuffer()); + }) + + } } diff --git a/lib/STP/packets.js b/lib/STP/packets.js index 73d49fd..58b1832 100644 --- a/lib/STP/packets.js +++ b/lib/STP/packets.js @@ -1,3 +1,4 @@ +const md5 = require('md5'); // #region === [ private lib functions ] === @@ -45,6 +46,22 @@ class ClientsPacket extends STPPacket { } } +class PingPacket extends STPPacket { + constructor() { + super(); + this.cmd = 'PING'; + this.data.id = md5(Date()); + } +} + +class PongPacket extends STPPacket { + constructor(id) { + super(); + this.cmd = 'PONG'; + this.data.id = id; + } +} + // #endregion // #region === [ ordinary packet classes ] === @@ -83,6 +100,8 @@ function reconstructPacket(packet) { module.exports.KeyExchangePacket = KeyExchangePacket; module.exports.ClientsPacket = ClientsPacket; +module.exports.PingPacket = PingPacket; +module.exports.PongPacket = PongPacket; module.exports.AckPacket = AckPacket; module.exports.GetClientsPacket = GetClientsPacket; diff --git a/lib/node.js b/lib/node.js index e498614..233e1d7 100644 --- a/lib/node.js +++ b/lib/node.js @@ -6,6 +6,7 @@ const pkg = require('./../package.json'); const { config, write } = require('./config.js'); const log = require('signale').scope('NODE'); const bonjour = require('bonjour')(); +const Gateway = require('./Gateway'); class Node extends EventEmitter { clients = []; @@ -31,7 +32,7 @@ class Node extends EventEmitter { } async connectNetwork() { - + const gateway = new Gateway(this.identity, config.endpoints); } async serverStartupFailed(error) { @@ -64,7 +65,7 @@ class Node extends EventEmitter { } async serviceUp({name, address, port, protocol}) { - log.debug(`Found ${name} @ ${address}:${port} using ${protocol}`); + // log.debug(`Found ${name} @ ${address}:${port} using ${protocol}`); } async serviceDown(service) { diff --git a/relay/index.js b/relay/index.js index 031c55f..9d0ee50 100644 --- a/relay/index.js +++ b/relay/index.js @@ -39,10 +39,10 @@ function connectNetwork(t = 1000) { t *= 2; setTimeout(connectNetwork.bind(global, t), t); log.warn('disconnected from relay'); - log.warn('retrying connection... ' + (t/1000) + 's') + log.warn('retrying connection... ' + (t / 1000) + 's'); }); } -connectNetwork(); +// connectNetwork(); // ==================================== [EXPRESS] const express = require('express'); diff --git a/upnp.js b/upnp.js index 5d638a9..f1ae2fb 100644 --- a/upnp.js +++ b/upnp.js @@ -41,8 +41,6 @@ function searchGateway(timeout, callback) { } var onlistening = function() { - - console.log('listening?') socket.setBroadcast(socket.fd, true); // send a few packets just in case.