summaryrefslogtreecommitdiff
path: root/policy/_policies/src
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-07 19:37:52 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-07 19:37:52 +0800
commit9e7c0fd45e169929156bdb317b10d7bb3db65f8b (patch)
tree94c1e0e6cafe996b7b7da8dfd6e1ff1a04539cda /policy/_policies/src
parent22926ce29e3f8e040ec349401aeb6a77f32eae72 (diff)
Add callback support to chunk_stream_with and implement stream writing
Diffstat (limited to 'policy/_policies/src')
-rw-r--r--policy/_policies/src/error.rs3
-rw-r--r--policy/_policies/src/lib.rs36
-rw-r--r--policy/_policies/src/lib.rs.t44
-rw-r--r--policy/_policies/src/stream_read.rs86
4 files changed, 159 insertions, 10 deletions
diff --git a/policy/_policies/src/error.rs b/policy/_policies/src/error.rs
index 975749d..2b9ce1b 100644
--- a/policy/_policies/src/error.rs
+++ b/policy/_policies/src/error.rs
@@ -11,4 +11,7 @@ pub enum ChunkFailed {
#[error("File open failed: {0}")]
FileOpenFailed(std::path::PathBuf),
+
+ #[error("Callback failed: {0}")]
+ CallbackFailed(String),
}
diff --git a/policy/_policies/src/lib.rs b/policy/_policies/src/lib.rs
index 397579a..f49134d 100644
--- a/policy/_policies/src/lib.rs
+++ b/policy/_policies/src/lib.rs
@@ -28,16 +28,41 @@ pub async fn chunk_with(
}
}
-pub async fn chunk_stream_with(
+pub async fn chunk_stream_with<F>(
policy_name: &str,
size: u32,
path: &Path,
+ callback: F,
params: &HashMap<&str, &str>,
-) -> Result<Vec<u32>, ChunkFailed> {
+) -> Result<(), ChunkFailed>
+where
+ F: FnMut(Vec<u8>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>>,
+{
match policy_name {
"butck_fixed_size" => {
let mut stream = butck_fixed_size::FixedSizeStream::default();
chunk_stream_process(
+ path, &mut stream, size, callback, params,
+ async |current_data, len, stream, params| {
+ butck_fixed_size::chunk_stream(current_data, len, stream, params).await
+ },
+ )
+ .await
+ }
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
+pub async fn chunk_stream_display_boundaries(
+ policy_name: &str,
+ size: u32,
+ path: &Path,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+ "butck_fixed_size" => {
+ let mut stream = butck_fixed_size::FixedSizeStream::default();
+ crate::stream_read::chunk_stream_display_boundaries(
path, &mut stream, size, params,
async |current_data, len, stream, params| {
butck_fixed_size::chunk_stream(current_data, len, stream, params).await
@@ -56,6 +81,13 @@ pub fn policies() -> Vec<&'static str> {
]
}
+pub fn stream_policies() -> Vec<&'static str> {
+ vec![
+ // butck_fixed_size
+ "butck_fixed_size",
+ ]
+}
+
pub mod butck_fixed_size {
pub use butck_fixed_size::FixedSizeStream;
use std::collections::HashMap;
diff --git a/policy/_policies/src/lib.rs.t b/policy/_policies/src/lib.rs.t
index 873a4cd..db0dc0f 100644
--- a/policy/_policies/src/lib.rs.t
+++ b/policy/_policies/src/lib.rs.t
@@ -28,24 +28,46 @@ pub async fn chunk_with(
}
}
-pub async fn chunk_stream_with(
+pub async fn chunk_stream_with<F>(
policy_name: &str,
size: u32,
path: &Path,
+ callback: F,
params: &HashMap<&str, &str>,
-) -> Result<Vec<u32>, ChunkFailed> {
+) -> Result<(), ChunkFailed>
+where
+ F: FnMut(Vec<u8>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>>,
+{
match policy_name {
>>>>>>>>>> match_arms_stream
_ => Err(ChunkFailed::PolicyNotFound),
}
}
+pub async fn chunk_stream_display_boundaries(
+ policy_name: &str,
+ size: u32,
+ path: &Path,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+>>>>>>>>>> match_arms_stream_display
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
pub fn policies() -> Vec<&'static str> {
vec![
>>>>>>>>>> policy_names
]
}
+pub fn stream_policies() -> Vec<&'static str> {
+ vec![
+>>>>>>>>>> stream_policy_names
+ ]
+}
+
>>>>>>>>>> exports_simple
>>>>>>>>>> exports_stream
>>>>>>>>>> exports_both
@@ -58,6 +80,19 @@ pub fn policies() -> Vec<&'static str> {
"<<<crate_name>>>" => {
let mut stream = <<<stream_struct_id>>>::default();
chunk_stream_process(
+ path, &mut stream, size, callback, params,
+ async |current_data, len, stream, params| {
+ <<<crate_name>>>::chunk_stream(current_data, len, stream, params).await
+ },
+ )
+ .await
+ }
+@@@ <<<
+
+@@@ >>> match_arms_stream_display
+ "<<<crate_name>>>" => {
+ let mut stream = <<<stream_struct_id>>>::default();
+ crate::stream_read::chunk_stream_display_boundaries(
path, &mut stream, size, params,
async |current_data, len, stream, params| {
<<<crate_name>>>::chunk_stream(current_data, len, stream, params).await
@@ -72,6 +107,11 @@ pub fn policies() -> Vec<&'static str> {
"<<<name>>>",
@@@ <<<
+@@@ >>> stream_policy_names
+ // <<<name>>>
+ "<<<name>>>",
+@@@ <<<
+
@@@ >>> exports_simple
pub mod <<<crate_name>>> {
use std::collections::HashMap;
diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs
index 5cf7791..0642ebf 100644
--- a/policy/_policies/src/stream_read.rs
+++ b/policy/_policies/src/stream_read.rs
@@ -1,7 +1,7 @@
use crate::error::ChunkFailed;
use std::{collections::HashMap, path::Path};
-pub async fn chunk_stream_process<T, F>(
+pub async fn chunk_stream_display_boundaries<T, F>(
path: &Path,
stream_data: &mut T,
size: u32,
@@ -16,8 +16,8 @@ where
.await
.map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
let mut buffer = vec![0u8; size as usize];
- let mut splits = Vec::new();
- let mut total_read = 0;
+ let mut boundaries = Vec::new();
+ let mut total_offset = 0;
loop {
let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
@@ -25,7 +25,7 @@ where
.map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
if bytes_read == 0 {
- break Ok(splits);
+ break Ok(boundaries);
}
// Process chunking on the buffer slice
@@ -38,9 +38,83 @@ where
.await;
if let Some(offset) = chunk_result {
- splits.push(total_read + offset);
+ boundaries.push(total_offset + offset);
}
- total_read += bytes_read as u32;
+ total_offset += bytes_read as u32;
+ }
+}
+
+pub async fn chunk_stream_process<T, F, C>(
+ path: &Path,
+ stream_data: &mut T,
+ size: u32,
+ mut callback: C,
+ params: &HashMap<&str, &str>,
+ chunk_func: F,
+) -> Result<(), ChunkFailed>
+where
+ T: Default,
+ F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
+ C: FnMut(
+ Vec<u8>,
+ ) -> std::pin::Pin<
+ Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>,
+ >,
+{
+ let mut file = tokio::fs::File::open(path)
+ .await
+ .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
+ let mut buffer = vec![0u8; size as usize];
+
+ let mut accumulated_chunk = Vec::new();
+ let mut processed_in_accumulator = 0;
+
+ loop {
+ let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
+ .await
+ .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
+
+ if bytes_read == 0 {
+ if !accumulated_chunk.is_empty() {
+ callback(accumulated_chunk)
+ .await
+ .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;
+ }
+ break Ok(());
+ }
+
+ accumulated_chunk.extend_from_slice(&buffer[..bytes_read]);
+
+ loop {
+ let unprocessed_data = &accumulated_chunk[processed_in_accumulator..];
+
+ let chunk_result = chunk_func(
+ unprocessed_data,
+ unprocessed_data.len() as u32,
+ stream_data,
+ params,
+ )
+ .await;
+
+ match chunk_result {
+ Some(offset) => {
+ let chunk_end = processed_in_accumulator + offset as usize;
+
+ let chunk_data = accumulated_chunk[..chunk_end].to_vec();
+
+ callback(chunk_data)
+ .await
+ .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;
+
+ accumulated_chunk.drain(..chunk_end);
+ processed_in_accumulator = 0;
+ }
+ None => {
+ processed_in_accumulator = accumulated_chunk.len();
+ break;
+ }
+ }
+ }
}
}