diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 21d28d93..3b65d881 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -15,7 +15,7 @@ serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.134" keyring = { version = "3.6.1", features = ["apple-native", "windows-native", "sync-secret-service"] } arboard = "3.4.1" -tokio = { version = "1.42", features = ["rt", "rt-multi-thread", "macros"] } +tokio = { version = "1.42", features = ["rt", "rt-multi-thread", "macros", "process"] } flexi_logger = "0.29.8" log = { version = "0.4", features = ["kv"] } once_cell = "1.20" @@ -31,12 +31,14 @@ hmac = "0.12.1" sha2 = "0.10.8" rcgen = { version = "0.13.2", features = ["pem"] } file-format = "0.26.0" -calamine = { version = "0.26.1", features = ["dates"] } -chrono = "0.4.39" +calamine = "0.22" pdfium-render = "0.8.27" # Fixes security vulnerability downstream, where the upstream is not fixed yet: url = "2.5" +async-stream = "0.3" +futures = "0.3" +tokio-stream = "0.1" [target.'cfg(target_os = "linux")'.dependencies] # See issue https://github.com/tauri-apps/tauri/issues/4470 diff --git a/runtime/src/file_data.rs b/runtime/src/file_data.rs index 8701eeab..d6a68400 100644 --- a/runtime/src/file_data.rs +++ b/runtime/src/file_data.rs @@ -1,30 +1,28 @@ use std::path::Path; -use std::io::{BufRead, BufReader}; +use std::pin::Pin; +use async_stream::stream; use base64::{engine::general_purpose, Engine as _}; use calamine::{open_workbook_auto, Reader}; use file_format::{FileFormat, Kind}; +use futures::{Stream, StreamExt}; use pdfium_render::prelude::Pdfium; -use std::error::Error; - -use std::fs::File; -use std::io::Read; -use std::process::Command; -use rocket::post; -use rocket::response::stream::{Event, EventStream}; +use tokio::io::AsyncBufReadExt; +use tokio::process::Command; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use rocket::{State, Shutdown}; -use rocket::fs::{relative, FileServer}; -use rocket::form::Form; -use rocket::serde::{Serialize, Deserialize}; -use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError}; +use rocket::response::stream::{EventStream, Event}; use rocket::tokio::select; +use rocket::serde::Serialize; +use rocket::get; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct Chunk { pub content: String, pub metadata: Metadata, } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub enum Metadata { Text { line_number: usize }, Pdf { page_number: usize }, @@ -37,289 +35,233 @@ const TO_MARKDOWN: &str = "markdown"; const DOCX: &str = "docx"; const ODT: &str = "odt"; -#[post("/system/file-data/extract", data = "")] -pub async fn extract_file_data( - file_path: String, - queue: &State>, - mut shutdown: Shutdown -) -> EventStream![] { - let mut rx = queue.subscribe(); - let path = file_path.clone(); - - // Start extraction in a separate task - let extractor = rocket::tokio::task::spawn_blocking(move || { - stream_data(&path).map(|iter| { - iter.map(|chunk| { - chunk.map_err(|e| format!("Chunk error: {}", e)) - }) - }) - }); +type Result = std::result::Result>; +type ChunkStream = Pin> + Send>>; +#[get("/system/file-data/extract?")] +pub async fn extract_data(path: String, mut end: Shutdown) -> EventStream![] { EventStream! { - let mut extraction_stream = match extractor.await { - Ok(Ok(stream)) => stream, - Ok(Err(e)) => { - yield Event::json(&ExtractEvent::Error(e.to_string())); - return; - } + let stream_result = stream_data(&path).await; + + match stream_result { + Ok(mut stream) => { + loop { + let chunk = select! { + chunk = stream.next() => match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + yield Event::json(&format!("Error: {}", e)); + break; + }, + None => break, + }, + _ = &mut end => break, + }; + + yield Event::json(&chunk); + } + }, Err(e) => { - yield Event::json(&ExtractEvent::Error(format!("Task failed: {}", e))); + yield Event::json(&format!("Error starting stream: {}", e)); + } + } + } +} + +async fn stream_data(file_path: &str) -> Result { + if !Path::new(file_path).exists() { + return Err("File does not exist.".into()); + } + + let file_path_clone = file_path.to_owned(); + + let fmt = tokio::task::spawn_blocking(move || { + FileFormat::from_file(&file_path_clone) + }).await??; + + let ext = file_path.split('.').last().unwrap_or(""); + + let stream = match ext { + DOCX | ODT => { + let from = if ext == DOCX { "docx" } else { "odt" }; + convert_with_pandoc(file_path, from, TO_MARKDOWN).await? + } + "xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => { + stream_spreadsheet_as_csv(file_path).await? + } + _ => match fmt.kind() { + Kind::Document => match fmt { + FileFormat::PortableDocumentFormat => read_pdf(file_path).await?, + FileFormat::MicrosoftWordDocument => { + convert_with_pandoc(file_path, "docx", TO_MARKDOWN).await? + } + FileFormat::OfficeOpenXmlDocument => { + convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? + } + _ => stream_text_file(file_path).await?, + }, + Kind::Ebook => return Err("Ebooks not yet supported".into()), + Kind::Image => chunk_image(file_path).await?, + Kind::Other => match fmt { + FileFormat::HypertextMarkupLanguage => { + convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? + } + _ => stream_text_file(file_path).await?, + }, + Kind::Presentation => match fmt { + FileFormat::OfficeOpenXmlPresentation => { + convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? + } + _ => stream_text_file(file_path).await?, + }, + Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path).await?, + _ => stream_text_file(file_path).await?, + }, + }; + + Ok(Box::pin(stream)) +} + +async fn stream_text_file(file_path: &str) -> Result { + let file = tokio::fs::File::open(file_path).await?; + let reader = tokio::io::BufReader::new(file); + let mut lines = reader.lines(); + let mut line_number = 0; + + let stream = stream! { + while let Ok(Some(line)) = lines.next_line().await { // Korrektur hier + line_number += 1; + yield Ok(Chunk { + content: line, + metadata: Metadata::Text { line_number }, + }); + } + }; + + Ok(Box::pin(stream)) +} + +async fn read_pdf(file_path: &str) -> Result { + let path = file_path.to_owned(); + let (tx, rx) = mpsc::channel(10); + + tokio::task::spawn_blocking(move || { + let pdfium = Pdfium::default(); + let doc = match pdfium.load_pdf_from_file(&path, None) { + Ok(d) => d, + Err(e) => { + let _ = tx.blocking_send(Err(e.into())); return; } }; - loop { - let chunk = select! { - chunk = extraction_stream.next() => chunk, - _ = &mut shutdown => break, + for (i, page) in doc.pages().iter().enumerate() { + let content = match page.text().and_then(|t| Ok(t.all())) { + Ok(c) => c, + Err(e) => { + let _ = tx.blocking_send(Err(e.into())); + continue; + } }; - match chunk { - Some(Ok(chunk)) => { - let event = ExtractEvent::Chunk { - content: chunk.content, - metadata: match chunk.metadata { - Metadata::Text { line_number } => - MetadataRepr::Text { line_number }, - Metadata::Pdf { page_number } => - MetadataRepr::Pdf { page_number }, - } - }; - yield Event::json(&event); - } - Some(Err(e)) => { - yield Event::json(&ExtractEvent::Error(e)); - } - None => break, - } - } - - yield Event::json(&ExtractEvent::Completed); - } -} - - -// Serialisierbare Datentypen -#[derive(serde::Serialize)] -#[serde(tag = "type", content = "data")] -enum ExtractEvent { - Chunk { - content: String, - metadata: MetadataRepr, - }, - Error(String), - Completed, -} - -#[derive(serde::Serialize)] -#[serde(tag = "type")] -enum MetadataRepr { - Text { line_number: usize }, - Pdf { page_number: usize }, - Spreadsheet { sheet_name: String, row_number: usize }, - Document, - Image, -} - - -/// Streams the content of a file in chunks with format-specific metadata. -/// -/// Takes a file path as input and returns a stream of chunks containing -/// content segments with associated metadata. Supports various file types -/// including documents, spreadsheets, presentations, images, and PDFs. -/// -/// The streaming process works as follows: -/// - Verifies the file exists -/// - Detects the file format using content and extension -/// - Processes content incrementally based on format: -/// - Text files: Streams line by line with line numbers -/// - PDFs: Extracts text page by page with page numbers -/// - Spreadsheets: Outputs rows with sheet names and row numbers -/// - Office documents: Converts to Markdown as single chunk -/// - Images: Returns Base64 encoding as single chunk -/// - HTML files: Converts to Markdown as single chunk -/// -/// # Parameters -/// - `file_path`: Path to the file to process (platform independent) -/// -/// # Returns -/// Returns a `Result` containing: -/// - `Ok`: Boxed iterator yielding `Result` items -/// - `Err`: Initial processing error (e.g., file not found) -/// -/// Each iterator item represents either: -/// - `Ok(Chunk)`: Content segment with metadata -/// - `Err`: Error during chunk processing -/// -/// # Chunk Structure -/// - `content`: Text segment or Base64 image data -/// - `metadata`: Context information including: -/// - Line numbers for text files -/// - Page numbers for PDFs -/// - Sheet/row numbers for spreadsheets -/// - Document type markers for office formats -/// - Image type marker for images -/// -/// # Errors -/// - Initial errors: File not found, format detection failures -/// - Chunk-level errors: Format-specific parsing errors -/// - Pandoc conversion failures for office documents -/// -/// # Examples -/// ``` -/// let chunk_stream = stream_data("data.txt")?; -/// for chunk_result in chunk_stream { -/// match chunk_result { -/// Ok(chunk) => { -/// println!("Metadata: {:?}", chunk.metadata); -/// println!("Content: {}", chunk.content); -/// } -/// Err(e) => eprintln!("Error: {}", e), -/// } -/// } -/// ``` -fn stream_data( - file_path: &str, -) -> Result>>>, Box> { - if !Path::new(file_path).exists() { - return Err(Box::from("File does not exist.")); - } - - let fmt = FileFormat::from_file(file_path)?; - let ext = file_path.split('.').last().unwrap_or(""); - - match ext { - DOCX | ODT => { - let from = if ext == DOCX { "docx" } else { "odt" }; - convert_with_pandoc(file_path, from, TO_MARKDOWN) - } - "xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => { - stream_spreadsheet_as_csv(file_path) - } - _ => match fmt.kind() { - Kind::Document => match fmt { - FileFormat::PortableDocumentFormat => read_pdf(file_path), - FileFormat::MicrosoftWordDocument => { - convert_with_pandoc(file_path, "docx", TO_MARKDOWN) - } - FileFormat::OfficeOpenXmlDocument => { - convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN) - } - _ => stream_text_file(file_path), - }, - Kind::Ebook => Err(Box::from("Ebooks not yet supported")), - Kind::Image => chunk_image(file_path), - Kind::Other => match fmt { - FileFormat::HypertextMarkupLanguage => { - convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN) - } - _ => stream_text_file(file_path), - }, - Kind::Presentation => match fmt { - FileFormat::OfficeOpenXmlPresentation => { - convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN) - } - _ => stream_text_file(file_path), - }, - Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path), - _ => stream_text_file(file_path), - }, - } -} - -fn stream_text_file(file_path: &str) -> Result>>>, Box> { - let file = File::open(file_path)?; - let reader = BufReader::new(file); - let iter = reader.lines() - .enumerate() - .map(|(i, line)| { - Ok(Chunk { - content: line?, - metadata: Metadata::Text { line_number: i + 1 }, - }) - }); - Ok(Box::new(iter)) -} - -fn read_pdf(file_path: &str) -> Result>>>, Box> { - let pdfium = Pdfium::default(); - let doc = pdfium.load_pdf_from_file(file_path, None)?; - let pages = doc.pages(); - let chunks: Vec<_> = pages.iter() - .enumerate() - .map(|(i, page)| { - let content = page.text()?.all(); - Ok(Chunk { + if tx.blocking_send(Ok(Chunk { content, metadata: Metadata::Pdf { page_number: i + 1 }, - }) - }) - .collect(); - Ok(Box::new(chunks.into_iter())) -} - -fn stream_spreadsheet_as_csv(file_path: &str) -> Result>>>, Box> { - let mut workbook = open_workbook_auto(file_path)?; - let mut chunks = Vec::new(); - - for sheet_name in workbook.sheet_names() { - let range = workbook.worksheet_range(&sheet_name)?; - for (row_idx, row) in range.rows().enumerate() { - let content = row.iter() - .map(|cell| cell.to_string()) - .collect::>() - .join(","); - chunks.push(Ok(Chunk { - content, - metadata: Metadata::Spreadsheet { - sheet_name: sheet_name.clone(), - row_number: row_idx + 1, - }, - })); + })).is_err() { + break; + } } - } - Ok(Box::new(chunks.into_iter())) + }); + + Ok(Box::pin(ReceiverStream::new(rx))) } -fn convert_with_pandoc( +async fn stream_spreadsheet_as_csv(file_path: &str) -> Result { + let path = file_path.to_owned(); + let (tx, rx) = mpsc::channel(10); + + tokio::task::spawn_blocking(move || { + let mut workbook = match open_workbook_auto(&path) { + Ok(w) => w, + Err(e) => { + let _ = tx.blocking_send(Err(e.into())); + return; + } + }; + + for sheet_name in workbook.sheet_names() { + let range = match workbook.worksheet_range(&sheet_name) { + Some(Ok(r)) => r, + Some(Err(e)) => { + let _ = tx.blocking_send(Err(e.into())); + continue; + } + None => continue, + }; + + for (row_idx, row) in range.rows().enumerate() { + let content = row.iter() + .map(|cell| cell.to_string()) + .collect::>() + .join(","); + + if tx.blocking_send(Ok(Chunk { + content, + metadata: Metadata::Spreadsheet { + sheet_name: sheet_name.clone(), + row_number: row_idx + 1, + }, + })).is_err() { + return; + } + } + } + }); + + Ok(Box::pin(ReceiverStream::new(rx))) +} + +async fn convert_with_pandoc( file_path: &str, from: &str, to: &str, -) -> Result>>>, Box> { +) -> Result { let output = Command::new("pandoc") .arg(file_path) .args(&["-f", from, "-t", to]) - .output()?; - if output.status.success() { - let content = String::from_utf8(output.stdout)?; - Ok(Box::new(std::iter::once(Ok(Chunk { - content, - metadata: Metadata::Document, - })))) - } else { - Err(Box::from(String::from_utf8_lossy(&output.stderr).into_owned())) - } -} + .output() + .await?; -fn read_img_as_base64(file_path: &str) -> Result> { - let img_result = File::open(file_path); - - match img_result { - Ok(mut img) => { - let mut buff = Vec::new(); - img.read_to_end(&mut buff)?; - - let base64 = general_purpose::STANDARD.encode(&buff); - Ok(base64) + let stream = stream! { + if output.status.success() { + match String::from_utf8(output.stdout.clone()) { + Ok(content) => yield Ok(Chunk { + content, + metadata: Metadata::Document, + }), + Err(e) => yield Err(e.into()), + } + } else { + yield Err(format!( + "Pandoc error: {}", + String::from_utf8_lossy(&output.stderr) + ).into()); } - Err(e) => Err(Box::from(format!("{}", e))), - } + }; + + Ok(Box::pin(stream)) } -fn chunk_image(file_path: &str) -> Result>>>, Box> { - let base64 = read_img_as_base64(file_path)?; - Ok(Box::new(std::iter::once(Ok(Chunk { - content: base64, - metadata: Metadata::Image, - })))) -} +async fn chunk_image(file_path: &str) -> Result { + let data = tokio::fs::read(file_path).await?; + let base64 = general_purpose::STANDARD.encode(&data); + + let stream = stream! { + yield Ok(Chunk { + content: base64, + metadata: Metadata::Image, + }); + }; + + Ok(Box::pin(stream)) +} \ No newline at end of file diff --git a/runtime/src/runtime_api.rs b/runtime/src/runtime_api.rs index b536c328..b6c18bbb 100644 --- a/runtime/src/runtime_api.rs +++ b/runtime/src/runtime_api.rs @@ -90,7 +90,7 @@ pub fn start_runtime_api() { crate::secret::delete_secret, crate::environment::get_data_directory, crate::environment::get_config_directory, - crate::file_data::extract_file_data, + crate::file_data::extract_data, ]) .ignite().await.unwrap() .launch().await.unwrap();