mirror of
https://github.com/MindWorkAI/AI-Studio.git
synced 2026-02-14 22:01:36 +00:00
Add semaphore timeout handling to prevent deadlocks
This commit is contained in:
parent
aabbb5c2c7
commit
b7105ac91a
@ -8,10 +8,14 @@ using AIStudio.Dialogs;
|
|||||||
using AIStudio.Settings;
|
using AIStudio.Settings;
|
||||||
using AIStudio.Tools.PluginSystem;
|
using AIStudio.Tools.PluginSystem;
|
||||||
|
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace AIStudio.Tools;
|
namespace AIStudio.Tools;
|
||||||
|
|
||||||
public static class WorkspaceBehaviour
|
public static class WorkspaceBehaviour
|
||||||
{
|
{
|
||||||
|
private static readonly ILogger LOG = Program.LOGGER_FACTORY.CreateLogger(nameof(WorkspaceBehaviour));
|
||||||
|
|
||||||
private static string TB(string fallbackEN) => I18N.I.T(fallbackEN, typeof(WorkspaceBehaviour).Namespace, nameof(WorkspaceBehaviour));
|
private static string TB(string fallbackEN) => I18N.I.T(fallbackEN, typeof(WorkspaceBehaviour).Namespace, nameof(WorkspaceBehaviour));
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -21,12 +25,39 @@ public static class WorkspaceBehaviour
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private static readonly ConcurrentDictionary<string, SemaphoreSlim> CHAT_STORAGE_SEMAPHORES = new();
|
private static readonly ConcurrentDictionary<string, SemaphoreSlim> CHAT_STORAGE_SEMAPHORES = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Timeout for acquiring the chat storage semaphore.
|
||||||
|
/// </summary>
|
||||||
|
private static readonly TimeSpan SEMAPHORE_TIMEOUT = TimeSpan.FromSeconds(6);
|
||||||
|
|
||||||
private static SemaphoreSlim GetChatSemaphore(Guid workspaceId, Guid chatId)
|
private static SemaphoreSlim GetChatSemaphore(Guid workspaceId, Guid chatId)
|
||||||
{
|
{
|
||||||
var key = $"{workspaceId}_{chatId}";
|
var key = $"{workspaceId}_{chatId}";
|
||||||
return CHAT_STORAGE_SEMAPHORES.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));
|
return CHAT_STORAGE_SEMAPHORES.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tries to acquire the chat storage semaphore within the configured timeout.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="workspaceId">The workspace ID.</param>
|
||||||
|
/// <param name="chatId">The chat ID.</param>
|
||||||
|
/// <param name="callerName">The name of the calling method for logging purposes.</param>
|
||||||
|
/// <returns>A tuple containing whether the semaphore was acquired and the semaphore instance.</returns>
|
||||||
|
private static async Task<(bool Acquired, SemaphoreSlim Semaphore)> TryAcquireChatSemaphoreAsync(Guid workspaceId, Guid chatId, string callerName)
|
||||||
|
{
|
||||||
|
var semaphore = GetChatSemaphore(workspaceId, chatId);
|
||||||
|
var acquired = await semaphore.WaitAsync(SEMAPHORE_TIMEOUT);
|
||||||
|
|
||||||
|
if (!acquired)
|
||||||
|
LOG.LogWarning("Failed to acquire chat storage semaphore within {Timeout} seconds for workspace '{WorkspaceId}', chat '{ChatId}' in method '{CallerName}'. Skipping operation to prevent potential race conditions or deadlocks.",
|
||||||
|
SEMAPHORE_TIMEOUT.TotalSeconds,
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
callerName);
|
||||||
|
|
||||||
|
return (acquired, semaphore);
|
||||||
|
}
|
||||||
|
|
||||||
public static readonly JsonSerializerOptions JSON_OPTIONS = new()
|
public static readonly JsonSerializerOptions JSON_OPTIONS = new()
|
||||||
{
|
{
|
||||||
WriteIndented = true,
|
WriteIndented = true,
|
||||||
@ -51,10 +82,11 @@ public static class WorkspaceBehaviour
|
|||||||
|
|
||||||
public static async Task StoreChat(ChatThread chat)
|
public static async Task StoreChat(ChatThread chat)
|
||||||
{
|
{
|
||||||
// Acquire the semaphore for this specific chat to prevent concurrent writes to the same file:
|
// Try to acquire the semaphore for this specific chat to prevent concurrent writes to the same file:
|
||||||
var semaphore = GetChatSemaphore(chat.WorkspaceId, chat.ChatId);
|
var (acquired, semaphore) = await TryAcquireChatSemaphoreAsync(chat.WorkspaceId, chat.ChatId, nameof(StoreChat));
|
||||||
await semaphore.WaitAsync();
|
if (!acquired)
|
||||||
|
return;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
string chatDirectory;
|
string chatDirectory;
|
||||||
@ -82,10 +114,11 @@ public static class WorkspaceBehaviour
|
|||||||
|
|
||||||
public static async Task<ChatThread?> LoadChat(LoadChat loadChat)
|
public static async Task<ChatThread?> LoadChat(LoadChat loadChat)
|
||||||
{
|
{
|
||||||
// Acquire the semaphore for this specific chat to prevent concurrent read/writes to the same file:
|
// Try to acquire the semaphore for this specific chat to prevent concurrent read/writes to the same file:
|
||||||
var semaphore = GetChatSemaphore(loadChat.WorkspaceId, loadChat.ChatId);
|
var (acquired, semaphore) = await TryAcquireChatSemaphoreAsync(loadChat.WorkspaceId, loadChat.ChatId, nameof(LoadChat));
|
||||||
await semaphore.WaitAsync();
|
if (!acquired)
|
||||||
|
return null;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var chatPath = loadChat.WorkspaceId == Guid.Empty
|
var chatPath = loadChat.WorkspaceId == Guid.Empty
|
||||||
@ -94,7 +127,7 @@ public static class WorkspaceBehaviour
|
|||||||
|
|
||||||
if(!Directory.Exists(chatPath))
|
if(!Directory.Exists(chatPath))
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
var chatData = await File.ReadAllTextAsync(Path.Join(chatPath, "thread.json"), Encoding.UTF8);
|
var chatData = await File.ReadAllTextAsync(Path.Join(chatPath, "thread.json"), Encoding.UTF8);
|
||||||
var chat = JsonSerializer.Deserialize<ChatThread>(chatData, JSON_OPTIONS);
|
var chat = JsonSerializer.Deserialize<ChatThread>(chatData, JSON_OPTIONS);
|
||||||
return chat;
|
return chat;
|
||||||
@ -177,10 +210,11 @@ public static class WorkspaceBehaviour
|
|||||||
else
|
else
|
||||||
chatDirectory = Path.Join(SettingsManager.DataDirectory, "workspaces", chat.WorkspaceId.ToString(), chat.ChatId.ToString());
|
chatDirectory = Path.Join(SettingsManager.DataDirectory, "workspaces", chat.WorkspaceId.ToString(), chat.ChatId.ToString());
|
||||||
|
|
||||||
// Acquire the semaphore to prevent deleting while another thread is writing:
|
// Try to acquire the semaphore to prevent deleting while another thread is writing:
|
||||||
var semaphore = GetChatSemaphore(workspaceId, chatId);
|
var (acquired, semaphore) = await TryAcquireChatSemaphoreAsync(workspaceId, chatId, nameof(DeleteChat));
|
||||||
await semaphore.WaitAsync();
|
if (!acquired)
|
||||||
|
return;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Directory.Delete(chatDirectory, true);
|
Directory.Delete(chatDirectory, true);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user