pingo pongo

canary
Marcus 2021-04-01 21:15:41 -04:00
parent 7b7f728740
commit 21a2d597dd
6 changed files with 182 additions and 7 deletions

135
lib/Gateway.js 100644
View File

@ -0,0 +1,135 @@
const { config } = require('./config');
const Keyv = require('keyv');
const { KeyvFile } = require('keyv-file');
const { Signale } = require('signale');
const log = new Signale().scope('GTWY');
const interactive = new Signale({interactive: true, scope: 'GTWY'});
const stp = require('./STP');
class Gateway {
constructor(identity, endpoints) {
this.identity = identity;
const appdata = process.env.APPDATA || (process.platform == 'darwin' ? process.env.HOME + '/Library/Preferences' : process.env.HOME + "/.local/share")
this.endpoints = new Keyv({
store: new KeyvFile({
filename: `${appdata}/valnet/relay/${this.identity.name}-endpoints.json`
})
});
this.ready = this.insertEndpoints(endpoints)
.then(this.networkTest.bind(this));
}
async insertEndpoints(endpoints) {
for (const endpoint of endpoints) {
const storeValue = await this.endpoints.get(endpoint);
if (storeValue) continue;
const [host, port] = endpoint.split(':');
const record = new EndpointRecord(host, port, null, 'unknown');
const currentEnpoints = await this.endpoints.get('cache') || [];
if (currentEnpoints.indexOf(endpoint) === -1) {
currentEnpoints.push(endpoint);
await this.endpoints.set('cache', currentEnpoints);
}
await this.endpoints.set(endpoint, record);
}
log.info('gateway endpoints:');
log.info('\t' + (await this.endpoints.get('cache')).join('\n\t'));
}
async networkTest() {
const endpoints = (await Promise.all(
(await this.endpoints.get('cache'))
.map(endpoint => this.endpoints.get(endpoint))
)).map(EndpointRecord.fromJson);
for(const endpoint of endpoints) {
await this.testEndpoint(endpoint.host, endpoint.port);
}
}
async testEndpoint(host, port) {
await new Promise(async (res, rej) => {
let pings = [];
let maxPings = 10;
let connectionAttempts = 0;
log.info(`Conencting to ${host}:${port}...`);
while(connectionAttempts < 10 && pings.length < maxPings) {
await new Promise(async (res) => {
const client = stp.connect({
identity: this.identity,
ip: host,
port: parseInt(port)
});
client.on('error', _ => _);
client.on('ready', async () => {
while(pings.length < maxPings) {
interactive.info(`[${pings.length + 1}/${maxPings}] Testing connection...`);
pings.push(await client.ping());
// pings.push(10);
await new Promise(res => setTimeout(res, 1000));
}
interactive.success(`Test complete. Average Ping: ${pings.reduce((a, v) => a += (v / maxPings), 0)}`)
client.tcpSocket.destroy();
res();
});
client.on('close', () => {
res();
});
});
}
});
}
}
class EndpointRecord {
/**
* @param {Object|string} json string / object representation
* @returns {EndpointRecord}
*/
static fromJson(obj) {
if (typeof obj === 'string')
return EndpointRecord.fromJson(JSON.parse(obj));
return new EndpointRecord(
obj.host,
obj.port,
obj.lastPing ? new PingRecord(
obj.lastPing.average,
obj.lastPing.tests,
obj.lastPing.date
) : null,
obj.status
);
}
constructor(host, port, lastPing, status) {
this.host = host;
this.port = port;
this.lastPing = lastPing;
this.status = status;
}
}
class PingRecord {
constructor(average, tests, date) {
this.average = average;
this.tests = tests;
this.date = date;
}
}
module.exports = Gateway

View File

@ -4,7 +4,9 @@ const NodeRSA = require('node-rsa');
const log = require('signale').scope('_STP');
const {
KeyExchangePacket,
AckPacket
AckPacket,
PingPacket,
PongPacket
} = require('./packets');
module.exports.createServer = function({identity = {}, port = 5000} = {}, cb = _ => _) {
@ -64,6 +66,8 @@ class STPSocket extends EventEmitter {
SECURED = Symbol('secured');
readyState = this.CONNECTING;
pingCallbacks = new Map();
get loopback() {
return this.identity.publicKey ===
this.externalKey.exportKey('pkcs8-public-pem');
@ -131,6 +135,11 @@ class STPSocket extends EventEmitter {
return;
}
if (this.readyState === this.SECURED && obj.cmd === 'PING') {
this.tcpSocket.write(new PongPacket(obj.data.id).toBuffer());
return;
}
}
handshake() {
@ -139,4 +148,17 @@ class STPSocket extends EventEmitter {
const buffer = packet.toBuffer();
this.tcpSocket.write(buffer);
}
async ping() {
const startTime = new Date().getTime();
return await new Promise(async (res) => {
const packet = new PingPacket();
this.pingCallbacks.set(packet.data.id, _ => {
res(new Date().getTime() - startTime);
this.pingCallbacks.delete(packet.data.id);
});
this.tcpSocket.write(packet.toBuffer());
})
}
}

View File

@ -1,3 +1,4 @@
const md5 = require('md5');
// #region === [ private lib functions ] ===
@ -45,6 +46,22 @@ class ClientsPacket extends STPPacket {
}
}
class PingPacket extends STPPacket {
constructor() {
super();
this.cmd = 'PING';
this.data.id = md5(Date());
}
}
class PongPacket extends STPPacket {
constructor(id) {
super();
this.cmd = 'PONG';
this.data.id = id;
}
}
// #endregion
// #region === [ ordinary packet classes ] ===
@ -83,6 +100,8 @@ function reconstructPacket(packet) {
module.exports.KeyExchangePacket = KeyExchangePacket;
module.exports.ClientsPacket = ClientsPacket;
module.exports.PingPacket = PingPacket;
module.exports.PongPacket = PongPacket;
module.exports.AckPacket = AckPacket;
module.exports.GetClientsPacket = GetClientsPacket;

View File

@ -6,6 +6,7 @@ const pkg = require('./../package.json');
const { config, write } = require('./config.js');
const log = require('signale').scope('NODE');
const bonjour = require('bonjour')();
const Gateway = require('./Gateway');
class Node extends EventEmitter {
clients = [];
@ -31,7 +32,7 @@ class Node extends EventEmitter {
}
async connectNetwork() {
const gateway = new Gateway(this.identity, config.endpoints);
}
async serverStartupFailed(error) {
@ -64,7 +65,7 @@ class Node extends EventEmitter {
}
async serviceUp({name, address, port, protocol}) {
log.debug(`Found ${name} @ ${address}:${port} using ${protocol}`);
// log.debug(`Found ${name} @ ${address}:${port} using ${protocol}`);
}
async serviceDown(service) {

View File

@ -39,10 +39,10 @@ function connectNetwork(t = 1000) {
t *= 2;
setTimeout(connectNetwork.bind(global, t), t);
log.warn('disconnected from relay');
log.warn('retrying connection... ' + (t/1000) + 's')
log.warn('retrying connection... ' + (t / 1000) + 's');
});
}
connectNetwork();
// connectNetwork();
// ==================================== [EXPRESS]
const express = require('express');

View File

@ -41,8 +41,6 @@ function searchGateway(timeout, callback) {
}
var onlistening = function() {
console.log('listening?')
socket.setBroadcast(socket.fd, true);
// send a few packets just in case.