From 33a2728644e565fc4bcc6d37c53e536eb44540d1 Mon Sep 17 00:00:00 2001 From: Thorsten Sommer Date: Sat, 4 Jan 2025 14:11:32 +0100 Subject: [PATCH] Refactored network code (#253) --- .../Provider/Anthropic/ProviderAnthropic.cs | 122 ++---------------- .../Provider/Anthropic/ResponseStreamLine.cs | 9 +- .../Provider/BaseProvider.cs | 108 ++++++++++++++++ .../Provider/Fireworks/ProviderFireworks.cs | 114 ++-------------- .../Provider/Fireworks/ResponseStreamLine.cs | 9 +- .../Provider/Google/ProviderGoogle.cs | 114 ++-------------- .../Provider/Groq/ProviderGroq.cs | 114 ++-------------- .../Provider/IResponseStreamLine.cs | 16 +++ .../Provider/Mistral/ProviderMistral.cs | 114 ++-------------- .../Provider/OpenAI/ProviderOpenAI.cs | 114 ++-------------- .../Provider/OpenAI/ResponseStreamLine.cs | 9 +- .../Provider/SelfHosted/ProviderSelfHosted.cs | 118 ++--------------- .../wwwroot/changelog/v0.9.25.md | 3 + 13 files changed, 224 insertions(+), 740 deletions(-) create mode 100644 app/MindWork AI Studio/Provider/IResponseStreamLine.cs create mode 100644 app/MindWork AI Studio/wwwroot/changelog/v0.9.25.md diff --git a/app/MindWork AI Studio/Provider/Anthropic/ProviderAnthropic.cs b/app/MindWork AI Studio/Provider/Anthropic/ProviderAnthropic.cs index f4846e9..d0e20d3 100644 --- a/app/MindWork AI Studio/Provider/Anthropic/ProviderAnthropic.cs +++ b/app/MindWork AI Studio/Provider/Anthropic/ProviderAnthropic.cs @@ -10,11 +10,6 @@ namespace AIStudio.Provider.Anthropic; public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://api.anthropic.com/v1/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider public override string Id => LLMProviders.ANTHROPIC.ToName(); @@ -60,117 +55,24 @@ public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://ap Stream = true, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "messages"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "messages"); - // Set the authorization header: - request.Headers.Add("x-api-key", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Add("x-api-key", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the Anthropic version: - request.Headers.Add("anthropic-version", "2023-06-01"); + // Set the Anthropic version: + request.Headers.Add("anthropic-version", "2023-06-01"); - // Set the content: - request.Content = new StringContent(chatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Anthropic chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var stream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(stream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from Anthropic '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(chatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from Anthropic '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from Anthropic '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Check for the end of the stream: - if(line.StartsWith("event: message_stop", StringComparison.InvariantCulture)) - yield break; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Ignore any type except "content_block_delta": - if(!line.Contains("\"content_block_delta\"", StringComparison.InvariantCulture)) - continue; - - ResponseStreamLine anthropicResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - anthropicResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(anthropicResponse == default || string.IsNullOrWhiteSpace(anthropicResponse.Delta.Text)) - continue; - - // Yield the response: - yield return anthropicResponse.Delta.Text; - } + await foreach (var content in this.StreamChatCompletionInternal("Anthropic", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/Anthropic/ResponseStreamLine.cs b/app/MindWork AI Studio/Provider/Anthropic/ResponseStreamLine.cs index 041d53a..c42e131 100644 --- a/app/MindWork AI Studio/Provider/Anthropic/ResponseStreamLine.cs +++ b/app/MindWork AI Studio/Provider/Anthropic/ResponseStreamLine.cs @@ -7,7 +7,14 @@ namespace AIStudio.Provider.Anthropic; /// The type of the response line. /// The index of the response line. /// The delta of the response line. -public readonly record struct ResponseStreamLine(string Type, int Index, Delta Delta); +public readonly record struct ResponseStreamLine(string Type, int Index, Delta Delta) : IResponseStreamLine +{ + /// + public bool ContainsContent() => this != default && !string.IsNullOrWhiteSpace(this.Delta.Text); + + /// + public string GetContent() => this.Delta.Text; +} /// /// The delta object of a response line. diff --git a/app/MindWork AI Studio/Provider/BaseProvider.cs b/app/MindWork AI Studio/Provider/BaseProvider.cs index 58810b7..2514383 100644 --- a/app/MindWork AI Studio/Provider/BaseProvider.cs +++ b/app/MindWork AI Studio/Provider/BaseProvider.cs @@ -1,4 +1,6 @@ using System.Net; +using System.Runtime.CompilerServices; +using System.Text.Json; using AIStudio.Chat; using AIStudio.Settings; @@ -31,6 +33,11 @@ public abstract class BaseProvider : IProvider, ISecretId protected static readonly RustService RUST_SERVICE; protected static readonly Encryption ENCRYPTION; + + protected static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, + }; /// /// Constructor for the base provider. @@ -127,4 +134,105 @@ public abstract class BaseProvider : IProvider, ISecretId return new HttpRateLimitedStreamResult(true, false, string.Empty, response); } + + protected async IAsyncEnumerable StreamChatCompletionInternal(string providerName, Func> requestBuilder, [EnumeratorCancellation] CancellationToken token = default) where T : struct, IResponseStreamLine + { + StreamReader? streamReader = null; + try + { + // Send the request using exponential backoff: + var responseData = await this.SendRequest(requestBuilder, token); + if(responseData.IsFailedAfterAllRetries) + { + this.logger.LogError($"The {providerName} chat completion failed: {responseData.ErrorMessage}"); + yield break; + } + + // Open the response stream: + var providerStream = await responseData.Response!.Content.ReadAsStreamAsync(token); + + // Add a stream reader to read the stream, line by line: + streamReader = new StreamReader(providerStream); + } + catch(Exception e) + { + this.logger.LogError($"Failed to stream chat completion from {providerName} '{this.InstanceName}': {e.Message}"); + } + + if (streamReader is null) + yield break; + + // Read the stream, line by line: + while (true) + { + try + { + if(streamReader.EndOfStream) + break; + } + catch (Exception e) + { + this.logger.LogWarning($"Failed to read the end-of-stream state from {providerName} '{this.InstanceName}': {e.Message}"); + break; + } + + // Check if the token is canceled: + if (token.IsCancellationRequested) + { + this.logger.LogWarning($"The user canceled the chat completion for {providerName} '{this.InstanceName}'."); + streamReader.Close(); + yield break; + } + + // Read the next line: + string? line; + try + { + line = await streamReader.ReadLineAsync(token); + } + catch (Exception e) + { + this.logger.LogError($"Failed to read the stream from {providerName} '{this.InstanceName}': {e.Message}"); + break; + } + + // Skip empty lines: + if (string.IsNullOrWhiteSpace(line)) + continue; + + // Skip lines that do not start with "data: ". Regard + // to the specification, we only want to read the data lines: + if (!line.StartsWith("data: ", StringComparison.InvariantCulture)) + continue; + + // Check if the line is the end of the stream: + if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) + yield break; + + T providerResponse; + try + { + // We know that the line starts with "data: ". Hence, we can + // skip the first 6 characters to get the JSON data after that. + var jsonData = line[6..]; + + // Deserialize the JSON data: + providerResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); + } + catch + { + // Skip invalid JSON data: + continue; + } + + // Skip empty responses: + if (!providerResponse.ContainsContent()) + continue; + + // Yield the response: + yield return providerResponse.GetContent(); + } + + streamReader.Dispose(); + } } \ No newline at end of file diff --git a/app/MindWork AI Studio/Provider/Fireworks/ProviderFireworks.cs b/app/MindWork AI Studio/Provider/Fireworks/ProviderFireworks.cs index 1dd49d3..66817fc 100644 --- a/app/MindWork AI Studio/Provider/Fireworks/ProviderFireworks.cs +++ b/app/MindWork AI Studio/Provider/Fireworks/ProviderFireworks.cs @@ -10,11 +10,6 @@ namespace AIStudio.Provider.Fireworks; public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.fireworks.ai/inference/v1/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider /// @@ -69,110 +64,21 @@ public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.firew Stream = true, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); - // Set the authorization header: - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(fireworksChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Fireworks chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var fireworksStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(fireworksStream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from Fireworks '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(fireworksChatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from Fireworks '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from Fireworks '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine fireworksResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - fireworksResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(fireworksResponse == default || fireworksResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return fireworksResponse.Choices[0].Delta.Content; - } + await foreach (var content in this.StreamChatCompletionInternal("Fireworks", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/Fireworks/ResponseStreamLine.cs b/app/MindWork AI Studio/Provider/Fireworks/ResponseStreamLine.cs index c4d54e0..b3832f5 100644 --- a/app/MindWork AI Studio/Provider/Fireworks/ResponseStreamLine.cs +++ b/app/MindWork AI Studio/Provider/Fireworks/ResponseStreamLine.cs @@ -8,7 +8,14 @@ namespace AIStudio.Provider.Fireworks; /// The timestamp of the response. /// The model used for the response. /// The choices made by the AI. -public readonly record struct ResponseStreamLine(string Id, string Object, uint Created, string Model, IList Choices); +public readonly record struct ResponseStreamLine(string Id, string Object, uint Created, string Model, IList Choices) : IResponseStreamLine +{ + /// + public bool ContainsContent() => this != default && this.Choices.Count > 0; + + /// + public string GetContent() => this.Choices[0].Delta.Content; +} /// /// Data model for a choice made by the AI. diff --git a/app/MindWork AI Studio/Provider/Google/ProviderGoogle.cs b/app/MindWork AI Studio/Provider/Google/ProviderGoogle.cs index 6293b08..e0d5da7 100644 --- a/app/MindWork AI Studio/Provider/Google/ProviderGoogle.cs +++ b/app/MindWork AI Studio/Provider/Google/ProviderGoogle.cs @@ -11,11 +11,6 @@ namespace AIStudio.Provider.Google; public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativelanguage.googleapis.com/v1beta/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider /// @@ -70,110 +65,21 @@ public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativela Stream = true, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); - // Set the authorization header: - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(geminiChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Google chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var geminiStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(geminiStream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from Google '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(geminiChatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from Google '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from Google '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine geminiResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - geminiResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(geminiResponse == default || geminiResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return geminiResponse.Choices[0].Delta.Content; - } + await foreach (var content in this.StreamChatCompletionInternal("Google", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/Groq/ProviderGroq.cs b/app/MindWork AI Studio/Provider/Groq/ProviderGroq.cs index 7fd2931..2fc7e88 100644 --- a/app/MindWork AI Studio/Provider/Groq/ProviderGroq.cs +++ b/app/MindWork AI Studio/Provider/Groq/ProviderGroq.cs @@ -11,11 +11,6 @@ namespace AIStudio.Provider.Groq; public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/openai/v1/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider /// @@ -72,110 +67,21 @@ public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/o Stream = true, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); - // Set the authorization header: - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(groqChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Groq chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var groqStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(groqStream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from Groq '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(groqChatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from Groq '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from Groq '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine groqResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - groqResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(groqResponse == default || groqResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return groqResponse.Choices[0].Delta.Content; - } + await foreach (var content in this.StreamChatCompletionInternal("Groq", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/IResponseStreamLine.cs b/app/MindWork AI Studio/Provider/IResponseStreamLine.cs new file mode 100644 index 0000000..b3e7c28 --- /dev/null +++ b/app/MindWork AI Studio/Provider/IResponseStreamLine.cs @@ -0,0 +1,16 @@ +namespace AIStudio.Provider; + +public interface IResponseStreamLine +{ + /// + /// Checks if the response line contains any content. + /// + /// True when the response line contains content, false otherwise. + public bool ContainsContent(); + + /// + /// Gets the content of the response line. + /// + /// The content of the response line. + public string GetContent(); +} \ No newline at end of file diff --git a/app/MindWork AI Studio/Provider/Mistral/ProviderMistral.cs b/app/MindWork AI Studio/Provider/Mistral/ProviderMistral.cs index b83dfa4..ebde4c7 100644 --- a/app/MindWork AI Studio/Provider/Mistral/ProviderMistral.cs +++ b/app/MindWork AI Studio/Provider/Mistral/ProviderMistral.cs @@ -11,11 +11,6 @@ namespace AIStudio.Provider.Mistral; public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api.mistral.ai/v1/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider public override string Id => LLMProviders.MISTRAL.ToName(); @@ -71,110 +66,21 @@ public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api. SafePrompt = false, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); - // Set the authorization header: - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(mistralChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Mistral chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var mistralStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(mistralStream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from Mistral '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(mistralChatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from Mistral '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from Mistral '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine mistralResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - mistralResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(mistralResponse == default || mistralResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return mistralResponse.Choices[0].Delta.Content; - } + await foreach (var content in this.StreamChatCompletionInternal("Mistral", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/OpenAI/ProviderOpenAI.cs b/app/MindWork AI Studio/Provider/OpenAI/ProviderOpenAI.cs index 7635366..6ddeeab 100644 --- a/app/MindWork AI Studio/Provider/OpenAI/ProviderOpenAI.cs +++ b/app/MindWork AI Studio/Provider/OpenAI/ProviderOpenAI.cs @@ -13,11 +13,6 @@ namespace AIStudio.Provider.OpenAI; /// public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.openai.com/v1/", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider /// @@ -99,110 +94,21 @@ public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.o Stream = true, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); - // Set the authorization header: - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(openAIChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"OpenAI chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var openAIStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(openAIStream); - } - catch (Exception e) - { - this.logger.LogError($"Failed to stream chat completion from OpenAI '{this.InstanceName}': {e.Message}"); + // Set the content: + request.Content = new StringContent(openAIChatRequest, Encoding.UTF8, "application/json"); + return request; } - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while(true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from OpenAI '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if(token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from OpenAI '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if(string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if(!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine openAIResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - openAIResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if(openAIResponse == default || openAIResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return openAIResponse.Choices[0].Delta.Content; - } + await foreach (var content in this.StreamChatCompletionInternal("OpenAI", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/Provider/OpenAI/ResponseStreamLine.cs b/app/MindWork AI Studio/Provider/OpenAI/ResponseStreamLine.cs index 0e938fd..98b2b2d 100644 --- a/app/MindWork AI Studio/Provider/OpenAI/ResponseStreamLine.cs +++ b/app/MindWork AI Studio/Provider/OpenAI/ResponseStreamLine.cs @@ -9,7 +9,14 @@ namespace AIStudio.Provider.OpenAI; /// The model used for the response. /// The system fingerprint; together with the seed, this allows you to reproduce the response. /// The choices made by the AI. -public readonly record struct ResponseStreamLine(string Id, string Object, uint Created, string Model, string SystemFingerprint, IList Choices); +public readonly record struct ResponseStreamLine(string Id, string Object, uint Created, string Model, string SystemFingerprint, IList Choices) : IResponseStreamLine +{ + /// + public bool ContainsContent() => this != default && this.Choices.Count > 0; + + /// + public string GetContent() => this.Choices[0].Delta.Content; +} /// /// Data model for a choice made by the AI. diff --git a/app/MindWork AI Studio/Provider/SelfHosted/ProviderSelfHosted.cs b/app/MindWork AI Studio/Provider/SelfHosted/ProviderSelfHosted.cs index eb3c98b..3abda28 100644 --- a/app/MindWork AI Studio/Provider/SelfHosted/ProviderSelfHosted.cs +++ b/app/MindWork AI Studio/Provider/SelfHosted/ProviderSelfHosted.cs @@ -11,11 +11,6 @@ namespace AIStudio.Provider.SelfHosted; public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostname) : BaseProvider($"{hostname}{host.BaseURL()}", logger) { - private static readonly JsonSerializerOptions JSON_SERIALIZER_OPTIONS = new() - { - PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - }; - #region Implementation of IProvider public override string Id => LLMProviders.SELF_HOSTED.ToName(); @@ -67,113 +62,22 @@ public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostnam MaxTokens = -1, }, JSON_SERIALIZER_OPTIONS); - StreamReader? streamReader = null; - try + async Task RequestBuilder() { - async Task RequestBuilder() - { - // Build the HTTP post request: - var request = new HttpRequestMessage(HttpMethod.Post, host.ChatURL()); + // Build the HTTP post request: + var request = new HttpRequestMessage(HttpMethod.Post, host.ChatURL()); - // Set the authorization header: - if (requestedSecret.Success) - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); + // Set the authorization header: + if (requestedSecret.Success) + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); - // Set the content: - request.Content = new StringContent(providerChatRequest, Encoding.UTF8, "application/json"); - return request; - } - - // Send the request using exponential backoff: - var responseData = await this.SendRequest(RequestBuilder, token); - if(responseData.IsFailedAfterAllRetries) - { - this.logger.LogError($"Self-hosted provider's chat completion failed: {responseData.ErrorMessage}"); - yield break; - } - - // Open the response stream: - var providerStream = await responseData.Response!.Content.ReadAsStreamAsync(token); - - // Add a stream reader to read the stream, line by line: - streamReader = new StreamReader(providerStream); - } - catch(Exception e) - { - this.logger.LogError($"Failed to stream chat completion from self-hosted provider '{this.InstanceName}': {e.Message}"); - } - - if (streamReader is null) - yield break; - - // Read the stream, line by line: - while (true) - { - try - { - if(streamReader.EndOfStream) - break; - } - catch (Exception e) - { - this.logger.LogWarning($"Failed to read the end-of-stream state from self-hosted provider '{this.InstanceName}': {e.Message}"); - break; - } - - // Check if the token is canceled: - if (token.IsCancellationRequested) - yield break; - - // Read the next line: - string? line; - try - { - line = await streamReader.ReadLineAsync(token); - } - catch (Exception e) - { - this.logger.LogError($"Failed to read the stream from self-hosted provider '{this.InstanceName}': {e.Message}"); - break; - } - - // Skip empty lines: - if (string.IsNullOrWhiteSpace(line)) - continue; - - // Skip lines that do not start with "data: ". Regard - // to the specification, we only want to read the data lines: - if (!line.StartsWith("data: ", StringComparison.InvariantCulture)) - continue; - - // Check if the line is the end of the stream: - if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture)) - yield break; - - ResponseStreamLine providerResponse; - try - { - // We know that the line starts with "data: ". Hence, we can - // skip the first 6 characters to get the JSON data after that. - var jsonData = line[6..]; - - // Deserialize the JSON data: - providerResponse = JsonSerializer.Deserialize(jsonData, JSON_SERIALIZER_OPTIONS); - } - catch - { - // Skip invalid JSON data: - continue; - } - - // Skip empty responses: - if (providerResponse == default || providerResponse.Choices.Count == 0) - continue; - - // Yield the response: - yield return providerResponse.Choices[0].Delta.Content; + // Set the content: + request.Content = new StringContent(providerChatRequest, Encoding.UTF8, "application/json"); + return request; } - streamReader.Dispose(); + await foreach (var content in this.StreamChatCompletionInternal("self-hosted provider", RequestBuilder, token)) + yield return content; } #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/app/MindWork AI Studio/wwwroot/changelog/v0.9.25.md b/app/MindWork AI Studio/wwwroot/changelog/v0.9.25.md new file mode 100644 index 0000000..7156d9b --- /dev/null +++ b/app/MindWork AI Studio/wwwroot/changelog/v0.9.25.md @@ -0,0 +1,3 @@ +# v0.9.25, build 200 (2025-01-xx xx:xx UTC) +- Improved the stop generation button behavior to ensure that the AI stops generating content immediately (which will save compute time, energy and financial resources). +- Restructured the streaming network code to be centralized out of the individual providers. This will allow for easier maintenance and updates in the future. \ No newline at end of file