diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b512c09 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules \ No newline at end of file diff --git a/index.js b/index.js index c0798f5..68af506 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,9 @@ // #sourceURL=xGraphAdapter.js -module.exports.xGraphAdapter = class xGraphAdapter { +let {StreamParser} = require('xgmp'); + +module.exports = class xGraphAdapter { constructor(host, port) { this._status = 'CLOSED'; this._sendQueue = []; @@ -18,87 +20,103 @@ module.exports.xGraphAdapter = class xGraphAdapter { this._socket.onclose = this._closed.bind(this); this._socket.onerror = this._error.bind(this); this._socket.onmessage = this._message.bind(this); + + // create a new parser for this connection. + // its write stream + this._parser = new StreamParser({ + write: { + writable: true, + on: (_, close_cb) => { + this._closedListener = close_cb; + }, + //redirect write to send + write: this._socket.send.bind(this._socket) + } + }); + + this._parser.on('reply', ({err, cmd}) => { + let id = cmd.Passport.Pid; + this._callbacks[id](err, cmd); + }) + + this._parser.on('query', (cmd) => { + this._dispatch(cmd); + }) } + // when we get a message on the line, pass it to the message parser. _message(evt) { - let parts = evt.data.split(/([\x02\x03])/); - for(let part of parts) { - if(part == '\x02') { - this._buffer = ''; - } else if(part == '\x03') { - let response = JSON.parse(this._buffer); - if(Array.isArray(response)) { - this._callbacks[response[1].Passport.Pid](response[0], response[1]); - } else { - this._dispatch(response); - } - } else { - this._buffer += part; - } - } + this._parser.data(evt.data.toString()); } _dispatch(command) { - console.log(command); if(command.Cmd in this) { this[command.Cmd](command, (err, cmd) => { - //TODO do something with the callback idfk - }) + this._parser.reply(err, cmd); + }); } } _error(evt) { - let fart = 6; + console.log(evt); + // this._closed(evt); } _closed(evt) { this._status = 'CLOSED'; + // send a closed event to the parser + this._closedListener(evt); + console.log('setTimeout') setTimeout(this._connect.bind(this), 0); } _opened(evt) { this._status = 'OPEN'; - - for(let obj of this._sendQueue) { - this.send(obj) + for(let pid in this._sendQueue) { + let [messageType, obj] = this._sendQueue[pid]; + this._send(messageType, obj); } - + this._sendQueue = []; } - async ping (obj) { - this.send(obj); - } - - async send(obj) { + async send(cmd, opt, fun) { let id = ++this._messageCount; - if(!('Passport' in obj)) { - obj = Object.assign(obj, { - Passport: { - Query: true, - Pid: "" + id - } - }); - } - if(this._status === 'CLOSED') { - this._sendQueue.push(obj); + let obj = Object.assign(opt, { + Cmd: cmd, + Passport: { + Query: true, + Pid: '' + id + } + }); + let messageType = fun ? 'query' : 'ping'; + + this._send(messageType, obj); + + if(!fun) { return await new Promise(resolve => { this._callbacks[id] = (err, cmd) => { delete this._callbacks[id]; // console.log(_) - resolve([err, cmd]); - } - }) - } - let message = JSON.stringify(obj); - this._socket.send(`\x02${message}\x03`); - - return await new Promise(resolve => { + if(err) resolve([err, cmd]); + else resolve(cmd); + }; + }); + } else { this._callbacks[id] = (err, cmd) => { delete this._callbacks[id]; // console.log(_) - resolve([err, cmd]); - } - }) + fun(err, cmd); + }; + } + } -} + _send(messageType, obj) { + if(this._status === 'CLOSED') { + this._sendQueue[obj.Passport.Pid] = ([messageType, obj]); + return; + } + this._parser[messageType](obj); + } + +}; diff --git a/package.json b/package.json index 798cf5b..be21580 100644 --- a/package.json +++ b/package.json @@ -11,5 +11,8 @@ "xgmp" ], "author": "Introspective Systems LLC", - "license": "ISC" + "license": "ISC", + "dependencies": { + "xgmp": "^1.2.0" + } }