Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added BZMPOP, BZPOPMAX and BZPOPMIN commands #884

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
104 changes: 104 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,110 @@
}
]
},
{
"Command": "BZMPOP",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about running CommentInfoUpdater - #864 (comment)

"Name": "BZMPOP",
"Summary": "Removes and returns a member by score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(K) \u002B O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "NUMKEYS",
"DisplayText": "numkeys",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "WHERE",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "PureToken",
"Token": "MIN"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "PureToken",
"Token": "MAX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer",
"Token": "COUNT",
"ArgumentFlags": "Optional"
}
]
},
{
"Command": "BZPOPMAX",
"Name": "BZPOPMAX",
"Summary": "Removes and returns the member with the highest score from one or more sorted sets. Blocks until a member available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
}
]
},
{
"Command": "BZPOPMIN",
"Name": "BZPOPMIN",
"Summary": "Removes and returns the member with the lowest score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
"Group": "SortedSet",
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
}
]
},
{
"Command": "CLIENT",
"Name": "CLIENT",
Expand Down
72 changes: 72 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,78 @@
}
]
},
{
"Command": "BZMPOP",
"Name": "BZMPOP",
"Arity": -5,
"Flags": "Blocking, MovableKeys, Write",
"AclCategories": "Blocking, SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysKeyNum",
"KeyNumIdx": 0,
"FirstKey": 1,
"KeyStep": 1
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "BZPOPMAX",
"Name": "BZPOPMAX",
"Arity": -3,
"Flags": "Blocking, Fast, Write",
"FirstKey": 1,
"LastKey": -2,
"Step": 1,
"AclCategories": "Blocking, Fast, SortedSet, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -2,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "BZPOPMIN",
"Name": "BZPOPMIN",
"Arity": -3,
"Flags": "Blocking, Fast, Write",
"FirstKey": 1,
"LastKey": -2,
"Step": 1,
"AclCategories": "Blocking, Fast, SortedSet, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": -2,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "CLIENT",
"Name": "CLIENT",
Expand Down
59 changes: 36 additions & 23 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,37 +383,47 @@ private static bool TryMoveNextListItem(ListObject srcListObj, ListObject dstLis
}

/// <summary>
/// Try to get next available item from sorted set object
/// Try to get next available item from sorted set object based on command type
/// BZPOPMIN and BZPOPMAX share same implementation since Dictionary.First() and Last()
/// handle the ordering automatically based on sorted set scores
/// </summary>
/// <param name="sortedSetObj">Sorted set object</param>
/// <param name="command">RESP command</param>
/// <param name="nextItem">Item retrieved</param>
/// <returns>True if found available item</returns>
private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespCommand command, out byte[] nextItem)
private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sortedSetObj, RespCommand command, ArgSlice[] cmdArgs, out CollectionItemResult result)
{
nextItem = default;
result = default;

// If object has no items, return
if (sortedSetObj.Dictionary.Count == 0) return false;

// Get the next object according to operation type
switch (command)
{
case RespCommand.BZPOPMIN:
case RespCommand.BZPOPMAX:
var element = sortedSetObj.PopMinOrMax(command == RespCommand.BZPOPMAX);
result = new CollectionItemResult(key, element.Score, element.Element);
return true;

case RespCommand.BZMPOP:
var lowScoresFirst = *(bool*)cmdArgs[0].ptr;
var popCount = *(int*)cmdArgs[1].ptr;
popCount = Math.Min(popCount, sortedSetObj.Dictionary.Count);

var scores = new double[popCount];
var items = new byte[popCount][];

for (int i = 0; i < popCount; i++)
{
var popResult = sortedSetObj.PopMinOrMax(!lowScoresFirst);
scores[i] = popResult.Score;
items[i] = popResult.Element;
}

result = new CollectionItemResult(key, scores, items);
return true;

default:
return false;
}
}

/// <summary>
/// Try to get next available item from object
/// </summary>
/// <param name="key">Key of object</param>
/// <param name="storageSession">Current storage session</param>
/// <param name="command">RESP command</param>
/// <param name="cmdArgs">Additional command arguments</param>
/// <param name="currCount">Collection size</param>
/// <param name="result">Result of command</param>
/// <returns>True if found available item</returns>
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
{
currCount = default;
Expand All @@ -423,6 +433,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
var objectType = command switch
{
RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List,
RespCommand.BZPOPMIN or RespCommand.BZPOPMAX or RespCommand.BZMPOP => GarnetObjectType.SortedSet,
_ => throw new NotSupportedException()
};

Expand Down Expand Up @@ -524,11 +535,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
}
case SortedSetObject setObj:
currCount = setObj.Dictionary.Count;
if (objectType != GarnetObjectType.SortedSet) return false;
if (objectType != GarnetObjectType.SortedSet)
return false;
if (currCount == 0)
return false;

return TryGetNextSetObjects(key, setObj, command, cmdArgs, out result);

var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem);
result = new CollectionItemResult(key, sortedSetNextItem);
return hasValue;
default:
return false;
}
Expand Down
24 changes: 24 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ public CollectionItemResult(byte[] key, byte[][] items)
Items = items;
}

public CollectionItemResult(byte[] key, double score, byte[] item)
{
Key = key;
Score = score;
Item = item;
}

public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
{
Key = key;
Scores = scores;
Items = items;
}

/// <summary>
/// True if item was found
/// </summary>
Expand All @@ -35,11 +49,21 @@ public CollectionItemResult(byte[] key, byte[][] items)
/// </summary>
internal byte[] Item { get; }

/// <summary>
/// Score associated with the item retrieved from the collection
/// </summary>
internal double Score { get; }

/// <summary>
/// Item retrieved from collection
/// </summary>
internal byte[][] Items { get; }

/// <summary>
/// Scores associated with the items retrieved from the collection
/// </summary>
internal double[] Scores { get; }

/// <summary>
/// Instance of empty result
/// </summary>
Expand Down
18 changes: 18 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,24 @@ private void SortedSetRank(ref ObjectInput input, ref SpanByteAndMemory output,
}
}

/// <summary>
/// Removes and returns the element with the highest or lowest score from the sorted set.
/// </summary>
/// <param name="popMaxScoreElement">If true, pops the element with the highest score; otherwise, pops the element with the lowest score.</param>
/// <returns>A tuple containing the score and the element as a byte array.</returns>
public (double Score, byte[] Element) PopMinOrMax(bool popMaxScoreElement = false)
{
if (sortedSet.Count == 0)
return default;

var element = popMaxScoreElement ? sortedSet.Max : sortedSet.Min;
sortedSet.Remove(element);
sortedSetDict.Remove(element.Element);
this.UpdateSize(element.Element, false);

return element;
}

/// <summary>
/// Removes and returns up to COUNT members with the low or high score
/// </summary>
Expand Down
Loading
Loading