summaryrefslogtreecommitdiff
path: root/policy/_policies/src/stream_read.rs
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-04 21:26:04 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-04 21:35:09 +0800
commit22926ce29e3f8e040ec349401aeb6a77f32eae72 (patch)
tree678753ec49a61fb9d3e2d8e869393dec90ea7ef4 /policy/_policies/src/stream_read.rs
Initialize Butchunker project structure and policy system
Diffstat (limited to 'policy/_policies/src/stream_read.rs')
-rw-r--r--policy/_policies/src/stream_read.rs46
1 files changed, 46 insertions, 0 deletions
diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs
new file mode 100644
index 0000000..5cf7791
--- /dev/null
+++ b/policy/_policies/src/stream_read.rs
@@ -0,0 +1,46 @@
+use crate::error::ChunkFailed;
+use std::{collections::HashMap, path::Path};
+
+pub async fn chunk_stream_process<T, F>(
+ path: &Path,
+ stream_data: &mut T,
+ size: u32,
+ params: &HashMap<&str, &str>,
+ chunk_func: F,
+) -> Result<Vec<u32>, ChunkFailed>
+where
+ T: Default,
+ F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
+{
+ let mut file = tokio::fs::File::open(path)
+ .await
+ .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
+ let mut buffer = vec![0u8; size as usize];
+ let mut splits = Vec::new();
+ let mut total_read = 0;
+
+ loop {
+ let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
+ .await
+ .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
+
+ if bytes_read == 0 {
+ break Ok(splits);
+ }
+
+ // Process chunking on the buffer slice
+ let chunk_result = chunk_func(
+ &buffer[..bytes_read],
+ bytes_read as u32,
+ stream_data,
+ params,
+ )
+ .await;
+
+ if let Some(offset) = chunk_result {
+ splits.push(total_read + offset);
+ }
+
+ total_read += bytes_read as u32;
+ }
+}