diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-03-07 19:37:52 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-03-07 19:37:52 +0800 |
| commit | 9e7c0fd45e169929156bdb317b10d7bb3db65f8b (patch) | |
| tree | 94c1e0e6cafe996b7b7da8dfd6e1ff1a04539cda /policy/_policies/src/stream_read.rs | |
| parent | 22926ce29e3f8e040ec349401aeb6a77f32eae72 (diff) | |
Add callback support to chunk_stream_with and implement stream writing
Diffstat (limited to 'policy/_policies/src/stream_read.rs')
| -rw-r--r-- | policy/_policies/src/stream_read.rs | 86 |
1 files changed, 80 insertions, 6 deletions
diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs index 5cf7791..0642ebf 100644 --- a/policy/_policies/src/stream_read.rs +++ b/policy/_policies/src/stream_read.rs @@ -1,7 +1,7 @@ use crate::error::ChunkFailed; use std::{collections::HashMap, path::Path}; -pub async fn chunk_stream_process<T, F>( +pub async fn chunk_stream_display_boundaries<T, F>( path: &Path, stream_data: &mut T, size: u32, @@ -16,8 +16,8 @@ where .await .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; let mut buffer = vec![0u8; size as usize]; - let mut splits = Vec::new(); - let mut total_read = 0; + let mut boundaries = Vec::new(); + let mut total_offset = 0; loop { let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) @@ -25,7 +25,7 @@ where .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; if bytes_read == 0 { - break Ok(splits); + break Ok(boundaries); } // Process chunking on the buffer slice @@ -38,9 +38,83 @@ where .await; if let Some(offset) = chunk_result { - splits.push(total_read + offset); + boundaries.push(total_offset + offset); } - total_read += bytes_read as u32; + total_offset += bytes_read as u32; + } +} + +pub async fn chunk_stream_process<T, F, C>( + path: &Path, + stream_data: &mut T, + size: u32, + mut callback: C, + params: &HashMap<&str, &str>, + chunk_func: F, +) -> Result<(), ChunkFailed> +where + T: Default, + F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>, + C: FnMut( + Vec<u8>, + ) -> std::pin::Pin< + Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>, + >, +{ + let mut file = tokio::fs::File::open(path) + .await + .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; + let mut buffer = vec![0u8; size as usize]; + + let mut accumulated_chunk = Vec::new(); + let mut processed_in_accumulator = 0; + + loop { + let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) + .await + .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; + + if bytes_read == 0 { + if !accumulated_chunk.is_empty() { + callback(accumulated_chunk) + .await + .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; + } + break Ok(()); + } + + accumulated_chunk.extend_from_slice(&buffer[..bytes_read]); + + loop { + let unprocessed_data = &accumulated_chunk[processed_in_accumulator..]; + + let chunk_result = chunk_func( + unprocessed_data, + unprocessed_data.len() as u32, + stream_data, + params, + ) + .await; + + match chunk_result { + Some(offset) => { + let chunk_end = processed_in_accumulator + offset as usize; + + let chunk_data = accumulated_chunk[..chunk_end].to_vec(); + + callback(chunk_data) + .await + .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; + + accumulated_chunk.drain(..chunk_end); + processed_in_accumulator = 0; + } + None => { + processed_in_accumulator = accumulated_chunk.len(); + break; + } + } + } } } |
