From 8240a83662d942af58a465bcec958e9ad7e24f51 Mon Sep 17 00:00:00 2001 From: krut_ni Date: Mon, 30 Jun 2025 14:52:14 +0200 Subject: [PATCH] added a guid as a unique stream id to the file retrieval API --- .../Components/ReadFileContent.razor.cs | 3 +- .../Tools/ContentStreamSseEvent.cs | 3 + .../Tools/Services/RustService.Retrieval.cs | 4 +- runtime/src/file_data.rs | 77 +++++++++++-------- 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/app/MindWork AI Studio/Components/ReadFileContent.razor.cs b/app/MindWork AI Studio/Components/ReadFileContent.razor.cs index 86bafebe..f01b8bc8 100644 --- a/app/MindWork AI Studio/Components/ReadFileContent.razor.cs +++ b/app/MindWork AI Studio/Components/ReadFileContent.razor.cs @@ -38,7 +38,8 @@ public partial class ReadFileContent : MSGComponentBase return; } - var fileContent = await this.RustService.ReadArbitraryFileData(selectedFile.SelectedFilePath, int.MaxValue); + var streamId = Guid.NewGuid().ToString(); + var fileContent = await this.RustService.ReadArbitraryFileData(selectedFile.SelectedFilePath, streamId, int.MaxValue); await this.FileContentChanged.InvokeAsync(fileContent); } } \ No newline at end of file diff --git a/app/MindWork AI Studio/Tools/ContentStreamSseEvent.cs b/app/MindWork AI Studio/Tools/ContentStreamSseEvent.cs index 63f05fc0..cb804bef 100644 --- a/app/MindWork AI Studio/Tools/ContentStreamSseEvent.cs +++ b/app/MindWork AI Studio/Tools/ContentStreamSseEvent.cs @@ -6,6 +6,9 @@ public class ContentStreamSseEvent { [JsonPropertyName("content")] public string? Content { get; init; } + + [JsonPropertyName("stream_id")] + public string? StreamId { get; init; } [JsonPropertyName("metadata")] public ContentStreamSseMetadata? Metadata { get; init; } diff --git a/app/MindWork AI Studio/Tools/Services/RustService.Retrieval.cs b/app/MindWork AI Studio/Tools/Services/RustService.Retrieval.cs index 73223b58..3d43eb72 100644 --- a/app/MindWork AI Studio/Tools/Services/RustService.Retrieval.cs +++ b/app/MindWork AI Studio/Tools/Services/RustService.Retrieval.cs @@ -5,9 +5,9 @@ namespace AIStudio.Tools.Services; public sealed partial class RustService { - public async Task ReadArbitraryFileData(string path, int maxChunks) + public async Task ReadArbitraryFileData(string path, string streamId, int maxChunks) { - var requestUri = $"/retrieval/fs/extract?path={Uri.EscapeDataString(path)}"; + var requestUri = $"/retrieval/fs/extract?path={Uri.EscapeDataString(path)}&stream_id={streamId}"; var request = new HttpRequestMessage(HttpMethod.Get, requestUri); var response = await this.http.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); diff --git a/runtime/src/file_data.rs b/runtime/src/file_data.rs index b52efed2..7333f963 100644 --- a/runtime/src/file_data.rs +++ b/runtime/src/file_data.rs @@ -23,8 +23,16 @@ use tokio_stream::wrappers::ReceiverStream; #[derive(Debug, Serialize)] pub struct Chunk { pub content: String, + pub stream_id: String, pub metadata: Metadata, } +impl Chunk { + pub fn new(content: String, metadata: Metadata) -> Self { + Chunk { content, stream_id: String::new(), metadata } + } + + pub fn set_stream_id(&mut self, stream_id: &str) { self.stream_id = stream_id.to_string(); } +} #[derive(Debug, Serialize)] pub enum Metadata { @@ -72,16 +80,21 @@ const IMAGE_SEGMENT_SIZE_IN_CHARS: usize = 8_192; // equivalent to ~ 5500 token type Result = std::result::Result>; type ChunkStream = Pin> + Send>>; -#[get("/retrieval/fs/extract?")] -pub async fn extract_data(_token: APIToken, path: String, mut end: Shutdown) -> EventStream![] { +#[get("/retrieval/fs/extract?&")] +pub async fn extract_data(_token: APIToken, path: String, stream_id: String, mut end: Shutdown) -> EventStream![] { EventStream! { let stream_result = stream_data(&path).await; + let id_ref = &stream_id; + match stream_result { Ok(mut stream) => { loop { let chunk = select! { chunk = stream.next() => match chunk { - Some(Ok(chunk)) => chunk, + Some(Ok(mut chunk)) => { + chunk.set_stream_id(id_ref); + chunk + }, Some(Err(e)) => { yield Event::json(&format!("Error: {e}")); break; @@ -171,10 +184,10 @@ async fn stream_text_file(file_path: &str) -> Result { let stream = stream! { while let Ok(Some(line)) = lines.next_line().await { line_number += 1; - yield Ok(Chunk { - content: line, - metadata: Metadata::Text { line_number }, - }); + yield Ok(Chunk::new( + line, + Metadata::Text { line_number } + )); } }; @@ -204,10 +217,10 @@ async fn stream_pdf(file_path: &str) -> Result { } }; - if tx.blocking_send(Ok(Chunk { - content, - metadata: Metadata::Pdf { page_number: num_page + 1 }, - })).is_err() { + if tx.blocking_send(Ok(Chunk::new( + content, + Metadata::Pdf { page_number: num_page + 1 } + ))).is_err() { break; } } @@ -244,13 +257,13 @@ async fn stream_spreadsheet_as_csv(file_path: &str) -> Result { .collect::>() .join(","); - if tx.blocking_send(Ok(Chunk { + if tx.blocking_send(Ok(Chunk::new( content, - metadata: Metadata::Spreadsheet { + Metadata::Spreadsheet { sheet_name: sheet_name.clone(), row_number: row_idx + 1, - }, - })).is_err() { + } + ))).is_err() { return; } } @@ -275,10 +288,10 @@ async fn convert_with_pandoc( let stream = stream! { if output.status.success() { match String::from_utf8(output.stdout.clone()) { - Ok(content) => yield Ok(Chunk { + Ok(content) => yield Ok(Chunk::new( content, - metadata: Metadata::Document {}, - }), + Metadata::Document {} + )), Err(e) => yield Err(e.into()), } } else { @@ -297,10 +310,10 @@ async fn chunk_image(file_path: &str) -> Result { let base64 = general_purpose::STANDARD.encode(&data); let stream = stream! { - yield Ok(Chunk { - content: base64, - metadata: Metadata::Image {}, - }); + yield Ok(Chunk::new( + base64, + Metadata::Image {}, + )); }; Ok(Box::pin(stream)) @@ -327,13 +340,13 @@ async fn stream_pptx(file_path: &str) -> Result { match slide_result { Ok(slide) => { if let Some(md_content) = slide.convert_to_md() { - let chunk = Chunk { - content: md_content, - metadata: Metadata::Presentation { + let chunk = Chunk::new( + md_content, + Metadata::Presentation { slide_number: slide.slide_number, image: None, - }, - }; + } + ); if tx.send(Ok(chunk)).await.is_err() { break; @@ -360,13 +373,13 @@ async fn stream_pptx(file_path: &str) -> Result { is_end ); - let chunk = Chunk { - content: String::new(), - metadata: Metadata::Presentation { + let chunk = Chunk::new( + String::new(), + Metadata::Presentation { slide_number: slide.slide_number, image: Some(base64_image), - }, - }; + } + ); if tx.send(Ok(chunk)).await.is_err() { break;