diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-03-07 19:37:52 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-03-07 19:37:52 +0800 |
| commit | 9e7c0fd45e169929156bdb317b10d7bb3db65f8b (patch) | |
| tree | 94c1e0e6cafe996b7b7da8dfd6e1ff1a04539cda /policy/_policies | |
| parent | 22926ce29e3f8e040ec349401aeb6a77f32eae72 (diff) | |
Add callback support to chunk_stream_with and implement stream writing
Diffstat (limited to 'policy/_policies')
| -rw-r--r-- | policy/_policies/src/error.rs | 3 | ||||
| -rw-r--r-- | policy/_policies/src/lib.rs | 36 | ||||
| -rw-r--r-- | policy/_policies/src/lib.rs.t | 44 | ||||
| -rw-r--r-- | policy/_policies/src/stream_read.rs | 86 |
4 files changed, 159 insertions, 10 deletions
diff --git a/policy/_policies/src/error.rs b/policy/_policies/src/error.rs index 975749d..2b9ce1b 100644 --- a/policy/_policies/src/error.rs +++ b/policy/_policies/src/error.rs @@ -11,4 +11,7 @@ pub enum ChunkFailed { #[error("File open failed: {0}")] FileOpenFailed(std::path::PathBuf), + + #[error("Callback failed: {0}")] + CallbackFailed(String), } diff --git a/policy/_policies/src/lib.rs b/policy/_policies/src/lib.rs index 397579a..f49134d 100644 --- a/policy/_policies/src/lib.rs +++ b/policy/_policies/src/lib.rs @@ -28,16 +28,41 @@ pub async fn chunk_with( } } -pub async fn chunk_stream_with( +pub async fn chunk_stream_with<F>( policy_name: &str, size: u32, path: &Path, + callback: F, params: &HashMap<&str, &str>, -) -> Result<Vec<u32>, ChunkFailed> { +) -> Result<(), ChunkFailed> +where + F: FnMut(Vec<u8>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>>, +{ match policy_name { "butck_fixed_size" => { let mut stream = butck_fixed_size::FixedSizeStream::default(); chunk_stream_process( + path, &mut stream, size, callback, params, + async |current_data, len, stream, params| { + butck_fixed_size::chunk_stream(current_data, len, stream, params).await + }, + ) + .await + } + _ => Err(ChunkFailed::PolicyNotFound), + } +} + +pub async fn chunk_stream_display_boundaries( + policy_name: &str, + size: u32, + path: &Path, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { + "butck_fixed_size" => { + let mut stream = butck_fixed_size::FixedSizeStream::default(); + crate::stream_read::chunk_stream_display_boundaries( path, &mut stream, size, params, async |current_data, len, stream, params| { butck_fixed_size::chunk_stream(current_data, len, stream, params).await @@ -56,6 +81,13 @@ pub fn policies() -> Vec<&'static str> { ] } +pub fn stream_policies() -> Vec<&'static str> { + vec![ + // butck_fixed_size + "butck_fixed_size", + ] +} + pub mod butck_fixed_size { pub use butck_fixed_size::FixedSizeStream; use std::collections::HashMap; diff --git a/policy/_policies/src/lib.rs.t b/policy/_policies/src/lib.rs.t index 873a4cd..db0dc0f 100644 --- a/policy/_policies/src/lib.rs.t +++ b/policy/_policies/src/lib.rs.t @@ -28,24 +28,46 @@ pub async fn chunk_with( } } -pub async fn chunk_stream_with( +pub async fn chunk_stream_with<F>( policy_name: &str, size: u32, path: &Path, + callback: F, params: &HashMap<&str, &str>, -) -> Result<Vec<u32>, ChunkFailed> { +) -> Result<(), ChunkFailed> +where + F: FnMut(Vec<u8>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>>, +{ match policy_name { >>>>>>>>>> match_arms_stream _ => Err(ChunkFailed::PolicyNotFound), } } +pub async fn chunk_stream_display_boundaries( + policy_name: &str, + size: u32, + path: &Path, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { +>>>>>>>>>> match_arms_stream_display + _ => Err(ChunkFailed::PolicyNotFound), + } +} + pub fn policies() -> Vec<&'static str> { vec![ >>>>>>>>>> policy_names ] } +pub fn stream_policies() -> Vec<&'static str> { + vec![ +>>>>>>>>>> stream_policy_names + ] +} + >>>>>>>>>> exports_simple >>>>>>>>>> exports_stream >>>>>>>>>> exports_both @@ -58,6 +80,19 @@ pub fn policies() -> Vec<&'static str> { "<<<crate_name>>>" => { let mut stream = <<<stream_struct_id>>>::default(); chunk_stream_process( + path, &mut stream, size, callback, params, + async |current_data, len, stream, params| { + <<<crate_name>>>::chunk_stream(current_data, len, stream, params).await + }, + ) + .await + } +@@@ <<< + +@@@ >>> match_arms_stream_display + "<<<crate_name>>>" => { + let mut stream = <<<stream_struct_id>>>::default(); + crate::stream_read::chunk_stream_display_boundaries( path, &mut stream, size, params, async |current_data, len, stream, params| { <<<crate_name>>>::chunk_stream(current_data, len, stream, params).await @@ -72,6 +107,11 @@ pub fn policies() -> Vec<&'static str> { "<<<name>>>", @@@ <<< +@@@ >>> stream_policy_names + // <<<name>>> + "<<<name>>>", +@@@ <<< + @@@ >>> exports_simple pub mod <<<crate_name>>> { use std::collections::HashMap; diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs index 5cf7791..0642ebf 100644 --- a/policy/_policies/src/stream_read.rs +++ b/policy/_policies/src/stream_read.rs @@ -1,7 +1,7 @@ use crate::error::ChunkFailed; use std::{collections::HashMap, path::Path}; -pub async fn chunk_stream_process<T, F>( +pub async fn chunk_stream_display_boundaries<T, F>( path: &Path, stream_data: &mut T, size: u32, @@ -16,8 +16,8 @@ where .await .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; let mut buffer = vec![0u8; size as usize]; - let mut splits = Vec::new(); - let mut total_read = 0; + let mut boundaries = Vec::new(); + let mut total_offset = 0; loop { let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) @@ -25,7 +25,7 @@ where .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; if bytes_read == 0 { - break Ok(splits); + break Ok(boundaries); } // Process chunking on the buffer slice @@ -38,9 +38,83 @@ where .await; if let Some(offset) = chunk_result { - splits.push(total_read + offset); + boundaries.push(total_offset + offset); } - total_read += bytes_read as u32; + total_offset += bytes_read as u32; + } +} + +pub async fn chunk_stream_process<T, F, C>( + path: &Path, + stream_data: &mut T, + size: u32, + mut callback: C, + params: &HashMap<&str, &str>, + chunk_func: F, +) -> Result<(), ChunkFailed> +where + T: Default, + F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>, + C: FnMut( + Vec<u8>, + ) -> std::pin::Pin< + Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>, + >, +{ + let mut file = tokio::fs::File::open(path) + .await + .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; + let mut buffer = vec![0u8; size as usize]; + + let mut accumulated_chunk = Vec::new(); + let mut processed_in_accumulator = 0; + + loop { + let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) + .await + .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; + + if bytes_read == 0 { + if !accumulated_chunk.is_empty() { + callback(accumulated_chunk) + .await + .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; + } + break Ok(()); + } + + accumulated_chunk.extend_from_slice(&buffer[..bytes_read]); + + loop { + let unprocessed_data = &accumulated_chunk[processed_in_accumulator..]; + + let chunk_result = chunk_func( + unprocessed_data, + unprocessed_data.len() as u32, + stream_data, + params, + ) + .await; + + match chunk_result { + Some(offset) => { + let chunk_end = processed_in_accumulator + offset as usize; + + let chunk_data = accumulated_chunk[..chunk_end].to_vec(); + + callback(chunk_data) + .await + .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; + + accumulated_chunk.drain(..chunk_end); + processed_in_accumulator = 0; + } + None => { + processed_in_accumulator = accumulated_chunk.len(); + break; + } + } + } } } |
