Add Markdown fences support in text and CSV streaming

This commit is contained in:
Thorsten Sommer 2025-06-30 21:41:32 +02:00
parent 5c05d3df3c
commit 7533e1fcc3
Signed by: tsommer
GPG Key ID: 371BBA77A02C0108
2 changed files with 52 additions and 9 deletions

View File

@ -30,7 +30,7 @@ public static class ContentStreamSseHandler
var sheetName = spreadsheetMetadata.Spreadsheet?.SheetName; var sheetName = spreadsheetMetadata.Spreadsheet?.SheetName;
var rowNumber = spreadsheetMetadata.Spreadsheet?.RowNumber; var rowNumber = spreadsheetMetadata.Spreadsheet?.RowNumber;
var spreadSheetResult = new StringBuilder(); var spreadSheetResult = new StringBuilder();
if (rowNumber == 1) if (rowNumber == 0)
{ {
spreadSheetResult.AppendLine(); spreadSheetResult.AppendLine();
spreadSheetResult.AppendLine($"# {sheetName}"); spreadSheetResult.AppendLine($"# {sheetName}");

View File

@ -140,6 +140,10 @@ async fn stream_data(file_path: &str, extract_images: bool) -> Result<ChunkStrea
convert_with_pandoc(file_path, from, TO_MARKDOWN).await? convert_with_pandoc(file_path, from, TO_MARKDOWN).await?
} }
"csv" | "tsv" => {
stream_text_file(file_path, true, Some("csv".to_string())).await?
},
"pptx" => stream_pptx(file_path, extract_images).await?, "pptx" => stream_pptx(file_path, extract_images).await?,
"xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => { "xlsx" | "ods" | "xls" | "xlsm" | "xlsb" | "xla" | "xlam" => {
@ -158,7 +162,7 @@ async fn stream_data(file_path: &str, extract_images: bool) -> Result<ChunkStrea
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
}, },
_ => stream_text_file(file_path).await?, _ => stream_text_file(file_path, false, None).await?,
}, },
Kind::Ebook => return Err("Ebooks not yet supported".into()), Kind::Ebook => return Err("Ebooks not yet supported".into()),
@ -176,7 +180,7 @@ async fn stream_data(file_path: &str, extract_images: bool) -> Result<ChunkStrea
convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await? convert_with_pandoc(file_path, fmt.extension(), TO_MARKDOWN).await?
}, },
_ => stream_text_file(file_path).await?, _ => stream_text_file(file_path, false, None).await?,
}, },
Kind::Presentation => match fmt { Kind::Presentation => match fmt {
@ -184,25 +188,42 @@ async fn stream_data(file_path: &str, extract_images: bool) -> Result<ChunkStrea
stream_pptx(file_path, extract_images).await? stream_pptx(file_path, extract_images).await?
}, },
_ => stream_text_file(file_path).await?, _ => stream_text_file(file_path, false, None).await?,
}, },
Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path).await?, Kind::Spreadsheet => stream_spreadsheet_as_csv(file_path).await?,
_ => stream_text_file(file_path).await?, _ => stream_text_file(file_path, false, None).await?,
}, },
}; };
Ok(Box::pin(stream)) Ok(Box::pin(stream))
} }
async fn stream_text_file(file_path: &str) -> Result<ChunkStream> { async fn stream_text_file(file_path: &str, use_md_fences: bool, fence_language: Option<String>) -> Result<ChunkStream> {
let file = tokio::fs::File::open(file_path).await?; let file = tokio::fs::File::open(file_path).await?;
let reader = tokio::io::BufReader::new(file); let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines(); let mut lines = reader.lines();
let mut line_number = 0; let mut line_number = 0;
let stream = stream! { let stream = stream! {
if use_md_fences {
match fence_language {
Some(lang) if lang.trim().is_empty() => {
yield Ok(Chunk::new("```".to_string(), Metadata::Text { line_number }));
},
Some(lang) => {
yield Ok(Chunk::new(format!("```{}", lang.trim()), Metadata::Text { line_number }));
},
None => {
yield Ok(Chunk::new("```".to_string(), Metadata::Text { line_number }));
}
};
}
while let Ok(Some(line)) = lines.next_line().await { while let Ok(Some(line)) = lines.next_line().await {
line_number += 1; line_number += 1;
yield Ok(Chunk::new( yield Ok(Chunk::new(
@ -210,6 +231,10 @@ async fn stream_text_file(file_path: &str) -> Result<ChunkStream> {
Metadata::Text { line_number } Metadata::Text { line_number }
)); ));
} }
if use_md_fences {
yield Ok(Chunk::new("```\n".to_string(), Metadata::Text { line_number }));
}
}; };
Ok(Box::pin(stream)) Ok(Box::pin(stream))
@ -272,7 +297,17 @@ async fn stream_spreadsheet_as_csv(file_path: &str) -> Result<ChunkStream> {
} }
}; };
for (row_idx, row) in range.rows().enumerate() { let mut row_idx = 0;
tx.blocking_send(Ok(Chunk::new(
"```csv".to_string(),
Metadata::Spreadsheet {
sheet_name: sheet_name.clone(),
row_number: row_idx,
}
))).ok();
for row in range.rows() {
row_idx += 1;
let content = row.iter() let content = row.iter()
.map(|cell| cell.to_string()) .map(|cell| cell.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
@ -282,12 +317,20 @@ async fn stream_spreadsheet_as_csv(file_path: &str) -> Result<ChunkStream> {
content, content,
Metadata::Spreadsheet { Metadata::Spreadsheet {
sheet_name: sheet_name.clone(), sheet_name: sheet_name.clone(),
row_number: row_idx + 1, row_number: row_idx,
} }
))).is_err() { ))).is_err() {
return; return;
} }
} }
tx.blocking_send(Ok(Chunk::new(
"```".to_string(),
Metadata::Spreadsheet {
sheet_name: sheet_name.clone(),
row_number: row_idx,
}
))).ok();
} }
}); });