Improved the stream handling for all providers (#251)

This commit is contained in:
Thorsten Sommer 2025-01-04 12:37:49 +01:00 committed by GitHub
parent 258bc7a338
commit d6521850e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 349 additions and 140 deletions

View File

@ -60,21 +60,24 @@ public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://ap
Stream = true, Stream = true,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "messages"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "messages");
// Set the authorization header: // Set the authorization header:
request.Headers.Add("x-api-key", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Add("x-api-key", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the Anthropic version: // Set the Anthropic version:
request.Headers.Add("anthropic-version", "2023-06-01"); request.Headers.Add("anthropic-version", "2023-06-01");
// Set the content: // Set the content:
request.Content = new StringContent(chatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(chatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -84,21 +87,49 @@ public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://ap
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var stream = await responseData.Response!.Content.ReadAsStreamAsync(token); var stream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(stream); streamReader = new StreamReader(stream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from Anthropic '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from Anthropic '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from Anthropic '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -69,18 +69,21 @@ public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.firew
Stream = true, Stream = true,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
// Set the authorization header: // Set the authorization header:
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the content: // Set the content:
request.Content = new StringContent(fireworksChatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(fireworksChatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -90,21 +93,49 @@ public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.firew
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var fireworksStream = await responseData.Response!.Content.ReadAsStreamAsync(token); var fireworksStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(fireworksStream); streamReader = new StreamReader(fireworksStream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from Fireworks '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from Fireworks '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from Fireworks '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -70,18 +70,21 @@ public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativela
Stream = true, Stream = true,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
// Set the authorization header: // Set the authorization header:
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the content: // Set the content:
request.Content = new StringContent(geminiChatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(geminiChatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -91,21 +94,49 @@ public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativela
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var geminiStream = await responseData.Response!.Content.ReadAsStreamAsync(token); var geminiStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(geminiStream); streamReader = new StreamReader(geminiStream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from Google '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from Google '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from Google '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -72,18 +72,21 @@ public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/o
Stream = true, Stream = true,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
// Set the authorization header: // Set the authorization header:
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the content: // Set the content:
request.Content = new StringContent(groqChatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(groqChatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -93,21 +96,49 @@ public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/o
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var groqStream = await responseData.Response!.Content.ReadAsStreamAsync(token); var groqStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(groqStream); streamReader = new StreamReader(groqStream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from Groq '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from Groq '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from Groq '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -71,18 +71,21 @@ public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api.
SafePrompt = false, SafePrompt = false,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
// Set the authorization header: // Set the authorization header:
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the content: // Set the content:
request.Content = new StringContent(mistralChatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(mistralChatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -92,21 +95,49 @@ public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api.
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var mistralStream = await responseData.Response!.Content.ReadAsStreamAsync(token); var mistralStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(mistralStream); streamReader = new StreamReader(mistralStream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from Mistral '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from Mistral '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from Mistral '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -99,18 +99,21 @@ public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.o
Stream = true, Stream = true,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder() StreamReader? streamReader = null;
try
{ {
// Build the HTTP post request: async Task<HttpRequestMessage> RequestBuilder()
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions"); {
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
// Set the authorization header: // Set the authorization header:
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION)); request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", await requestedSecret.Secret.Decrypt(ENCRYPTION));
// Set the content: // Set the content:
request.Content = new StringContent(openAIChatRequest, Encoding.UTF8, "application/json"); request.Content = new StringContent(openAIChatRequest, Encoding.UTF8, "application/json");
return request; return request;
} }
// Send the request using exponential backoff: // Send the request using exponential backoff:
var responseData = await this.SendRequest(RequestBuilder, token); var responseData = await this.SendRequest(RequestBuilder, token);
@ -120,21 +123,49 @@ public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.o
yield break; yield break;
} }
// Open the response stream: // Open the response stream:
var openAIStream = await responseData.Response!.Content.ReadAsStreamAsync(token); var openAIStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line: // Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(openAIStream); streamReader = new StreamReader(openAIStream);
}
catch (Exception e)
{
this.logger.LogError($"Failed to stream chat completion from OpenAI '{this.InstanceName}': {e.Message}");
}
if (streamReader is null)
yield break;
// Read the stream, line by line: // Read the stream, line by line:
while(!streamReader.EndOfStream) while(true)
{ {
try
{
if(streamReader.EndOfStream)
break;
}
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from OpenAI '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled: // Check if the token is canceled:
if(token.IsCancellationRequested) if(token.IsCancellationRequested)
yield break; yield break;
// Read the next line: // Read the next line:
var line = await streamReader.ReadLineAsync(token); string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from OpenAI '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines: // Skip empty lines:
if(string.IsNullOrWhiteSpace(line)) if(string.IsNullOrWhiteSpace(line))

View File

@ -67,7 +67,7 @@ public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostnam
MaxTokens = -1, MaxTokens = -1,
}, JSON_SERIALIZER_OPTIONS); }, JSON_SERIALIZER_OPTIONS);
StreamReader? streamReader = default; StreamReader? streamReader = null;
try try
{ {
async Task<HttpRequestMessage> RequestBuilder() async Task<HttpRequestMessage> RequestBuilder()
@ -103,55 +103,77 @@ public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostnam
this.logger.LogError($"Failed to stream chat completion from self-hosted provider '{this.InstanceName}': {e.Message}"); this.logger.LogError($"Failed to stream chat completion from self-hosted provider '{this.InstanceName}': {e.Message}");
} }
if (streamReader is not null) if (streamReader is null)
yield break;
// Read the stream, line by line:
while (true)
{ {
// Read the stream, line by line: try
while (!streamReader.EndOfStream)
{ {
// Check if the token is canceled: if(streamReader.EndOfStream)
if (token.IsCancellationRequested) break;
yield break;
// Read the next line:
var line = await streamReader.ReadLineAsync(token);
// Skip empty lines:
if (string.IsNullOrWhiteSpace(line))
continue;
// Skip lines that do not start with "data: ". Regard
// to the specification, we only want to read the data lines:
if (!line.StartsWith("data: ", StringComparison.InvariantCulture))
continue;
// Check if the line is the end of the stream:
if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture))
yield break;
ResponseStreamLine providerResponse;
try
{
// We know that the line starts with "data: ". Hence, we can
// skip the first 6 characters to get the JSON data after that.
var jsonData = line[6..];
// Deserialize the JSON data:
providerResponse = JsonSerializer.Deserialize<ResponseStreamLine>(jsonData, JSON_SERIALIZER_OPTIONS);
}
catch
{
// Skip invalid JSON data:
continue;
}
// Skip empty responses:
if (providerResponse == default || providerResponse.Choices.Count == 0)
continue;
// Yield the response:
yield return providerResponse.Choices[0].Delta.Content;
} }
catch (Exception e)
{
this.logger.LogWarning($"Failed to read the end-of-stream state from self-hosted provider '{this.InstanceName}': {e.Message}");
break;
}
// Check if the token is canceled:
if (token.IsCancellationRequested)
yield break;
// Read the next line:
string? line;
try
{
line = await streamReader.ReadLineAsync(token);
}
catch (Exception e)
{
this.logger.LogError($"Failed to read the stream from self-hosted provider '{this.InstanceName}': {e.Message}");
break;
}
// Skip empty lines:
if (string.IsNullOrWhiteSpace(line))
continue;
// Skip lines that do not start with "data: ". Regard
// to the specification, we only want to read the data lines:
if (!line.StartsWith("data: ", StringComparison.InvariantCulture))
continue;
// Check if the line is the end of the stream:
if (line.StartsWith("data: [DONE]", StringComparison.InvariantCulture))
yield break;
ResponseStreamLine providerResponse;
try
{
// We know that the line starts with "data: ". Hence, we can
// skip the first 6 characters to get the JSON data after that.
var jsonData = line[6..];
// Deserialize the JSON data:
providerResponse = JsonSerializer.Deserialize<ResponseStreamLine>(jsonData, JSON_SERIALIZER_OPTIONS);
}
catch
{
// Skip invalid JSON data:
continue;
}
// Skip empty responses:
if (providerResponse == default || providerResponse.Choices.Count == 0)
continue;
// Yield the response:
yield return providerResponse.Choices[0].Delta.Content;
} }
streamReader.Dispose();
} }
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously

View File

@ -2,5 +2,6 @@
- Added a button to remove a message from the chat thread. - Added a button to remove a message from the chat thread.
- Added a button to regenerate the last AI response. - Added a button to regenerate the last AI response.
- Added a button to edit the last user message. - Added a button to edit the last user message.
- Added a button to stop the AI from generating a response. - Added a button to stop the AI from generating more tokens.
- Improved the stream handling for all providers. The stream handling is now very resilient and handles all kinds of network issues.
- Fixed a streaming bug that was particularly visible with self-hosted providers. - Fixed a streaming bug that was particularly visible with self-hosted providers.