use std::cmp::min; use crate::api_token::APIToken; use crate::pandoc::PandocProcessBuilder; use crate::pdfium::PdfiumInit; use async_stream::stream; use base64::{engine::general_purpose, Engine as _}; use calamine::{open_workbook_auto, Reader}; use file_format::{FileFormat, Kind}; use futures::{Stream, StreamExt}; use pdfium_render::prelude::Pdfium; use pptx_to_md::{ImageHandlingMode, ParserConfig, PptxContainer}; use rocket::get; use rocket::response::stream::{Event, EventStream}; use rocket::serde::Serialize; use rocket::tokio::select; use rocket::Shutdown; use std::path::Path; use std::pin::Pin; use tokio::io::AsyncBufReadExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; #[derive(Debug, Serialize)] pub struct Chunk { pub content: String, pub metadata: Metadata, } #[derive(Debug, Serialize)] pub enum Metadata { Text { line_number: usize }, Pdf { page_number: usize }, Spreadsheet { sheet_name: String, row_number: usize }, Document, Image, Presentation { slide_number: u32, image: Option, }, } #[derive(Debug, Serialize)] pub struct Base64Image { pub id: String, pub content: String, pub segment: usize, pub is_end: bool } impl Base64Image { fn new(id: String, content: String, segment: usize, is_end: bool) -> Self { Self { id, content, segment, is_end } } Document {}, Image {}, } const TO_MARKDOWN: &str = "markdown"; const DOCX: &str = "docx"; const ODT: &str = "odt"; const IMAGE_SEGMENT_SIZE_IN_CHARS: usize = 8_192; // equivalent to ~ 5500 token type Result = std::result::Result>; type ChunkStream = Pin> + Send>>; #[get("/retrieval/fs/extract?")] pub async fn extract_data(_token: APIToken, path: String, mut end: Shutdown) -> EventStream![] { EventStream! { let stream_result = stream_data(&path).await; match stream_result { Ok(mut stream) => { loop { let chunk = select! { chunk = stream.next() => match chunk { Some(Ok(chunk)) => chunk, Some(Err(e)) => { yield Event::json(&format!("Error: {}", e)); break; }, None => break, }, _ = &mut end => break, }; yield Event::json(&chunk); } }, Err(e) => { yield Event::json(&format!("Error starting stream: {}", e)); } } } } async fn stream_data(file_path: &str) -> Result { if !Path::new(file_path).exists() { return Err("File does not exist.".into()); } let file_path_clone = file_path.to_owned(); let fmt = tokio::task::spawn_blocking(move || { FileFormat::from_file(&file_path_clone) }).await??; let ext = file_path.split('.').next_back().unwrap_or(""); let stream = match ext { DOCX | ODT => { let from = if ext == DOCX { "docx" } else { "odt" }; convert_with_pandoc(file_path, from, TO_MARKDOWN).await? } "pptx" => stream_pptx(file_path).await?, "xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => { stream_spreadsheet_as_csv(file_path).await? } _ => match fmt.kind() { Kind::Document => match fmt { FileFormat::PortableDocumentFormat => stream_pdf(file_path).await?, FileFormat::MicrosoftWordDocument => { convert_with_pandoc(file_path, "docx", TO_MARKDOWN).await? } FileFormat::OfficeOpenXmlDocument => { convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? } _ => stream_text_file(file_path).await?, }, Kind::Ebook => return Err("Ebooks not yet supported".into()), Kind::Image => chunk_image(file_path).await?, Kind::Other => match fmt { FileFormat::HypertextMarkupLanguage => { convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? } _ => stream_text_file(file_path).await?, }, Kind::Presentation => match fmt { FileFormat::OfficeOpenXmlPresentation => { stream_pptx(file_path).await? } _ => stream_text_file(file_path).await?, }, Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path).await?, _ => stream_text_file(file_path).await?, }, }; Ok(Box::pin(stream)) } async fn stream_text_file(file_path: &str) -> Result { let file = tokio::fs::File::open(file_path).await?; let reader = tokio::io::BufReader::new(file); let mut lines = reader.lines(); let mut line_number = 0; let stream = stream! { while let Ok(Some(line)) = lines.next_line().await { line_number += 1; yield Ok(Chunk { content: line, metadata: Metadata::Text { line_number }, }); } }; Ok(Box::pin(stream)) } #[get("/retrieval/fs/read/pdf?")] pub fn read_pdf(_token: APIToken, file_path: String) -> String { let pdfium = Pdfium::ai_studio_init(); let doc = match pdfium.load_pdf_from_file(&file_path, None) { Ok(document) => document, Err(e) => return e.to_string(), }; let mut pdf_content = String::new(); for page in doc.pages().iter() { let content = match page.text().map(|text_content| text_content.all()) { Ok(content) => content, Err(_) => { continue } }; pdf_content.push_str(&content); pdf_content.push_str("\n\n"); } pdf_content } async fn stream_pdf(file_path: &str) -> Result { let path = file_path.to_owned(); let (tx, rx) = mpsc::channel(10); tokio::task::spawn_blocking(move || { let pdfium = Pdfium::ai_studio_init(); let doc = match pdfium.load_pdf_from_file(&path, None) { Ok(document) => document, Err(e) => { let _ = tx.blocking_send(Err(e.into())); return; } }; for (num_page, page) in doc.pages().iter().enumerate() { let content = match page.text().map(|t| t.all()) { Ok(text_content) => text_content, Err(e) => { let _ = tx.blocking_send(Err(e.into())); continue; } }; if tx.blocking_send(Ok(Chunk { content, metadata: Metadata::Pdf { page_number: num_page + 1 }, })).is_err() { break; } } }); Ok(Box::pin(ReceiverStream::new(rx))) } async fn stream_spreadsheet_as_csv(file_path: &str) -> Result { let path = file_path.to_owned(); let (tx, rx) = mpsc::channel(10); tokio::task::spawn_blocking(move || { let mut workbook = match open_workbook_auto(&path) { Ok(w) => w, Err(e) => { let _ = tx.blocking_send(Err(e.into())); return; } }; for sheet_name in workbook.sheet_names() { let range = match workbook.worksheet_range(&sheet_name) { Ok(r) => r, Err(e) => { let _ = tx.blocking_send(Err(e.into())); continue; } }; for (row_idx, row) in range.rows().enumerate() { let content = row.iter() .map(|cell| cell.to_string()) .collect::>() .join(","); if tx.blocking_send(Ok(Chunk { content, metadata: Metadata::Spreadsheet { sheet_name: sheet_name.clone(), row_number: row_idx + 1, }, })).is_err() { return; } } } }); Ok(Box::pin(ReceiverStream::new(rx))) } async fn convert_with_pandoc( file_path: &str, from: &str, to: &str, ) -> Result { let output = PandocProcessBuilder::new() .with_input_file(file_path) .with_input_format(from) .with_output_format(to) .build() .command.output().await?; let stream = stream! { if output.status.success() { match String::from_utf8(output.stdout.clone()) { Ok(content) => yield Ok(Chunk { content, metadata: Metadata::Document {}, }), Err(e) => yield Err(e.into()), } } else { yield Err(format!( "Pandoc error: {}", String::from_utf8_lossy(&output.stderr) ).into()); } }; Ok(Box::pin(stream)) } async fn chunk_image(file_path: &str) -> Result { let data = tokio::fs::read(file_path).await?; let base64 = general_purpose::STANDARD.encode(&data); let stream = stream! { yield Ok(Chunk { content: base64, metadata: Metadata::Image {}, }); }; Ok(Box::pin(stream)) } async fn stream_pptx(file_path: &str) -> Result { let path = Path::new(file_path).to_owned(); let parser_config = ParserConfig::builder() .extract_images(true) .compress_images(true) .quality(75) .image_handling_mode(ImageHandlingMode::Manually) .build(); let mut streamer = tokio::task::spawn_blocking(move || { PptxContainer::open(&path, parser_config).map_err(|e| Box::new(e) as Box) }).await??; let (tx, rx) = mpsc::channel(32); tokio::spawn(async move { for slide_result in streamer.iter_slides() { match slide_result { Ok(slide) => { if let Some(md_content) = slide.convert_to_md() { let chunk = Chunk { content: md_content, metadata: Metadata::Presentation { slide_number: slide.slide_number, image: None, }, }; if tx.send(Ok(chunk)).await.is_err() { break; } } if let Some(images) = slide.load_images_manually() { for image in images.iter() { let base64_data = &image.base64_content; let total_length = base64_data.len(); let mut offset = 0; let mut segment_index = 0; while offset < total_length { let end = min(offset + IMAGE_SEGMENT_SIZE_IN_CHARS, total_length); let segment_content = &base64_data[offset..end]; let is_end = end == total_length; let base64_image = Base64Image::new( image.img_ref.id.clone(), segment_content.to_string(), segment_index, is_end ); let chunk = Chunk { content: String::new(), metadata: Metadata::Presentation { slide_number: slide.slide_number, image: Some(base64_image), }, }; if tx.send(Ok(chunk)).await.is_err() { break; } offset = end; segment_index += 1; } } } }, Err(e) => { let _ = tx.send(Err(Box::new(e) as Box)).await; break; } } } }); Ok(Box::pin(ReceiverStream::new(rx))) }