From 44ccac8127e44d885f39adbd2c2b843ef040e4d8 Mon Sep 17 00:00:00 2001 From: Thorsten Sommer Date: Sun, 18 Aug 2024 11:17:44 +0200 Subject: [PATCH] Added option to defer messages --- app/MindWork AI Studio/Tools/MessageBus.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/app/MindWork AI Studio/Tools/MessageBus.cs b/app/MindWork AI Studio/Tools/MessageBus.cs index 9a6aac88..44e1dc86 100644 --- a/app/MindWork AI Studio/Tools/MessageBus.cs +++ b/app/MindWork AI Studio/Tools/MessageBus.cs @@ -11,6 +11,7 @@ public sealed class MessageBus private readonly ConcurrentDictionary componentFilters = new(); private readonly ConcurrentDictionary componentEvents = new(); + private readonly ConcurrentDictionary> deferredMessages = new(); private readonly ConcurrentQueue messageQueue = new(); private readonly SemaphoreSlim sendingSemaphore = new(1, 1); @@ -65,6 +66,24 @@ public sealed class MessageBus } } + public void DeferMessage(ComponentBase? sendingComponent, Event triggeredEvent, T? data = default) + { + if (this.deferredMessages.TryGetValue(triggeredEvent, out var queue)) + queue.Enqueue(new Message(sendingComponent, triggeredEvent, data)); + else + { + this.deferredMessages[triggeredEvent] = new(); + this.deferredMessages[triggeredEvent].Enqueue(new Message(sendingComponent, triggeredEvent, data)); + } + } + + public IEnumerable CheckDeferredMessages(Event triggeredEvent) + { + if (this.deferredMessages.TryGetValue(triggeredEvent, out var queue)) + while (queue.TryDequeue(out var message)) + yield return message.Data is T data ? data : default; + } + public async Task SendMessageUseFirstResult(ComponentBase? sendingComponent, Event triggeredEvent, TPayload? data = default) { foreach (var (receiver, componentFilter) in this.componentFilters)