/*! * socket.io-node * Copyright(c) 2011 LearnBoost * MIT Licensed */ /** * Module dependencies. */ var crypto = require('crypto') , Store = require('../store') , assert = require('assert'); /** * Exports the constructor. */ exports = module.exports = Redis; Redis.Client = Client; /** * Redis store. * Options: * - nodeId (fn) gets an id that uniquely identifies this node * - redis (fn) redis constructor, defaults to redis * - redisPub (object) options to pass to the pub redis client * - redisSub (object) options to pass to the sub redis client * - redisClient (object) options to pass to the general redis client * - pack (fn) custom packing, defaults to JSON or msgpack if installed * - unpack (fn) custom packing, defaults to JSON or msgpack if installed * * @api public */ function Redis (opts) { opts = opts || {}; // node id to uniquely identify this node var nodeId = opts.nodeId || function () { // by default, we generate a random id return Math.abs(Math.random() * Math.random() * Date.now() | 0); }; this.nodeId = nodeId(); // packing / unpacking mechanism if (opts.pack) { this.pack = opts.pack; this.unpack = opts.unpack; } else { try { var msgpack = require('msgpack'); this.pack = msgpack.pack; this.unpack = msgpack.unpack; } catch (e) { this.pack = JSON.stringify; this.unpack = JSON.parse; } } var redis = opts.redis || require('redis') , RedisClient = redis.RedisClient; // initialize a pubsub client and a regular client if (opts.redisPub instanceof RedisClient) { this.pub = opts.redisPub; } else { opts.redisPub || (opts.redisPub = {}); this.pub = redis.createClient(opts.redisPub.port, opts.redisPub.host, opts.redisPub); } if (opts.redisSub instanceof RedisClient) { this.sub = opts.redisSub; } else { opts.redisSub || (opts.redisSub = {}); this.sub = redis.createClient(opts.redisSub.port, opts.redisSub.host, opts.redisSub); } if (opts.redisClient instanceof RedisClient) { this.cmd = opts.redisClient; } else { opts.redisClient || (opts.redisClient = {}); this.cmd = redis.createClient(opts.redisClient.port, opts.redisClient.host, opts.redisClient); } Store.call(this, opts); this.sub.setMaxListeners(0); this.setMaxListeners(0); }; /** * Inherits from Store. */ Redis.prototype.__proto__ = Store.prototype; /** * Publishes a message. * * @api private */ Redis.prototype.publish = function (name) { var args = Array.prototype.slice.call(arguments, 1); this.pub.publish(name, this.pack({ nodeId: this.nodeId, args: args })); this.emit.apply(this, ['publish', name].concat(args)); }; /** * Subscribes to a channel * * @api private */ Redis.prototype.subscribe = function (name, consumer, fn) { this.sub.subscribe(name); if (consumer || fn) { var self = this; self.sub.on('subscribe', function subscribe (ch) { if (name == ch) { function message (ch, msg) { if (name == ch) { msg = self.unpack(msg); // we check that the message consumed wasnt emitted by this node if (self.nodeId != msg.nodeId) { consumer.apply(null, msg.args); } } }; self.sub.on('message', message); self.on('unsubscribe', function unsubscribe (ch) { if (name == ch) { self.sub.removeListener('message', message); self.removeListener('unsubscribe', unsubscribe); } }); self.sub.removeListener('subscribe', subscribe); fn && fn(); } }); } this.emit('subscribe', name, consumer, fn); }; /** * Unsubscribes * * @api private */ Redis.prototype.unsubscribe = function (name, fn) { this.sub.unsubscribe(name); if (fn) { var client = this.sub; client.on('unsubscribe', function unsubscribe (ch) { if (name == ch) { fn(); client.removeListener('unsubscribe', unsubscribe); } }); } this.emit('unsubscribe', name, fn); }; /** * Destroys the store * * @api public */ Redis.prototype.destroy = function () { Store.prototype.destroy.call(this); this.pub.end(); this.sub.end(); this.cmd.end(); }; /** * Client constructor * * @api private */ function Client (store, id) { Store.Client.call(this, store, id); }; /** * Inherits from Store.Client */ Client.prototype.__proto__ = Store.Client; /** * Redis hash get * * @api private */ Client.prototype.get = function (key, fn) { this.store.cmd.hget(this.id, key, fn); return this; }; /** * Redis hash set * * @api private */ Client.prototype.set = function (key, value, fn) { this.store.cmd.hset(this.id, key, value, fn); return this; }; /** * Redis hash del * * @api private */ Client.prototype.del = function (key, fn) { this.store.cmd.hdel(this.id, key, fn); return this; }; /** * Redis hash has * * @api private */ Client.prototype.has = function (key, fn) { this.store.cmd.hexists(this.id, key, function (err, has) { if (err) return fn(err); fn(null, !!has); }); return this; }; /** * Destroys client * * @param {Number} number of seconds to expire data * @api private */ Client.prototype.destroy = function (expiration) { if ('number' != typeof expiration) { this.store.cmd.del(this.id); } else { this.store.cmd.expire(this.id, expiration); } return this; };