From 548d1eef26d91f7f76299672c7cb46393b09ea99 Mon Sep 17 00:00:00 2001 From: Thorsten Sommer Date: Sun, 30 Jun 2024 13:55:42 +0200 Subject: [PATCH] Added a message bus system --- app/MindWork AI Studio/Program.cs | 1 + app/MindWork AI Studio/Tools/Event.cs | 13 ++++ .../Tools/IMessageBusReceiver.cs | 8 +++ .../Tools/MSGComponentBase.cs | 44 +++++++++++++ app/MindWork AI Studio/Tools/MessageBus.cs | 66 +++++++++++++++++++ .../Tools/MessageBusExtensions.cs | 16 +++++ 6 files changed, 148 insertions(+) create mode 100644 app/MindWork AI Studio/Tools/Event.cs create mode 100644 app/MindWork AI Studio/Tools/IMessageBusReceiver.cs create mode 100644 app/MindWork AI Studio/Tools/MSGComponentBase.cs create mode 100644 app/MindWork AI Studio/Tools/MessageBus.cs create mode 100644 app/MindWork AI Studio/Tools/MessageBusExtensions.cs diff --git a/app/MindWork AI Studio/Program.cs b/app/MindWork AI Studio/Program.cs index 5d54b793..0edb1520 100644 --- a/app/MindWork AI Studio/Program.cs +++ b/app/MindWork AI Studio/Program.cs @@ -25,6 +25,7 @@ builder.Services.AddMudServices(config => }); builder.Services.AddMudMarkdownServices(); +builder.Services.AddSingleton(MessageBus.INSTANCE); builder.Services.AddSingleton(); builder.Services.AddMudMarkdownClipboardService(); builder.Services.AddSingleton(); diff --git a/app/MindWork AI Studio/Tools/Event.cs b/app/MindWork AI Studio/Tools/Event.cs new file mode 100644 index 00000000..2b259f4a --- /dev/null +++ b/app/MindWork AI Studio/Tools/Event.cs @@ -0,0 +1,13 @@ +namespace AIStudio.Tools; + +public enum Event +{ + NONE, + + // Common events: + STATE_HAS_CHANGED, + + // Update events: + USER_SEARCH_FOR_UPDATE, + UPDATE_AVAILABLE, +} \ No newline at end of file diff --git a/app/MindWork AI Studio/Tools/IMessageBusReceiver.cs b/app/MindWork AI Studio/Tools/IMessageBusReceiver.cs new file mode 100644 index 00000000..401a2118 --- /dev/null +++ b/app/MindWork AI Studio/Tools/IMessageBusReceiver.cs @@ -0,0 +1,8 @@ +using Microsoft.AspNetCore.Components; + +namespace AIStudio.Tools; + +public interface IMessageBusReceiver +{ + public Task ProcessMessage(ComponentBase? sendingComponent, Event triggeredEvent, T? data); +} \ No newline at end of file diff --git a/app/MindWork AI Studio/Tools/MSGComponentBase.cs b/app/MindWork AI Studio/Tools/MSGComponentBase.cs new file mode 100644 index 00000000..b8ccaf82 --- /dev/null +++ b/app/MindWork AI Studio/Tools/MSGComponentBase.cs @@ -0,0 +1,44 @@ +using Microsoft.AspNetCore.Components; + +namespace AIStudio.Tools; + +public abstract class MSGComponentBase : ComponentBase, IDisposable, IMessageBusReceiver +{ + [Inject] + protected MessageBus MessageBus { get; init; } = null!; + + #region Overrides of ComponentBase + + protected override void OnInitialized() + { + this.MessageBus.RegisterComponent(this); + base.OnInitialized(); + } + + #endregion + + #region Implementation of IMessageBusReceiver + + public abstract Task ProcessMessage(ComponentBase? sendingComponent, Event triggeredEvent, T? data); + + #endregion + + #region Implementation of IDisposable + + public void Dispose() + { + this.MessageBus.Unregister(this); + } + + #endregion + + protected async Task SendMessage(Event triggeredEvent, T? data = default) + { + await this.MessageBus.SendMessage(this, triggeredEvent, data); + } + + protected void ApplyFilters(ComponentBase[] components, Event[] events) + { + this.MessageBus.ApplyFilters(this, components, events); + } +} \ No newline at end of file diff --git a/app/MindWork AI Studio/Tools/MessageBus.cs b/app/MindWork AI Studio/Tools/MessageBus.cs new file mode 100644 index 00000000..6b584c42 --- /dev/null +++ b/app/MindWork AI Studio/Tools/MessageBus.cs @@ -0,0 +1,66 @@ +using System.Collections.Concurrent; + +using Microsoft.AspNetCore.Components; + +namespace AIStudio.Tools; + +public sealed class MessageBus +{ + public static readonly MessageBus INSTANCE = new(); + + private readonly ConcurrentDictionary componentFilters = new(); + private readonly ConcurrentDictionary componentEvents = new(); + private readonly ConcurrentQueue messageQueue = new(); + private readonly SemaphoreSlim sendingSemaphore = new(1, 1); + + private MessageBus() + { + } + + public void ApplyFilters(IMessageBusReceiver receiver, ComponentBase[] components, Event[] events) + { + this.componentFilters[receiver] = components; + this.componentEvents[receiver] = events; + } + + public void RegisterComponent(IMessageBusReceiver receiver) + { + this.componentFilters.TryAdd(receiver, []); + this.componentEvents.TryAdd(receiver, []); + } + + public void Unregister(IMessageBusReceiver receiver) + { + this.componentFilters.TryRemove(receiver, out _); + this.componentEvents.TryRemove(receiver, out _); + } + + private record class Message(ComponentBase? SendingComponent, Event TriggeredEvent, object? Data); + + public async Task SendMessage(ComponentBase? sendingComponent, Event triggeredEvent, T? data = default) + { + this.messageQueue.Enqueue(new Message(sendingComponent, triggeredEvent, data)); + + try + { + await this.sendingSemaphore.WaitAsync(); + while (this.messageQueue.TryDequeue(out var message)) + { + foreach (var (receiver, componentFilter) in this.componentFilters) + { + if (componentFilter.Length > 0 && sendingComponent is not null && !componentFilter.Contains(sendingComponent)) + continue; + + var eventFilter = this.componentEvents[receiver]; + if (eventFilter.Length == 0 || eventFilter.Contains(triggeredEvent)) + // We don't await the task here because we don't want to block the message bus: + _ = receiver.ProcessMessage(message.SendingComponent, message.TriggeredEvent, message.Data); + } + } + } + finally + { + this.sendingSemaphore.Release(); + } + } +} \ No newline at end of file diff --git a/app/MindWork AI Studio/Tools/MessageBusExtensions.cs b/app/MindWork AI Studio/Tools/MessageBusExtensions.cs new file mode 100644 index 00000000..7956c27e --- /dev/null +++ b/app/MindWork AI Studio/Tools/MessageBusExtensions.cs @@ -0,0 +1,16 @@ +using Microsoft.AspNetCore.Components; + +namespace AIStudio.Tools; + +public static class MessageBusExtensions +{ + public static async Task SendMessage(this ComponentBase component, Event triggeredEvent, T? data = default) + { + await MessageBus.INSTANCE.SendMessage(component, triggeredEvent, data); + } + + public static void ApplyFilters(this IMessageBusReceiver component, ComponentBase[] components, Event[] events) + { + MessageBus.INSTANCE.ApplyFilters(component, components, events); + } +} \ No newline at end of file