Advertisement
dizballanze

Nodejs Memcache

Oct 8th, 2011
1,078
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. var EventEmitter = require('events').EventEmitter
  2.   , Stream = require('net').Stream
  3.   , Buffer = require('buffer').Buffer;
  4.  
  5. var HashRing = require('hashring')
  6.   , Connection = require('./connection')
  7.   , Utils = require('./utils')
  8.   , Manager = Connection.Manager
  9.   , IssueLog = Connection.IssueLog;
  10.  
  11. // The constructor
  12. function Client(args, options){
  13.   if(!(this && this.hasOwnProperty && (this instanceof Client))) this = new Client();
  14.  
  15.   var servers = []
  16.     , weights = {}
  17.     , key;
  18.  
  19.   // Parse down the connection arguments  
  20.   switch (Object.prototype.toString.call(args)){
  21.     case '[object String]':
  22.       servers.push(args);
  23.       break;
  24.     case '[object Object]':
  25.       weights = args;
  26.       servers = Object.keys(args);
  27.     case '[object Array]':
  28.     default:
  29.       servers = args;
  30.       break;
  31.   }
  32.  
  33.   if (!servers.length) throw new Error('No servers where supplied in the arguments');
  34.  
  35.   // merge with global and user config
  36.   Utils.merge(this, Client.config);
  37.   Utils.merge(this, options);
  38.   EventEmitter.call(this);
  39.  
  40.   this.servers = servers;
  41.   this.HashRing = new HashRing(args, this.algorithm);
  42.   this.connections = {};
  43.   this.issues = [];
  44. };
  45.  
  46. // Allows users to configure the memcached globally or per memcached client
  47. Client.config = {
  48.   maxKeySize: 251         // max key size allowed by Memcached
  49. , maxExpiration: 2592000  // max expiration duration allowed by Memcached
  50. , maxValue: 1048576       // max length of value allowed by Memcached
  51.  
  52. , algorithm: 'crc32'      // hashing algorithm that is used for key mapping  
  53.  
  54. , poolSize: 10            // maximal parallel connections
  55. , reconnect: 18000000     // if dead, attempt reconnect each xx ms
  56. , timeout: 5000           // after x ms the server should send a timeout if we can't connect
  57. , retries: 5              // amount of retries before server is dead
  58. , retry: 30000            // timeout between retries, all call will be marked as cache miss
  59. , remove: false           // remove server if dead if false, we will attempt to reconnect
  60. , redundancy: false       // allows you do re-distribute the keys over a x amount of servers
  61. , keyCompression: true    // compress keys if they are to large (md5)
  62. , debug: false            // Output the commands and responses
  63. };
  64.  
  65. // There some functions we don't want users to touch so we scope them
  66. (function(nMemcached){
  67.   const LINEBREAK = '\r\n'
  68.       , NOREPLY = ' noreply'
  69.       , FLUSH = 1E3
  70.       , BUFFER = 1E2
  71.       , CONTINUE = 1E1
  72.       , FLAG_JSON = 1<<1
  73.       , FLAG_BINARY = 2<<1;
  74.  
  75.   var memcached = nMemcached.prototype = new EventEmitter
  76.     , private = {}
  77.     , undefined;
  78.  
  79.   // Creates or generates a new connection for the give server, the callback will receive the connection
  80.   // if the operation was successful
  81.   memcached.connect = function connect(server, callback){
  82.     // server is dead, bail out
  83.     if (server in this.issues && this.issues[server].failed) return callback(false, false);
  84.    
  85.     // fetch from connection pool
  86.     if (server in this.connections) return this.connections[server].allocate(callback);
  87.    
  88.     // No connection factory created yet, so we must build one
  89.     var serverTokens = /(.*):(\d+){1,}$/.exec(server).reverse()
  90.       , memcached = this;
  91.    
  92.     serverTokens.pop();
  93.  
  94.     var sid = 0;
  95.     this.connections[server] = new Manager(server, this.poolSize, function(callback){
  96.       var S = new Stream
  97.         , Manager = this;
  98.      
  99.       // config the Stream
  100.       S.streamID = sid++;
  101.       S.setTimeout(memcached.timeout);
  102.       S.setNoDelay(true);
  103.       S.metaData = [];
  104.       S.responseBuffer = "";
  105.       S.bufferArray = [];
  106.       S.server = server;
  107.       S.tokens = serverTokens;
  108.      
  109.       // Add the event listeners
  110.       Utils.fuse(S, {
  111.         connect: function streamConnect(){ callback(false, this) }
  112.       , close: function streamClose(){ Manager.remove(this) }
  113.       , error: function streamError(err){ memcached.connectionIssue(err, S, callback) }
  114.       , data: Utils.curry(memcached, private.buffer, S)
  115.       , timeout: function streamTimeout(){ Manager.remove(this) }
  116.       , end: S.end
  117.       });
  118.      
  119.       // connect the net.Stream [port, hostname]
  120.       S.connect.apply(S, serverTokens);
  121.       return S;
  122.     });
  123.    
  124.     // now that we have setup our connection factory we can allocate a new connection
  125.     this.connections[server].allocate(callback);
  126.   };
  127.  
  128.   // Creates a multi stream, so it's easier to query agains
  129.   // multiple memcached servers.
  130.   memcached.multi = function memcachedMulti(keys, callback){
  131.     var map = {}
  132.       , memcached = this
  133.       , servers
  134.       , i;
  135.    
  136.     // gets all servers based on the supplied keys,
  137.     // or just gives all servers if we don't have keys
  138.     if (keys){
  139.       keys.forEach(function fetchMultipleServers(key){
  140.         var server = memcached.HashRing.getNode(key);
  141.         if (map[server]){
  142.           map[server].push(key);
  143.         } else {
  144.           map[server] = [key];
  145.         }
  146.       });
  147.       // store the servers
  148.       servers = Object.keys(map);
  149.     } else {
  150.       servers = this.servers;
  151.     }
  152.    
  153.     i = servers.length;
  154.     while(i--){
  155.       callback.call(this, servers[i], map[servers[i]], i, servers.length);
  156.     }
  157.   };
  158.  
  159.   // Executes the command on the net.Stream, if no server is supplied it will use the query.key to get
  160.   // the server from the HashRing
  161.   memcached.command = function memcachedCommand(queryCompiler, server){
  162.    
  163.     // generate a regular query,
  164.     var query = queryCompiler()
  165.     , redundancy = this.redundancy && this.redundancy < this.servers.length
  166.     , queryRedundancy = query.redundancyEnabled
  167.     , memcached = this;
  168.    
  169.     // validate the arguments
  170.     if (query.validation && !Utils.validateArg(query, this)) return;
  171.    
  172.     // fetch servers
  173.     server = server ? server : redundancy && queryRedundancy ? (redundancy = this.HashRing.createRange(query.key, (this.redundancy + 1), true)).shift() : this.HashRing.getNode(query.key);
  174.    
  175.     // check if the server is still alive
  176.     if (server in this.issues && this.issues[server].failed) return query.callback && query.callback(false, false);
  177.  
  178.     this.connect(server, function allocateMemcachedConnection(error, S){
  179.       if (Client.config.debug)
  180.         query.command.split(LINEBREAK).forEach(function(line) { console.log(S.streamID + ' \033[34m<<\033[0m ' + line); });
  181.  
  182.       // check for issues
  183.       if (!S) return query.callback && query.callback(false, false);
  184.       if (error) return query.callback && query.callback(error);
  185.       if (S.readyState !== 'open') return query.callback && query.callback('Connection readyState is set to ' + S.readySate);
  186.      
  187.       // used for request timing
  188.       query.start = Date.now();
  189.       S.metaData.push(query);
  190.       S.write(query.command + LINEBREAK);
  191.     });
  192.    
  193.     // if we have redundancy enabled and the query is used for redundancy, than we are going loop over
  194.     // the servers, check if we can reach them, and connect to the correct net connection.
  195.     // because all redundancy queries are executed with "no reply" we do not need to store the callback
  196.     // as there will be no value to parse.
  197.     if (redundancy && queryRedundancy){
  198.       queryRedundancy = queryCompiler(queryRedundancy);
  199.       redundancy.forEach(function(server){
  200.         if (server in memcached.issues && memcached.issues[server].failed) return;
  201.        
  202.         memcached.connect(server, function allocateMemcachedConnection(error, S){
  203.           if (!S || error || S.readyState !== 'open') return;
  204.           S.write(queryRedundancy.command + LINEBREAK);
  205.         });
  206.       })
  207.     }
  208.   };
  209.  
  210.   // Logs all connection issues, and handles them off. Marking all requests as cache misses.
  211.   memcached.connectionIssue = function connectionIssue(error, S, callback){
  212.     // end connection and mark callback as cache miss
  213.     if (S && S.end) S.end();
  214.     if (callback) callback(false, false);
  215.    
  216.     var issues
  217.       , server = S.server
  218.       , memcached = this;
  219.    
  220.     // check for existing issue logs, or create a new log
  221.     if (server in this.issues){
  222.       issues = this.issues[server];
  223.     } else {
  224.       issues = this.issues[server] = new IssueLog({
  225.         server: server
  226.       , tokens: S.tokens
  227.       , reconnect: this.reconnect
  228.       , retries: this.retries
  229.       , retry: this.retry
  230.       , remove: this.remove
  231.       });
  232.      
  233.       // proxy the events
  234.       Utils.fuse(issues, {
  235.         issue: function(details){ memcached.emit('issue', details) }
  236.       , failure: function(details){ memcached.emit('failure', details) }
  237.       , reconnecting: function(details){ memcached.emit('reconnecting', details) }
  238.       , reconnected: function(details){ memcached.emit('reconnect', details) }
  239.       , remove: function(details){
  240.           // emit event and remove servers
  241.           memcached.emit('remove', details);
  242.           memcached.connections[server].end();
  243.          
  244.           if (this.failOverServers && this.failOverServers.length){
  245.             memcached.HashRing.replaceServer(server, this.failOverServers.shift());
  246.           } else {
  247.             memcached.HashRing.removeServer(server);
  248.           }
  249.         }
  250.       });
  251.     }
  252.    
  253.     // log the issue
  254.     issues.log(error);
  255.   };
  256.  
  257.   // Kills all active connections
  258.   memcached.end = function endMemcached(){
  259.     var memcached = this;
  260.     Object.keys(this.connections).forEach(function closeConnection(key){
  261.       memcached.connections[key].free(0)
  262.     });
  263.   };
  264.  
  265.   // These do not need to be publicly available as it's one of the most important
  266.   // parts of the whole client, the parser commands:
  267.   private.parsers = {
  268.     // handle error responses
  269.     'NOT_FOUND': function(tokens, dataSet, err){ return [CONTINUE, false] }
  270.   , 'NOT_STORED': function(tokens, dataSet, err){ return [CONTINUE, false] }
  271.   , 'ERROR': function(tokens, dataSet, err){ err.push('Received an ERROR response'); return [FLUSH, false] }
  272.   , 'CLIENT_ERROR': function(tokens, dataSet, err){ err.push(tokens.splice(1).join(' ')); return [CONTINUE, false] }
  273.   , 'SERVER_ERROR': function(tokens, dataSet, err, queue, S, memcached){ memcached.connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false] }
  274.    
  275.     // keyword based responses
  276.   , 'STORED': function(tokens, dataSet){ return [CONTINUE, true] }
  277.   , 'DELETED': function(tokens, dataSet){ return [CONTINUE, true] }
  278.   , 'OK': function(tokens, dataSet){ return [CONTINUE, true] }
  279.   , 'EXISTS': function(tokens, dataSet){ return [CONTINUE, false] }
  280.   , 'END': function(tokens, dataSet, err, queue){ if (!queue.length) queue.push(false); return [FLUSH, true] }
  281.    
  282.     // value parsing:
  283.   , 'VALUE': function(tokens, dataSet, err, queue){
  284.       var key = tokens[1]
  285.         , flag = +tokens[2]
  286.         , expire = tokens[3]
  287.         , cas = tokens[4]
  288.         , multi = this.metaData[0] && this.metaData[0].multi || cas ? {} : false
  289.         , tmp;
  290.      
  291.       switch (flag){
  292.         case FLAG_JSON:
  293.           //dataSet = JSON.parse(dataSet);
  294.           break;
  295.         case FLAG_BINARY:
  296.           tmp = new Buffer(dataSet.length);
  297.           tmp.write(dataSet, 0, 'binary');
  298.           dataSet = tmp;
  299.           break;
  300.         }
  301.      
  302.       // Add to queue as multiple get key key key key key returns multiple values
  303.       if (!multi){
  304.         queue.push(dataSet);
  305.       } else {
  306.         multi[key] = dataSet;
  307.         if (cas) multi.cas = cas;
  308.         queue.push(multi);
  309.       }
  310.      
  311.       return [BUFFER, false]
  312.     }
  313.   , 'INCRDECR': function(tokens){ return [CONTINUE, +tokens[1]] }
  314.   , 'STAT': function(tokens, dataSet, err, queue){
  315.       queue.push([tokens[1], /^\d+$/.test(tokens[2]) ? +tokens[2] : tokens[2]]);
  316.       return [BUFFER, true]
  317.     }
  318.   , 'VERSION': function(tokens, dataSet){
  319.       var versionTokens = /(\d+)(?:\.)(\d+)(?:\.)(\d+)$/.exec(tokens.pop());
  320.      
  321.       return [CONTINUE, {
  322.         server: this.server
  323.       , version: versionTokens[0]
  324.       , major: versionTokens[1] || 0
  325.       , minor: versionTokens[2] || 0
  326.       , bugfix: versionTokens[3] || 0
  327.       }];
  328.     }
  329.   , 'ITEM': function(tokens, dataSet, err, queue){
  330.       queue.push({
  331.         key: tokens[1]
  332.       , b: +tokens[2].substr(1)
  333.       , s: +tokens[4]
  334.       });
  335.       return [BUFFER, false]
  336.     }
  337.   };
  338.  
  339.   // Parses down result sets
  340.   private.resultParsers = {
  341.     // combines the stats array, in to an object
  342.     'stats': function(resultSet){
  343.       var response = {};
  344.      
  345.       // add references to the retrieved server
  346.       response.server = this.server;
  347.      
  348.       // Fill the object
  349.       resultSet.forEach(function(statSet){
  350.         response[statSet[0]] = statSet[1];
  351.       });
  352.      
  353.       return response;
  354.     }
  355.    
  356.     // the settings uses the same parse format as the regular stats
  357.   , 'stats settings': function(){ return private.resultParsers.stats.apply(this, arguments) }
  358.     // Group slabs by slab id
  359.   , 'stats slabs': function(resultSet){
  360.       var response = {};
  361.      
  362.       // add references to the retrieved server
  363.       response.server = this.server;
  364.      
  365.       // Fill the object
  366.       resultSet.forEach(function(statSet){
  367.         var identifier = statSet[0].split(':');
  368.        
  369.         if (!response[identifier[0]]) response[identifier[0]] = {};
  370.         response[identifier[0]][identifier[1]] = statSet[1];
  371.        
  372.       });
  373.      
  374.       return response;
  375.     }
  376.   , 'stats items': function(resultSet){
  377.       var response = {};
  378.      
  379.       // add references to the retrieved server
  380.       response.server = this.server;
  381.      
  382.       // Fill the object
  383.       resultSet.forEach(function(statSet){
  384.         var identifier = statSet[0].split(':');
  385.        
  386.         if (!response[identifier[1]]) response[identifier[1]] = {};
  387.         response[identifier[1]][identifier[2]] = statSet[1];
  388.        
  389.       });
  390.      
  391.       return response;
  392.     }
  393.   };
  394.  
  395.   // Generates a RegExp that can be used to check if a chunk is memcached response identifier
  396.   private.allCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + '|\\d' + ')');
  397.   private.bufferedCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + ')');
  398.  
  399.   // When working with large chunks of responses, node chunks it in to pieces. So we might have
  400.   // half responses. So we are going to buffer up the buffer and user our buffered buffer to query
  401.   // against. Also when you execute allot of .writes to the same stream, node will combine the responses
  402.   // in to one response stream. With no indication where it had cut the data. So it can be it cuts inside the value response,
  403.   // or even right in the middle of a line-break, so we need to make sure, the last piece in the buffer is a LINEBREAK
  404.   // because that is all what is sure about the Memcached Protocol, all responds end with them.
  405.   private.buffer = function BufferBuffer(S, BufferStream){
  406.     S.responseBuffer += BufferStream;
  407.  
  408.     // only call transform the data once we are sure, 100% sure, that we valid response ending
  409.     if (S.responseBuffer.substr(S.responseBuffer.length - 2) === LINEBREAK){
  410.       var chunks = S.responseBuffer.split(LINEBREAK);
  411.  
  412.       if (Client.config.debug)
  413.         chunks.forEach(function(line) { console.log(S.streamID + ' \033[35m>>\033[0m ' + line); });
  414.  
  415.       S.responseBuffer = ""; // clear!
  416.       this.rawDataReceived(S, S.bufferArray = S.bufferArray.concat(chunks));
  417.     }
  418.   };
  419.  
  420.   // The actual parsers function that scan over the responseBuffer in search of Memcached response
  421.   // identifiers. Once we have found one, we will send it to the dedicated parsers that will transform
  422.   // the data in a human readable format, deciding if we should queue it up, or send it to a callback fn.
  423.   memcached.rawDataReceived = function rawDataReceived(S){
  424.     var queue = []
  425.       , token
  426.       , tokenSet
  427.       , dataSet = ''
  428.       , resultSet
  429.       , metaData
  430.       , err = []
  431.       , tmp;
  432.    
  433.     while(S.bufferArray.length && private.allCommands.test(S.bufferArray[0])){
  434.      
  435.       token = S.bufferArray.shift();
  436.       tokenSet = token.split(' ');
  437.      
  438.       // special case for digit only's these are responses from INCR and DECR
  439.       if (/^\d+$/.test(tokenSet[0])) tokenSet.unshift('INCRDECR');
  440.  
  441.       // special case for value, it's required that it has a second response!
  442.       // add the token back, and wait for the next response, we might be handling a big
  443.       // ass response here.
  444.       if (tokenSet[0] == 'VALUE' && S.bufferArray.indexOf('END') == -1){
  445.         return S.bufferArray.unshift(token);
  446.       }
  447.      
  448.       // check for dedicated parser
  449.       if (private.parsers[tokenSet[0]]){
  450.        
  451.         // fetch the response content
  452.         if (tokenSet[0] == 'VALUE') {
  453.           while(S.bufferArray.length){
  454.             if (private.bufferedCommands.test(S.bufferArray[0])) break;
  455.  
  456.             dataSet += S.bufferArray.shift();
  457.           };
  458.         }
  459.  
  460.         resultSet = private.parsers[tokenSet[0]].call(S, tokenSet, dataSet || token, err, queue, this);
  461.        
  462.         // check how we need to handle the resultSet response
  463.         switch(resultSet.shift()){
  464.           case BUFFER:
  465.             break;
  466.            
  467.           case FLUSH:
  468.             metaData = S.metaData.shift();
  469.             resultSet = queue;
  470.            
  471.             // if we have a callback, call it
  472.             if (metaData && metaData.callback){
  473.               metaData.execution = Date.now() - metaData.start;
  474.               metaData.callback.call(
  475.                 metaData, err.length ? err : err[0],
  476.                
  477.                 // see if optional parsing needs to be applied to make the result set more readable
  478.                 private.resultParsers[metaData.type] ? private.resultParsers[metaData.type].call(S, resultSet, err) :
  479.                 !Array.isArray(queue) || queue.length > 1 ? queue : queue[0]
  480.              );
  481.             }
  482.              
  483.             queue.length = err.length = 0;
  484.             break;
  485.            
  486.           case CONTINUE:
  487.           default:
  488.             metaData = S.metaData.shift();
  489.            
  490.             if (metaData && metaData.callback){
  491.               metaData.execution = Date.now() - metaData.start;
  492.               metaData.callback.call(metaData, err.length > 1 ? err : err[0], resultSet[0]);
  493.             }
  494.              
  495.             err.length = 0;
  496.             break;
  497.         }
  498.       } else {
  499.         // handle unkown responses
  500.         metaData = S.metaData.shift();
  501.         if (metaData && metaData.callback){
  502.           metaData.execution = Date.now() - metaData.start;
  503.           metaData.callback.call(metaData, 'Unknown response from the memcached server: "' + token + '"', false);
  504.         }
  505.       }
  506.      
  507.       // cleanup
  508.       dataSet = ''
  509.       tokenSet = metaData = undefined;
  510.      
  511.       // check if we need to remove an empty item from the array, as splitting on /r/n might cause an empty
  512.       // item at the end..
  513.       if (S.bufferArray[0] === '') S.bufferArray.shift();
  514.     };
  515.   };
  516.  
  517.   // Small wrapper function that only executes errors when we have a callback
  518.   private.errorResponse = function errorResponse(error, callback){
  519.     if (typeof callback == 'function') callback(error, false);
  520.    
  521.     return false;
  522.   };
  523.  
  524.   // This is where the actual Memcached API layer begins:
  525.   memcached.get = function get(key, callback){
  526.     if (Array.isArray(key)) return this.getMulti.apply(this, arguments);
  527.      
  528.     this.command(function getCommand(noreply){ return {
  529.       key: key
  530.     , callback: callback
  531.     , validate: [['key', String], ['callback', Function]]
  532.     , type: 'get'
  533.     , command: 'get ' + key
  534.     }});
  535.   };
  536.  
  537.   // the difference between get and gets is that gets, also returns a cas value
  538.   // and gets doesn't support multi-gets at this moment.
  539.   memcached.gets = function get(key, callback){
  540.     this.command(function getCommand(noreply){ return {
  541.       key: key
  542.     , callback: callback
  543.     , validate: [['key', String], ['callback', Function]]
  544.     , type: 'gets'
  545.     , command: 'gets ' + key
  546.     }});
  547.   };
  548.  
  549.   // Handles get's with multiple keys
  550.   memcached.getMulti = function getMulti(keys, callback){
  551.     var memcached = this
  552.       , responses = {}
  553.       , errors = []
  554.       , calls
  555.      
  556.       // handle multiple responses and cache them untill we receive all.
  557.       , handle = function(err, results){
  558.           if (err) errors.push(err);
  559.          
  560.           // add all responses to the array
  561.           (Array.isArray(results) ? results : [results]).forEach(function(value){ Utils.merge(responses, value) });
  562.          
  563.           if (!--calls) callback(errors.length ? errors : false, responses);
  564.         };
  565.    
  566.     this.multi(keys, function(server, key, index, totals){
  567.       if (!calls) calls = totals;
  568.      
  569.       memcached.command(function getMultiCommand(noreply){ return {
  570.           callback: handle
  571.         , multi:true
  572.         , type: 'get'
  573.         , command: 'get ' + key.join(' ')
  574.         }},
  575.         server
  576.      );
  577.     });
  578.   };
  579.  
  580.   // As all command nearly use the same syntax we are going to proxy them all to this
  581.   // function to ease maintenance. This is possible because most set commands will use the same
  582.   // syntax for the Memcached server. Some commands do not require a lifetime and a flag, but the
  583.   // memcached server is smart enough to ignore those.
  584.   private.setters = function setters(type, validate, key, value, lifetime, callback, cas){
  585.     var flag = 0
  586.       , memcached = this
  587.       , valuetype = typeof value
  588.       , length;
  589.    
  590.     if (Buffer.isBuffer(value)){
  591.       flag = FLAG_BINARY;
  592.       value = value.toString('binary');
  593.     } else if (valuetype !== 'string' && valuetype !== 'number'){
  594.       flag = FLAG_JSON;
  595.       value = JSON.stringify(value);
  596.     } else {
  597.       value = value.toString();
  598.     }
  599.    
  600.     length = Buffer.byteLength(value);
  601.     if (length > memcached.maxValue) return private.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback);
  602.        
  603.     memcached.command(function settersCommand(noreply){ return {
  604.       key: key
  605.     , callback: callback
  606.     , lifetime: lifetime
  607.     , value: value
  608.     , cas: cas
  609.     , validate: validate
  610.     , type: type
  611.     , redundancyEnabled: true
  612.     , command: [type, key, flag, lifetime, length].join(' ') +
  613.            (cas ? ' ' + cas : '') +
  614.            (noreply ? NOREPLY : '') +
  615.            LINEBREAK + value
  616.     }});
  617.   };
  618.  
  619.   // Curry the function and so we can tell the type our private set function
  620.   memcached.set = Utils.curry(false, private.setters, 'set', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
  621.   memcached.replace = Utils.curry(false, private.setters, 'replace', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
  622.   memcached.add = Utils.curry(false, private.setters, 'add', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
  623.  
  624.   memcached.cas = function checkandset(key, value, cas, lifetime, callback){
  625.     private.setters.call(this, 'cas', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, lifetime, callback, cas);
  626.   };
  627.  
  628.   memcached.append = function append(key, value, callback){
  629.     private.setters.call(this, 'append', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
  630.   };
  631.  
  632.   memcached.prepend = function prepend(key, value, callback){
  633.     private.setters.call(this, 'prepend', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
  634.   };
  635.  
  636.   // Small handler for incr and decr's
  637.   private.incrdecr = function incrdecr(type, key, value, callback){
  638.     this.command(function incredecrCommand(noreply){ return {
  639.       key: key
  640.     , callback: callback
  641.     , value: value
  642.     , validate: [['key', String], ['value', Number], ['callback', Function]]
  643.     , type: type
  644.     , redundancyEnabled: true
  645.     , command: [type, key, value].join(' ') +
  646.            (noreply ? NOREPLY : '')
  647.     }});
  648.   };
  649.  
  650.   // Curry the function and so we can tell the type our private incrdecr
  651.   memcached.increment = memcached.incr = Utils.curry(false, private.incrdecr, 'incr');
  652.   memcached.decrement = memcached.decr = Utils.curry(false, private.incrdecr, 'decr');
  653.  
  654.   // Deletes the keys from the servers
  655.   memcached.del = function del(key, callback){
  656.     this.command(function deleteCommand(noreply){ return {
  657.       key: key
  658.     , callback: callback
  659.     , validate: [['key', String], ['callback', Function]]
  660.     , type: 'delete'
  661.     , redundancyEnabled: true
  662.     , command: 'delete ' + key +
  663.            (noreply ? NOREPLY : '')
  664.     }});
  665.   };
  666.   memcached['delete'] = memcached.del;
  667.  
  668.   // Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT
  669.   private.singles = function singles(type, callback){
  670.     var memcached = this
  671.       , responses = []
  672.       , errors = []
  673.       , calls
  674.      
  675.       // handle multiple servers
  676.       , handle = function(err, results){
  677.         if (err) errors.push(err);
  678.         if (results) responses = responses.concat(results);
  679.        
  680.         // multi calls should ALWAYS return an array!
  681.         if (!--calls) callback(errors, responses);
  682.       };
  683.    
  684.     this.multi(false, function(server, keys, index, totals){
  685.       if (!calls) calls = totals;
  686.      
  687.       memcached.command(function singlesCommand(noreply){ return {
  688.           callback: handle
  689.         , type: type
  690.         , command: type
  691.         }},
  692.         server
  693.      );
  694.     });
  695.   };
  696.  
  697.   // Curry the function and so we can tell the type our private singles
  698.   memcached.version = Utils.curry(false, private.singles, 'version');
  699.   memcached.flush = Utils.curry(false, private.singles, 'flush_all');
  700.   memcached.stats = Utils.curry(false, private.singles, 'stats');
  701.   memcached.queue = Utils.curry(false, private.singles, 'stats queue');
  702.   memcached.settings = Utils.curry(false, private.singles, 'stats settings');
  703.   memcached.slabs = Utils.curry(false, private.singles, 'stats slabs');
  704.   memcached.items = Utils.curry(false, private.singles, 'stats items');
  705.  
  706.   // You need to use the items dump to get the correct server and slab settings
  707.   // see simple_cachedump.js for an example
  708.   memcached.cachedump = function cachedump(server, slabid, number, callback){
  709.     this.command(function cachedumpCommand(noreply){ return {
  710.         callback: callback
  711.       , number: number
  712.       , slabid: slabid
  713.       , validate: [['number', Number], ['slabid', Number], ['callback', Function]]
  714.       , type: 'stats cachedump'
  715.       , command: 'stats cachedump ' + slabid + ' ' + number
  716.       }},
  717.       server
  718.    );
  719.   };
  720.  
  721. })(Client);
  722.  
  723. module.exports = Client;
  724.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement