mirror of
https://github.com/MindWorkAI/AI-Studio.git
synced 2025-04-28 08:19:47 +00:00
Changed extraction of data from sync to async and include a get endpoint that handles all files
This commit is contained in:
parent
ef8d1051ae
commit
0e4c49d3c0
@ -15,7 +15,7 @@ serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.134"
|
||||
keyring = { version = "3.6.1", features = ["apple-native", "windows-native", "sync-secret-service"] }
|
||||
arboard = "3.4.1"
|
||||
tokio = { version = "1.42", features = ["rt", "rt-multi-thread", "macros"] }
|
||||
tokio = { version = "1.42", features = ["rt", "rt-multi-thread", "macros", "process"] }
|
||||
flexi_logger = "0.29.8"
|
||||
log = { version = "0.4", features = ["kv"] }
|
||||
once_cell = "1.20"
|
||||
@ -31,12 +31,14 @@ hmac = "0.12.1"
|
||||
sha2 = "0.10.8"
|
||||
rcgen = { version = "0.13.2", features = ["pem"] }
|
||||
file-format = "0.26.0"
|
||||
calamine = { version = "0.26.1", features = ["dates"] }
|
||||
chrono = "0.4.39"
|
||||
calamine = "0.22"
|
||||
pdfium-render = "0.8.27"
|
||||
|
||||
# Fixes security vulnerability downstream, where the upstream is not fixed yet:
|
||||
url = "2.5"
|
||||
async-stream = "0.3"
|
||||
futures = "0.3"
|
||||
tokio-stream = "0.1"
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
# See issue https://github.com/tauri-apps/tauri/issues/4470
|
||||
|
@ -1,30 +1,28 @@
|
||||
use std::path::Path;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::pin::Pin;
|
||||
use async_stream::stream;
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use calamine::{open_workbook_auto, Reader};
|
||||
use file_format::{FileFormat, Kind};
|
||||
use futures::{Stream, StreamExt};
|
||||
use pdfium_render::prelude::Pdfium;
|
||||
use std::error::Error;
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::process::Command;
|
||||
use rocket::post;
|
||||
use rocket::response::stream::{Event, EventStream};
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use rocket::{State, Shutdown};
|
||||
use rocket::fs::{relative, FileServer};
|
||||
use rocket::form::Form;
|
||||
use rocket::serde::{Serialize, Deserialize};
|
||||
use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError};
|
||||
use rocket::response::stream::{EventStream, Event};
|
||||
use rocket::tokio::select;
|
||||
use rocket::serde::Serialize;
|
||||
use rocket::get;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Chunk {
|
||||
pub content: String,
|
||||
pub metadata: Metadata,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub enum Metadata {
|
||||
Text { line_number: usize },
|
||||
Pdf { page_number: usize },
|
||||
@ -37,289 +35,233 @@ const TO_MARKDOWN: &str = "markdown";
|
||||
const DOCX: &str = "docx";
|
||||
const ODT: &str = "odt";
|
||||
|
||||
#[post("/system/file-data/extract", data = "<file_path>")]
|
||||
pub async fn extract_file_data(
|
||||
file_path: String,
|
||||
queue: &State<Sender<ExtractEvent>>,
|
||||
mut shutdown: Shutdown
|
||||
) -> EventStream![] {
|
||||
let mut rx = queue.subscribe();
|
||||
let path = file_path.clone();
|
||||
|
||||
// Start extraction in a separate task
|
||||
let extractor = rocket::tokio::task::spawn_blocking(move || {
|
||||
stream_data(&path).map(|iter| {
|
||||
iter.map(|chunk| {
|
||||
chunk.map_err(|e| format!("Chunk error: {}", e))
|
||||
})
|
||||
})
|
||||
});
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
type ChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>;
|
||||
|
||||
#[get("/system/file-data/extract?<path>")]
|
||||
pub async fn extract_data(path: String, mut end: Shutdown) -> EventStream![] {
|
||||
EventStream! {
|
||||
let mut extraction_stream = match extractor.await {
|
||||
Ok(Ok(stream)) => stream,
|
||||
Ok(Err(e)) => {
|
||||
yield Event::json(&ExtractEvent::Error(e.to_string()));
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
yield Event::json(&ExtractEvent::Error(format!("Task failed: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let stream_result = stream_data(&path).await;
|
||||
|
||||
match stream_result {
|
||||
Ok(mut stream) => {
|
||||
loop {
|
||||
let chunk = select! {
|
||||
chunk = extraction_stream.next() => chunk,
|
||||
_ = &mut shutdown => break,
|
||||
};
|
||||
|
||||
match chunk {
|
||||
Some(Ok(chunk)) => {
|
||||
let event = ExtractEvent::Chunk {
|
||||
content: chunk.content,
|
||||
metadata: match chunk.metadata {
|
||||
Metadata::Text { line_number } =>
|
||||
MetadataRepr::Text { line_number },
|
||||
Metadata::Pdf { page_number } =>
|
||||
MetadataRepr::Pdf { page_number },
|
||||
}
|
||||
};
|
||||
yield Event::json(&event);
|
||||
}
|
||||
chunk = stream.next() => match chunk {
|
||||
Some(Ok(chunk)) => chunk,
|
||||
Some(Err(e)) => {
|
||||
yield Event::json(&ExtractEvent::Error(e));
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
yield Event::json(&ExtractEvent::Completed);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Serialisierbare Datentypen
|
||||
#[derive(serde::Serialize)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
enum ExtractEvent {
|
||||
Chunk {
|
||||
content: String,
|
||||
metadata: MetadataRepr,
|
||||
yield Event::json(&format!("Error: {}", e));
|
||||
break;
|
||||
},
|
||||
Error(String),
|
||||
Completed,
|
||||
None => break,
|
||||
},
|
||||
_ = &mut end => break,
|
||||
};
|
||||
|
||||
yield Event::json(&chunk);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
yield Event::json(&format!("Error starting stream: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
enum MetadataRepr {
|
||||
Text { line_number: usize },
|
||||
Pdf { page_number: usize },
|
||||
Spreadsheet { sheet_name: String, row_number: usize },
|
||||
Document,
|
||||
Image,
|
||||
}
|
||||
|
||||
|
||||
/// Streams the content of a file in chunks with format-specific metadata.
|
||||
///
|
||||
/// Takes a file path as input and returns a stream of chunks containing
|
||||
/// content segments with associated metadata. Supports various file types
|
||||
/// including documents, spreadsheets, presentations, images, and PDFs.
|
||||
///
|
||||
/// The streaming process works as follows:
|
||||
/// - Verifies the file exists
|
||||
/// - Detects the file format using content and extension
|
||||
/// - Processes content incrementally based on format:
|
||||
/// - Text files: Streams line by line with line numbers
|
||||
/// - PDFs: Extracts text page by page with page numbers
|
||||
/// - Spreadsheets: Outputs rows with sheet names and row numbers
|
||||
/// - Office documents: Converts to Markdown as single chunk
|
||||
/// - Images: Returns Base64 encoding as single chunk
|
||||
/// - HTML files: Converts to Markdown as single chunk
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `file_path`: Path to the file to process (platform independent)
|
||||
///
|
||||
/// # Returns
|
||||
/// Returns a `Result` containing:
|
||||
/// - `Ok`: Boxed iterator yielding `Result<Chunk>` items
|
||||
/// - `Err`: Initial processing error (e.g., file not found)
|
||||
///
|
||||
/// Each iterator item represents either:
|
||||
/// - `Ok(Chunk)`: Content segment with metadata
|
||||
/// - `Err`: Error during chunk processing
|
||||
///
|
||||
/// # Chunk Structure
|
||||
/// - `content`: Text segment or Base64 image data
|
||||
/// - `metadata`: Context information including:
|
||||
/// - Line numbers for text files
|
||||
/// - Page numbers for PDFs
|
||||
/// - Sheet/row numbers for spreadsheets
|
||||
/// - Document type markers for office formats
|
||||
/// - Image type marker for images
|
||||
///
|
||||
/// # Errors
|
||||
/// - Initial errors: File not found, format detection failures
|
||||
/// - Chunk-level errors: Format-specific parsing errors
|
||||
/// - Pandoc conversion failures for office documents
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// let chunk_stream = stream_data("data.txt")?;
|
||||
/// for chunk_result in chunk_stream {
|
||||
/// match chunk_result {
|
||||
/// Ok(chunk) => {
|
||||
/// println!("Metadata: {:?}", chunk.metadata);
|
||||
/// println!("Content: {}", chunk.content);
|
||||
/// }
|
||||
/// Err(e) => eprintln!("Error: {}", e),
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
fn stream_data(
|
||||
file_path: &str,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
async fn stream_data(file_path: &str) -> Result<ChunkStream> {
|
||||
if !Path::new(file_path).exists() {
|
||||
return Err(Box::from("File does not exist."));
|
||||
return Err("File does not exist.".into());
|
||||
}
|
||||
|
||||
let fmt = FileFormat::from_file(file_path)?;
|
||||
let file_path_clone = file_path.to_owned();
|
||||
|
||||
let fmt = tokio::task::spawn_blocking(move || {
|
||||
FileFormat::from_file(&file_path_clone)
|
||||
}).await??;
|
||||
|
||||
let ext = file_path.split('.').last().unwrap_or("");
|
||||
|
||||
match ext {
|
||||
let stream = match ext {
|
||||
DOCX | ODT => {
|
||||
let from = if ext == DOCX { "docx" } else { "odt" };
|
||||
convert_with_pandoc(file_path, from, TO_MARKDOWN)
|
||||
convert_with_pandoc(file_path, from, TO_MARKDOWN).await?
|
||||
}
|
||||
"xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => {
|
||||
stream_spreadsheet_as_csv(file_path)
|
||||
stream_spreadsheet_as_csv(file_path).await?
|
||||
}
|
||||
_ => match fmt.kind() {
|
||||
Kind::Document => match fmt {
|
||||
FileFormat::PortableDocumentFormat => read_pdf(file_path),
|
||||
FileFormat::PortableDocumentFormat => read_pdf(file_path).await?,
|
||||
FileFormat::MicrosoftWordDocument => {
|
||||
convert_with_pandoc(file_path, "docx", TO_MARKDOWN)
|
||||
convert_with_pandoc(file_path, "docx", TO_MARKDOWN).await?
|
||||
}
|
||||
FileFormat::OfficeOpenXmlDocument => {
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN)
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
|
||||
}
|
||||
_ => stream_text_file(file_path),
|
||||
_ => stream_text_file(file_path).await?,
|
||||
},
|
||||
Kind::Ebook => Err(Box::from("Ebooks not yet supported")),
|
||||
Kind::Image => chunk_image(file_path),
|
||||
Kind::Ebook => return Err("Ebooks not yet supported".into()),
|
||||
Kind::Image => chunk_image(file_path).await?,
|
||||
Kind::Other => match fmt {
|
||||
FileFormat::HypertextMarkupLanguage => {
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN)
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
|
||||
}
|
||||
_ => stream_text_file(file_path),
|
||||
_ => stream_text_file(file_path).await?,
|
||||
},
|
||||
Kind::Presentation => match fmt {
|
||||
FileFormat::OfficeOpenXmlPresentation => {
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN)
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
|
||||
}
|
||||
_ => stream_text_file(file_path),
|
||||
_ => stream_text_file(file_path).await?,
|
||||
},
|
||||
Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path),
|
||||
_ => stream_text_file(file_path),
|
||||
Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path).await?,
|
||||
_ => stream_text_file(file_path).await?,
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn stream_text_file(file_path: &str) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
let file = File::open(file_path)?;
|
||||
let reader = BufReader::new(file);
|
||||
let iter = reader.lines()
|
||||
.enumerate()
|
||||
.map(|(i, line)| {
|
||||
Ok(Chunk {
|
||||
content: line?,
|
||||
metadata: Metadata::Text { line_number: i + 1 },
|
||||
})
|
||||
async fn stream_text_file(file_path: &str) -> Result<ChunkStream> {
|
||||
let file = tokio::fs::File::open(file_path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut line_number = 0;
|
||||
|
||||
let stream = stream! {
|
||||
while let Ok(Some(line)) = lines.next_line().await { // Korrektur hier
|
||||
line_number += 1;
|
||||
yield Ok(Chunk {
|
||||
content: line,
|
||||
metadata: Metadata::Text { line_number },
|
||||
});
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn read_pdf(file_path: &str) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
async fn read_pdf(file_path: &str) -> Result<ChunkStream> {
|
||||
let path = file_path.to_owned();
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let pdfium = Pdfium::default();
|
||||
let doc = pdfium.load_pdf_from_file(file_path, None)?;
|
||||
let pages = doc.pages();
|
||||
let chunks: Vec<_> = pages.iter()
|
||||
.enumerate()
|
||||
.map(|(i, page)| {
|
||||
let content = page.text()?.all();
|
||||
Ok(Chunk {
|
||||
let doc = match pdfium.load_pdf_from_file(&path, None) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
for (i, page) in doc.pages().iter().enumerate() {
|
||||
let content = match page.text().and_then(|t| Ok(t.all())) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if tx.blocking_send(Ok(Chunk {
|
||||
content,
|
||||
metadata: Metadata::Pdf { page_number: i + 1 },
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
Ok(Box::new(chunks.into_iter()))
|
||||
})).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
fn stream_spreadsheet_as_csv(file_path: &str) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
let mut workbook = open_workbook_auto(file_path)?;
|
||||
let mut chunks = Vec::new();
|
||||
async fn stream_spreadsheet_as_csv(file_path: &str) -> Result<ChunkStream> {
|
||||
let path = file_path.to_owned();
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut workbook = match open_workbook_auto(&path) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
for sheet_name in workbook.sheet_names() {
|
||||
let range = workbook.worksheet_range(&sheet_name)?;
|
||||
let range = match workbook.worksheet_range(&sheet_name) {
|
||||
Some(Ok(r)) => r,
|
||||
Some(Err(e)) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
continue;
|
||||
}
|
||||
None => continue,
|
||||
};
|
||||
|
||||
for (row_idx, row) in range.rows().enumerate() {
|
||||
let content = row.iter()
|
||||
.map(|cell| cell.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
chunks.push(Ok(Chunk {
|
||||
|
||||
if tx.blocking_send(Ok(Chunk {
|
||||
content,
|
||||
metadata: Metadata::Spreadsheet {
|
||||
sheet_name: sheet_name.clone(),
|
||||
row_number: row_idx + 1,
|
||||
},
|
||||
}));
|
||||
})).is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Ok(Box::new(chunks.into_iter()))
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
fn convert_with_pandoc(
|
||||
async fn convert_with_pandoc(
|
||||
file_path: &str,
|
||||
from: &str,
|
||||
to: &str,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
) -> Result<ChunkStream> {
|
||||
let output = Command::new("pandoc")
|
||||
.arg(file_path)
|
||||
.args(&["-f", from, "-t", to])
|
||||
.output()?;
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
let stream = stream! {
|
||||
if output.status.success() {
|
||||
let content = String::from_utf8(output.stdout)?;
|
||||
Ok(Box::new(std::iter::once(Ok(Chunk {
|
||||
match String::from_utf8(output.stdout.clone()) {
|
||||
Ok(content) => yield Ok(Chunk {
|
||||
content,
|
||||
metadata: Metadata::Document,
|
||||
}))))
|
||||
}),
|
||||
Err(e) => yield Err(e.into()),
|
||||
}
|
||||
} else {
|
||||
Err(Box::from(String::from_utf8_lossy(&output.stderr).into_owned()))
|
||||
yield Err(format!(
|
||||
"Pandoc error: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
).into());
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn read_img_as_base64(file_path: &str) -> Result<String, Box<dyn Error>> {
|
||||
let img_result = File::open(file_path);
|
||||
async fn chunk_image(file_path: &str) -> Result<ChunkStream> {
|
||||
let data = tokio::fs::read(file_path).await?;
|
||||
let base64 = general_purpose::STANDARD.encode(&data);
|
||||
|
||||
match img_result {
|
||||
Ok(mut img) => {
|
||||
let mut buff = Vec::new();
|
||||
img.read_to_end(&mut buff)?;
|
||||
|
||||
let base64 = general_purpose::STANDARD.encode(&buff);
|
||||
Ok(base64)
|
||||
}
|
||||
Err(e) => Err(Box::from(format!("{}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_image(file_path: &str) -> Result<Box<dyn Iterator<Item = Result<Chunk, Box<dyn Error>>>>, Box<dyn Error>> {
|
||||
let base64 = read_img_as_base64(file_path)?;
|
||||
Ok(Box::new(std::iter::once(Ok(Chunk {
|
||||
let stream = stream! {
|
||||
yield Ok(Chunk {
|
||||
content: base64,
|
||||
metadata: Metadata::Image,
|
||||
}))))
|
||||
});
|
||||
};
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
@ -90,7 +90,7 @@ pub fn start_runtime_api() {
|
||||
crate::secret::delete_secret,
|
||||
crate::environment::get_data_directory,
|
||||
crate::environment::get_config_directory,
|
||||
crate::file_data::extract_file_data,
|
||||
crate::file_data::extract_data,
|
||||
])
|
||||
.ignite().await.unwrap()
|
||||
.launch().await.unwrap();
|
||||
|
Loading…
Reference in New Issue
Block a user