master
Marcus Gosselin 2018-10-29 11:07:34 -04:00
parent f1a1973a1e
commit d3d2eca6ce
3 changed files with 74 additions and 52 deletions

1
.gitignore vendored 100644
View File

@ -0,0 +1 @@
node_modules

122
index.js
View File

@ -1,7 +1,9 @@
// #sourceURL=xGraphAdapter.js // #sourceURL=xGraphAdapter.js
module.exports.xGraphAdapter = class xGraphAdapter { let {StreamParser} = require('xgmp');
module.exports = class xGraphAdapter {
constructor(host, port) { constructor(host, port) {
this._status = 'CLOSED'; this._status = 'CLOSED';
this._sendQueue = []; this._sendQueue = [];
@ -18,87 +20,103 @@ module.exports.xGraphAdapter = class xGraphAdapter {
this._socket.onclose = this._closed.bind(this); this._socket.onclose = this._closed.bind(this);
this._socket.onerror = this._error.bind(this); this._socket.onerror = this._error.bind(this);
this._socket.onmessage = this._message.bind(this); this._socket.onmessage = this._message.bind(this);
// create a new parser for this connection.
// its write stream
this._parser = new StreamParser({
write: {
writable: true,
on: (_, close_cb) => {
this._closedListener = close_cb;
},
//redirect write to send
write: this._socket.send.bind(this._socket)
}
});
this._parser.on('reply', ({err, cmd}) => {
let id = cmd.Passport.Pid;
this._callbacks[id](err, cmd);
})
this._parser.on('query', (cmd) => {
this._dispatch(cmd);
})
} }
// when we get a message on the line, pass it to the message parser.
_message(evt) { _message(evt) {
let parts = evt.data.split(/([\x02\x03])/); this._parser.data(evt.data.toString());
for(let part of parts) {
if(part == '\x02') {
this._buffer = '';
} else if(part == '\x03') {
let response = JSON.parse(this._buffer);
if(Array.isArray(response)) {
this._callbacks[response[1].Passport.Pid](response[0], response[1]);
} else {
this._dispatch(response);
}
} else {
this._buffer += part;
}
}
} }
_dispatch(command) { _dispatch(command) {
console.log(command);
if(command.Cmd in this) { if(command.Cmd in this) {
this[command.Cmd](command, (err, cmd) => { this[command.Cmd](command, (err, cmd) => {
//TODO do something with the callback idfk this._parser.reply(err, cmd);
}) });
} }
} }
_error(evt) { _error(evt) {
let fart = 6; console.log(evt);
// this._closed(evt);
} }
_closed(evt) { _closed(evt) {
this._status = 'CLOSED'; this._status = 'CLOSED';
// send a closed event to the parser
this._closedListener(evt);
console.log('setTimeout')
setTimeout(this._connect.bind(this), 0); setTimeout(this._connect.bind(this), 0);
} }
_opened(evt) { _opened(evt) {
this._status = 'OPEN'; this._status = 'OPEN';
for(let pid in this._sendQueue) {
for(let obj of this._sendQueue) { let [messageType, obj] = this._sendQueue[pid];
this.send(obj) this._send(messageType, obj);
}
this._sendQueue = [];
} }
} async send(cmd, opt, fun) {
async ping (obj) {
this.send(obj);
}
async send(obj) {
let id = ++this._messageCount; let id = ++this._messageCount;
if(!('Passport' in obj)) { let obj = Object.assign(opt, {
obj = Object.assign(obj, { Cmd: cmd,
Passport: { Passport: {
Query: true, Query: true,
Pid: "" + id Pid: '' + id
} }
}); });
let messageType = fun ? 'query' : 'ping';
this._send(messageType, obj);
if(!fun) {
return await new Promise(resolve => {
this._callbacks[id] = (err, cmd) => {
delete this._callbacks[id];
// console.log(_)
if(err) resolve([err, cmd]);
else resolve(cmd);
};
});
} else {
this._callbacks[id] = (err, cmd) => {
delete this._callbacks[id];
// console.log(_)
fun(err, cmd);
};
} }
}
_send(messageType, obj) {
if(this._status === 'CLOSED') { if(this._status === 'CLOSED') {
this._sendQueue.push(obj); this._sendQueue[obj.Passport.Pid] = ([messageType, obj]);
return await new Promise(resolve => { return;
this._callbacks[id] = (err, cmd) => {
delete this._callbacks[id];
// console.log(_)
resolve([err, cmd]);
} }
}) this._parser[messageType](obj);
}
let message = JSON.stringify(obj);
this._socket.send(`\x02${message}\x03`);
return await new Promise(resolve => {
this._callbacks[id] = (err, cmd) => {
delete this._callbacks[id];
// console.log(_)
resolve([err, cmd]);
}
})
} }
} };

View File

@ -11,5 +11,8 @@
"xgmp" "xgmp"
], ],
"author": "Introspective Systems LLC", "author": "Introspective Systems LLC",
"license": "ISC" "license": "ISC",
"dependencies": {
"xgmp": "^1.2.0"
}
} }