Implemented the Tauri events stream for the Rust server and the .NET client

This commit is contained in:
Thorsten Sommer 2025-10-27 14:37:15 +01:00
parent dca43b6c1e
commit 15d7359c59
Signed by: tsommer
GPG Key ID: 371BBA77A02C0108
7 changed files with 323 additions and 59 deletions

View File

@ -1,3 +1,4 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
@ -9,6 +10,9 @@ namespace AIStudio.Settings;
/// <remarks>
/// When the target enum value does not exist, the value will be the default value.
/// This converter handles enum values as property names and values.
/// <br/><br/>
/// We assume that enum names are in UPPER_SNAKE_CASE, and the JSON strings may be
/// in any case style (e.g., camelCase, PascalCase, snake_case, UPPER_SNAKE_CASE, etc.)
/// </remarks>
public sealed class TolerantEnumConverter : JsonConverter<object>
{
@ -16,30 +20,46 @@ public sealed class TolerantEnumConverter : JsonConverter<object>
public override bool CanConvert(Type typeToConvert) => typeToConvert.IsEnum;
public override object? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
public override object? Read(ref Utf8JsonReader reader, Type enumType, JsonSerializerOptions options)
{
// Is this token a string?
if (reader.TokenType == JsonTokenType.String)
{
// Try to use that string as the name of the enum value:
if (Enum.TryParse(typeToConvert, reader.GetString(), out var result))
var text = reader.GetString();
// Convert the text to UPPER_SNAKE_CASE:
text = ConvertToUpperSnakeCase(text);
// Try to parse the enum value:
if (Enum.TryParse(enumType, text, out var result))
return result;
}
// In any other case, we will return the default enum value:
LOG.LogWarning($"Cannot read '{reader.GetString()}' as '{typeToConvert.Name}' enum; token type: {reader.TokenType}");
return Activator.CreateInstance(typeToConvert);
LOG.LogWarning($"Cannot read '{reader.GetString()}' as '{enumType.Name}' enum; token type: {reader.TokenType}");
return Activator.CreateInstance(enumType);
}
public override object ReadAsPropertyName(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
public override object ReadAsPropertyName(ref Utf8JsonReader reader, Type enumType, JsonSerializerOptions options)
{
// Is this token a property name?
if (reader.TokenType == JsonTokenType.PropertyName)
{
// Try to use that property name as the name of the enum value:
if (Enum.TryParse(typeToConvert, reader.GetString(), out var result))
var text = reader.GetString();
// Convert the text to UPPER_SNAKE_CASE:
text = ConvertToUpperSnakeCase(text);
// Try to parse the enum value:
if (Enum.TryParse(enumType, text, out var result))
return result;
}
// In any other case, we will return the default enum value:
LOG.LogWarning($"Cannot read '{reader.GetString()}' as '{typeToConvert.Name}' enum; token type: {reader.TokenType}");
return Activator.CreateInstance(typeToConvert)!;
LOG.LogWarning($"Cannot read '{reader.GetString()}' as '{enumType.Name}' enum; token type: {reader.TokenType}");
return Activator.CreateInstance(enumType)!;
}
public override void Write(Utf8JsonWriter writer, object value, JsonSerializerOptions options)
@ -51,4 +71,44 @@ public sealed class TolerantEnumConverter : JsonConverter<object>
{
writer.WritePropertyName(value.ToString()!);
}
/// <summary>
/// Converts a string to UPPER_SNAKE_CASE.
/// </summary>
/// <param name="text">The text to convert.</param>
/// <returns>The converted text as UPPER_SNAKE_CASE.</returns>
private static string ConvertToUpperSnakeCase(string? text)
{
// Handle null or empty strings:
if (string.IsNullOrWhiteSpace(text))
return string.Empty;
// Create a string builder with the same length as the
// input text. We will add underscores as needed, which
// may increase the length -- we cannot predict how many
// underscores will be added, so we just start with the
// original length:
var sb = new StringBuilder(text.Length);
// State to track if the last character was lowercase.
// This helps to determine when to add underscores:
var lastCharWasLowerCase = false;
// Iterate through each character in the input text:
foreach(var c in text)
{
// If the current character is uppercase and the last
// character was lowercase, we need to add an underscore:
if (char.IsUpper(c) && lastCharWasLowerCase)
sb.Append('_');
// Append the uppercase version of the current character:
sb.Append(char.ToUpperInvariant(c));
// Keep track of whether the current character is lowercase:
lastCharWasLowerCase = char.IsLower(c);
}
return sb.ToString();
}
}

View File

@ -1,3 +1,8 @@
namespace AIStudio.Tools.Rust;
public readonly record struct TauriEvent(TauriEventType Type, List<string> Payload);
/// <summary>
/// The data structure for a Tauri event sent from the Rust backend to the C# frontend.
/// </summary>
/// <param name="EventType">The type of the Tauri event.</param>
/// <param name="Payload">The payload of the Tauri event.</param>
public readonly record struct TauriEvent(TauriEventType EventType, List<string> Payload);

View File

@ -1,7 +1,14 @@
namespace AIStudio.Tools.Rust;
/// <summary>
/// The type of Tauri events we can receive.
/// </summary>
public enum TauriEventType
{
NONE,
PING,
UNKNOWN,
WINDOW_FOCUSED,
WINDOW_NOT_FOCUSED,

View File

@ -6,24 +6,46 @@ namespace AIStudio.Tools.Services;
public partial class RustService
{
/// <summary>
/// Consume the Tauri event stream and forward relevant events to the message bus.
/// </summary>
/// <param name="stopToken">Cancellation token to stop the stream.</param>
private async Task StartStreamTauriEvents(CancellationToken stopToken)
{
// Outer try-catch to handle cancellation:
try
{
while (!stopToken.IsCancellationRequested)
{
// Inner try-catch to handle streaming issues:
try
{
// Open the event stream:
await using var stream = await this.http.GetStreamAsync("/events", stopToken);
// Read events line by line:
using var reader = new StreamReader(stream);
while(!reader.EndOfStream)
// Read until the end of the stream or cancellation:
while(!reader.EndOfStream && !stopToken.IsCancellationRequested)
{
// Read the next line of JSON from the stream:
var line = await reader.ReadLineAsync(stopToken);
// Skip empty lines:
if (string.IsNullOrWhiteSpace(line))
continue;
// Deserialize the Tauri event:
var tauriEvent = JsonSerializer.Deserialize<TauriEvent>(line, this.jsonRustSerializerOptions);
if (tauriEvent != default)
// Log the received event for debugging:
this.logger!.LogDebug("Received Tauri event: {Event}", tauriEvent);
// Forward relevant events to the message bus:
if (tauriEvent != default && tauriEvent.EventType is not TauriEventType.NONE
and not TauriEventType.UNKNOWN and not TauriEventType.PING)
await MessageBus.INSTANCE.SendMessage(null, Event.TAURI_EVENT_RECEIVED, tauriEvent);
}
}

View File

@ -1,6 +1,10 @@
using System.Security.Cryptography;
using System.Text.Json;
using AIStudio.Settings;
using Version = System.Version;
// ReSharper disable NotAccessedPositionalProperty.Local
namespace AIStudio.Tools.Services;
@ -15,6 +19,7 @@ public sealed partial class RustService : BackgroundService
private readonly JsonSerializerOptions jsonRustSerializerOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
Converters = { new TolerantEnumConverter() },
};
private ILogger<RustService>? logger;
@ -61,9 +66,15 @@ public sealed partial class RustService : BackgroundService
#region Overrides of BackgroundService
/// <summary>
/// The main execution loop of the Rust service as a background thread.
/// </summary>
/// <param name="stopToken">The cancellation token to stop the service.</param>
protected override async Task ExecuteAsync(CancellationToken stopToken)
{
this.logger?.LogInformation("The Rust service was initialized.");
// Start consuming Tauri events:
await this.StartStreamTauriEvents(stopToken);
}

View File

@ -3,12 +3,14 @@ use std::time::Duration;
use log::{debug, error, info, trace, warn};
use once_cell::sync::Lazy;
use rocket::{get, post};
use rocket::response::stream::TextStream;
use rocket::serde::json::Json;
use rocket::serde::Serialize;
use serde::Deserialize;
use tauri::updater::UpdateResponse;
use tauri::{FileDropEvent, Manager, PathResolver, Window};
use tauri::{FileDropEvent, UpdaterEvent, RunEvent, Manager, PathResolver, Window, WindowEvent};
use tauri::api::dialog::blocking::FileDialogBuilder;
use tokio::sync::broadcast;
use tokio::time;
use crate::api_token::APIToken;
use crate::dotnet::stop_dotnet_server;
@ -22,41 +24,61 @@ static MAIN_WINDOW: Lazy<Mutex<Option<Window>>> = Lazy::new(|| Mutex::new(None))
/// The update response coming from the Tauri updater.
static CHECK_UPDATE_RESPONSE: Lazy<Mutex<Option<UpdateResponse<tauri::Wry>>>> = Lazy::new(|| Mutex::new(None));
/// The event broadcast sender for Tauri events.
static EVENT_BROADCAST: Lazy<Mutex<Option<broadcast::Sender<Event>>>> = Lazy::new(|| Mutex::new(None));
/// Starts the Tauri app.
pub fn start_tauri() {
info!("Starting Tauri app...");
// Create the event broadcast channel:
let (event_sender, root_event_receiver) = broadcast::channel(100);
// Save a copy of the event broadcast sender for later use:
*EVENT_BROADCAST.lock().unwrap() = Some(event_sender.clone());
// When the last receiver is dropped, we lose the ability to send events.
// Therefore, we spawn a task that keeps the root receiver alive:
tauri::async_runtime::spawn(async move {
let mut root_receiver = root_event_receiver;
loop {
match root_receiver.recv().await {
Ok(event) => {
debug!(Source = "Tauri"; "Tauri event received: location=root receiver , event={event:?}");
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!(Source = "Tauri"; "Root event receiver lagged, skipped {skipped} messages.");
},
Err(broadcast::error::RecvError::Closed) => {
warn!(Source = "Tauri"; "Root event receiver channel closed.");
return;
},
}
}
});
let app = tauri::Builder::default()
.setup(move |app| {
// Get the main window:
let window = app.get_window("main").expect("Failed to get main window.");
// Register a callback for file drop events:
window.on_window_event(|event|
match event {
tauri::WindowEvent::FileDrop(drop_event) => {
match drop_event {
FileDropEvent::Hovered(files) => {
info!(Source = "Tauri"; "Files hovered over the window: {files:?}");
},
FileDropEvent::Dropped(files) => {
info!(Source = "Tauri"; "Files dropped on the window: {files:?}");
},
FileDropEvent::Cancelled => {
info!(Source = "Tauri"; "File drop was cancelled.");
},
_ => {}
}
},
tauri::WindowEvent::Focused(state) => {
info!(Source = "Tauri"; "Window focus changed: focused={state}");
},
_ => {}
}
);
// Register a callback for window events, such as file drops. We have to use
// this handler in addition to the app event handler, because file drop events
// are only available in the window event handler (is a bug, cf. https://github.com/tauri-apps/tauri/issues/14338):
window.on_window_event(move |event| {
debug!(Source = "Tauri"; "Tauri event received: location=window event handler, event={event:?}");
let event_to_send = Event::from_window_event(event);
let sender = event_sender.clone();
tauri::async_runtime::spawn(async move {
match sender.send(event_to_send) {
Ok(_) => {},
Err(error) => error!(Source = "Tauri"; "Failed to channel window event: {error}"),
}
});
});
// Save the main window for later access:
*MAIN_WINDOW.lock().unwrap() = Some(window);
@ -65,6 +87,7 @@ pub fn start_tauri() {
let data_path = app.path_resolver().app_local_data_dir().unwrap();
let data_path = data_path.join("data");
// Get and store the data and config directories:
DATA_DIRECTORY.set(data_path.to_str().unwrap().to_string()).map_err(|_| error!("Was not abe to set the data directory.")).unwrap();
CONFIG_DIRECTORY.set(app.path_resolver().app_config_dir().unwrap().to_str().unwrap().to_string()).map_err(|_| error!("Was not able to set the config directory.")).unwrap();
@ -77,46 +100,43 @@ pub fn start_tauri() {
.build(tauri::generate_context!())
.expect("Error while running Tauri application");
// The app event handler:
app.run(|app_handle, event| {
if !matches!(event, tauri::RunEvent::MainEventsCleared) {
debug!(Source = "Tauri"; "Event received: {event:?}");
if !matches!(event, RunEvent::MainEventsCleared) {
debug!(Source = "Tauri"; "Tauri event received: location=app event handler , event={event:?}");
}
match event {
tauri::RunEvent::WindowEvent { event, label, .. } => {
RunEvent::WindowEvent { event, label, .. } => {
match event {
tauri::WindowEvent::CloseRequested { .. } => {
WindowEvent::CloseRequested { .. } => {
warn!(Source = "Tauri"; "Window '{label}': close was requested.");
}
tauri::WindowEvent::Destroyed => {
WindowEvent::Destroyed => {
warn!(Source = "Tauri"; "Window '{label}': was destroyed.");
}
tauri::WindowEvent::FileDrop(files) => {
info!(Source = "Tauri"; "Window '{label}': files were dropped: {files:?}");
}
_ => (),
}
}
tauri::RunEvent::Updater(updater_event) => {
RunEvent::Updater(updater_event) => {
match updater_event {
tauri::UpdaterEvent::UpdateAvailable { body, date, version } => {
UpdaterEvent::UpdateAvailable { body, date, version } => {
let body_len = body.len();
info!(Source = "Tauri"; "Updater: update available: body size={body_len} time={date:?} version={version}");
}
tauri::UpdaterEvent::Pending => {
UpdaterEvent::Pending => {
info!(Source = "Tauri"; "Updater: update is pending!");
}
tauri::UpdaterEvent::DownloadProgress { chunk_length, content_length: _ } => {
UpdaterEvent::DownloadProgress { chunk_length, content_length: _ } => {
trace!(Source = "Tauri"; "Updater: downloading chunk of {chunk_length} bytes");
}
tauri::UpdaterEvent::Downloaded => {
UpdaterEvent::Downloaded => {
info!(Source = "Tauri"; "Updater: update has been downloaded!");
warn!(Source = "Tauri"; "Try to stop the .NET server now...");
@ -127,7 +147,7 @@ pub fn start_tauri() {
}
}
tauri::UpdaterEvent::Updated => {
UpdaterEvent::Updated => {
info!(Source = "Tauri"; "Updater: app has been updated");
warn!(Source = "Tauri"; "Try to restart the app now...");
@ -138,21 +158,21 @@ pub fn start_tauri() {
}
}
tauri::UpdaterEvent::AlreadyUpToDate => {
UpdaterEvent::AlreadyUpToDate => {
info!(Source = "Tauri"; "Updater: app is already up to date");
}
tauri::UpdaterEvent::Error(error) => {
UpdaterEvent::Error(error) => {
warn!(Source = "Tauri"; "Updater: failed to update: {error}");
}
}
}
tauri::RunEvent::ExitRequested { .. } => {
RunEvent::ExitRequested { .. } => {
warn!(Source = "Tauri"; "Run event: exit was requested.");
}
tauri::RunEvent::Ready => {
RunEvent::Ready => {
info!(Source = "Tauri"; "Run event: Tauri app is ready.");
}
@ -167,6 +187,144 @@ pub fn start_tauri() {
}
}
/// Our event API endpoint for Tauri events. We try to send an endless stream of events to the client.
/// If no events are available for a certain time, we send a ping event to keep the connection alive.
/// When the client disconnects, the stream is closed. But we try to not lose events in between.
/// The client is expected to reconnect automatically when the connection is closed and continue
/// listening for events.
#[get("/events")]
pub async fn get_event_stream(_token: APIToken) -> TextStream![String] {
// Get the lock to the event broadcast sender:
let event_broadcast_lock = EVENT_BROADCAST.lock().unwrap();
// Get and subscribe to the event receiver:
let mut event_receiver = event_broadcast_lock.as_ref()
.expect("Event sender not initialized.")
.subscribe();
// Drop the lock to allow other access to the sender:
drop(event_broadcast_lock);
// Create the event stream:
TextStream! {
loop {
// Wait at most 3 seconds for an event:
match time::timeout(Duration::from_secs(3), event_receiver.recv()).await {
// Case: we received an event
Ok(Ok(event)) => {
// Serialize the event to JSON. Important is that the entire event
// is serialized as a single line so that the client can parse it
// correctly:
let event_json = serde_json::to_string(&event).unwrap();
yield event_json;
// The client expects a newline after each event because we are using
// a method to read the stream line-by-line:
yield "\n".to_string();
},
// Case: we lagged behind and missed some events
Ok(Err(broadcast::error::RecvError::Lagged(skipped))) => {
warn!(Source = "Tauri"; "Event receiver lagged, skipped {skipped} messages.");
},
// Case: the event channel was closed
Ok(Err(broadcast::error::RecvError::Closed)) => {
warn!(Source = "Tauri"; "Event receiver channel closed.");
return;
},
// Case: timeout. We will send a ping event to keep the connection alive.
Err(_) => {
let ping_event = Event::new(TauriEventType::Ping, Vec::new());
// Again, we have to serialize the event as a single line:
let event_json = serde_json::to_string(&ping_event).unwrap();
yield event_json;
// The client expects a newline after each event because we are using
// a method to read the stream line-by-line:
yield "\n".to_string();
},
}
}
}
}
/// Data structure representing a Tauri event for our event API.
#[derive(Debug, Clone, Serialize)]
pub struct Event {
pub event_type: TauriEventType,
pub payload: Vec<String>,
}
/// Implementation of the Event struct.
impl Event {
/// Creates a new Event instance.
pub fn new(event_type: TauriEventType, payload: Vec<String>) -> Self {
Event {
payload,
event_type,
}
}
/// Creates an Event instance from a Tauri WindowEvent.
pub fn from_window_event(window_event: &WindowEvent) -> Self {
match window_event {
WindowEvent::FileDrop(drop_event) => {
match drop_event {
FileDropEvent::Hovered(files) => Event::new(TauriEventType::FileDropHovered,
files.iter().map(|f| f.to_string_lossy().to_string()).collect(),
),
FileDropEvent::Dropped(files) => Event::new(TauriEventType::FileDropDropped,
files.iter().map(|f| f.to_string_lossy().to_string()).collect(),
),
FileDropEvent::Cancelled => Event::new(TauriEventType::FileDropCanceled,
Vec::new(),
),
_ => Event::new(TauriEventType::Unknown,
Vec::new(),
),
}
},
WindowEvent::Focused(state) => if *state {
Event::new(TauriEventType::WindowFocused,
Vec::new(),
)
} else {
Event::new(TauriEventType::WindowNotFocused,
Vec::new(),
)
},
_ => Event::new(TauriEventType::Unknown,
Vec::new(),
),
}
}
}
/// The types of Tauri events we can send through our event API.
#[derive(Debug, Serialize, Clone)]
pub enum TauriEventType {
None,
Ping,
Unknown,
WindowFocused,
WindowNotFocused,
FileDropHovered,
FileDropDropped,
FileDropCanceled,
}
/// Changes the location of the main window to the given URL.
pub async fn change_location_to(url: &str) {
// Try to get the main window. If it is not available yet, wait for it:

View File

@ -68,6 +68,7 @@ pub fn start_runtime_api() {
crate::dotnet::dotnet_port,
crate::dotnet::dotnet_ready,
crate::clipboard::set_clipboard,
crate::app_window::get_event_stream,
crate::app_window::check_for_update,
crate::app_window::install_update,
crate::app_window::select_directory,