mirror of
https://github.com/MindWorkAI/AI-Studio.git
synced 2025-08-20 22:32:56 +00:00
added function to parse pptx presentations and chunk slides and chunk base64 images to send them with sse
This commit is contained in:
parent
7c4ddf1164
commit
0beb6b476b
@ -38,6 +38,7 @@ calamine = "0.27.0"
|
||||
pdfium-render = "0.8.31"
|
||||
sys-locale = "0.3.2"
|
||||
cfg-if = "1.0.0"
|
||||
pptx-to-md = "0.3.0"
|
||||
|
||||
# Fixes security vulnerability downstream, where the upstream is not fixed yet:
|
||||
url = "2.5"
|
||||
|
@ -1,22 +1,24 @@
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::cmp::min;
|
||||
use crate::api_token::APIToken;
|
||||
use crate::pandoc::PandocProcessBuilder;
|
||||
use crate::pdfium::PdfiumInit;
|
||||
use async_stream::stream;
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use calamine::{open_workbook_auto, Reader};
|
||||
use file_format::{FileFormat, Kind};
|
||||
use futures::{Stream, StreamExt};
|
||||
use pdfium_render::prelude::Pdfium;
|
||||
use pptx_to_md::{ImageHandlingMode, ParserConfig, PptxContainer};
|
||||
use rocket::get;
|
||||
use rocket::response::stream::{Event, EventStream};
|
||||
use rocket::serde::Serialize;
|
||||
use rocket::tokio::select;
|
||||
use rocket::Shutdown;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use rocket::Shutdown;
|
||||
use rocket::response::stream::{EventStream, Event};
|
||||
use rocket::tokio::select;
|
||||
use rocket::serde::Serialize;
|
||||
use rocket::get;
|
||||
use crate::api_token::APIToken;
|
||||
use crate::pandoc::PandocProcessBuilder;
|
||||
use crate::pdfium::PdfiumInit;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Chunk {
|
||||
@ -31,17 +33,36 @@ pub enum Metadata {
|
||||
Spreadsheet { sheet_name: String, row_number: usize },
|
||||
Document,
|
||||
Image,
|
||||
Presentation {
|
||||
slide_number: u32,
|
||||
image: Option<Base64Image>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Base64Image {
|
||||
pub id: String,
|
||||
pub content: String,
|
||||
pub segment: usize,
|
||||
pub is_end: bool
|
||||
}
|
||||
|
||||
impl Base64Image {
|
||||
fn new(id: String, content: String, segment: usize, is_end: bool) -> Self {
|
||||
Self { id, content, segment, is_end }
|
||||
}
|
||||
}
|
||||
|
||||
const TO_MARKDOWN: &str = "markdown";
|
||||
const DOCX: &str = "docx";
|
||||
const ODT: &str = "odt";
|
||||
const IMAGE_SEGMENT_SIZE_IN_CHARS: usize = 8_192; // equivalent to ~ 5500 token
|
||||
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
type ChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>;
|
||||
|
||||
#[get("/retrieval/fs/extract?<path>")]
|
||||
pub async fn extract_data(_token: APIToken, path: String, mut end: Shutdown) -> EventStream![] {
|
||||
pub async fn extract_data(path: String, mut end: Shutdown) -> EventStream![] {
|
||||
EventStream! {
|
||||
let stream_result = stream_data(&path).await;
|
||||
match stream_result {
|
||||
@ -87,6 +108,8 @@ async fn stream_data(file_path: &str) -> Result<ChunkStream> {
|
||||
convert_with_pandoc(file_path, from, TO_MARKDOWN).await?
|
||||
}
|
||||
|
||||
"pptx" => stream_pptx(file_path).await?,
|
||||
|
||||
"xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => {
|
||||
stream_spreadsheet_as_csv(file_path).await?
|
||||
}
|
||||
@ -115,7 +138,7 @@ async fn stream_data(file_path: &str) -> Result<ChunkStream> {
|
||||
|
||||
Kind::Presentation => match fmt {
|
||||
FileFormat::OfficeOpenXmlPresentation => {
|
||||
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
|
||||
stream_pptx(file_path).await?
|
||||
}
|
||||
_ => stream_text_file(file_path).await?,
|
||||
},
|
||||
@ -295,3 +318,86 @@ async fn chunk_image(file_path: &str) -> Result<ChunkStream> {
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
async fn stream_pptx(file_path: &str) -> Result<ChunkStream> {
|
||||
let path = Path::new(file_path).to_owned();
|
||||
|
||||
let parser_config = ParserConfig::builder()
|
||||
.extract_images(true)
|
||||
.compress_images(true)
|
||||
.quality(75)
|
||||
.image_handling_mode(ImageHandlingMode::Manually)
|
||||
.build();
|
||||
|
||||
let mut streamer = tokio::task::spawn_blocking(move || {
|
||||
PptxContainer::open(&path, parser_config).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
||||
}).await??;
|
||||
|
||||
let (tx, rx) = mpsc::channel(32);
|
||||
|
||||
tokio::spawn(async move {
|
||||
for slide_result in streamer.iter_slides() {
|
||||
match slide_result {
|
||||
Ok(slide) => {
|
||||
if let Some(md_content) = slide.convert_to_md() {
|
||||
let chunk = Chunk {
|
||||
content: md_content,
|
||||
metadata: Metadata::Presentation {
|
||||
slide_number: slide.slide_number,
|
||||
image: None,
|
||||
},
|
||||
};
|
||||
|
||||
if tx.send(Ok(chunk)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(images) = slide.load_images_manually() {
|
||||
for image in images.iter() {
|
||||
let base64_data = &image.base64_content;
|
||||
|
||||
let total_length = base64_data.len();
|
||||
let mut offset = 0;
|
||||
let mut segment_index = 0;
|
||||
|
||||
while offset < total_length {
|
||||
let end = min(offset + IMAGE_SEGMENT_SIZE_IN_CHARS, total_length);
|
||||
let segment_content = &base64_data[offset..end];
|
||||
let is_end = end == total_length;
|
||||
|
||||
let base64_image = Base64Image::new(
|
||||
image.img_ref.id.clone(),
|
||||
segment_content.to_string(),
|
||||
segment_index,
|
||||
is_end
|
||||
);
|
||||
|
||||
let chunk = Chunk {
|
||||
content: String::new(),
|
||||
metadata: Metadata::Presentation {
|
||||
slide_number: slide.slide_number,
|
||||
image: Some(base64_image),
|
||||
},
|
||||
};
|
||||
|
||||
if tx.send(Ok(chunk)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
offset = end;
|
||||
segment_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(ReceiverStream::new(rx)))
|
||||
}
|
Loading…
Reference in New Issue
Block a user