summaryrefslogtreecommitdiff
path: root/policy/_policies/src/stream_read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'policy/_policies/src/stream_read.rs')
-rw-r--r--policy/_policies/src/stream_read.rs86
1 files changed, 80 insertions, 6 deletions
diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs
index 5cf7791..0642ebf 100644
--- a/policy/_policies/src/stream_read.rs
+++ b/policy/_policies/src/stream_read.rs
@@ -1,7 +1,7 @@
use crate::error::ChunkFailed;
use std::{collections::HashMap, path::Path};
-pub async fn chunk_stream_process<T, F>(
+pub async fn chunk_stream_display_boundaries<T, F>(
path: &Path,
stream_data: &mut T,
size: u32,
@@ -16,8 +16,8 @@ where
.await
.map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
let mut buffer = vec![0u8; size as usize];
- let mut splits = Vec::new();
- let mut total_read = 0;
+ let mut boundaries = Vec::new();
+ let mut total_offset = 0;
loop {
let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
@@ -25,7 +25,7 @@ where
.map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
if bytes_read == 0 {
- break Ok(splits);
+ break Ok(boundaries);
}
// Process chunking on the buffer slice
@@ -38,9 +38,83 @@ where
.await;
if let Some(offset) = chunk_result {
- splits.push(total_read + offset);
+ boundaries.push(total_offset + offset);
}
- total_read += bytes_read as u32;
+ total_offset += bytes_read as u32;
+ }
+}
+
+pub async fn chunk_stream_process<T, F, C>(
+ path: &Path,
+ stream_data: &mut T,
+ size: u32,
+ mut callback: C,
+ params: &HashMap<&str, &str>,
+ chunk_func: F,
+) -> Result<(), ChunkFailed>
+where
+ T: Default,
+ F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
+ C: FnMut(
+ Vec<u8>,
+ ) -> std::pin::Pin<
+ Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>,
+ >,
+{
+ let mut file = tokio::fs::File::open(path)
+ .await
+ .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
+ let mut buffer = vec![0u8; size as usize];
+
+ let mut accumulated_chunk = Vec::new();
+ let mut processed_in_accumulator = 0;
+
+ loop {
+ let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
+ .await
+ .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
+
+ if bytes_read == 0 {
+ if !accumulated_chunk.is_empty() {
+ callback(accumulated_chunk)
+ .await
+ .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;
+ }
+ break Ok(());
+ }
+
+ accumulated_chunk.extend_from_slice(&buffer[..bytes_read]);
+
+ loop {
+ let unprocessed_data = &accumulated_chunk[processed_in_accumulator..];
+
+ let chunk_result = chunk_func(
+ unprocessed_data,
+ unprocessed_data.len() as u32,
+ stream_data,
+ params,
+ )
+ .await;
+
+ match chunk_result {
+ Some(offset) => {
+ let chunk_end = processed_in_accumulator + offset as usize;
+
+ let chunk_data = accumulated_chunk[..chunk_end].to_vec();
+
+ callback(chunk_data)
+ .await
+ .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;
+
+ accumulated_chunk.drain(..chunk_end);
+ processed_in_accumulator = 0;
+ }
+ None => {
+ processed_in_accumulator = accumulated_chunk.len();
+ break;
+ }
+ }
+ }
}
}