var events = require("events"), util = require("../util"); function Packet(type, size) { this.type = type; this.size = +size; } exports.name = "javascript"; exports.debug_mode = false; function ReplyParser(options) { this.name = exports.name; this.options = options || { }; this._buffer = null; this._offset = 0; this._encoding = "utf-8"; this._debug_mode = options.debug_mode; this._reply_type = null; } util.inherits(ReplyParser, events.EventEmitter); exports.Parser = ReplyParser; function IncompleteReadBuffer(message) { this.name = "IncompleteReadBuffer"; this.message = message; } util.inherits(IncompleteReadBuffer, Error); // Buffer.toString() is quite slow for small strings function small_toString(buf, start, end) { var tmp = "", i; for (i = start; i < end; i++) { tmp += String.fromCharCode(buf[i]); } return tmp; } ReplyParser.prototype._parseResult = function (type) { var start, end, offset, packetHeader; if (type === 43 || type === 45) { // + or - // up to the delimiter end = this._packetEndOffset() - 1; start = this._offset; // include the delimiter this._offset = end + 2; if (end > this._buffer.length) { this._offset = start; throw new IncompleteReadBuffer("Wait for more data."); } if (this.options.return_buffers) { return this._buffer.slice(start, end); } else { if (end - start < 65536) { // completely arbitrary return small_toString(this._buffer, start, end); } else { return this._buffer.toString(this._encoding, start, end); } } } else if (type === 58) { // : // up to the delimiter end = this._packetEndOffset() - 1; start = this._offset; // include the delimiter this._offset = end + 2; if (end > this._buffer.length) { this._offset = start; throw new IncompleteReadBuffer("Wait for more data."); } if (this.options.return_buffers) { return this._buffer.slice(start, end); } // return the coerced numeric value return +small_toString(this._buffer, start, end); } else if (type === 36) { // $ // set a rewind point, as the packet could be larger than the // buffer in memory offset = this._offset - 1; packetHeader = new Packet(type, this.parseHeader()); // packets with a size of -1 are considered null if (packetHeader.size === -1) { return undefined; } end = this._offset + packetHeader.size; start = this._offset; // set the offset to after the delimiter this._offset = end + 2; if (end > this._buffer.length) { this._offset = offset; throw new IncompleteReadBuffer("Wait for more data."); } if (this.options.return_buffers) { return this._buffer.slice(start, end); } else { return this._buffer.toString(this._encoding, start, end); } } else if (type === 42) { // * offset = this._offset; packetHeader = new Packet(type, this.parseHeader()); if (packetHeader.size < 0) { return null; } if (packetHeader.size > this._bytesRemaining()) { this._offset = offset - 1; throw new IncompleteReadBuffer("Wait for more data."); } var reply = [ ]; var ntype, i, res; offset = this._offset - 1; for (i = 0; i < packetHeader.size; i++) { ntype = this._buffer[this._offset++]; if (this._offset > this._buffer.length) { throw new IncompleteReadBuffer("Wait for more data."); } res = this._parseResult(ntype); if (res === undefined) { res = null; } reply.push(res); } return reply; } }; ReplyParser.prototype.execute = function (buffer) { this.append(buffer); var type, ret, offset; while (true) { offset = this._offset; try { // at least 4 bytes: :1\r\n if (this._bytesRemaining() < 4) { break; } type = this._buffer[this._offset++]; if (type === 43) { // + ret = this._parseResult(type); if (ret === null) { break; } this.send_reply(ret); } else if (type === 45) { // - ret = this._parseResult(type); if (ret === null) { break; } this.send_error(ret); } else if (type === 58) { // : ret = this._parseResult(type); if (ret === null) { break; } this.send_reply(ret); } else if (type === 36) { // $ ret = this._parseResult(type); if (ret === null) { break; } // check the state for what is the result of // a -1, set it back up for a null reply if (ret === undefined) { ret = null; } this.send_reply(ret); } else if (type === 42) { // * // set a rewind point. if a failure occurs, // wait for the next execute()/append() and try again offset = this._offset - 1; ret = this._parseResult(type); this.send_reply(ret); } } catch (err) { // catch the error (not enough data), rewind, and wait // for the next packet to appear if (! (err instanceof IncompleteReadBuffer)) { throw err; } this._offset = offset; break; } } }; ReplyParser.prototype.append = function (newBuffer) { if (!newBuffer) { return; } // first run if (this._buffer === null) { this._buffer = newBuffer; return; } // out of data if (this._offset >= this._buffer.length) { this._buffer = newBuffer; this._offset = 0; return; } // very large packet // check for concat, if we have it, use it if (Buffer.concat !== undefined) { this._buffer = Buffer.concat([this._buffer.slice(this._offset), newBuffer]); } else { var remaining = this._bytesRemaining(), newLength = remaining + newBuffer.length, tmpBuffer = new Buffer(newLength); this._buffer.copy(tmpBuffer, 0, this._offset); newBuffer.copy(tmpBuffer, remaining, 0); this._buffer = tmpBuffer; } this._offset = 0; }; ReplyParser.prototype.parseHeader = function () { var end = this._packetEndOffset(), value = small_toString(this._buffer, this._offset, end - 1); this._offset = end + 1; return value; }; ReplyParser.prototype._packetEndOffset = function () { var offset = this._offset; while (this._buffer[offset] !== 0x0d && this._buffer[offset + 1] !== 0x0a) { offset++; if (offset >= this._buffer.length) { throw new IncompleteReadBuffer("didn't see LF after NL reading multi bulk count (" + offset + " => " + this._buffer.length + ", " + this._offset + ")"); } } offset++; return offset; }; ReplyParser.prototype._bytesRemaining = function () { return (this._buffer.length - this._offset) < 0 ? 0 : (this._buffer.length - this._offset); }; ReplyParser.prototype.parser_error = function (message) { this.emit("error", message); }; ReplyParser.prototype.send_error = function (reply) { this.emit("reply error", reply); }; ReplyParser.prototype.send_reply = function (reply) { this.emit("reply", reply); };