/** * Module dependencies. */ var util = require('./util') , transports = require('./transports') , Emitter = require('./emitter') , debug = require('debug')('engine-client:socket'); /** * Module exports. */ module.exports = Socket; /** * Global reference. */ var global = 'undefined' != typeof window ? window : global; /** * Socket constructor. * * @param {Object} options * @api public */ function Socket(opts){ if (!(this instanceof Socket)) return new Socket(opts); if ('string' == typeof opts) { var uri = util.parseUri(opts); opts = arguments[1] || {}; opts.host = uri.host; opts.secure = uri.protocol == 'https' || uri.protocol == 'wss'; opts.port = uri.port; } opts = opts || {}; this.secure = null != opts.secure ? opts.secure : (global.location && 'https:' == location.protocol); this.host = opts.host || opts.hostname || (global.location ? location.hostname : 'localhost'); this.port = opts.port || (global.location && location.port ? location.port : (this.secure ? 443 : 80)); this.query = opts.query || {}; this.query.uid = rnd(); this.upgrade = false !== opts.upgrade; this.resource = opts.resource || 'default'; this.path = (opts.path || '/engine.io').replace(/\/$/, ''); this.path += '/' + this.resource + '/'; this.forceJSONP = !!opts.forceJSONP; this.timestampParam = opts.timestampParam || 't'; this.timestampRequests = !!opts.timestampRequests; this.flashPath = opts.flashPath || ''; this.transports = opts.transports || ['polling', 'websocket', 'flashsocket']; this.readyState = ''; this.writeBuffer = []; this.policyPort = opts.policyPort || 843; this.open(); Socket.sockets.push(this); Socket.sockets.evs.emit('add', this); }; /** * Mix in `Emitter`. */ Emitter(Socket.prototype); /** * Protocol version. * * @api public */ Socket.protocol = 1; /** * Static EventEmitter. */ Socket.sockets = []; Socket.sockets.evs = new Emitter; /** * Expose deps for legacy compatibility * and standalone browser access. */ Socket.Socket = Socket; Socket.Transport = require('./transport'); Socket.Emitter = require('./emitter'); Socket.transports = require('./transports'); Socket.util = require('./util'); Socket.parser = require('./parser'); /** * Creates transport of the given type. * * @param {String} transport name * @return {Transport} * @api private */ Socket.prototype.createTransport = function (name) { debug('creating transport "%s"', name); var query = clone(this.query); query.transport = name; if (this.id) { query.sid = this.id; } var transport = new transports[name]({ host: this.host , port: this.port , secure: this.secure , path: this.path , query: query , forceJSONP: this.forceJSONP , timestampRequests: this.timestampRequests , timestampParam: this.timestampParam , flashPath: this.flashPath , policyPort: this.policyPort }); return transport; }; function clone (obj) { var o = {}; for (var i in obj) { if (obj.hasOwnProperty(i)) { o[i] = obj[i]; } } return o; } /** * Initializes transport to use and starts probe. * * @api private */ Socket.prototype.open = function () { this.readyState = 'opening'; var transport = this.createTransport(this.transports[0]); transport.open(); this.setTransport(transport); }; /** * Sets the current transport. Disables the existing one (if any). * * @api private */ Socket.prototype.setTransport = function (transport) { var self = this; if (this.transport) { debug('clearing existing transport'); this.transport.removeAllListeners(); } // set up transport this.transport = transport; // set up transport listeners transport .on('drain', function () { self.flush(); }) .on('packet', function (packet) { self.onPacket(packet); }) .on('error', function (e) { self.onError(e); }) .on('close', function () { self.onClose('transport close'); }); }; /** * Probes a transport. * * @param {String} transport name * @api private */ Socket.prototype.probe = function (name) { debug('probing transport "%s"', name); var transport = this.createTransport(name, { probe: 1 }) , failed = false , self = this; transport.once('open', function () { if (failed) return; debug('probe transport "%s" opened', name); transport.send([{ type: 'ping', data: 'probe' }]); transport.once('packet', function (msg) { if (failed) return; if ('pong' == msg.type && 'probe' == msg.data) { debug('probe transport "%s" pong', name); self.upgrading = true; self.emit('upgrading', transport); debug('pausing current transport "%s"', self.transport.name); self.transport.pause(function () { if (failed) return; if ('closed' == self.readyState || 'closing' == self.readyState) { return; } debug('changing transport and sending upgrade packet'); transport.removeListener('error', onerror); self.emit('upgrade', transport); self.setTransport(transport); transport.send([{ type: 'upgrade' }]); transport = null; self.upgrading = false; self.flush(); }); } else { debug('probe transport "%s" failed', name); var err = new Error('probe error'); err.transport = transport.name; self.emit('error', err); } }); }); transport.once('error', onerror); function onerror(err) { if (failed) return; // Any callback called by transport should be ignored since now failed = true; var error = new Error('probe error: ' + err); error.transport = transport.name; transport.close(); transport = null; debug('probe transport "%s" failed because of error: %s', name, err); self.emit('error', error); }; transport.open(); this.once('close', function () { if (transport) { debug('socket closed prematurely - aborting probe'); failed = true; transport.close(); transport = null; } }); this.once('upgrading', function (to) { if (transport && to.name != transport.name) { debug('"%s" works - aborting "%s"', to.name, transport.name); transport.close(); transport = null; } }); }; /** * Called when connection is deemed open. * * @api public */ Socket.prototype.onOpen = function () { debug('socket open'); this.readyState = 'open'; this.emit('open'); this.onopen && this.onopen.call(this); this.flush(); // we check for `readyState` in case an `open` // listener alreay closed the socket if ('open' == this.readyState && this.upgrade && this.transport.pause) { debug('starting upgrade probes'); for (var i = 0, l = this.upgrades.length; i < l; i++) { this.probe(this.upgrades[i]); } } }; /** * Handles a packet. * * @api private */ Socket.prototype.onPacket = function (packet) { if ('opening' == this.readyState || 'open' == this.readyState) { debug('socket receive: type "%s", data "%s"', packet.type, packet.data); this.emit('packet', packet); // Socket is live - any packet counts this.emit('heartbeat'); switch (packet.type) { case 'open': this.onHandshake(util.parseJSON(packet.data)); break; case 'pong': this.ping(); break; case 'error': var err = new Error('server error'); err.code = packet.data; this.emit('error', err); break; case 'message': this.emit('message', packet.data); var event = { data: packet.data }; event.toString = function () { return packet.data; }; this.onmessage && this.onmessage.call(this, event); break; } } else { debug('packet received with socket readyState "%s"', this.readyState); } }; /** * Called upon handshake completion. * * @param {Object} handshake obj * @api private */ Socket.prototype.onHandshake = function (data) { this.emit('handshake', data); this.id = data.sid; this.transport.query.sid = data.sid; this.upgrades = data.upgrades; this.pingInterval = data.pingInterval; this.pingTimeout = data.pingTimeout; this.onOpen(); this.ping(); // Prolong liveness of socket on heartbeat this.removeListener('heartbeat', this.onHeartbeat); this.on('heartbeat', this.onHeartbeat); }; /** * Resets ping timeout. * * @api private */ Socket.prototype.onHeartbeat = function (timeout) { clearTimeout(this.pingTimeoutTimer); var self = this; self.pingTimeoutTimer = setTimeout(function () { if ('closed' == self.readyState) return; self.onClose('ping timeout'); }, timeout || (self.pingInterval + self.pingTimeout)); }; /** * Pings server every `this.pingInterval` and expects response * within `this.pingTimeout` or closes connection. * * @api private */ Socket.prototype.ping = function () { var self = this; clearTimeout(self.pingIntervalTimer); self.pingIntervalTimer = setTimeout(function () { debug('writing ping packet - expecting pong within %sms', self.pingTimeout); self.sendPacket('ping'); self.onHeartbeat(self.pingTimeout); }, self.pingInterval); }; /** * Flush write buffers. * * @api private */ Socket.prototype.flush = function () { if ('closed' != this.readyState && this.transport.writable && !this.upgrading && this.writeBuffer.length) { debug('flushing %d packets in socket', this.writeBuffer.length); this.transport.send(this.writeBuffer); this.writeBuffer = []; } }; /** * Sends a message. * * @param {String} message. * @return {Socket} for chaining. * @api public */ Socket.prototype.write = Socket.prototype.send = function (msg) { this.sendPacket('message', msg); return this; }; /** * Sends a packet. * * @param {String} packet type. * @param {String} data. * @api private */ Socket.prototype.sendPacket = function (type, data) { var packet = { type: type, data: data }; this.emit('packetCreate', packet); this.writeBuffer.push(packet); this.flush(); }; /** * Closes the connection. * * @api private */ Socket.prototype.close = function () { if ('opening' == this.readyState || 'open' == this.readyState) { this.onClose('forced close'); debug('socket closing - telling transport to close'); this.transport.close(); this.transport.removeAllListeners(); } return this; }; /** * Called upon transport error * * @api private */ Socket.prototype.onError = function (err) { this.emit('error', err); this.onClose('transport error', err); }; /** * Called upon transport close. * * @api private */ Socket.prototype.onClose = function (reason, desc) { if ('closed' != this.readyState) { debug('socket close with reason: "%s"', reason); clearTimeout(this.pingIntervalTimer); clearTimeout(this.pingTimeoutTimer); this.readyState = 'closed'; this.emit('close', reason, desc); this.onclose && this.onclose.call(this); this.id = null; } }; /** * Generates a random uid. * * @api private */ function rnd () { return String(Math.random()).substr(5) + String(Math.random()).substr(5); }