Advertisement
thepirat000

Modified t_zset.c for INRANGE on ZUNIONSTORE and ZINTERSTORE

Oct 8th, 2015
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 98.18 KB | None | 0 0
  1. /*
  2.  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
  3.  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
  4.  * All rights reserved.
  5.  *
  6.  * Redistribution and use in source and binary forms, with or without
  7.  * modification, are permitted provided that the following conditions are met:
  8.  *
  9.  *   * Redistributions of source code must retain the above copyright notice,
  10.  *     this list of conditions and the following disclaimer.
  11.  *   * Redistributions in binary form must reproduce the above copyright
  12.  *     notice, this list of conditions and the following disclaimer in the
  13.  *     documentation and/or other materials provided with the distribution.
  14.  *   * Neither the name of Redis nor the names of its contributors may be used
  15.  *     to endorse or promote products derived from this software without
  16.  *     specific prior written permission.
  17.  *
  18.  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  19.  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  20.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  21.  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  22.  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  23.  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  24.  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  25.  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  26.  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  27.  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  28.  * POSSIBILITY OF SUCH DAMAGE.
  29.  */
  30.  
  31. /*-----------------------------------------------------------------------------
  32.  * Sorted set API
  33.  *----------------------------------------------------------------------------*/
  34.  
  35. /* ZSETs are ordered sets using two data structures to hold the same elements
  36.  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
  37.  * data structure.
  38.  *
  39.  * The elements are added to a hash table mapping Redis objects to scores.
  40.  * At the same time the elements are added to a skip list mapping scores
  41.  * to Redis objects (so objects are sorted by scores in this "view").
  42.  *
  43.  * Note that the SDS string representing the element is the same in both
  44.  * the hash table and skiplist in order to save memory. What we do in order
  45.  * to manage the shared SDS string more easily is to free the SDS string
  46.  * only in zslFreeNode(). The dictionary has no value free method set.
  47.  * So we should always remove an element from the dictionary, and later from
  48.  * the skiplist.
  49.  *
  50.  * This skiplist implementation is almost a C translation of the original
  51.  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
  52.  * Alternative to Balanced Trees", modified in three ways:
  53.  * a) this implementation allows for repeated scores.
  54.  * b) the comparison is not just by key (our 'score') but by satellite data.
  55.  * c) there is a back pointer, so it's a doubly linked list with the back
  56.  * pointers being only at "level 1". This allows to traverse the list
  57.  * from tail to head, useful for ZREVRANGE. */
  58.  
  59. #include "server.h"
  60. #include <math.h>
  61.  
  62. /*-----------------------------------------------------------------------------
  63.  * Skiplist implementation of the low level API
  64.  *----------------------------------------------------------------------------*/
  65.  
  66. static int zslLexValueGteMin(sds value, zlexrangespec *spec);
  67. static int zslLexValueLteMax(sds value, zlexrangespec *spec);
  68.  
  69. /* Create a skiplist node with the specified number of levels.
  70.  * The SDS string 'ele' is referenced by the node after the call. */
  71. zskiplistNode *zslCreateNode(int level, double score, sds ele) {
  72.     zskiplistNode *zn =
  73.         zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
  74.     zn->score = score;
  75.     zn->ele = ele;
  76.     return zn;
  77. }
  78.  
  79. /* Create a new skiplist. */
  80. zskiplist *zslCreate(void) {
  81.     int j;
  82.     zskiplist *zsl;
  83.  
  84.     zsl = zmalloc(sizeof(*zsl));
  85.     zsl->level = 1;
  86.     zsl->length = 0;
  87.     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
  88.     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
  89.         zsl->header->level[j].forward = NULL;
  90.         zsl->header->level[j].span = 0;
  91.     }
  92.     zsl->header->backward = NULL;
  93.     zsl->tail = NULL;
  94.     return zsl;
  95. }
  96.  
  97. /* Free the specified skiplist node. The referenced SDS string representation
  98.  * of the element is freed too, unless node->ele is set to NULL before calling
  99.  * this function. */
  100. void zslFreeNode(zskiplistNode *node) {
  101.     sdsfree(node->ele);
  102.     zfree(node);
  103. }
  104.  
  105. /* Free a whole skiplist. */
  106. void zslFree(zskiplist *zsl) {
  107.     zskiplistNode *node = zsl->header->level[0].forward, *next;
  108.  
  109.     zfree(zsl->header);
  110.     while(node) {
  111.         next = node->level[0].forward;
  112.         zslFreeNode(node);
  113.         node = next;
  114.     }
  115.     zfree(zsl);
  116. }
  117.  
  118. /* Returns a random level for the new skiplist node we are going to create.
  119.  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
  120.  * (both inclusive), with a powerlaw-alike distribution where higher
  121.  * levels are less likely to be returned. */
  122. int zslRandomLevel(void) {
  123.     int level = 1;
  124.     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
  125.         level += 1;
  126.     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
  127. }
  128.  
  129. /* Insert a new node in the skiplist. Assumes the element does not already
  130.  * exist (up to the caller to enforce that). The skiplist takes ownership
  131.  * of the passed SDS string 'ele'. */
  132. zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
  133.     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  134.     unsigned int rank[ZSKIPLIST_MAXLEVEL];
  135.     int i, level;
  136.  
  137.     serverAssert(!isnan(score));
  138.     x = zsl->header;
  139.     for (i = zsl->level-1; i >= 0; i--) {
  140.         /* store rank that is crossed to reach the insert position */
  141.         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
  142.         while (x->level[i].forward &&
  143.                 (x->level[i].forward->score < score ||
  144.                     (x->level[i].forward->score == score &&
  145.                     sdscmp(x->level[i].forward->ele,ele) < 0)))
  146.         {
  147.             rank[i] += x->level[i].span;
  148.             x = x->level[i].forward;
  149.         }
  150.         update[i] = x;
  151.     }
  152.     /* we assume the element is not already inside, since we allow duplicated
  153.      * scores, reinserting the same element should never happen since the
  154.      * caller of zslInsert() should test in the hash table if the element is
  155.      * already inside or not. */
  156.     level = zslRandomLevel();
  157.     if (level > zsl->level) {
  158.         for (i = zsl->level; i < level; i++) {
  159.             rank[i] = 0;
  160.             update[i] = zsl->header;
  161.             update[i]->level[i].span = zsl->length;
  162.         }
  163.         zsl->level = level;
  164.     }
  165.     x = zslCreateNode(level,score,ele);
  166.     for (i = 0; i < level; i++) {
  167.         x->level[i].forward = update[i]->level[i].forward;
  168.         update[i]->level[i].forward = x;
  169.  
  170.         /* update span covered by update[i] as x is inserted here */
  171.         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
  172.         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  173.     }
  174.  
  175.     /* increment span for untouched levels */
  176.     for (i = level; i < zsl->level; i++) {
  177.         update[i]->level[i].span++;
  178.     }
  179.  
  180.     x->backward = (update[0] == zsl->header) ? NULL : update[0];
  181.     if (x->level[0].forward)
  182.         x->level[0].forward->backward = x;
  183.     else
  184.         zsl->tail = x;
  185.     zsl->length++;
  186.     return x;
  187. }
  188.  
  189. /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
  190. void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
  191.     int i;
  192.     for (i = 0; i < zsl->level; i++) {
  193.         if (update[i]->level[i].forward == x) {
  194.             update[i]->level[i].span += x->level[i].span - 1;
  195.             update[i]->level[i].forward = x->level[i].forward;
  196.         } else {
  197.             update[i]->level[i].span -= 1;
  198.         }
  199.     }
  200.     if (x->level[0].forward) {
  201.         x->level[0].forward->backward = x->backward;
  202.     } else {
  203.         zsl->tail = x->backward;
  204.     }
  205.     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
  206.         zsl->level--;
  207.     zsl->length--;
  208. }
  209.  
  210. /* Delete an element with matching score/element from the skiplist.
  211.  * The function returns 1 if the node was found and deleted, otherwise
  212.  * 0 is returned.
  213.  *
  214.  * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
  215.  * it is not freed (but just unlinked) and *node is set to the node pointer,
  216.  * so that it is possible for the caller to reuse the node (including the
  217.  * referenced SDS string at node->ele). */
  218. int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
  219.     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  220.     int i;
  221.  
  222.     x = zsl->header;
  223.     for (i = zsl->level-1; i >= 0; i--) {
  224.         while (x->level[i].forward &&
  225.                 (x->level[i].forward->score < score ||
  226.                     (x->level[i].forward->score == score &&
  227.                      sdscmp(x->level[i].forward->ele,ele) < 0)))
  228.         {
  229.             x = x->level[i].forward;
  230.         }
  231.         update[i] = x;
  232.     }
  233.     /* We may have multiple elements with the same score, what we need
  234.      * is to find the element with both the right score and object. */
  235.     x = x->level[0].forward;
  236.     if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
  237.         zslDeleteNode(zsl, x, update);
  238.         if (!node)
  239.             zslFreeNode(x);
  240.         else
  241.             *node = x;
  242.         return 1;
  243.     }
  244.     return 0; /* not found */
  245. }
  246.  
  247. static int zslValueGteMin(double value, zrangespec *spec) {
  248.     return spec->minex ? (value > spec->min) : (value >= spec->min);
  249. }
  250.  
  251. int zslValueLteMax(double value, zrangespec *spec) {
  252.     return spec->maxex ? (value < spec->max) : (value <= spec->max);
  253. }
  254.  
  255. /* Returns if there is a part of the zset is in range. */
  256. int zslIsInRange(zskiplist *zsl, zrangespec *range) {
  257.     zskiplistNode *x;
  258.  
  259.     /* Test for ranges that will always be empty. */
  260.     if (range->min > range->max ||
  261.             (range->min == range->max && (range->minex || range->maxex)))
  262.         return 0;
  263.     x = zsl->tail;
  264.     if (x == NULL || !zslValueGteMin(x->score,range))
  265.         return 0;
  266.     x = zsl->header->level[0].forward;
  267.     if (x == NULL || !zslValueLteMax(x->score,range))
  268.         return 0;
  269.     return 1;
  270. }
  271.  
  272. /* Find the first node that is contained in the specified range.
  273.  * Returns NULL when no element is contained in the range. */
  274. zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
  275.     zskiplistNode *x;
  276.     int i;
  277.  
  278.     /* If everything is out of range, return early. */
  279.     if (!zslIsInRange(zsl,range)) return NULL;
  280.  
  281.     x = zsl->header;
  282.     for (i = zsl->level-1; i >= 0; i--) {
  283.         /* Go forward while *OUT* of range. */
  284.         while (x->level[i].forward &&
  285.             !zslValueGteMin(x->level[i].forward->score,range))
  286.                 x = x->level[i].forward;
  287.     }
  288.  
  289.     /* This is an inner range, so the next node cannot be NULL. */
  290.     x = x->level[0].forward;
  291.     serverAssert(x != NULL);
  292.  
  293.     /* Check if score <= max. */
  294.     if (!zslValueLteMax(x->score,range)) return NULL;
  295.     return x;
  296. }
  297.  
  298. /* Find the last node that is contained in the specified range.
  299.  * Returns NULL when no element is contained in the range. */
  300. zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
  301.     zskiplistNode *x;
  302.     int i;
  303.  
  304.     /* If everything is out of range, return early. */
  305.     if (!zslIsInRange(zsl,range)) return NULL;
  306.  
  307.     x = zsl->header;
  308.     for (i = zsl->level-1; i >= 0; i--) {
  309.         /* Go forward while *IN* range. */
  310.         while (x->level[i].forward &&
  311.             zslValueLteMax(x->level[i].forward->score,range))
  312.                 x = x->level[i].forward;
  313.     }
  314.  
  315.     /* This is an inner range, so this node cannot be NULL. */
  316.     serverAssert(x != NULL);
  317.  
  318.     /* Check if score >= min. */
  319.     if (!zslValueGteMin(x->score,range)) return NULL;
  320.     return x;
  321. }
  322.  
  323. /* Delete all the elements with score between min and max from the skiplist.
  324.  * Min and max are inclusive, so a score >= min || score <= max is deleted.
  325.  * Note that this function takes the reference to the hash table view of the
  326.  * sorted set, in order to remove the elements from the hash table too. */
  327. unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
  328.     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  329.     unsigned long removed = 0;
  330.     int i;
  331.  
  332.     x = zsl->header;
  333.     for (i = zsl->level-1; i >= 0; i--) {
  334.         while (x->level[i].forward && (range->minex ?
  335.             x->level[i].forward->score <= range->min :
  336.             x->level[i].forward->score < range->min))
  337.                 x = x->level[i].forward;
  338.         update[i] = x;
  339.     }
  340.  
  341.     /* Current node is the last with score < or <= min. */
  342.     x = x->level[0].forward;
  343.  
  344.     /* Delete nodes while in range. */
  345.     while (x &&
  346.            (range->maxex ? x->score < range->max : x->score <= range->max))
  347.     {
  348.         zskiplistNode *next = x->level[0].forward;
  349.         zslDeleteNode(zsl,x,update);
  350.         dictDelete(dict,x->ele);
  351.         zslFreeNode(x); /* Here is where x->ele is actually released. */
  352.         removed++;
  353.         x = next;
  354.     }
  355.     return removed;
  356. }
  357.  
  358. unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
  359.     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  360.     unsigned long removed = 0;
  361.     int i;
  362.  
  363.  
  364.     x = zsl->header;
  365.     for (i = zsl->level-1; i >= 0; i--) {
  366.         while (x->level[i].forward &&
  367.             !zslLexValueGteMin(x->level[i].forward->ele,range))
  368.                 x = x->level[i].forward;
  369.         update[i] = x;
  370.     }
  371.  
  372.     /* Current node is the last with score < or <= min. */
  373.     x = x->level[0].forward;
  374.  
  375.     /* Delete nodes while in range. */
  376.     while (x && zslLexValueLteMax(x->ele,range)) {
  377.         zskiplistNode *next = x->level[0].forward;
  378.         zslDeleteNode(zsl,x,update);
  379.         dictDelete(dict,x->ele);
  380.         zslFreeNode(x); /* Here is where x->ele is actually released. */
  381.         removed++;
  382.         x = next;
  383.     }
  384.     return removed;
  385. }
  386.  
  387. /* Delete all the elements with rank between start and end from the skiplist.
  388.  * Start and end are inclusive. Note that start and end need to be 1-based */
  389. unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
  390.     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  391.     unsigned long traversed = 0, removed = 0;
  392.     int i;
  393.  
  394.     x = zsl->header;
  395.     for (i = zsl->level-1; i >= 0; i--) {
  396.         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
  397.             traversed += x->level[i].span;
  398.             x = x->level[i].forward;
  399.         }
  400.         update[i] = x;
  401.     }
  402.  
  403.     traversed++;
  404.     x = x->level[0].forward;
  405.     while (x && traversed <= end) {
  406.         zskiplistNode *next = x->level[0].forward;
  407.         zslDeleteNode(zsl,x,update);
  408.         dictDelete(dict,x->ele);
  409.         zslFreeNode(x);
  410.         removed++;
  411.         traversed++;
  412.         x = next;
  413.     }
  414.     return removed;
  415. }
  416.  
  417. /* Find the rank for an element by both score and key.
  418.  * Returns 0 when the element cannot be found, rank otherwise.
  419.  * Note that the rank is 1-based due to the span of zsl->header to the
  420.  * first element. */
  421. unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
  422.     zskiplistNode *x;
  423.     unsigned long rank = 0;
  424.     int i;
  425.  
  426.     x = zsl->header;
  427.     for (i = zsl->level-1; i >= 0; i--) {
  428.         while (x->level[i].forward &&
  429.             (x->level[i].forward->score < score ||
  430.                 (x->level[i].forward->score == score &&
  431.                 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
  432.             rank += x->level[i].span;
  433.             x = x->level[i].forward;
  434.         }
  435.  
  436.         /* x might be equal to zsl->header, so test if obj is non-NULL */
  437.         if (x->ele && sdscmp(x->ele,ele) == 0) {
  438.             return rank;
  439.         }
  440.     }
  441.     return 0;
  442. }
  443.  
  444. /* Finds an element by its rank. The rank argument needs to be 1-based. */
  445. zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
  446.     zskiplistNode *x;
  447.     unsigned long traversed = 0;
  448.     int i;
  449.  
  450.     x = zsl->header;
  451.     for (i = zsl->level-1; i >= 0; i--) {
  452.         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
  453.         {
  454.             traversed += x->level[i].span;
  455.             x = x->level[i].forward;
  456.         }
  457.         if (traversed == rank) {
  458.             return x;
  459.         }
  460.     }
  461.     return NULL;
  462. }
  463.  
  464. /* Populate the rangespec according to the objects min and max. */
  465. static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
  466.     char *eptr;
  467.     spec->minex = spec->maxex = 0;
  468.  
  469.     /* Parse the min-max interval. If one of the values is prefixed
  470.      * by the "(" character, it's considered "open". For instance
  471.      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
  472.      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
  473.     if (min->encoding == OBJ_ENCODING_INT) {
  474.         spec->min = (long)min->ptr;
  475.     } else {
  476.         if (((char*)min->ptr)[0] == '(') {
  477.             spec->min = strtod((char*)min->ptr+1,&eptr);
  478.             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
  479.             spec->minex = 1;
  480.         } else {
  481.             spec->min = strtod((char*)min->ptr,&eptr);
  482.             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
  483.         }
  484.     }
  485.     if (max->encoding == OBJ_ENCODING_INT) {
  486.         spec->max = (long)max->ptr;
  487.     } else {
  488.         if (((char*)max->ptr)[0] == '(') {
  489.             spec->max = strtod((char*)max->ptr+1,&eptr);
  490.             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
  491.             spec->maxex = 1;
  492.         } else {
  493.             spec->max = strtod((char*)max->ptr,&eptr);
  494.             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
  495.         }
  496.     }
  497.  
  498.     return C_OK;
  499. }
  500.  
  501. /* ------------------------ Lexicographic ranges ---------------------------- */
  502.  
  503. /* Parse max or min argument of ZRANGEBYLEX.
  504.   * (foo means foo (open interval)
  505.   * [foo means foo (closed interval)
  506.   * - means the min string possible
  507.   * + means the max string possible
  508.   *
  509.   * If the string is valid the *dest pointer is set to the redis object
  510.   * that will be used for the comparision, and ex will be set to 0 or 1
  511.   * respectively if the item is exclusive or inclusive. C_OK will be
  512.   * returned.
  513.   *
  514.   * If the string is not a valid range C_ERR is returned, and the value
  515.   * of *dest and *ex is undefined. */
  516. int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
  517.     char *c = item->ptr;
  518.  
  519.     switch(c[0]) {
  520.     case '+':
  521.         if (c[1] != '\0') return C_ERR;
  522.         *ex = 0;
  523.         *dest = shared.maxstring;
  524.         return C_OK;
  525.     case '-':
  526.         if (c[1] != '\0') return C_ERR;
  527.         *ex = 0;
  528.         *dest = shared.minstring;
  529.         return C_OK;
  530.     case '(':
  531.         *ex = 1;
  532.         *dest = sdsnewlen(c+1,sdslen(c)-1);
  533.         return C_OK;
  534.     case '[':
  535.         *ex = 0;
  536.         *dest = sdsnewlen(c+1,sdslen(c)-1);
  537.         return C_OK;
  538.     default:
  539.         return C_ERR;
  540.     }
  541. }
  542.  
  543. /* Free a lex range structure, must be called only after zelParseLexRange()
  544.  * populated the structure with success (C_OK returned). */
  545. void zslFreeLexRange(zlexrangespec *spec) {
  546.     if (spec->min != shared.minstring &&
  547.         spec->min != shared.maxstring) sdsfree(spec->min);
  548.     if (spec->max != shared.minstring &&
  549.         spec->max != shared.maxstring) sdsfree(spec->max);
  550. }
  551.  
  552. /* Populate the rangespec according to the objects min and max.
  553.  *
  554.  * Return C_OK on success. On error C_ERR is returned.
  555.  * When OK is returned the structure must be freed with zslFreeLexRange(),
  556.  * otherwise no release is needed. */
  557. static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
  558.     /* The range can't be valid if objects are integer encoded.
  559.      * Every item must start with ( or [. */
  560.     if (min->encoding == OBJ_ENCODING_INT ||
  561.         max->encoding == OBJ_ENCODING_INT) return C_ERR;
  562.  
  563.     spec->min = spec->max = NULL;
  564.     if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
  565.         zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
  566.         zslFreeLexRange(spec);
  567.         return C_ERR;
  568.     } else {
  569.         return C_OK;
  570.     }
  571. }
  572.  
  573. /* This is just a wrapper to sdscmp() that is able to
  574.  * handle shared.minstring and shared.maxstring as the equivalent of
  575.  * -inf and +inf for strings */
  576. int sdscmplex(sds a, sds b) {
  577.     if (a == b) return 0;
  578.     if (a == shared.minstring || b == shared.maxstring) return -1;
  579.     if (a == shared.maxstring || b == shared.minstring) return 1;
  580.     return sdscmp(a,b);
  581. }
  582.  
  583. static int zslLexValueGteMin(sds value, zlexrangespec *spec) {
  584.     return spec->minex ?
  585.         (sdscmplex(value,spec->min) > 0) :
  586.         (sdscmplex(value,spec->min) >= 0);
  587. }
  588.  
  589. static int zslLexValueLteMax(sds value, zlexrangespec *spec) {
  590.     return spec->maxex ?
  591.         (sdscmplex(value,spec->max) < 0) :
  592.         (sdscmplex(value,spec->max) <= 0);
  593. }
  594.  
  595. /* Returns if there is a part of the zset is in the lex range. */
  596. int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
  597.     zskiplistNode *x;
  598.  
  599.     /* Test for ranges that will always be empty. */
  600.     if (sdscmplex(range->min,range->max) > 1 ||
  601.             (sdscmp(range->min,range->max) == 0 &&
  602.             (range->minex || range->maxex)))
  603.         return 0;
  604.     x = zsl->tail;
  605.     if (x == NULL || !zslLexValueGteMin(x->ele,range))
  606.         return 0;
  607.     x = zsl->header->level[0].forward;
  608.     if (x == NULL || !zslLexValueLteMax(x->ele,range))
  609.         return 0;
  610.     return 1;
  611. }
  612.  
  613. /* Find the first node that is contained in the specified lex range.
  614.  * Returns NULL when no element is contained in the range. */
  615. zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
  616.     zskiplistNode *x;
  617.     int i;
  618.  
  619.     /* If everything is out of range, return early. */
  620.     if (!zslIsInLexRange(zsl,range)) return NULL;
  621.  
  622.     x = zsl->header;
  623.     for (i = zsl->level-1; i >= 0; i--) {
  624.         /* Go forward while *OUT* of range. */
  625.         while (x->level[i].forward &&
  626.             !zslLexValueGteMin(x->level[i].forward->ele,range))
  627.                 x = x->level[i].forward;
  628.     }
  629.  
  630.     /* This is an inner range, so the next node cannot be NULL. */
  631.     x = x->level[0].forward;
  632.     serverAssert(x != NULL);
  633.  
  634.     /* Check if score <= max. */
  635.     if (!zslLexValueLteMax(x->ele,range)) return NULL;
  636.     return x;
  637. }
  638.  
  639. /* Find the last node that is contained in the specified range.
  640.  * Returns NULL when no element is contained in the range. */
  641. zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
  642.     zskiplistNode *x;
  643.     int i;
  644.  
  645.     /* If everything is out of range, return early. */
  646.     if (!zslIsInLexRange(zsl,range)) return NULL;
  647.  
  648.     x = zsl->header;
  649.     for (i = zsl->level-1; i >= 0; i--) {
  650.         /* Go forward while *IN* range. */
  651.         while (x->level[i].forward &&
  652.             zslLexValueLteMax(x->level[i].forward->ele,range))
  653.                 x = x->level[i].forward;
  654.     }
  655.  
  656.     /* This is an inner range, so this node cannot be NULL. */
  657.     serverAssert(x != NULL);
  658.  
  659.     /* Check if score >= min. */
  660.     if (!zslLexValueGteMin(x->ele,range)) return NULL;
  661.     return x;
  662. }
  663.  
  664. /*-----------------------------------------------------------------------------
  665.  * Ziplist-backed sorted set API
  666.  *----------------------------------------------------------------------------*/
  667.  
  668. double zzlGetScore(unsigned char *sptr) {
  669.     unsigned char *vstr;
  670.     unsigned int vlen;
  671.     long long vlong;
  672.     char buf[128];
  673.     double score;
  674.  
  675.     serverAssert(sptr != NULL);
  676.     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
  677.  
  678.     if (vstr) {
  679.         memcpy(buf,vstr,vlen);
  680.         buf[vlen] = '\0';
  681.         score = strtod(buf,NULL);
  682.     } else {
  683.         score = vlong;
  684.     }
  685.  
  686.     return score;
  687. }
  688.  
  689. /* Return a ziplist element as an SDS string. */
  690. sds ziplistGetObject(unsigned char *sptr) {
  691.     unsigned char *vstr;
  692.     unsigned int vlen;
  693.     long long vlong;
  694.  
  695.     serverAssert(sptr != NULL);
  696.     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
  697.  
  698.     if (vstr) {
  699.         return sdsnewlen((char*)vstr,vlen);
  700.     } else {
  701.         return sdsfromlonglong(vlong);
  702.     }
  703. }
  704.  
  705. /* Compare element in sorted set with given element. */
  706. int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
  707.     unsigned char *vstr;
  708.     unsigned int vlen;
  709.     long long vlong;
  710.     unsigned char vbuf[32];
  711.     int minlen, cmp;
  712.  
  713.     serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
  714.     if (vstr == NULL) {
  715.         /* Store string representation of long long in buf. */
  716.         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
  717.         vstr = vbuf;
  718.     }
  719.  
  720.     minlen = (vlen < clen) ? vlen : clen;
  721.     cmp = memcmp(vstr,cstr,minlen);
  722.     if (cmp == 0) return vlen-clen;
  723.     return cmp;
  724. }
  725.  
  726. unsigned int zzlLength(unsigned char *zl) {
  727.     return ziplistLen(zl)/2;
  728. }
  729.  
  730. /* Move to next entry based on the values in eptr and sptr. Both are set to
  731.  * NULL when there is no next entry. */
  732. void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
  733.     unsigned char *_eptr, *_sptr;
  734.     serverAssert(*eptr != NULL && *sptr != NULL);
  735.  
  736.     _eptr = ziplistNext(zl,*sptr);
  737.     if (_eptr != NULL) {
  738.         _sptr = ziplistNext(zl,_eptr);
  739.         serverAssert(_sptr != NULL);
  740.     } else {
  741.         /* No next entry. */
  742.         _sptr = NULL;
  743.     }
  744.  
  745.     *eptr = _eptr;
  746.     *sptr = _sptr;
  747. }
  748.  
  749. /* Move to the previous entry based on the values in eptr and sptr. Both are
  750.  * set to NULL when there is no next entry. */
  751. void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
  752.     unsigned char *_eptr, *_sptr;
  753.     serverAssert(*eptr != NULL && *sptr != NULL);
  754.  
  755.     _sptr = ziplistPrev(zl,*eptr);
  756.     if (_sptr != NULL) {
  757.         _eptr = ziplistPrev(zl,_sptr);
  758.         serverAssert(_eptr != NULL);
  759.     } else {
  760.         /* No previous entry. */
  761.         _eptr = NULL;
  762.     }
  763.  
  764.     *eptr = _eptr;
  765.     *sptr = _sptr;
  766. }
  767.  
  768. /* Returns if there is a part of the zset is in range. Should only be used
  769.  * internally by zzlFirstInRange and zzlLastInRange. */
  770. int zzlIsInRange(unsigned char *zl, zrangespec *range) {
  771.     unsigned char *p;
  772.     double score;
  773.  
  774.     /* Test for ranges that will always be empty. */
  775.     if (range->min > range->max ||
  776.             (range->min == range->max && (range->minex || range->maxex)))
  777.         return 0;
  778.  
  779.     p = ziplistIndex(zl,-1); /* Last score. */
  780.     if (p == NULL) return 0; /* Empty sorted set */
  781.     score = zzlGetScore(p);
  782.     if (!zslValueGteMin(score,range))
  783.         return 0;
  784.  
  785.     p = ziplistIndex(zl,1); /* First score. */
  786.     serverAssert(p != NULL);
  787.     score = zzlGetScore(p);
  788.     if (!zslValueLteMax(score,range))
  789.         return 0;
  790.  
  791.     return 1;
  792. }
  793.  
  794. /* Find pointer to the first element contained in the specified range.
  795.  * Returns NULL when no element is contained in the range. */
  796. unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
  797.     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  798.     double score;
  799.  
  800.     /* If everything is out of range, return early. */
  801.     if (!zzlIsInRange(zl,range)) return NULL;
  802.  
  803.     while (eptr != NULL) {
  804.         sptr = ziplistNext(zl,eptr);
  805.         serverAssert(sptr != NULL);
  806.  
  807.         score = zzlGetScore(sptr);
  808.         if (zslValueGteMin(score,range)) {
  809.             /* Check if score <= max. */
  810.             if (zslValueLteMax(score,range))
  811.                 return eptr;
  812.             return NULL;
  813.         }
  814.  
  815.         /* Move to next element. */
  816.         eptr = ziplistNext(zl,sptr);
  817.     }
  818.  
  819.     return NULL;
  820. }
  821.  
  822. /* Find pointer to the last element contained in the specified range.
  823.  * Returns NULL when no element is contained in the range. */
  824. unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
  825.     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
  826.     double score;
  827.  
  828.     /* If everything is out of range, return early. */
  829.     if (!zzlIsInRange(zl,range)) return NULL;
  830.  
  831.     while (eptr != NULL) {
  832.         sptr = ziplistNext(zl,eptr);
  833.         serverAssert(sptr != NULL);
  834.  
  835.         score = zzlGetScore(sptr);
  836.         if (zslValueLteMax(score,range)) {
  837.             /* Check if score >= min. */
  838.             if (zslValueGteMin(score,range))
  839.                 return eptr;
  840.             return NULL;
  841.         }
  842.  
  843.         /* Move to previous element by moving to the score of previous element.
  844.          * When this returns NULL, we know there also is no element. */
  845.         sptr = ziplistPrev(zl,eptr);
  846.         if (sptr != NULL)
  847.             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
  848.         else
  849.             eptr = NULL;
  850.     }
  851.  
  852.     return NULL;
  853. }
  854.  
  855. static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
  856.     sds value = ziplistGetObject(p);
  857.     int res = zslLexValueGteMin(value,spec);
  858.     sdsfree(value);
  859.     return res;
  860. }
  861.  
  862. static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
  863.     sds value = ziplistGetObject(p);
  864.     int res = zslLexValueLteMax(value,spec);
  865.     sdsfree(value);
  866.     return res;
  867. }
  868.  
  869. /* Returns if there is a part of the zset is in range. Should only be used
  870.  * internally by zzlFirstInRange and zzlLastInRange. */
  871. int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
  872.     unsigned char *p;
  873.  
  874.     /* Test for ranges that will always be empty. */
  875.     if (sdscmplex(range->min,range->max) > 1 ||
  876.             (sdscmp(range->min,range->max) == 0 &&
  877.             (range->minex || range->maxex)))
  878.         return 0;
  879.  
  880.     p = ziplistIndex(zl,-2); /* Last element. */
  881.     if (p == NULL) return 0;
  882.     if (!zzlLexValueGteMin(p,range))
  883.         return 0;
  884.  
  885.     p = ziplistIndex(zl,0); /* First element. */
  886.     serverAssert(p != NULL);
  887.     if (!zzlLexValueLteMax(p,range))
  888.         return 0;
  889.  
  890.     return 1;
  891. }
  892.  
  893. /* Find pointer to the first element contained in the specified lex range.
  894.  * Returns NULL when no element is contained in the range. */
  895. unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
  896.     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  897.  
  898.     /* If everything is out of range, return early. */
  899.     if (!zzlIsInLexRange(zl,range)) return NULL;
  900.  
  901.     while (eptr != NULL) {
  902.         if (zzlLexValueGteMin(eptr,range)) {
  903.             /* Check if score <= max. */
  904.             if (zzlLexValueLteMax(eptr,range))
  905.                 return eptr;
  906.             return NULL;
  907.         }
  908.  
  909.         /* Move to next element. */
  910.         sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
  911.         serverAssert(sptr != NULL);
  912.         eptr = ziplistNext(zl,sptr); /* Next element. */
  913.     }
  914.  
  915.     return NULL;
  916. }
  917.  
  918. /* Find pointer to the last element contained in the specified lex range.
  919.  * Returns NULL when no element is contained in the range. */
  920. unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
  921.     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
  922.  
  923.     /* If everything is out of range, return early. */
  924.     if (!zzlIsInLexRange(zl,range)) return NULL;
  925.  
  926.     while (eptr != NULL) {
  927.         if (zzlLexValueLteMax(eptr,range)) {
  928.             /* Check if score >= min. */
  929.             if (zzlLexValueGteMin(eptr,range))
  930.                 return eptr;
  931.             return NULL;
  932.         }
  933.  
  934.         /* Move to previous element by moving to the score of previous element.
  935.          * When this returns NULL, we know there also is no element. */
  936.         sptr = ziplistPrev(zl,eptr);
  937.         if (sptr != NULL)
  938.             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
  939.         else
  940.             eptr = NULL;
  941.     }
  942.  
  943.     return NULL;
  944. }
  945.  
  946. unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
  947.     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  948.  
  949.     while (eptr != NULL) {
  950.         sptr = ziplistNext(zl,eptr);
  951.         serverAssert(sptr != NULL);
  952.  
  953.         if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
  954.             /* Matching element, pull out score. */
  955.             if (score != NULL) *score = zzlGetScore(sptr);
  956.             return eptr;
  957.         }
  958.  
  959.         /* Move to next element. */
  960.         eptr = ziplistNext(zl,sptr);
  961.     }
  962.     return NULL;
  963. }
  964.  
  965. /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
  966.  * don't want to modify the one given as argument. */
  967. unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
  968.     unsigned char *p = eptr;
  969.  
  970.     /* TODO: add function to ziplist API to delete N elements from offset. */
  971.     zl = ziplistDelete(zl,&p);
  972.     zl = ziplistDelete(zl,&p);
  973.     return zl;
  974. }
  975.  
  976. unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
  977.     unsigned char *sptr;
  978.     char scorebuf[128];
  979.     int scorelen;
  980.     size_t offset;
  981.  
  982.     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
  983.     if (eptr == NULL) {
  984.         zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
  985.         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
  986.     } else {
  987.         /* Keep offset relative to zl, as it might be re-allocated. */
  988.         offset = eptr-zl;
  989.         zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
  990.         eptr = zl+offset;
  991.  
  992.         /* Insert score after the element. */
  993.         serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
  994.         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
  995.     }
  996.     return zl;
  997. }
  998.  
  999. /* Insert (element,score) pair in ziplist. This function assumes the element is
  1000.  * not yet present in the list. */
  1001. unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
  1002.     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  1003.     double s;
  1004.  
  1005.     while (eptr != NULL) {
  1006.         sptr = ziplistNext(zl,eptr);
  1007.         serverAssert(sptr != NULL);
  1008.         s = zzlGetScore(sptr);
  1009.  
  1010.         if (s > score) {
  1011.             /* First element with score larger than score for element to be
  1012.              * inserted. This means we should take its spot in the list to
  1013.              * maintain ordering. */
  1014.             zl = zzlInsertAt(zl,eptr,ele,score);
  1015.             break;
  1016.         } else if (s == score) {
  1017.             /* Ensure lexicographical ordering for elements. */
  1018.             if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
  1019.                 zl = zzlInsertAt(zl,eptr,ele,score);
  1020.                 break;
  1021.             }
  1022.         }
  1023.  
  1024.         /* Move to next element. */
  1025.         eptr = ziplistNext(zl,sptr);
  1026.     }
  1027.  
  1028.     /* Push on tail of list when it was not yet inserted. */
  1029.     if (eptr == NULL)
  1030.         zl = zzlInsertAt(zl,NULL,ele,score);
  1031.     return zl;
  1032. }
  1033.  
  1034. unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
  1035.     unsigned char *eptr, *sptr;
  1036.     double score;
  1037.     unsigned long num = 0;
  1038.  
  1039.     if (deleted != NULL) *deleted = 0;
  1040.  
  1041.     eptr = zzlFirstInRange(zl,range);
  1042.     if (eptr == NULL) return zl;
  1043.  
  1044.     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
  1045.      * byte and ziplistNext will return NULL. */
  1046.     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
  1047.         score = zzlGetScore(sptr);
  1048.         if (zslValueLteMax(score,range)) {
  1049.             /* Delete both the element and the score. */
  1050.             zl = ziplistDelete(zl,&eptr);
  1051.             zl = ziplistDelete(zl,&eptr);
  1052.             num++;
  1053.         } else {
  1054.             /* No longer in range. */
  1055.             break;
  1056.         }
  1057.     }
  1058.  
  1059.     if (deleted != NULL) *deleted = num;
  1060.     return zl;
  1061. }
  1062.  
  1063. unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
  1064.     unsigned char *eptr, *sptr;
  1065.     unsigned long num = 0;
  1066.  
  1067.     if (deleted != NULL) *deleted = 0;
  1068.  
  1069.     eptr = zzlFirstInLexRange(zl,range);
  1070.     if (eptr == NULL) return zl;
  1071.  
  1072.     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
  1073.      * byte and ziplistNext will return NULL. */
  1074.     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
  1075.         if (zzlLexValueLteMax(eptr,range)) {
  1076.             /* Delete both the element and the score. */
  1077.             zl = ziplistDelete(zl,&eptr);
  1078.             zl = ziplistDelete(zl,&eptr);
  1079.             num++;
  1080.         } else {
  1081.             /* No longer in range. */
  1082.             break;
  1083.         }
  1084.     }
  1085.  
  1086.     if (deleted != NULL) *deleted = num;
  1087.     return zl;
  1088. }
  1089.  
  1090. /* Delete all the elements with rank between start and end from the skiplist.
  1091.  * Start and end are inclusive. Note that start and end need to be 1-based */
  1092. unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
  1093.     unsigned int num = (end-start)+1;
  1094.     if (deleted) *deleted = num;
  1095.     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
  1096.     return zl;
  1097. }
  1098.  
  1099. /*-----------------------------------------------------------------------------
  1100.  * Common sorted set API
  1101.  *----------------------------------------------------------------------------*/
  1102.  
  1103. unsigned int zsetLength(robj *zobj) {
  1104.     int length = -1;
  1105.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1106.         length = zzlLength(zobj->ptr);
  1107.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1108.         length = ((zset*)zobj->ptr)->zsl->length;
  1109.     } else {
  1110.         serverPanic("Unknown sorted set encoding");
  1111.     }
  1112.     return length;
  1113. }
  1114.  
  1115. void zsetConvert(robj *zobj, int encoding) {
  1116.     zset *zs;
  1117.     zskiplistNode *node, *next;
  1118.     sds ele;
  1119.     double score;
  1120.  
  1121.     if (zobj->encoding == encoding) return;
  1122.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1123.         unsigned char *zl = zobj->ptr;
  1124.         unsigned char *eptr, *sptr;
  1125.         unsigned char *vstr;
  1126.         unsigned int vlen;
  1127.         long long vlong;
  1128.  
  1129.         if (encoding != OBJ_ENCODING_SKIPLIST)
  1130.             serverPanic("Unknown target encoding");
  1131.  
  1132.         zs = zmalloc(sizeof(*zs));
  1133.         zs->dict = dictCreate(&zsetDictType,NULL);
  1134.         zs->zsl = zslCreate();
  1135.  
  1136.         eptr = ziplistIndex(zl,0);
  1137.         serverAssertWithInfo(NULL,zobj,eptr != NULL);
  1138.         sptr = ziplistNext(zl,eptr);
  1139.         serverAssertWithInfo(NULL,zobj,sptr != NULL);
  1140.  
  1141.         while (eptr != NULL) {
  1142.             score = zzlGetScore(sptr);
  1143.             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
  1144.             if (vstr == NULL)
  1145.                 ele = sdsfromlonglong(vlong);
  1146.             else
  1147.                 ele = sdsnewlen((char*)vstr,vlen);
  1148.  
  1149.             node = zslInsert(zs->zsl,score,ele);
  1150.             serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
  1151.             zzlNext(zl,&eptr,&sptr);
  1152.         }
  1153.  
  1154.         zfree(zobj->ptr);
  1155.         zobj->ptr = zs;
  1156.         zobj->encoding = OBJ_ENCODING_SKIPLIST;
  1157.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1158.         unsigned char *zl = ziplistNew();
  1159.  
  1160.         if (encoding != OBJ_ENCODING_ZIPLIST)
  1161.             serverPanic("Unknown target encoding");
  1162.  
  1163.         /* Approach similar to zslFree(), since we want to free the skiplist at
  1164.          * the same time as creating the ziplist. */
  1165.         zs = zobj->ptr;
  1166.         dictRelease(zs->dict);
  1167.         node = zs->zsl->header->level[0].forward;
  1168.         zfree(zs->zsl->header);
  1169.         zfree(zs->zsl);
  1170.  
  1171.         while (node) {
  1172.             zl = zzlInsertAt(zl,NULL,node->ele,node->score);
  1173.             next = node->level[0].forward;
  1174.             zslFreeNode(node);
  1175.             node = next;
  1176.         }
  1177.  
  1178.         zfree(zs);
  1179.         zobj->ptr = zl;
  1180.         zobj->encoding = OBJ_ENCODING_ZIPLIST;
  1181.     } else {
  1182.         serverPanic("Unknown sorted set encoding");
  1183.     }
  1184. }
  1185.  
  1186. /* Return (by reference) the score of the specified member of the sorted set
  1187.  * storing it into *score. If the element does not exist C_ERR is returned
  1188.  * otherwise C_OK is returned and *score is correctly populated.
  1189.  * If 'zobj' or 'member' is NULL, C_ERR is returned. */
  1190. int zsetScore(robj *zobj, sds member, double *score) {
  1191.     if (!zobj || !member) return C_ERR;
  1192.  
  1193.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1194.         if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
  1195.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1196.         zset *zs = zobj->ptr;
  1197.         dictEntry *de = dictFind(zs->dict, member);
  1198.         if (de == NULL) return C_ERR;
  1199.         *score = *(double*)dictGetVal(de);
  1200.     } else {
  1201.         serverPanic("Unknown sorted set encoding");
  1202.     }
  1203.     return C_OK;
  1204. }
  1205.  
  1206. /*-----------------------------------------------------------------------------
  1207.  * Sorted set commands
  1208.  *----------------------------------------------------------------------------*/
  1209.  
  1210. /* This generic command implements both ZADD and ZINCRBY. */
  1211. #define ZADD_NONE 0
  1212. #define ZADD_INCR (1<<0)    /* Increment the score instead of setting it. */
  1213. #define ZADD_NX (1<<1)      /* Don't touch elements not already existing. */
  1214. #define ZADD_XX (1<<2)      /* Only touch elements already exisitng. */
  1215. #define ZADD_CH (1<<3)      /* Return num of elements added or updated. */
  1216. void zaddGenericCommand(client *c, int flags) {
  1217.     static char *nanerr = "resulting score is not a number (NaN)";
  1218.     robj *key = c->argv[1];
  1219.     robj *zobj;
  1220.     sds ele;
  1221.     double score = 0, *scores = NULL, curscore = 0.0;
  1222.     int j, elements;
  1223.     int scoreidx = 0;
  1224.     /* The following vars are used in order to track what the command actually
  1225.      * did during the execution, to reply to the client and to trigger the
  1226.      * notification of keyspace change. */
  1227.     int added = 0;      /* Number of new elements added. */
  1228.     int updated = 0;    /* Number of elements with updated score. */
  1229.     int processed = 0;  /* Number of elements processed, may remain zero with
  1230.                            options like XX. */
  1231.  
  1232.     /* Parse options. At the end 'scoreidx' is set to the argument position
  1233.      * of the score of the first score-element pair. */
  1234.     scoreidx = 2;
  1235.     while(scoreidx < c->argc) {
  1236.         char *opt = c->argv[scoreidx]->ptr;
  1237.         if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
  1238.         else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
  1239.         else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
  1240.         else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
  1241.         else break;
  1242.         scoreidx++;
  1243.     }
  1244.  
  1245.     /* Turn options into simple to check vars. */
  1246.     int incr = (flags & ZADD_INCR) != 0;
  1247.     int nx = (flags & ZADD_NX) != 0;
  1248.     int xx = (flags & ZADD_XX) != 0;
  1249.     int ch = (flags & ZADD_CH) != 0;
  1250.  
  1251.     /* After the options, we expect to have an even number of args, since
  1252.      * we expect any number of score-element pairs. */
  1253.     elements = c->argc-scoreidx;
  1254.     if (elements % 2) {
  1255.         addReply(c,shared.syntaxerr);
  1256.         return;
  1257.     }
  1258.     elements /= 2; /* Now this holds the number of score-element pairs. */
  1259.  
  1260.     /* Check for incompatible options. */
  1261.     if (nx && xx) {
  1262.         addReplyError(c,
  1263.             "XX and NX options at the same time are not compatible");
  1264.         return;
  1265.     }
  1266.  
  1267.     if (incr && elements > 1) {
  1268.         addReplyError(c,
  1269.             "INCR option supports a single increment-element pair");
  1270.         return;
  1271.     }
  1272.  
  1273.     /* Start parsing all the scores, we need to emit any syntax error
  1274.      * before executing additions to the sorted set, as the command should
  1275.      * either execute fully or nothing at all. */
  1276.     scores = zmalloc(sizeof(double)*elements);
  1277.     for (j = 0; j < elements; j++) {
  1278.         if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
  1279.             != C_OK) goto cleanup;
  1280.     }
  1281.  
  1282.     /* Lookup the key and create the sorted set if does not exist. */
  1283.     zobj = lookupKeyWrite(c->db,key);
  1284.     if (zobj == NULL) {
  1285.         if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
  1286.         if (server.zset_max_ziplist_entries == 0 ||
  1287.             server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
  1288.         {
  1289.             zobj = createZsetObject();
  1290.         } else {
  1291.             zobj = createZsetZiplistObject();
  1292.         }
  1293.         dbAdd(c->db,key,zobj);
  1294.     } else {
  1295.         if (zobj->type != OBJ_ZSET) {
  1296.             addReply(c,shared.wrongtypeerr);
  1297.             goto cleanup;
  1298.         }
  1299.     }
  1300.  
  1301.     for (j = 0; j < elements; j++) {
  1302.         score = scores[j];
  1303.  
  1304.         ele = c->argv[scoreidx+1+j*2]->ptr;
  1305.         if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1306.             unsigned char *eptr;
  1307.  
  1308.             if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
  1309.                 if (nx) continue;
  1310.                 if (incr) {
  1311.                     score += curscore;
  1312.                     if (isnan(score)) {
  1313.                         addReplyError(c,nanerr);
  1314.                         goto cleanup;
  1315.                     }
  1316.                 }
  1317.  
  1318.                 /* Remove and re-insert when score changed. */
  1319.                 if (score != curscore) {
  1320.                     zobj->ptr = zzlDelete(zobj->ptr,eptr);
  1321.                     zobj->ptr = zzlInsert(zobj->ptr,ele,score);
  1322.                     server.dirty++;
  1323.                     updated++;
  1324.                 }
  1325.                 processed++;
  1326.             } else if (!xx) {
  1327.                 /* Optimize: check if the element is too large or the list
  1328.                  * becomes too long *before* executing zzlInsert. */
  1329.                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
  1330.                 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
  1331.                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
  1332.                 if (sdslen(ele) > server.zset_max_ziplist_value)
  1333.                     zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
  1334.                 server.dirty++;
  1335.                 added++;
  1336.                 processed++;
  1337.             }
  1338.         } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1339.             zset *zs = zobj->ptr;
  1340.             zskiplistNode *znode;
  1341.             dictEntry *de;
  1342.  
  1343.             de = dictFind(zs->dict,ele);
  1344.             if (de != NULL) {
  1345.                 if (nx) continue;
  1346.                 curscore = *(double*)dictGetVal(de);
  1347.  
  1348.                 if (incr) {
  1349.                     score += curscore;
  1350.                     if (isnan(score)) {
  1351.                         addReplyError(c,nanerr);
  1352.                         /* Don't need to check if the sorted set is empty
  1353.                          * because we know it has at least one element. */
  1354.                         goto cleanup;
  1355.                     }
  1356.                 }
  1357.  
  1358.                 /* Remove and re-insert when score changes. */
  1359.                 if (score != curscore) {
  1360.                     zskiplistNode *node;
  1361.                     serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
  1362.                     znode = zslInsert(zs->zsl,score,node->ele);
  1363.                     /* We reused the node->ele SDS string, free the node now
  1364.                      * since zslInsert created a new one. */
  1365.                     node->ele = NULL;
  1366.                     zslFreeNode(node);
  1367.                     /* Note that we did not removed the original element from
  1368.                      * the hash table representing the sorted set, so we just
  1369.                      * update the score. */
  1370.                     dictGetVal(de) = &znode->score; /* Update score ptr. */
  1371.                     server.dirty++;
  1372.                     updated++;
  1373.                 }
  1374.                 processed++;
  1375.             } else if (!xx) {
  1376.                 ele = sdsdup(ele);
  1377.                 znode = zslInsert(zs->zsl,score,ele);
  1378.                 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
  1379.                 server.dirty++;
  1380.                 added++;
  1381.                 processed++;
  1382.             }
  1383.         } else {
  1384.             serverPanic("Unknown sorted set encoding");
  1385.         }
  1386.     }
  1387.  
  1388. reply_to_client:
  1389.     if (incr) { /* ZINCRBY or INCR option. */
  1390.         if (processed)
  1391.             addReplyDouble(c,score);
  1392.         else
  1393.             addReply(c,shared.nullbulk);
  1394.     } else { /* ZADD. */
  1395.         addReplyLongLong(c,ch ? added+updated : added);
  1396.     }
  1397.  
  1398. cleanup:
  1399.     zfree(scores);
  1400.     if (added || updated) {
  1401.         signalModifiedKey(c->db,key);
  1402.         notifyKeyspaceEvent(NOTIFY_ZSET,
  1403.             incr ? "zincr" : "zadd", key, c->db->id);
  1404.     }
  1405. }
  1406.  
  1407. void zaddCommand(client *c) {
  1408.     zaddGenericCommand(c,ZADD_NONE);
  1409. }
  1410.  
  1411. void zincrbyCommand(client *c) {
  1412.     zaddGenericCommand(c,ZADD_INCR);
  1413. }
  1414.  
  1415. void zremCommand(client *c) {
  1416.     robj *key = c->argv[1];
  1417.     robj *zobj;
  1418.     int deleted = 0, keyremoved = 0, j;
  1419.  
  1420.     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
  1421.         checkType(c,zobj,OBJ_ZSET)) return;
  1422.  
  1423.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1424.         unsigned char *eptr;
  1425.  
  1426.         for (j = 2; j < c->argc; j++) {
  1427.             if ((eptr = zzlFind(zobj->ptr,c->argv[j]->ptr,NULL)) != NULL) {
  1428.                 deleted++;
  1429.                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
  1430.                 if (zzlLength(zobj->ptr) == 0) {
  1431.                     dbDelete(c->db,key);
  1432.                     keyremoved = 1;
  1433.                     break;
  1434.                 }
  1435.             }
  1436.         }
  1437.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1438.         zset *zs = zobj->ptr;
  1439.         dictEntry *de;
  1440.         double score;
  1441.  
  1442.         for (j = 2; j < c->argc; j++) {
  1443.             de = dictFind(zs->dict,c->argv[j]->ptr);
  1444.             if (de != NULL) {
  1445.                 deleted++;
  1446.  
  1447.                 /* Get the score in order to delete from the skiplist later. */
  1448.                 score = *(double*)dictGetVal(de);
  1449.  
  1450.                 /* Delete from the hash table and later from the skiplist.
  1451.                  * Note that the order is important: deleting from the skiplist
  1452.                  * actually releases the SDS string representing the element,
  1453.                  * which is shared between the skiplist and the hash table, so
  1454.                  * we need to delete from the skiplist as the final step. */
  1455.                 int retval1 = dictDelete(zs->dict,c->argv[j]->ptr);
  1456.  
  1457.                 /* Delete from skiplist. */
  1458.                 int retval2 = zslDelete(zs->zsl,score,c->argv[j]->ptr,NULL);
  1459.  
  1460.                 serverAssertWithInfo(c,c->argv[j],
  1461.                     retval1 == DICT_OK && retval2);
  1462.  
  1463.                 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
  1464.                 if (dictSize(zs->dict) == 0) {
  1465.                     dbDelete(c->db,key);
  1466.                     keyremoved = 1;
  1467.                     break;
  1468.                 }
  1469.             }
  1470.         }
  1471.     } else {
  1472.         serverPanic("Unknown sorted set encoding");
  1473.     }
  1474.  
  1475.     if (deleted) {
  1476.         notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
  1477.         if (keyremoved)
  1478.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
  1479.         signalModifiedKey(c->db,key);
  1480.         server.dirty += deleted;
  1481.     }
  1482.     addReplyLongLong(c,deleted);
  1483. }
  1484.  
  1485. /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
  1486. #define ZRANGE_RANK 0
  1487. #define ZRANGE_SCORE 1
  1488. #define ZRANGE_LEX 2
  1489. void zremrangeGenericCommand(client *c, int rangetype) {
  1490.     robj *key = c->argv[1];
  1491.     robj *zobj;
  1492.     int keyremoved = 0;
  1493.     unsigned long deleted = 0;
  1494.     zrangespec range;
  1495.     zlexrangespec lexrange;
  1496.     long start, end, llen;
  1497.  
  1498.     /* Step 1: Parse the range. */
  1499.     if (rangetype == ZRANGE_RANK) {
  1500.         if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
  1501.             (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
  1502.             return;
  1503.     } else if (rangetype == ZRANGE_SCORE) {
  1504.         if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
  1505.             addReplyError(c,"min or max is not a float");
  1506.             return;
  1507.         }
  1508.     } else if (rangetype == ZRANGE_LEX) {
  1509.         if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
  1510.             addReplyError(c,"min or max not valid string range item");
  1511.             return;
  1512.         }
  1513.     }
  1514.  
  1515.     /* Step 2: Lookup & range sanity checks if needed. */
  1516.     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
  1517.         checkType(c,zobj,OBJ_ZSET)) goto cleanup;
  1518.  
  1519.     if (rangetype == ZRANGE_RANK) {
  1520.         /* Sanitize indexes. */
  1521.         llen = zsetLength(zobj);
  1522.         if (start < 0) start = llen+start;
  1523.         if (end < 0) end = llen+end;
  1524.         if (start < 0) start = 0;
  1525.  
  1526.         /* Invariant: start >= 0, so this test will be true when end < 0.
  1527.          * The range is empty when start > end or start >= length. */
  1528.         if (start > end || start >= llen) {
  1529.             addReply(c,shared.czero);
  1530.             goto cleanup;
  1531.         }
  1532.         if (end >= llen) end = llen-1;
  1533.     }
  1534.  
  1535.     /* Step 3: Perform the range deletion operation. */
  1536.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  1537.         switch(rangetype) {
  1538.         case ZRANGE_RANK:
  1539.             zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
  1540.             break;
  1541.         case ZRANGE_SCORE:
  1542.             zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
  1543.             break;
  1544.         case ZRANGE_LEX:
  1545.             zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
  1546.             break;
  1547.         }
  1548.         if (zzlLength(zobj->ptr) == 0) {
  1549.             dbDelete(c->db,key);
  1550.             keyremoved = 1;
  1551.         }
  1552.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  1553.         zset *zs = zobj->ptr;
  1554.         switch(rangetype) {
  1555.         case ZRANGE_RANK:
  1556.             deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
  1557.             break;
  1558.         case ZRANGE_SCORE:
  1559.             deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
  1560.             break;
  1561.         case ZRANGE_LEX:
  1562.             deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
  1563.             break;
  1564.         }
  1565.         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
  1566.         if (dictSize(zs->dict) == 0) {
  1567.             dbDelete(c->db,key);
  1568.             keyremoved = 1;
  1569.         }
  1570.     } else {
  1571.         serverPanic("Unknown sorted set encoding");
  1572.     }
  1573.  
  1574.     /* Step 4: Notifications and reply. */
  1575.     if (deleted) {
  1576.         char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
  1577.         signalModifiedKey(c->db,key);
  1578.         notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
  1579.         if (keyremoved)
  1580.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
  1581.     }
  1582.     server.dirty += deleted;
  1583.     addReplyLongLong(c,deleted);
  1584.  
  1585. cleanup:
  1586.     if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
  1587. }
  1588.  
  1589. void zremrangebyrankCommand(client *c) {
  1590.     zremrangeGenericCommand(c,ZRANGE_RANK);
  1591. }
  1592.  
  1593. void zremrangebyscoreCommand(client *c) {
  1594.     zremrangeGenericCommand(c,ZRANGE_SCORE);
  1595. }
  1596.  
  1597. void zremrangebylexCommand(client *c) {
  1598.     zremrangeGenericCommand(c,ZRANGE_LEX);
  1599. }
  1600.  
  1601. typedef struct {
  1602.     robj *subject;
  1603.     int type; /* Set, sorted set */
  1604.     int encoding;
  1605.     double weight;
  1606.    
  1607.     union {
  1608.         /* Set iterators. */
  1609.         union _iterset {
  1610.             struct {
  1611.                 intset *is;
  1612.                 int ii;
  1613.             } is;
  1614.             struct {
  1615.                 dict *dict;
  1616.                 dictIterator *di;
  1617.                 dictEntry *de;
  1618.             } ht;
  1619.         } set;
  1620.  
  1621.         /* Sorted set iterators. */
  1622.         union _iterzset {
  1623.             struct {
  1624.                 unsigned char *zl;
  1625.                 unsigned char *eptr, *sptr;
  1626.             } zl;
  1627.             struct {
  1628.                 zset *zs;
  1629.                 zskiplistNode *node;
  1630.             } sl;
  1631.         } zset;
  1632.     } iter;
  1633. } zsetopsrc;
  1634.  
  1635.  
  1636. /* Use dirty flags for pointers that need to be cleaned up in the next
  1637.  * iteration over the zsetopval. The dirty flag for the long long value is
  1638.  * special, since long long values don't need cleanup. Instead, it means that
  1639.  * we already checked that "ell" holds a long long, or tried to convert another
  1640.  * representation into a long long value. When this was successful,
  1641.  * OPVAL_VALID_LL is set as well. */
  1642. #define OPVAL_DIRTY_SDS 1
  1643. #define OPVAL_DIRTY_LL 2
  1644. #define OPVAL_VALID_LL 4
  1645.  
  1646. /* Store value retrieved from the iterator. */
  1647. typedef struct {
  1648.     int flags;
  1649.     unsigned char _buf[32]; /* Private buffer. */
  1650.     sds ele;
  1651.     unsigned char *estr;
  1652.     unsigned int elen;
  1653.     long long ell;
  1654.     double score;
  1655. } zsetopval;
  1656.  
  1657. typedef union _iterset iterset;
  1658. typedef union _iterzset iterzset;
  1659.  
  1660. void zuiInitIterator(zsetopsrc *op) {
  1661.     if (op->subject == NULL)
  1662.         return;
  1663.  
  1664.     if (op->type == OBJ_SET) {
  1665.         iterset *it = &op->iter.set;
  1666.         if (op->encoding == OBJ_ENCODING_INTSET) {
  1667.             it->is.is = op->subject->ptr;
  1668.             it->is.ii = 0;
  1669.         } else if (op->encoding == OBJ_ENCODING_HT) {
  1670.             it->ht.dict = op->subject->ptr;
  1671.             it->ht.di = dictGetIterator(op->subject->ptr);
  1672.             it->ht.de = dictNext(it->ht.di);
  1673.         } else {
  1674.             serverPanic("Unknown set encoding");
  1675.         }
  1676.     } else if (op->type == OBJ_ZSET) {
  1677.         iterzset *it = &op->iter.zset;
  1678.         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
  1679.             it->zl.zl = op->subject->ptr;
  1680.             it->zl.eptr = ziplistIndex(it->zl.zl,0);
  1681.             if (it->zl.eptr != NULL) {
  1682.                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
  1683.                 serverAssert(it->zl.sptr != NULL);
  1684.             }
  1685.         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
  1686.             it->sl.zs = op->subject->ptr;
  1687.             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
  1688.         } else {
  1689.             serverPanic("Unknown sorted set encoding");
  1690.         }
  1691.     } else {
  1692.         serverPanic("Unsupported type");
  1693.     }
  1694. }
  1695.  
  1696. void zuiClearIterator(zsetopsrc *op) {
  1697.     if (op->subject == NULL)
  1698.         return;
  1699.  
  1700.     if (op->type == OBJ_SET) {
  1701.         iterset *it = &op->iter.set;
  1702.         if (op->encoding == OBJ_ENCODING_INTSET) {
  1703.             UNUSED(it); /* skip */
  1704.         } else if (op->encoding == OBJ_ENCODING_HT) {
  1705.             dictReleaseIterator(it->ht.di);
  1706.         } else {
  1707.             serverPanic("Unknown set encoding");
  1708.         }
  1709.     } else if (op->type == OBJ_ZSET) {
  1710.         iterzset *it = &op->iter.zset;
  1711.         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
  1712.             UNUSED(it); /* skip */
  1713.         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
  1714.             UNUSED(it); /* skip */
  1715.         } else {
  1716.             serverPanic("Unknown sorted set encoding");
  1717.         }
  1718.     } else {
  1719.         serverPanic("Unsupported type");
  1720.     }
  1721. }
  1722.  
  1723. int zuiLength(zsetopsrc *op) {
  1724.     if (op->subject == NULL)
  1725.         return 0;
  1726.  
  1727.     if (op->type == OBJ_SET) {
  1728.         if (op->encoding == OBJ_ENCODING_INTSET) {
  1729.             return intsetLen(op->subject->ptr);
  1730.         } else if (op->encoding == OBJ_ENCODING_HT) {
  1731.             dict *ht = op->subject->ptr;
  1732.             return dictSize(ht);
  1733.         } else {
  1734.             serverPanic("Unknown set encoding");
  1735.         }
  1736.     } else if (op->type == OBJ_ZSET) {
  1737.         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
  1738.             return zzlLength(op->subject->ptr);
  1739.         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
  1740.             zset *zs = op->subject->ptr;
  1741.             return zs->zsl->length;
  1742.         } else {
  1743.             serverPanic("Unknown sorted set encoding");
  1744.         }
  1745.     } else {
  1746.         serverPanic("Unsupported type");
  1747.     }
  1748. }
  1749.  
  1750. /* Check if the current value is valid. If so, store it in the passed structure
  1751.  * and move to the next element. If not valid, this means we have reached the
  1752.  * end of the structure and can abort. */
  1753. int zuiNext(zsetopsrc *op, zsetopval *val) {
  1754.     if (op->subject == NULL)
  1755.         return 0;
  1756.  
  1757.     if (val->flags & OPVAL_DIRTY_SDS)
  1758.         sdsfree(val->ele);
  1759.  
  1760.     memset(val,0,sizeof(zsetopval));
  1761.  
  1762.     if (op->type == OBJ_SET) {
  1763.         iterset *it = &op->iter.set;
  1764.         if (op->encoding == OBJ_ENCODING_INTSET) {
  1765.             int64_t ell;
  1766.  
  1767.             if (!intsetGet(it->is.is,it->is.ii,&ell))
  1768.                 return 0;
  1769.             val->ell = ell;
  1770.             val->score = 1.0;
  1771.  
  1772.             /* Move to next element. */
  1773.             it->is.ii++;
  1774.         } else if (op->encoding == OBJ_ENCODING_HT) {
  1775.             if (it->ht.de == NULL)
  1776.                 return 0;
  1777.             val->ele = dictGetKey(it->ht.de);
  1778.             val->score = 1.0;
  1779.  
  1780.             /* Move to next element. */
  1781.             it->ht.de = dictNext(it->ht.di);
  1782.         } else {
  1783.             serverPanic("Unknown set encoding");
  1784.         }
  1785.     } else if (op->type == OBJ_ZSET) {
  1786.         iterzset *it = &op->iter.zset;
  1787.         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
  1788.             /* No need to check both, but better be explicit. */
  1789.             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
  1790.                 return 0;
  1791.             serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
  1792.             val->score = zzlGetScore(it->zl.sptr);
  1793.  
  1794.             /* Move to next element. */
  1795.             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
  1796.         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
  1797.             if (it->sl.node == NULL)
  1798.                 return 0;
  1799.             val->ele = it->sl.node->ele;
  1800.             val->score = it->sl.node->score;
  1801.  
  1802.             /* Move to next element. */
  1803.             it->sl.node = it->sl.node->level[0].forward;
  1804.         } else {
  1805.             serverPanic("Unknown sorted set encoding");
  1806.         }
  1807.     } else {
  1808.         serverPanic("Unsupported type");
  1809.     }
  1810.     return 1;
  1811. }
  1812.  
  1813. int zuiLongLongFromValue(zsetopval *val) {
  1814.     if (!(val->flags & OPVAL_DIRTY_LL)) {
  1815.         val->flags |= OPVAL_DIRTY_LL;
  1816.  
  1817.         if (val->ele != NULL) {
  1818.             if (string2ll(val->ele,sdslen(val->ele),&val->ell))
  1819.                 val->flags |= OPVAL_VALID_LL;
  1820.         } else if (val->estr != NULL) {
  1821.             if (string2ll((char*)val->estr,val->elen,&val->ell))
  1822.                 val->flags |= OPVAL_VALID_LL;
  1823.         } else {
  1824.             /* The long long was already set, flag as valid. */
  1825.             val->flags |= OPVAL_VALID_LL;
  1826.         }
  1827.     }
  1828.     return val->flags & OPVAL_VALID_LL;
  1829. }
  1830.  
  1831. sds zuiSdsFromValue(zsetopval *val) {
  1832.     if (val->ele == NULL) {
  1833.         if (val->estr != NULL) {
  1834.             val->ele = sdsnewlen((char*)val->estr,val->elen);
  1835.         } else {
  1836.             val->ele = sdsfromlonglong(val->ell);
  1837.         }
  1838.         val->flags |= OPVAL_DIRTY_SDS;
  1839.     }
  1840.     return val->ele;
  1841. }
  1842.  
  1843. /* This is different from zuiSdsFromValue since returns a new SDS string
  1844.  * which is up to the caller to free. */
  1845. sds zuiNewSdsFromValue(zsetopval *val) {
  1846.     if (val->flags & OPVAL_DIRTY_SDS) {
  1847.         /* We have already one to return! */
  1848.         sds ele = val->ele;
  1849.         val->flags &= ~OPVAL_DIRTY_SDS;
  1850.         val->ele = NULL;
  1851.         return ele;
  1852.     } else if (val->ele) {
  1853.         return sdsdup(val->ele);
  1854.     } else if (val->estr) {
  1855.         return sdsnewlen((char*)val->estr,val->elen);
  1856.     } else {
  1857.         return sdsfromlonglong(val->ell);
  1858.     }
  1859. }
  1860.  
  1861. int zuiBufferFromValue(zsetopval *val) {
  1862.     if (val->estr == NULL) {
  1863.         if (val->ele != NULL) {
  1864.             val->elen = sdslen(val->ele);
  1865.             val->estr = (unsigned char*)val->ele;
  1866.         } else {
  1867.             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
  1868.             val->estr = val->_buf;
  1869.         }
  1870.     }
  1871.     return 1;
  1872. }
  1873.  
  1874. /* Find value pointed to by val in the source pointer to by op. When found,
  1875.  * return 1 and store its score in target. Return 0 otherwise. */
  1876. int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
  1877.     if (op->subject == NULL)
  1878.         return 0;
  1879.  
  1880.     if (op->type == OBJ_SET) {
  1881.         if (op->encoding == OBJ_ENCODING_INTSET) {
  1882.             if (zuiLongLongFromValue(val) &&
  1883.                 intsetFind(op->subject->ptr,val->ell))
  1884.             {
  1885.                 *score = 1.0;
  1886.                 return 1;
  1887.             } else {
  1888.                 return 0;
  1889.             }
  1890.         } else if (op->encoding == OBJ_ENCODING_HT) {
  1891.             dict *ht = op->subject->ptr;
  1892.             zuiSdsFromValue(val);
  1893.             if (dictFind(ht,val->ele) != NULL) {
  1894.                 *score = 1.0;
  1895.                 return 1;
  1896.             } else {
  1897.                 return 0;
  1898.             }
  1899.         } else {
  1900.             serverPanic("Unknown set encoding");
  1901.         }
  1902.     } else if (op->type == OBJ_ZSET) {
  1903.         zuiSdsFromValue(val);
  1904.  
  1905.         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
  1906.             if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
  1907.                 /* Score is already set by zzlFind. */
  1908.                 return 1;
  1909.             } else {
  1910.                 return 0;
  1911.             }
  1912.         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
  1913.             zset *zs = op->subject->ptr;
  1914.             dictEntry *de;
  1915.             if ((de = dictFind(zs->dict,val->ele)) != NULL) {
  1916.                 *score = *(double*)dictGetVal(de);
  1917.                 return 1;
  1918.             } else {
  1919.                 return 0;
  1920.             }
  1921.         } else {
  1922.             serverPanic("Unknown sorted set encoding");
  1923.         }
  1924.     } else {
  1925.         serverPanic("Unsupported type");
  1926.     }
  1927. }
  1928.  
  1929. int zuiCompareByCardinality(const void *s1, const void *s2) {
  1930.     return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
  1931. }
  1932.  
  1933. #define REDIS_AGGR_SUM 1
  1934. #define REDIS_AGGR_MIN 2
  1935. #define REDIS_AGGR_MAX 3
  1936. #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
  1937.  
  1938. inline static void zunionInterAggregate(double *target, double val, int aggregate) {
  1939.     if (aggregate == REDIS_AGGR_SUM) {
  1940.         *target = *target + val;
  1941.         /* The result of adding two doubles is NaN when one variable
  1942.          * is +inf and the other is -inf. When these numbers are added,
  1943.          * we maintain the convention of the result being 0.0. */
  1944.         if (isnan(*target)) *target = 0.0;
  1945.     } else if (aggregate == REDIS_AGGR_MIN) {
  1946.         *target = val < *target ? val : *target;
  1947.     } else if (aggregate == REDIS_AGGR_MAX) {
  1948.         *target = val > *target ? val : *target;
  1949.     } else {
  1950.         /* safety net */
  1951.         serverPanic("Unknown ZUNION/INTER aggregate type");
  1952.     }
  1953. }
  1954.  
  1955. unsigned int dictSdsHash(const void *key);
  1956. int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
  1957.  
  1958. dictType setAccumulatorDictType = {
  1959.     dictSdsHash,               /* hash function */
  1960.     NULL,                      /* key dup */
  1961.     NULL,                      /* val dup */
  1962.     dictSdsKeyCompare,         /* key compare */
  1963.     NULL,                      /* key destructor */
  1964.     NULL                       /* val destructor */
  1965. };
  1966.  
  1967. void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
  1968.     int i, j;
  1969.     zrangespec range;
  1970.     int filterbyrange = 0;
  1971.     long setnum;
  1972.     int aggregate = REDIS_AGGR_SUM;
  1973.     zsetopsrc *src;
  1974.     zsetopval zval;
  1975.     sds tmp;
  1976.     unsigned int maxelelen = 0;
  1977.     robj *dstobj;
  1978.     zset *dstzset;
  1979.     zskiplistNode *znode;
  1980.     int touched = 0;
  1981.    
  1982.     /* expect setnum input keys to be given */
  1983.     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
  1984.         return;
  1985.  
  1986.     if (setnum < 1) {
  1987.         addReplyError(c,
  1988.             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
  1989.         return;
  1990.     }
  1991.  
  1992.     /* test if the expected number of keys would overflow */
  1993.     if (setnum > c->argc-3) {
  1994.         addReply(c,shared.syntaxerr);
  1995.         return;
  1996.     }
  1997.  
  1998.     /* read keys to be used for input */
  1999.     src = zcalloc(sizeof(zsetopsrc) * setnum);
  2000.     for (i = 0, j = 3; i < setnum; i++, j++) {
  2001.         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
  2002.         if (obj != NULL) {
  2003.             if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
  2004.                 zfree(src);
  2005.                 addReply(c,shared.wrongtypeerr);
  2006.                 return;
  2007.             }
  2008.  
  2009.             src[i].subject = obj;
  2010.             src[i].type = obj->type;
  2011.             src[i].encoding = obj->encoding;
  2012.         } else {
  2013.             src[i].subject = NULL;
  2014.         }
  2015.  
  2016.         /* Default all weights to 1. */
  2017.         src[i].weight = 1.0;
  2018.     }
  2019.  
  2020.     /* parse optional extra arguments */
  2021.     if (j < c->argc) {
  2022.         int remaining = c->argc - j;
  2023.  
  2024.         while (remaining) {
  2025.             if (remaining >= (setnum + 1) &&
  2026.                 !strcasecmp(c->argv[j]->ptr,"weights"))
  2027.             {
  2028.                 j++; remaining--;
  2029.                 for (i = 0; i < setnum; i++, j++, remaining--) {
  2030.                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
  2031.                             "weight value is not a float") != C_OK)
  2032.                     {
  2033.                         zfree(src);
  2034.                         return;
  2035.                     }
  2036.                 }
  2037.             } else if (remaining >= 2 &&
  2038.                        !strcasecmp(c->argv[j]->ptr,"aggregate"))
  2039.             {
  2040.                 j++; remaining--;
  2041.                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
  2042.                     aggregate = REDIS_AGGR_SUM;
  2043.                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
  2044.                     aggregate = REDIS_AGGR_MIN;
  2045.                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
  2046.                     aggregate = REDIS_AGGR_MAX;
  2047.                 } else {
  2048.                     zfree(src);
  2049.                     addReply(c,shared.syntaxerr);
  2050.                     return;
  2051.                 }
  2052.                 j++; remaining--;
  2053.             } else if (remaining > 2 && !strcasecmp(c->argv[j]->ptr,"inrange"))
  2054.         {
  2055.           filterbyrange = 1;
  2056.           j++; remaining--;
  2057.           if (zslParseRange(c->argv[j],c->argv[j+1],&range) != C_OK) {
  2058.         zfree(src);
  2059.                 addReply(c,shared.syntaxerr);
  2060.                 return;
  2061.           }
  2062.           j+=2; remaining-=2;
  2063.         } else {
  2064.                 zfree(src);
  2065.                 addReply(c,shared.syntaxerr);
  2066.                 return;
  2067.             }
  2068.         }
  2069.     }
  2070.  
  2071.     /* sort sets from the smallest to largest, this will improve our
  2072.      * algorithm's performance */
  2073.     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
  2074.    
  2075.     dstobj = createZsetObject();
  2076.     dstzset = dstobj->ptr;
  2077.     memset(&zval, 0, sizeof(zval));
  2078.  
  2079.     if (op == SET_OP_INTER) {
  2080.         /* Skip everything if the smallest input is empty. */
  2081.         if (zuiLength(&src[0]) > 0) {
  2082.             /* Precondition: as src[0] is non-empty and the inputs are ordered
  2083.              * by size, all src[i > 0] are non-empty too. */
  2084.             zuiInitIterator(&src[0]);
  2085.             while (zuiNext(&src[0],&zval)) {
  2086.                 double score, value;
  2087.                 score = src[0].weight * zval.score;
  2088.                 if (isnan(score)) score = 0;
  2089.  
  2090.                 for (j = 1; j < setnum; j++) {
  2091.                     /* It is not safe to access the zset we are
  2092.                      * iterating, so explicitly check for equal object. */
  2093.                     if (src[j].subject == src[0].subject) {
  2094.                         value = zval.score*src[j].weight;
  2095.                         zunionInterAggregate(&score,value,aggregate);
  2096.                     } else if (zuiFind(&src[j],&zval,&value)) {
  2097.                         value *= src[j].weight;
  2098.                         zunionInterAggregate(&score,value,aggregate);
  2099.                     } else {
  2100.                         break;
  2101.                     }
  2102.                 }
  2103.  
  2104.                 /* Only continue when present in every input. */
  2105.                 if (j == setnum) {
  2106.             if (!filterbyrange || ( zslValueGteMin(score, &range) && zslValueLteMax(score, &range) )) {
  2107.               tmp = zuiNewSdsFromValue(&zval);
  2108.               znode = zslInsert(dstzset->zsl,score,tmp);
  2109.               dictAdd(dstzset->dict,tmp,&znode->score);
  2110.               if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
  2111.             }
  2112.                 }
  2113.             }
  2114.             zuiClearIterator(&src[0]);
  2115.         }
  2116.     } else if (op == SET_OP_UNION) {
  2117.         dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
  2118.         dictIterator *di;
  2119.         dictEntry *de;
  2120.         double score;
  2121.  
  2122.         if (setnum) {
  2123.             /* Our union is at least as large as the largest set.
  2124.              * Resize the dictionary ASAP to avoid useless rehashing. */
  2125.             dictExpand(accumulator,zuiLength(&src[setnum-1]));
  2126.         }
  2127.  
  2128.         /* Step 1: Create a dictionary of elements -> aggregated-scores
  2129.          * by iterating one sorted set after the other. */
  2130.         for (i = 0; i < setnum; i++) {
  2131.             if (zuiLength(&src[i]) == 0) continue;
  2132.  
  2133.             zuiInitIterator(&src[i]);
  2134.             while (zuiNext(&src[i],&zval)) {
  2135.                 /* Initialize value */
  2136.                 score = src[i].weight * zval.score;
  2137.                 if (isnan(score)) score = 0;
  2138.  
  2139.                 /* Search for this element in the accumulating dictionary. */
  2140.                 de = dictFind(accumulator,zuiSdsFromValue(&zval));
  2141.                 /* If we don't have it, we need to create a new entry. */
  2142.                 if (de == NULL) {
  2143.                     tmp = zuiNewSdsFromValue(&zval);
  2144.                     /* Remember the longest single element encountered,
  2145.                      * to understand if it's possible to convert to ziplist
  2146.                      * at the end. */
  2147.                      if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
  2148.                     /* Add the element with its initial score. */
  2149.                     de = dictAddRaw(accumulator,tmp);
  2150.                     dictSetDoubleVal(de,score);
  2151.                 } else {
  2152.                     /* Update the score with the score of the new instance
  2153.                      * of the element found in the current sorted set.
  2154.                      *
  2155.                      * Here we access directly the dictEntry double
  2156.                      * value inside the union as it is a big speedup
  2157.                      * compared to using the getDouble/setDouble API. */
  2158.                     zunionInterAggregate(&de->v.d,score,aggregate);
  2159.                 }
  2160.             }
  2161.             zuiClearIterator(&src[i]);
  2162.         }
  2163.  
  2164.         /* Step 2: convert the dictionary into the final sorted set. */
  2165.         di = dictGetIterator(accumulator);
  2166.  
  2167.     if (!filterbyrange) {
  2168.       /* We now are aware of the final size of the resulting sorted set,
  2169.       * let's resize the dictionary embedded inside the sorted set to the
  2170.       * right size, in order to save rehashing time. */
  2171.       dictExpand(dstzset->dict,dictSize(accumulator));
  2172.     }
  2173.         while((de = dictNext(di)) != NULL) {
  2174.         score = dictGetDoubleVal(de);
  2175.         if (!filterbyrange || ( zslValueGteMin(score, &range) && zslValueLteMax(score, &range) )) {
  2176.           sds ele = dictGetKey(de);
  2177.           znode = zslInsert(dstzset->zsl,score,ele);
  2178.           dictAdd(dstzset->dict,ele,&znode->score);
  2179.         }
  2180.         }
  2181.         dictReleaseIterator(di);
  2182.         dictRelease(accumulator);
  2183.     } else {
  2184.         serverPanic("Unknown operator");
  2185.     }
  2186.  
  2187.     if (dbDelete(c->db,dstkey)) {
  2188.         signalModifiedKey(c->db,dstkey);
  2189.         touched = 1;
  2190.         server.dirty++;
  2191.     }
  2192.     if (dstzset->zsl->length) {
  2193.         /* Convert to ziplist when in limits. */
  2194.         if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
  2195.             maxelelen <= server.zset_max_ziplist_value)
  2196.                 zsetConvert(dstobj,OBJ_ENCODING_ZIPLIST);
  2197.  
  2198.         dbAdd(c->db,dstkey,dstobj);
  2199.         addReplyLongLong(c,zsetLength(dstobj));
  2200.         if (!touched) signalModifiedKey(c->db,dstkey);
  2201.         notifyKeyspaceEvent(NOTIFY_ZSET,
  2202.             (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
  2203.             dstkey,c->db->id);
  2204.         server.dirty++;
  2205.     } else {
  2206.         decrRefCount(dstobj);
  2207.         addReply(c,shared.czero);
  2208.         if (touched)
  2209.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
  2210.     }
  2211.     zfree(src);
  2212. }
  2213.  
  2214. void zunionstoreCommand(client *c) {
  2215.     zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
  2216. }
  2217.  
  2218. void zinterstoreCommand(client *c) {
  2219.     zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
  2220. }
  2221.  
  2222. void zrangeGenericCommand(client *c, int reverse) {
  2223.     robj *key = c->argv[1];
  2224.     robj *zobj;
  2225.     int withscores = 0;
  2226.     long start;
  2227.     long end;
  2228.     int llen;
  2229.     int rangelen;
  2230.  
  2231.     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
  2232.         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
  2233.  
  2234.     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
  2235.         withscores = 1;
  2236.     } else if (c->argc >= 5) {
  2237.         addReply(c,shared.syntaxerr);
  2238.         return;
  2239.     }
  2240.  
  2241.     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
  2242.          || checkType(c,zobj,OBJ_ZSET)) return;
  2243.  
  2244.     /* Sanitize indexes. */
  2245.     llen = zsetLength(zobj);
  2246.     if (start < 0) start = llen+start;
  2247.     if (end < 0) end = llen+end;
  2248.     if (start < 0) start = 0;
  2249.  
  2250.     /* Invariant: start >= 0, so this test will be true when end < 0.
  2251.      * The range is empty when start > end or start >= length. */
  2252.     if (start > end || start >= llen) {
  2253.         addReply(c,shared.emptymultibulk);
  2254.         return;
  2255.     }
  2256.     if (end >= llen) end = llen-1;
  2257.     rangelen = (end-start)+1;
  2258.  
  2259.     /* Return the result in form of a multi-bulk reply */
  2260.     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
  2261.  
  2262.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2263.         unsigned char *zl = zobj->ptr;
  2264.         unsigned char *eptr, *sptr;
  2265.         unsigned char *vstr;
  2266.         unsigned int vlen;
  2267.         long long vlong;
  2268.  
  2269.         if (reverse)
  2270.             eptr = ziplistIndex(zl,-2-(2*start));
  2271.         else
  2272.             eptr = ziplistIndex(zl,2*start);
  2273.  
  2274.         serverAssertWithInfo(c,zobj,eptr != NULL);
  2275.         sptr = ziplistNext(zl,eptr);
  2276.  
  2277.         while (rangelen--) {
  2278.             serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
  2279.             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
  2280.             if (vstr == NULL)
  2281.                 addReplyBulkLongLong(c,vlong);
  2282.             else
  2283.                 addReplyBulkCBuffer(c,vstr,vlen);
  2284.  
  2285.             if (withscores)
  2286.                 addReplyDouble(c,zzlGetScore(sptr));
  2287.  
  2288.             if (reverse)
  2289.                 zzlPrev(zl,&eptr,&sptr);
  2290.             else
  2291.                 zzlNext(zl,&eptr,&sptr);
  2292.         }
  2293.  
  2294.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2295.         zset *zs = zobj->ptr;
  2296.         zskiplist *zsl = zs->zsl;
  2297.         zskiplistNode *ln;
  2298.         sds ele;
  2299.  
  2300.         /* Check if starting point is trivial, before doing log(N) lookup. */
  2301.         if (reverse) {
  2302.             ln = zsl->tail;
  2303.             if (start > 0)
  2304.                 ln = zslGetElementByRank(zsl,llen-start);
  2305.         } else {
  2306.             ln = zsl->header->level[0].forward;
  2307.             if (start > 0)
  2308.                 ln = zslGetElementByRank(zsl,start+1);
  2309.         }
  2310.  
  2311.         while(rangelen--) {
  2312.             serverAssertWithInfo(c,zobj,ln != NULL);
  2313.             ele = ln->ele;
  2314.             addReplyBulkCBuffer(c,ele,sdslen(ele));
  2315.             if (withscores)
  2316.                 addReplyDouble(c,ln->score);
  2317.             ln = reverse ? ln->backward : ln->level[0].forward;
  2318.         }
  2319.     } else {
  2320.         serverPanic("Unknown sorted set encoding");
  2321.     }
  2322. }
  2323.  
  2324. void zrangeCommand(client *c) {
  2325.     zrangeGenericCommand(c,0);
  2326. }
  2327.  
  2328. void zrevrangeCommand(client *c) {
  2329.     zrangeGenericCommand(c,1);
  2330. }
  2331.  
  2332. /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
  2333. void genericZrangebyscoreCommand(client *c, int reverse) {
  2334.     zrangespec range;
  2335.     robj *key = c->argv[1];
  2336.     robj *zobj;
  2337.     long offset = 0, limit = -1;
  2338.     int withscores = 0;
  2339.     unsigned long rangelen = 0;
  2340.     void *replylen = NULL;
  2341.     int minidx, maxidx;
  2342.  
  2343.     /* Parse the range arguments. */
  2344.     if (reverse) {
  2345.         /* Range is given as [max,min] */
  2346.         maxidx = 2; minidx = 3;
  2347.     } else {
  2348.         /* Range is given as [min,max] */
  2349.         minidx = 2; maxidx = 3;
  2350.     }
  2351.  
  2352.     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
  2353.         addReplyError(c,"min or max is not a float");
  2354.         return;
  2355.     }
  2356.  
  2357.     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
  2358.      * 4 arguments, so we'll never enter the following code path. */
  2359.     if (c->argc > 4) {
  2360.         int remaining = c->argc - 4;
  2361.         int pos = 4;
  2362.  
  2363.         while (remaining) {
  2364.             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
  2365.                 pos++; remaining--;
  2366.                 withscores = 1;
  2367.             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
  2368.                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
  2369.                         != C_OK) ||
  2370.                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
  2371.                         != C_OK))
  2372.                 {
  2373.                     return;
  2374.                 }
  2375.                 pos += 3; remaining -= 3;
  2376.             } else {
  2377.                 addReply(c,shared.syntaxerr);
  2378.                 return;
  2379.             }
  2380.         }
  2381.     }
  2382.  
  2383.     /* Ok, lookup the key and get the range */
  2384.     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
  2385.         checkType(c,zobj,OBJ_ZSET)) return;
  2386.  
  2387.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2388.         unsigned char *zl = zobj->ptr;
  2389.         unsigned char *eptr, *sptr;
  2390.         unsigned char *vstr;
  2391.         unsigned int vlen;
  2392.         long long vlong;
  2393.         double score;
  2394.  
  2395.         /* If reversed, get the last node in range as starting point. */
  2396.         if (reverse) {
  2397.             eptr = zzlLastInRange(zl,&range);
  2398.         } else {
  2399.             eptr = zzlFirstInRange(zl,&range);
  2400.         }
  2401.  
  2402.         /* No "first" element in the specified interval. */
  2403.         if (eptr == NULL) {
  2404.             addReply(c, shared.emptymultibulk);
  2405.             return;
  2406.         }
  2407.  
  2408.         /* Get score pointer for the first element. */
  2409.         serverAssertWithInfo(c,zobj,eptr != NULL);
  2410.         sptr = ziplistNext(zl,eptr);
  2411.  
  2412.         /* We don't know in advance how many matching elements there are in the
  2413.          * list, so we push this object that will represent the multi-bulk
  2414.          * length in the output buffer, and will "fix" it later */
  2415.         replylen = addDeferredMultiBulkLength(c);
  2416.  
  2417.         /* If there is an offset, just traverse the number of elements without
  2418.          * checking the score because that is done in the next loop. */
  2419.         while (eptr && offset--) {
  2420.             if (reverse) {
  2421.                 zzlPrev(zl,&eptr,&sptr);
  2422.             } else {
  2423.                 zzlNext(zl,&eptr,&sptr);
  2424.             }
  2425.         }
  2426.  
  2427.         while (eptr && limit--) {
  2428.             score = zzlGetScore(sptr);
  2429.  
  2430.             /* Abort when the node is no longer in range. */
  2431.             if (reverse) {
  2432.                 if (!zslValueGteMin(score,&range)) break;
  2433.             } else {
  2434.                 if (!zslValueLteMax(score,&range)) break;
  2435.             }
  2436.  
  2437.             /* We know the element exists, so ziplistGet should always succeed */
  2438.             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
  2439.  
  2440.             rangelen++;
  2441.             if (vstr == NULL) {
  2442.                 addReplyBulkLongLong(c,vlong);
  2443.             } else {
  2444.                 addReplyBulkCBuffer(c,vstr,vlen);
  2445.             }
  2446.  
  2447.             if (withscores) {
  2448.                 addReplyDouble(c,score);
  2449.             }
  2450.  
  2451.             /* Move to next node */
  2452.             if (reverse) {
  2453.                 zzlPrev(zl,&eptr,&sptr);
  2454.             } else {
  2455.                 zzlNext(zl,&eptr,&sptr);
  2456.             }
  2457.         }
  2458.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2459.         zset *zs = zobj->ptr;
  2460.         zskiplist *zsl = zs->zsl;
  2461.         zskiplistNode *ln;
  2462.  
  2463.         /* If reversed, get the last node in range as starting point. */
  2464.         if (reverse) {
  2465.             ln = zslLastInRange(zsl,&range);
  2466.         } else {
  2467.             ln = zslFirstInRange(zsl,&range);
  2468.         }
  2469.  
  2470.         /* No "first" element in the specified interval. */
  2471.         if (ln == NULL) {
  2472.             addReply(c, shared.emptymultibulk);
  2473.             return;
  2474.         }
  2475.  
  2476.         /* We don't know in advance how many matching elements there are in the
  2477.          * list, so we push this object that will represent the multi-bulk
  2478.          * length in the output buffer, and will "fix" it later */
  2479.         replylen = addDeferredMultiBulkLength(c);
  2480.  
  2481.         /* If there is an offset, just traverse the number of elements without
  2482.          * checking the score because that is done in the next loop. */
  2483.         while (ln && offset--) {
  2484.             if (reverse) {
  2485.                 ln = ln->backward;
  2486.             } else {
  2487.                 ln = ln->level[0].forward;
  2488.             }
  2489.         }
  2490.  
  2491.         while (ln && limit--) {
  2492.             /* Abort when the node is no longer in range. */
  2493.             if (reverse) {
  2494.                 if (!zslValueGteMin(ln->score,&range)) break;
  2495.             } else {
  2496.                 if (!zslValueLteMax(ln->score,&range)) break;
  2497.             }
  2498.  
  2499.             rangelen++;
  2500.             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
  2501.  
  2502.             if (withscores) {
  2503.                 addReplyDouble(c,ln->score);
  2504.             }
  2505.  
  2506.             /* Move to next node */
  2507.             if (reverse) {
  2508.                 ln = ln->backward;
  2509.             } else {
  2510.                 ln = ln->level[0].forward;
  2511.             }
  2512.         }
  2513.     } else {
  2514.         serverPanic("Unknown sorted set encoding");
  2515.     }
  2516.  
  2517.     if (withscores) {
  2518.         rangelen *= 2;
  2519.     }
  2520.  
  2521.     setDeferredMultiBulkLength(c, replylen, rangelen);
  2522. }
  2523.  
  2524. void zrangebyscoreCommand(client *c) {
  2525.     genericZrangebyscoreCommand(c,0);
  2526. }
  2527.  
  2528. void zrevrangebyscoreCommand(client *c) {
  2529.     genericZrangebyscoreCommand(c,1);
  2530. }
  2531.  
  2532. void zcountCommand(client *c) {
  2533.     robj *key = c->argv[1];
  2534.     robj *zobj;
  2535.     zrangespec range;
  2536.     int count = 0;
  2537.  
  2538.     /* Parse the range arguments */
  2539.     if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
  2540.         addReplyError(c,"min or max is not a float");
  2541.         return;
  2542.     }
  2543.  
  2544.     /* Lookup the sorted set */
  2545.     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
  2546.         checkType(c, zobj, OBJ_ZSET)) return;
  2547.  
  2548.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2549.         unsigned char *zl = zobj->ptr;
  2550.         unsigned char *eptr, *sptr;
  2551.         double score;
  2552.  
  2553.         /* Use the first element in range as the starting point */
  2554.         eptr = zzlFirstInRange(zl,&range);
  2555.  
  2556.         /* No "first" element */
  2557.         if (eptr == NULL) {
  2558.             addReply(c, shared.czero);
  2559.             return;
  2560.         }
  2561.  
  2562.         /* First element is in range */
  2563.         sptr = ziplistNext(zl,eptr);
  2564.         score = zzlGetScore(sptr);
  2565.         serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
  2566.  
  2567.         /* Iterate over elements in range */
  2568.         while (eptr) {
  2569.             score = zzlGetScore(sptr);
  2570.  
  2571.             /* Abort when the node is no longer in range. */
  2572.             if (!zslValueLteMax(score,&range)) {
  2573.                 break;
  2574.             } else {
  2575.                 count++;
  2576.                 zzlNext(zl,&eptr,&sptr);
  2577.             }
  2578.         }
  2579.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2580.         zset *zs = zobj->ptr;
  2581.         zskiplist *zsl = zs->zsl;
  2582.         zskiplistNode *zn;
  2583.         unsigned long rank;
  2584.  
  2585.         /* Find first element in range */
  2586.         zn = zslFirstInRange(zsl, &range);
  2587.  
  2588.         /* Use rank of first element, if any, to determine preliminary count */
  2589.         if (zn != NULL) {
  2590.             rank = zslGetRank(zsl, zn->score, zn->ele);
  2591.             count = (zsl->length - (rank - 1));
  2592.  
  2593.             /* Find last element in range */
  2594.             zn = zslLastInRange(zsl, &range);
  2595.  
  2596.             /* Use rank of last element, if any, to determine the actual count */
  2597.             if (zn != NULL) {
  2598.                 rank = zslGetRank(zsl, zn->score, zn->ele);
  2599.                 count -= (zsl->length - rank);
  2600.             }
  2601.         }
  2602.     } else {
  2603.         serverPanic("Unknown sorted set encoding");
  2604.     }
  2605.  
  2606.     addReplyLongLong(c, count);
  2607. }
  2608.  
  2609. void zlexcountCommand(client *c) {
  2610.     robj *key = c->argv[1];
  2611.     robj *zobj;
  2612.     zlexrangespec range;
  2613.     int count = 0;
  2614.  
  2615.     /* Parse the range arguments */
  2616.     if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
  2617.         addReplyError(c,"min or max not valid string range item");
  2618.         return;
  2619.     }
  2620.  
  2621.     /* Lookup the sorted set */
  2622.     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
  2623.         checkType(c, zobj, OBJ_ZSET))
  2624.     {
  2625.         zslFreeLexRange(&range);
  2626.         return;
  2627.     }
  2628.  
  2629.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2630.         unsigned char *zl = zobj->ptr;
  2631.         unsigned char *eptr, *sptr;
  2632.  
  2633.         /* Use the first element in range as the starting point */
  2634.         eptr = zzlFirstInLexRange(zl,&range);
  2635.  
  2636.         /* No "first" element */
  2637.         if (eptr == NULL) {
  2638.             zslFreeLexRange(&range);
  2639.             addReply(c, shared.czero);
  2640.             return;
  2641.         }
  2642.  
  2643.         /* First element is in range */
  2644.         sptr = ziplistNext(zl,eptr);
  2645.         serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
  2646.  
  2647.         /* Iterate over elements in range */
  2648.         while (eptr) {
  2649.             /* Abort when the node is no longer in range. */
  2650.             if (!zzlLexValueLteMax(eptr,&range)) {
  2651.                 break;
  2652.             } else {
  2653.                 count++;
  2654.                 zzlNext(zl,&eptr,&sptr);
  2655.             }
  2656.         }
  2657.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2658.         zset *zs = zobj->ptr;
  2659.         zskiplist *zsl = zs->zsl;
  2660.         zskiplistNode *zn;
  2661.         unsigned long rank;
  2662.  
  2663.         /* Find first element in range */
  2664.         zn = zslFirstInLexRange(zsl, &range);
  2665.  
  2666.         /* Use rank of first element, if any, to determine preliminary count */
  2667.         if (zn != NULL) {
  2668.             rank = zslGetRank(zsl, zn->score, zn->ele);
  2669.             count = (zsl->length - (rank - 1));
  2670.  
  2671.             /* Find last element in range */
  2672.             zn = zslLastInLexRange(zsl, &range);
  2673.  
  2674.             /* Use rank of last element, if any, to determine the actual count */
  2675.             if (zn != NULL) {
  2676.                 rank = zslGetRank(zsl, zn->score, zn->ele);
  2677.                 count -= (zsl->length - rank);
  2678.             }
  2679.         }
  2680.     } else {
  2681.         serverPanic("Unknown sorted set encoding");
  2682.     }
  2683.  
  2684.     zslFreeLexRange(&range);
  2685.     addReplyLongLong(c, count);
  2686. }
  2687.  
  2688. /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
  2689. void genericZrangebylexCommand(client *c, int reverse) {
  2690.     zlexrangespec range;
  2691.     robj *key = c->argv[1];
  2692.     robj *zobj;
  2693.     long offset = 0, limit = -1;
  2694.     unsigned long rangelen = 0;
  2695.     void *replylen = NULL;
  2696.     int minidx, maxidx;
  2697.  
  2698.     /* Parse the range arguments. */
  2699.     if (reverse) {
  2700.         /* Range is given as [max,min] */
  2701.         maxidx = 2; minidx = 3;
  2702.     } else {
  2703.         /* Range is given as [min,max] */
  2704.         minidx = 2; maxidx = 3;
  2705.     }
  2706.  
  2707.     if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
  2708.         addReplyError(c,"min or max not valid string range item");
  2709.         return;
  2710.     }
  2711.  
  2712.     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
  2713.      * 4 arguments, so we'll never enter the following code path. */
  2714.     if (c->argc > 4) {
  2715.         int remaining = c->argc - 4;
  2716.         int pos = 4;
  2717.  
  2718.         while (remaining) {
  2719.             if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
  2720.                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
  2721.                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
  2722.                 pos += 3; remaining -= 3;
  2723.             } else {
  2724.                 zslFreeLexRange(&range);
  2725.                 addReply(c,shared.syntaxerr);
  2726.                 return;
  2727.             }
  2728.         }
  2729.     }
  2730.  
  2731.     /* Ok, lookup the key and get the range */
  2732.     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
  2733.         checkType(c,zobj,OBJ_ZSET))
  2734.     {
  2735.         zslFreeLexRange(&range);
  2736.         return;
  2737.     }
  2738.  
  2739.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2740.         unsigned char *zl = zobj->ptr;
  2741.         unsigned char *eptr, *sptr;
  2742.         unsigned char *vstr;
  2743.         unsigned int vlen;
  2744.         long long vlong;
  2745.  
  2746.         /* If reversed, get the last node in range as starting point. */
  2747.         if (reverse) {
  2748.             eptr = zzlLastInLexRange(zl,&range);
  2749.         } else {
  2750.             eptr = zzlFirstInLexRange(zl,&range);
  2751.         }
  2752.  
  2753.         /* No "first" element in the specified interval. */
  2754.         if (eptr == NULL) {
  2755.             addReply(c, shared.emptymultibulk);
  2756.             zslFreeLexRange(&range);
  2757.             return;
  2758.         }
  2759.  
  2760.         /* Get score pointer for the first element. */
  2761.         serverAssertWithInfo(c,zobj,eptr != NULL);
  2762.         sptr = ziplistNext(zl,eptr);
  2763.  
  2764.         /* We don't know in advance how many matching elements there are in the
  2765.          * list, so we push this object that will represent the multi-bulk
  2766.          * length in the output buffer, and will "fix" it later */
  2767.         replylen = addDeferredMultiBulkLength(c);
  2768.  
  2769.         /* If there is an offset, just traverse the number of elements without
  2770.          * checking the score because that is done in the next loop. */
  2771.         while (eptr && offset--) {
  2772.             if (reverse) {
  2773.                 zzlPrev(zl,&eptr,&sptr);
  2774.             } else {
  2775.                 zzlNext(zl,&eptr,&sptr);
  2776.             }
  2777.         }
  2778.  
  2779.         while (eptr && limit--) {
  2780.             /* Abort when the node is no longer in range. */
  2781.             if (reverse) {
  2782.                 if (!zzlLexValueGteMin(eptr,&range)) break;
  2783.             } else {
  2784.                 if (!zzlLexValueLteMax(eptr,&range)) break;
  2785.             }
  2786.  
  2787.             /* We know the element exists, so ziplistGet should always
  2788.              * succeed. */
  2789.             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
  2790.  
  2791.             rangelen++;
  2792.             if (vstr == NULL) {
  2793.                 addReplyBulkLongLong(c,vlong);
  2794.             } else {
  2795.                 addReplyBulkCBuffer(c,vstr,vlen);
  2796.             }
  2797.  
  2798.             /* Move to next node */
  2799.             if (reverse) {
  2800.                 zzlPrev(zl,&eptr,&sptr);
  2801.             } else {
  2802.                 zzlNext(zl,&eptr,&sptr);
  2803.             }
  2804.         }
  2805.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2806.         zset *zs = zobj->ptr;
  2807.         zskiplist *zsl = zs->zsl;
  2808.         zskiplistNode *ln;
  2809.  
  2810.         /* If reversed, get the last node in range as starting point. */
  2811.         if (reverse) {
  2812.             ln = zslLastInLexRange(zsl,&range);
  2813.         } else {
  2814.             ln = zslFirstInLexRange(zsl,&range);
  2815.         }
  2816.  
  2817.         /* No "first" element in the specified interval. */
  2818.         if (ln == NULL) {
  2819.             addReply(c, shared.emptymultibulk);
  2820.             zslFreeLexRange(&range);
  2821.             return;
  2822.         }
  2823.  
  2824.         /* We don't know in advance how many matching elements there are in the
  2825.          * list, so we push this object that will represent the multi-bulk
  2826.          * length in the output buffer, and will "fix" it later */
  2827.         replylen = addDeferredMultiBulkLength(c);
  2828.  
  2829.         /* If there is an offset, just traverse the number of elements without
  2830.          * checking the score because that is done in the next loop. */
  2831.         while (ln && offset--) {
  2832.             if (reverse) {
  2833.                 ln = ln->backward;
  2834.             } else {
  2835.                 ln = ln->level[0].forward;
  2836.             }
  2837.         }
  2838.  
  2839.         while (ln && limit--) {
  2840.             /* Abort when the node is no longer in range. */
  2841.             if (reverse) {
  2842.                 if (!zslLexValueGteMin(ln->ele,&range)) break;
  2843.             } else {
  2844.                 if (!zslLexValueLteMax(ln->ele,&range)) break;
  2845.             }
  2846.  
  2847.             rangelen++;
  2848.             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
  2849.  
  2850.             /* Move to next node */
  2851.             if (reverse) {
  2852.                 ln = ln->backward;
  2853.             } else {
  2854.                 ln = ln->level[0].forward;
  2855.             }
  2856.         }
  2857.     } else {
  2858.         serverPanic("Unknown sorted set encoding");
  2859.     }
  2860.  
  2861.     zslFreeLexRange(&range);
  2862.     setDeferredMultiBulkLength(c, replylen, rangelen);
  2863. }
  2864.  
  2865. void zrangebylexCommand(client *c) {
  2866.     genericZrangebylexCommand(c,0);
  2867. }
  2868.  
  2869. void zrevrangebylexCommand(client *c) {
  2870.     genericZrangebylexCommand(c,1);
  2871. }
  2872.  
  2873. void zcardCommand(client *c) {
  2874.     robj *key = c->argv[1];
  2875.     robj *zobj;
  2876.  
  2877.     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
  2878.         checkType(c,zobj,OBJ_ZSET)) return;
  2879.  
  2880.     addReplyLongLong(c,zsetLength(zobj));
  2881. }
  2882.  
  2883. void zscoreCommand(client *c) {
  2884.     robj *key = c->argv[1];
  2885.     robj *zobj;
  2886.     double score;
  2887.  
  2888.     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
  2889.         checkType(c,zobj,OBJ_ZSET)) return;
  2890.  
  2891.     if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
  2892.         addReply(c,shared.nullbulk);
  2893.     } else {
  2894.         addReplyDouble(c,score);
  2895.     }
  2896. }
  2897.  
  2898. void zrankGenericCommand(client *c, int reverse) {
  2899.     robj *key = c->argv[1];
  2900.     robj *ele = c->argv[2];
  2901.     robj *zobj;
  2902.     unsigned long llen;
  2903.     unsigned long rank;
  2904.  
  2905.     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
  2906.         checkType(c,zobj,OBJ_ZSET)) return;
  2907.     llen = zsetLength(zobj);
  2908.  
  2909.     serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
  2910.  
  2911.     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
  2912.         unsigned char *zl = zobj->ptr;
  2913.         unsigned char *eptr, *sptr;
  2914.  
  2915.         eptr = ziplistIndex(zl,0);
  2916.         serverAssertWithInfo(c,zobj,eptr != NULL);
  2917.         sptr = ziplistNext(zl,eptr);
  2918.         serverAssertWithInfo(c,zobj,sptr != NULL);
  2919.  
  2920.         rank = 1;
  2921.         while(eptr != NULL) {
  2922.             if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
  2923.                 break;
  2924.             rank++;
  2925.             zzlNext(zl,&eptr,&sptr);
  2926.         }
  2927.  
  2928.         if (eptr != NULL) {
  2929.             if (reverse)
  2930.                 addReplyLongLong(c,llen-rank);
  2931.             else
  2932.                 addReplyLongLong(c,rank-1);
  2933.         } else {
  2934.             addReply(c,shared.nullbulk);
  2935.         }
  2936.     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  2937.         zset *zs = zobj->ptr;
  2938.         zskiplist *zsl = zs->zsl;
  2939.         dictEntry *de;
  2940.         double score;
  2941.  
  2942.         ele = c->argv[2];
  2943.         de = dictFind(zs->dict,ele->ptr);
  2944.         if (de != NULL) {
  2945.             score = *(double*)dictGetVal(de);
  2946.             rank = zslGetRank(zsl,score,ele->ptr);
  2947.             /* Existing elements always have a rank. */
  2948.             serverAssertWithInfo(c,ele,rank);
  2949.             if (reverse)
  2950.                 addReplyLongLong(c,llen-rank);
  2951.             else
  2952.                 addReplyLongLong(c,rank-1);
  2953.         } else {
  2954.             addReply(c,shared.nullbulk);
  2955.         }
  2956.     } else {
  2957.         serverPanic("Unknown sorted set encoding");
  2958.     }
  2959. }
  2960.  
  2961. void zrankCommand(client *c) {
  2962.     zrankGenericCommand(c, 0);
  2963. }
  2964.  
  2965. void zrevrankCommand(client *c) {
  2966.     zrankGenericCommand(c, 1);
  2967. }
  2968.  
  2969. void zscanCommand(client *c) {
  2970.     robj *o;
  2971.     unsigned long cursor;
  2972.  
  2973.     if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
  2974.     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
  2975.         checkType(c,o,OBJ_ZSET)) return;
  2976.     scanGenericCommand(c,o,cursor);
  2977. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement