Added option to defer messages

This commit is contained in:
Thorsten Sommer 2024-08-18 11:17:44 +02:00
parent 4293ae6850
commit 44ccac8127
Signed by: tsommer
GPG Key ID: 371BBA77A02C0108

View File

@ -11,6 +11,7 @@ public sealed class MessageBus
private readonly ConcurrentDictionary<IMessageBusReceiver, ComponentBase[]> componentFilters = new(); private readonly ConcurrentDictionary<IMessageBusReceiver, ComponentBase[]> componentFilters = new();
private readonly ConcurrentDictionary<IMessageBusReceiver, Event[]> componentEvents = new(); private readonly ConcurrentDictionary<IMessageBusReceiver, Event[]> componentEvents = new();
private readonly ConcurrentDictionary<Event, ConcurrentQueue<Message>> deferredMessages = new();
private readonly ConcurrentQueue<Message> messageQueue = new(); private readonly ConcurrentQueue<Message> messageQueue = new();
private readonly SemaphoreSlim sendingSemaphore = new(1, 1); private readonly SemaphoreSlim sendingSemaphore = new(1, 1);
@ -65,6 +66,24 @@ public sealed class MessageBus
} }
} }
public void DeferMessage<T>(ComponentBase? sendingComponent, Event triggeredEvent, T? data = default)
{
if (this.deferredMessages.TryGetValue(triggeredEvent, out var queue))
queue.Enqueue(new Message(sendingComponent, triggeredEvent, data));
else
{
this.deferredMessages[triggeredEvent] = new();
this.deferredMessages[triggeredEvent].Enqueue(new Message(sendingComponent, triggeredEvent, data));
}
}
public IEnumerable<T?> CheckDeferredMessages<T>(Event triggeredEvent)
{
if (this.deferredMessages.TryGetValue(triggeredEvent, out var queue))
while (queue.TryDequeue(out var message))
yield return message.Data is T data ? data : default;
}
public async Task<TResult?> SendMessageUseFirstResult<TPayload, TResult>(ComponentBase? sendingComponent, Event triggeredEvent, TPayload? data = default) public async Task<TResult?> SendMessageUseFirstResult<TPayload, TResult>(ComponentBase? sendingComponent, Event triggeredEvent, TPayload? data = default)
{ {
foreach (var (receiver, componentFilter) in this.componentFilters) foreach (var (receiver, componentFilter) in this.componentFilters)