// EXAMPLE: stream_tutorial // HIDE_START using System.Runtime.CompilerServices; using NRedisStack.Tests; using StackExchange.Redis; // HIDE_END // REMOVE_START namespace Doc; [Collection("DocsTests")] // REMOVE_END // HIDE_START public class StreamTutorial // REMOVE_START : AbstractNRedisStackTest, IDisposable // REMOVE_END { // REMOVE_START public StreamTutorial(EndpointsFixture fixture) : base(fixture) { } [SkippableFact] // REMOVE_END public void run() { //REMOVE_START // This is needed because we're constructing ConfigurationOptions in the test before calling GetConnection SkipIfTargetConnectionDoesNotExist(EndpointsFixture.Env.Standalone); var _ = GetCleanDatabase(EndpointsFixture.Env.Standalone); //REMOVE_END var muxer = ConnectionMultiplexer.Connect("localhost:6379"); var db = muxer.GetDatabase(); //REMOVE_START // Clear any keys here before using them in tests. db.KeyDelete("race:france"); db.KeyDelete("race:italy"); db.KeyDelete("race:usa"); //REMOVE_END // HIDE_END // STEP_START xadd RedisValue res1 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 30.2), new NameValueEntry("position", 1), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res1); // >>> 1712668482289-0 RedisValue res2 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Norem"), new NameValueEntry("speed", 28.8), new NameValueEntry("position", 3), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res2); // >>> 1712668766534-1 RedisValue res3 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Prickett"), new NameValueEntry("speed", 29.7), new NameValueEntry("position", 2), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res3); // >>> 1712669055705-0 // STEP_END // Tests for 'xadd' step. // REMOVE_START Assert.Equal(3, db.StreamLength("race:france")); // REMOVE_END // STEP_START xrange StreamEntry[] res4 = db.StreamRange("race:france", "1712668482289-0", "+", 2); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // STEP_END // Tests for 'xrange' step. // REMOVE_START // REMOVE_END // STEP_START xread_block StreamEntry[] res5 = db.StreamRead("race:france", 0, 100); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // STEP_END // Tests for 'xread_block' step. // REMOVE_START // REMOVE_END // STEP_START xadd_2 RedisValue res6 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 29.9), new NameValueEntry("position", 1), new NameValueEntry("location_id", 2) } ); Console.WriteLine(res6); // >>> 1712675674750-0 // STEP_END // Tests for 'xadd_2' step. // REMOVE_START // REMOVE_END // STEP_START xlen long res7 = db.StreamLength("race:france"); Console.WriteLine(res7); // >>> 4 // STEP_END // Tests for 'xlen' step. // REMOVE_START Assert.Equal(4, res7); // REMOVE_END // STEP_START xadd_id RedisValue res8 = db.StreamAdd( "race:usa", new NameValueEntry[] { new NameValueEntry("racer", "Castilla") }, "0-1" ); Console.WriteLine(res8); // >>> 0-1 RedisValue res9 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Norem") }, "0-2" ); Console.WriteLine(res9); // >>> 0-2 // STEP_END // Tests for 'xadd_id' step. // REMOVE_START Assert.Equal(2, db.StreamLength("race:usa")); // REMOVE_END // STEP_START xadd_bad_id try { RedisValue res10 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Prickett") }, "0-1" ); } catch (RedisServerException ex) { Console.WriteLine(ex); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } // STEP_END // Tests for 'xadd_bad_id' step. // REMOVE_START // REMOVE_END // STEP_START xadd_7 RedisValue res11 = ""; Version version = muxer.GetServer("localhost:6379").Version; if (version.Major >= 7) { res11 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("rider", "Norem") }, "0-*" ); Console.WriteLine(res11); // >>> "0-3" } // STEP_END // Tests for 'xadd_7' step. // REMOVE_START if (version.Major >= 7) { Assert.Equal("0-3", res11); } // REMOVE_END // STEP_START xrange_all StreamEntry[] res12 = db.StreamRange("race:france", "-", "+"); foreach (StreamEntry entry in res12) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // STEP_END // Tests for 'xrange_all' step. // REMOVE_START Assert.Equal(4, res12.Length); // REMOVE_END // STEP_START xrange_time StreamEntry[] res13 = db.StreamRange("race:france", 1712668482289, 1712668482291); foreach (StreamEntry entry in res13) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // STEP_END // Tests for 'xrange_time' step. // REMOVE_START // REMOVE_END // STEP_START xrange_step_1 StreamEntry[] res14 = db.StreamRange("race:france", "-", "+", 2); foreach (StreamEntry entry in res14) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // STEP_END // Tests for 'xrange_step_1' step. // REMOVE_START // REMOVE_END // STEP_START xrange_step_2 StreamEntry[] res15 = db.StreamRange("race:france", "(1712668766534-1", "+", 2); foreach (StreamEntry entry in res15) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // STEP_END // Tests for 'xrange_step_2' step. // REMOVE_START // REMOVE_END // STEP_START xrange_empty StreamEntry[] res16 = db.StreamRange("race:france", "(1712675674750-0", "+", 2); foreach (StreamEntry entry in res16) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> // STEP_END // Tests for 'xrange_empty' step. // REMOVE_START // REMOVE_END // STEP_START xrevrange StreamEntry[] res17 = db.StreamRange("race:france", "+", "-", 1, Order.Descending); foreach (StreamEntry entry in res17) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // STEP_END // Tests for 'xrevrange' step. // REMOVE_START // REMOVE_END // STEP_START xread StreamEntry[] res18 = db.StreamRead("race:france", 0, 2); foreach (StreamEntry entry in res18) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // STEP_END // Tests for 'xread' step. // REMOVE_START Assert.Equal(2, res18.Length); // REMOVE_END // STEP_START xgroup_create bool res19 = db.StreamCreateConsumerGroup("race:france", "france_riders", "$"); Console.WriteLine(res19); // >>> true // STEP_END // Tests for 'xgroup_create' step. // REMOVE_START Assert.True(res19); // REMOVE_END // STEP_START xgroup_create_mkstream bool res20 = db.StreamCreateConsumerGroup("race:italy", "italy_riders", "$", true); Console.WriteLine(res20); // >>> true // STEP_END // Tests for 'xgroup_create_mkstream' step. // REMOVE_START Assert.True(res20); // REMOVE_END // STEP_START xgroup_read RedisValue groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Castilla") } ); // 1712744323758-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Royce") } ); // 1712744358384-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Sam-Bodden") } ); // 1712744379676-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Prickett") } ); // 1712744399401-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Norem") } ); // 1712744413117-0 StreamEntry[] res21 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", ">", 1); foreach (StreamEntry entry in res21) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744323758-0: [rider: Castilla] // STEP_END // Tests for 'xgroup_read' step. // REMOVE_START Assert.Single(res21); // REMOVE_END // STEP_START xgroup_read_id StreamEntry[] res22 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res22) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); // >>> 1712744323758-0: [rider: Castilla] } // STEP_END // Tests for 'xgroup_read_id' step. // REMOVE_START Assert.Single(res22); // REMOVE_END // STEP_START xack long res23 = db.StreamAcknowledge("race:italy", "italy_riders", "1712744323758-0"); Console.WriteLine(res23); // >>> 1 StreamEntry[] res24 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res24) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> // STEP_END // Tests for 'xack' step. // REMOVE_START // REMOVE_END // STEP_START xgroup_read_bob StreamEntry[] res25 = db.StreamReadGroup("race:italy", "italy_riders", "Bob", ">", 2); foreach (StreamEntry entry in res25) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // >>> 1712744379676-0: [rider: Sam-Bodden] // STEP_END // Tests for 'xgroup_read_bob' step. // REMOVE_START // REMOVE_END // STEP_START xpending StreamPendingInfo res26 = db.StreamPending("race:italy", "italy_riders"); Console.WriteLine($"pending: {res26.PendingMessageCount}, min: {res26.LowestPendingMessageId}, max: {res26.HighestPendingMessageId}, consumers:[{string.Join(", ", res26.Consumers.Select(c => $"{c.Name}: {c.PendingMessageCount}"))}]"); // >>> pending: 2, min: 1712747506906-0, max: 1712747506907-0, consumers:[name: Bob, pending:2] // STEP_END // Tests for 'xpending' step. // REMOVE_START // REMOVE_END // STEP_START xpending_plus_minus StreamPendingMessageInfo[] res27 = db.StreamPendingMessages( "race:italy", "italy_riders", 10, "", "-", "+" ); foreach (StreamPendingMessageInfo info in res27) { Console.WriteLine($"message_id: {info.MessageId}, consumer: {info.ConsumerName}, time_since_delivered: {info.IdleTimeInMilliseconds}, times_delivered: {info.DeliveryCount}"); } // >>> message_id: min: 1712747506906-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // >>> message_id: min: 1712747506907-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // STEP_END // Tests for 'xpending_plus_minus' step. // REMOVE_START // REMOVE_END // STEP_START xrange_pending StreamEntry[] res28 = db.StreamRange("race:italy", "1712744358384-0", "1712744358384-0"); foreach (StreamEntry entry in res28) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // STEP_END // Tests for 'xrange_pending' step. // REMOVE_START // REMOVE_END // STEP_START xclaim StreamEntry[] res29 = db.StreamClaim( "race:italy", "italy_riders", "Alice", 60000, new RedisValue[] { 1712744358384 - 0 } ); foreach (StreamEntry entry in res29) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // STEP_END // Tests for 'xclaim' step. // REMOVE_START // REMOVE_END // STEP_START xautoclaim StreamAutoClaimResult res30 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "0-0", 1 ); Console.WriteLine($"{res30.NextStartId}, ({string.Join(", ", res30.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 1712744379676-0, (1712744358384-0: [rider: Royce]) // STEP_END // Tests for 'xautoclaim' step. // REMOVE_START // REMOVE_END // STEP_START xautoclaim_cursor StreamAutoClaimResult res31 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "(1712744358384-0", 1 ); Console.WriteLine($"{res31.NextStartId}, ({string.Join(", ", res31.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 0-0, (1712744379676-0: [rider: Sam-Bodden]) // STEP_END // Tests for 'xautoclaim_cursor' step. // REMOVE_START // REMOVE_END // STEP_START xinfo StreamInfo res32 = db.StreamInfo("race:italy"); Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}"); // >>> length: 5, radix-tree-keys: 1, radix-tree-nodes: 2, last-generated-id: 1712756762686-1, first-entry: 1712756762685-0: [rider: Castilla], last-entry: 1712756762686-1: [rider: Norem] // STEP_END // Tests for 'xinfo' step. // REMOVE_START // REMOVE_END // STEP_START xinfo_groups StreamGroupInfo[] res33 = db.StreamGroupInfo("race:italy"); foreach (StreamGroupInfo info in res33) { Console.WriteLine($"name: {info.Name}, consumers: {info.ConsumerCount}, pending: {info.PendingMessageCount}, last-delivered-id: {info.LastDeliveredId}"); } // >>> name: italy_riders, consumers: 2, pending: 2, last-delivered-id: 1712757192730-2 // STEP_END // Tests for 'xinfo_groups' step. // REMOVE_START // REMOVE_END // STEP_START xinfo_consumers StreamConsumerInfo[] res34 = db.StreamConsumerInfo("race:italy", "italy_riders"); foreach (StreamConsumerInfo info in res34) { Console.WriteLine($"name: {info.Name}, pending: {info.PendingMessageCount}, idle: {info.IdleTimeInMilliseconds}"); } // >>> name: Alice, pending: 1, idle: 7717 // >>> name: Bob, pending: 0, idle: 7722 // STEP_END // Tests for 'xinfo_consumers' step. // REMOVE_START // REMOVE_END // STEP_START maxlen db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Jones") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Wood") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Henshaw") }, null, 2, true ); long res35 = db.StreamLength("race:italy"); Console.WriteLine(res35); // >>> 8 StreamEntry[] res36 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res36) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712758336128-0: [rider: Castilla] // >>> 1712758336128-1: [rider: Royce] // >>> 1712758336128-2: [rider: Sam-Bodden] // >>> 1712758336129-0: [rider: Prickett] // >>> 1712758336139-0: [rider: Norem] // >>> 1712758340854-0: [rider: Jones] // >>> 1712758341645-0: [rider: Wood] // >>> 1712758342134-0: [rider: Henshaw] db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Smith") }, null, 2, false ); StreamEntry[] res37 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res37) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // 1712758746476-1: [rider: Henshaw] // 1712758746477-0: [rider: Smith] // STEP_END // Tests for 'maxlen' step. // REMOVE_START // REMOVE_END // STEP_START xtrim long res38 = db.StreamTrim("race:italy", 10, false); Console.WriteLine(res38); // >>> 0 // STEP_END // Tests for 'xtrim' step. // REMOVE_START // REMOVE_END // STEP_START xtrim2 long res39 = db.StreamTrim("race:italy", 10, true); Console.WriteLine(res39); // >>> 0 // STEP_END // Tests for 'xtrim2' step. // REMOVE_START // REMOVE_END // STEP_START xdel StreamEntry[] res40 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res40) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // >>> 1712759694003-1: [rider: Smith] long res41 = db.StreamDelete("race:italy", new RedisValue[] { "1712759694003-1" }); Console.WriteLine(res41); // >>> 1 StreamEntry[] res42 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res42) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // STEP_END // Tests for 'xdel' step. // REMOVE_START // REMOVE_END // HIDE_START } } // HIDE_END