added a guid as a unique stream id to the file retrieval API

This commit is contained in:
krut_ni 2025-06-30 14:52:14 +02:00
parent 429f9a566e
commit 8240a83662
4 changed files with 52 additions and 35 deletions

View File

@ -38,7 +38,8 @@ public partial class ReadFileContent : MSGComponentBase
return;
}
var fileContent = await this.RustService.ReadArbitraryFileData(selectedFile.SelectedFilePath, int.MaxValue);
var streamId = Guid.NewGuid().ToString();
var fileContent = await this.RustService.ReadArbitraryFileData(selectedFile.SelectedFilePath, streamId, int.MaxValue);
await this.FileContentChanged.InvokeAsync(fileContent);
}
}

View File

@ -6,6 +6,9 @@ public class ContentStreamSseEvent
{
[JsonPropertyName("content")]
public string? Content { get; init; }
[JsonPropertyName("stream_id")]
public string? StreamId { get; init; }
[JsonPropertyName("metadata")]
public ContentStreamSseMetadata? Metadata { get; init; }

View File

@ -5,9 +5,9 @@ namespace AIStudio.Tools.Services;
public sealed partial class RustService
{
public async Task<string> ReadArbitraryFileData(string path, int maxChunks)
public async Task<string> ReadArbitraryFileData(string path, string streamId, int maxChunks)
{
var requestUri = $"/retrieval/fs/extract?path={Uri.EscapeDataString(path)}";
var requestUri = $"/retrieval/fs/extract?path={Uri.EscapeDataString(path)}&stream_id={streamId}";
var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
var response = await this.http.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);

View File

@ -23,8 +23,16 @@ use tokio_stream::wrappers::ReceiverStream;
#[derive(Debug, Serialize)]
pub struct Chunk {
pub content: String,
pub stream_id: String,
pub metadata: Metadata,
}
impl Chunk {
pub fn new(content: String, metadata: Metadata) -> Self {
Chunk { content, stream_id: String::new(), metadata }
}
pub fn set_stream_id(&mut self, stream_id: &str) { self.stream_id = stream_id.to_string(); }
}
#[derive(Debug, Serialize)]
pub enum Metadata {
@ -72,16 +80,21 @@ const IMAGE_SEGMENT_SIZE_IN_CHARS: usize = 8_192; // equivalent to ~ 5500 token
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type ChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>;
#[get("/retrieval/fs/extract?<path>")]
pub async fn extract_data(_token: APIToken, path: String, mut end: Shutdown) -> EventStream![] {
#[get("/retrieval/fs/extract?<path>&<stream_id>")]
pub async fn extract_data(_token: APIToken, path: String, stream_id: String, mut end: Shutdown) -> EventStream![] {
EventStream! {
let stream_result = stream_data(&path).await;
let id_ref = &stream_id;
match stream_result {
Ok(mut stream) => {
loop {
let chunk = select! {
chunk = stream.next() => match chunk {
Some(Ok(chunk)) => chunk,
Some(Ok(mut chunk)) => {
chunk.set_stream_id(id_ref);
chunk
},
Some(Err(e)) => {
yield Event::json(&format!("Error: {e}"));
break;
@ -171,10 +184,10 @@ async fn stream_text_file(file_path: &str) -> Result<ChunkStream> {
let stream = stream! {
while let Ok(Some(line)) = lines.next_line().await {
line_number += 1;
yield Ok(Chunk {
content: line,
metadata: Metadata::Text { line_number },
});
yield Ok(Chunk::new(
line,
Metadata::Text { line_number }
));
}
};
@ -204,10 +217,10 @@ async fn stream_pdf(file_path: &str) -> Result<ChunkStream> {
}
};
if tx.blocking_send(Ok(Chunk {
content,
metadata: Metadata::Pdf { page_number: num_page + 1 },
})).is_err() {
if tx.blocking_send(Ok(Chunk::new(
content,
Metadata::Pdf { page_number: num_page + 1 }
))).is_err() {
break;
}
}
@ -244,13 +257,13 @@ async fn stream_spreadsheet_as_csv(file_path: &str) -> Result<ChunkStream> {
.collect::<Vec<_>>()
.join(",");
if tx.blocking_send(Ok(Chunk {
if tx.blocking_send(Ok(Chunk::new(
content,
metadata: Metadata::Spreadsheet {
Metadata::Spreadsheet {
sheet_name: sheet_name.clone(),
row_number: row_idx + 1,
},
})).is_err() {
}
))).is_err() {
return;
}
}
@ -275,10 +288,10 @@ async fn convert_with_pandoc(
let stream = stream! {
if output.status.success() {
match String::from_utf8(output.stdout.clone()) {
Ok(content) => yield Ok(Chunk {
Ok(content) => yield Ok(Chunk::new(
content,
metadata: Metadata::Document {},
}),
Metadata::Document {}
)),
Err(e) => yield Err(e.into()),
}
} else {
@ -297,10 +310,10 @@ async fn chunk_image(file_path: &str) -> Result<ChunkStream> {
let base64 = general_purpose::STANDARD.encode(&data);
let stream = stream! {
yield Ok(Chunk {
content: base64,
metadata: Metadata::Image {},
});
yield Ok(Chunk::new(
base64,
Metadata::Image {},
));
};
Ok(Box::pin(stream))
@ -327,13 +340,13 @@ async fn stream_pptx(file_path: &str) -> Result<ChunkStream> {
match slide_result {
Ok(slide) => {
if let Some(md_content) = slide.convert_to_md() {
let chunk = Chunk {
content: md_content,
metadata: Metadata::Presentation {
let chunk = Chunk::new(
md_content,
Metadata::Presentation {
slide_number: slide.slide_number,
image: None,
},
};
}
);
if tx.send(Ok(chunk)).await.is_err() {
break;
@ -360,13 +373,13 @@ async fn stream_pptx(file_path: &str) -> Result<ChunkStream> {
is_end
);
let chunk = Chunk {
content: String::new(),
metadata: Metadata::Presentation {
let chunk = Chunk::new(
String::new(),
Metadata::Presentation {
slide_number: slide.slide_number,
image: Some(base64_image),
},
};
}
);
if tx.send(Ok(chunk)).await.is_err() {
break;