summaryrefslogtreecommitdiff
path: root/policy/_policies/src/stream_read.rs
blob: 0642ebfce8ebb8b6efdfefb17045d4fc03f1f9fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::error::ChunkFailed;
use std::{collections::HashMap, path::Path};

pub async fn chunk_stream_display_boundaries<T, F>(
    path: &Path,
    stream_data: &mut T,
    size: u32,
    params: &HashMap<&str, &str>,
    chunk_func: F,
) -> Result<Vec<u32>, ChunkFailed>
where
    T: Default,
    F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
{
    let mut file = tokio::fs::File::open(path)
        .await
        .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
    let mut buffer = vec![0u8; size as usize];
    let mut boundaries = Vec::new();
    let mut total_offset = 0;

    loop {
        let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
            .await
            .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;

        if bytes_read == 0 {
            break Ok(boundaries);
        }

        // Process chunking on the buffer slice
        let chunk_result = chunk_func(
            &buffer[..bytes_read],
            bytes_read as u32,
            stream_data,
            params,
        )
        .await;

        if let Some(offset) = chunk_result {
            boundaries.push(total_offset + offset);
        }

        total_offset += bytes_read as u32;
    }
}

pub async fn chunk_stream_process<T, F, C>(
    path: &Path,
    stream_data: &mut T,
    size: u32,
    mut callback: C,
    params: &HashMap<&str, &str>,
    chunk_func: F,
) -> Result<(), ChunkFailed>
where
    T: Default,
    F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
    C: FnMut(
        Vec<u8>,
    ) -> std::pin::Pin<
        Box<dyn std::future::Future<Output = Result<(), std::io::Error>> + Send>,
    >,
{
    let mut file = tokio::fs::File::open(path)
        .await
        .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
    let mut buffer = vec![0u8; size as usize];

    let mut accumulated_chunk = Vec::new();
    let mut processed_in_accumulator = 0;

    loop {
        let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
            .await
            .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;

        if bytes_read == 0 {
            if !accumulated_chunk.is_empty() {
                callback(accumulated_chunk)
                    .await
                    .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;
            }
            break Ok(());
        }

        accumulated_chunk.extend_from_slice(&buffer[..bytes_read]);

        loop {
            let unprocessed_data = &accumulated_chunk[processed_in_accumulator..];

            let chunk_result = chunk_func(
                unprocessed_data,
                unprocessed_data.len() as u32,
                stream_data,
                params,
            )
            .await;

            match chunk_result {
                Some(offset) => {
                    let chunk_end = processed_in_accumulator + offset as usize;

                    let chunk_data = accumulated_chunk[..chunk_end].to_vec();

                    callback(chunk_data)
                        .await
                        .map_err(|e| ChunkFailed::CallbackFailed(e.to_string()))?;

                    accumulated_chunk.drain(..chunk_end);
                    processed_in_accumulator = 0;
                }
                None => {
                    processed_in_accumulator = accumulated_chunk.len();
                    break;
                }
            }
        }
    }
}