use crate::error::ChunkFailed; use std::{collections::HashMap, path::Path}; pub async fn chunk_stream_display_boundaries( path: &Path, stream_data: &mut T, size: u32, params: &HashMap<&str, &str>, chunk_func: F, ) -> Result, ChunkFailed> where T: Default, F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option, { let mut file = tokio::fs::File::open(path) .await .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; let mut buffer = vec![0u8; size as usize]; let mut boundaries = Vec::new(); let mut total_offset = 0; loop { let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) .await .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; if bytes_read == 0 { break Ok(boundaries); } // Process chunking on the buffer slice let chunk_result = chunk_func( &buffer[..bytes_read], bytes_read as u32, stream_data, params, ) .await; if let Some(offset) = chunk_result { boundaries.push(total_offset + offset); } total_offset += bytes_read as u32; } } pub async fn chunk_stream_process( path: &Path, stream_data: &mut T, size: u32, mut callback: C, params: &HashMap<&str, &str>, chunk_func: F, ) -> Result<(), ChunkFailed> where T: Default, F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option, C: FnMut( Vec, ) -> std::pin::Pin< Box> + Send>, >, { let mut file = tokio::fs::File::open(path) .await .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; let mut buffer = vec![0u8; size as usize]; let mut accumulated_chunk = Vec::new(); let mut processed_in_accumulator = 0; loop { let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) .await .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; if bytes_read == 0 { if !accumulated_chunk.is_empty() { callback(accumulated_chunk) .await .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; } break Ok(()); } accumulated_chunk.extend_from_slice(&buffer[..bytes_read]); loop { let unprocessed_data = &accumulated_chunk[processed_in_accumulator..]; let chunk_result = chunk_func( unprocessed_data, unprocessed_data.len() as u32, stream_data, params, ) .await; match chunk_result { Some(offset) => { let chunk_end = processed_in_accumulator + offset as usize; let chunk_data = accumulated_chunk[..chunk_end].to_vec(); callback(chunk_data) .await .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?; accumulated_chunk.drain(..chunk_end); processed_in_accumulator = 0; } None => { processed_in_accumulator = accumulated_chunk.len(); break; } } } } }