Implemented rate limit handling for providers

This commit is contained in:
Thorsten Sommer 2024-12-31 19:29:38 +01:00
parent e932c21709
commit 45ca0cdb23
Signed by: tsommer
GPG Key ID: 371BBA77A02C0108
10 changed files with 217 additions and 101 deletions

View File

@ -59,6 +59,8 @@ public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://ap
Stream = true,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "messages");
@ -70,14 +72,19 @@ public sealed class ProviderAnthropic(ILogger logger) : BaseProvider("https://ap
// Set the content:
request.Content = new StringContent(chatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Anthropic chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var stream = await response.Content.ReadAsStreamAsync(token);
var stream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(stream);

View File

@ -74,4 +74,47 @@ public abstract class BaseProvider : IProvider, ISecretId
public string SecretName => this.InstanceName;
#endregion
/// <summary>
/// Sends a request and handles rate limiting by exponential backoff.
/// </summary>
/// <param name="requestBuilder">A function that builds the request.</param>
/// <param name="token">The cancellation token.</param>
/// <returns>The status object of the request.</returns>
protected async Task<HttpRateLimitedStreamResult> SendRequest(Func<Task<HttpRequestMessage>> requestBuilder, CancellationToken token = default)
{
const int MAX_RETRIES = 6;
const double RETRY_DELAY_SECONDS = 4;
var retry = 0;
var response = default(HttpResponseMessage);
var errorMessage = string.Empty;
while (retry++ < MAX_RETRIES)
{
using var request = await requestBuilder();
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var nextResponse = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
if (nextResponse.IsSuccessStatusCode)
{
response = nextResponse;
break;
}
errorMessage = nextResponse.ReasonPhrase;
var timeSeconds = Math.Pow(RETRY_DELAY_SECONDS, retry + 1);
if(timeSeconds > 90)
timeSeconds = 90;
this.logger.LogDebug($"Failed request with status code {nextResponse.StatusCode} (message = '{errorMessage}'). Retrying in {timeSeconds:0.00} seconds.");
await Task.Delay(TimeSpan.FromSeconds(timeSeconds), token);
}
if(retry >= MAX_RETRIES)
return new HttpRateLimitedStreamResult(false, true, errorMessage ?? $"Failed after {MAX_RETRIES} retries; no provider message available", response);
return new HttpRateLimitedStreamResult(true, false, string.Empty, response);
}
}

View File

@ -68,6 +68,8 @@ public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.firew
Stream = true,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
@ -76,14 +78,19 @@ public class ProviderFireworks(ILogger logger) : BaseProvider("https://api.firew
// Set the content:
request.Content = new StringContent(fireworksChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Fireworks chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var fireworksStream = await response.Content.ReadAsStreamAsync(token);
var fireworksStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(fireworksStream);

View File

@ -69,6 +69,8 @@ public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativela
Stream = true,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
@ -77,14 +79,19 @@ public class ProviderGoogle(ILogger logger) : BaseProvider("https://generativela
// Set the content:
request.Content = new StringContent(geminiChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Google chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var geminiStream = await response.Content.ReadAsStreamAsync(token);
var geminiStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(geminiStream);

View File

@ -71,6 +71,8 @@ public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/o
Stream = true,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
@ -79,14 +81,19 @@ public class ProviderGroq(ILogger logger) : BaseProvider("https://api.groq.com/o
// Set the content:
request.Content = new StringContent(groqChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Groq chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var groqStream = await response.Content.ReadAsStreamAsync(token);
var groqStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(groqStream);

View File

@ -70,6 +70,8 @@ public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api.
SafePrompt = false,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
@ -78,14 +80,19 @@ public sealed class ProviderMistral(ILogger logger) : BaseProvider("https://api.
// Set the content:
request.Content = new StringContent(mistralChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Mistral chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var mistralStream = await response.Content.ReadAsStreamAsync(token);
var mistralStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(mistralStream);

View File

@ -74,6 +74,8 @@ public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.o
FrequencyPenalty = 0f,
}, JSON_SERIALIZER_OPTIONS);
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, "chat/completions");
@ -82,14 +84,19 @@ public sealed class ProviderOpenAI(ILogger logger) : BaseProvider("https://api.o
// Set the content:
request.Content = new StringContent(openAIChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"OpenAI chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var openAIStream = await response.Content.ReadAsStreamAsync(token);
var openAIStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
var streamReader = new StreamReader(openAIStream);

View File

@ -68,6 +68,8 @@ public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostnam
StreamReader? streamReader = default;
try
{
async Task<HttpRequestMessage> RequestBuilder()
{
// Build the HTTP post request:
var request = new HttpRequestMessage(HttpMethod.Post, host.ChatURL());
@ -78,14 +80,19 @@ public sealed class ProviderSelfHosted(ILogger logger, Host host, string hostnam
// Set the content:
request.Content = new StringContent(providerChatRequest, Encoding.UTF8, "application/json");
return request;
}
// Send the request with the ResponseHeadersRead option.
// This allows us to read the stream as soon as the headers are received.
// This is important because we want to stream the responses.
var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
// Send the request using exponential backoff:
using var responseData = await this.SendRequest(RequestBuilder, token);
if(responseData.IsFailedAfterAllRetries)
{
this.logger.LogError($"Self-hosted provider's chat completion failed: {responseData.ErrorMessage}");
yield break;
}
// Open the response stream:
var providerStream = await response.Content.ReadAsStreamAsync(token);
var providerStream = await responseData.Response!.Content.ReadAsStreamAsync(token);
// Add a stream reader to read the stream, line by line:
streamReader = new StreamReader(providerStream);

View File

@ -0,0 +1,23 @@
namespace AIStudio.Tools;
/// <summary>
/// The result of a rate-limited HTTP stream.
/// </summary>
/// <param name="IsFailedAfterAllRetries">True, when the stream failed after all retries.</param>
/// <param name="ErrorMessage">The error message which we might show to the user.</param>
/// <param name="Response">The response from the server.</param>
public readonly record struct HttpRateLimitedStreamResult(
bool IsSuccessful,
bool IsFailedAfterAllRetries,
string ErrorMessage,
HttpResponseMessage? Response) : IDisposable
{
#region IDisposable
public void Dispose()
{
this.Response?.Dispose();
}
#endregion
}

View File

@ -1,3 +1,4 @@
# v0.9.23, build 198 (2024-12-xx xx:xx UTC)
- Added an ERI server coding assistant as a preview feature behind the RAG feature flag. This helps you implement an ERI server to gain access to, e.g., your enterprise data from within AI Studio.
- Improved provider requests by handling rate limits by retrying requests.
- Fixed layout issues when selecting `other` items (e.g., programming languages)