summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write/stream.rs
blob: 092cee7ba713af862d0f27dd6a14ba5db86f2ef1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use std::{collections::HashMap, path::Path, sync::Arc};

use crate::{
    chunker::{
        context::ButckContext,
        rw::{
            error::ButckRWErrorKind,
            storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path},
        },
    },
    storage::{get_index_file_name, simple::display_boundaries},
};
use butck_policies::chunk_stream_with;
use just_progress::progress;
use log::{error, info, trace};
use tokio::sync::Mutex;

pub async fn write_file_stream(
    path: &Path,
    stream_read_size: u32,
    ctx: &ButckContext,
    params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
    trace!(
        "write_file_stream: Starting stream write for {}",
        path.display()
    );

    // Check if policy is specified
    let policy_name = ctx.policy_name.as_ref().ok_or_else(|| {
        error!("No chunking policy specified for stream write");
        ButckRWErrorKind::ChunkingPolicyNotSpecified
    })?;

    // Create progress bar
    let progress_name = format!(
        "Write `{}`",
        path.file_name().unwrap_or_default().to_string_lossy()
    );
    progress::update_progress(&progress_name, 0.0);

    // Collect chunk information
    let chunk_infos = Arc::new(Mutex::new(Vec::new()));
    let chunk_counter = Arc::new(Mutex::new(0usize));
    let output_dir = ctx.output_dir.clone();
    let chunk_hash = ctx.chunk_hash;

    // If only displaying boundaries, use chunk_stream_display_boundaries
    if ctx.display_boundaries {
        let boundaries = butck_policies::chunk_stream_display_boundaries(
            policy_name,
            stream_read_size,
            path,
            params,
        )
        .await
        .map_err(|e| {
            error!("Stream chunking failed: {}", e);
            ButckRWErrorKind::ChunkingFailed(e.to_string())
        })?;

        // Calculate total file size by reading the file
        let total_bytes = tokio::fs::metadata(path)
            .await
            .map_err(|e| {
                error!("Failed to get file metadata: {}", e);
                ButckRWErrorKind::IOError(e)
            })?
            .len() as usize;

        // Display boundaries information
        display_boundaries(&boundaries, total_bytes).await;

        return Ok(());
    }

    // Call chunk_stream_with with callback to write chunks
    chunk_stream_with(
        policy_name,
        stream_read_size,
        path,
        |chunk_data: Vec<u8>| {
            let output_dir = output_dir.clone();
            let chunk_hash = chunk_hash;
            let progress_name = progress_name.clone();
            let chunk_infos = Arc::clone(&chunk_infos);
            let chunk_counter = Arc::clone(&chunk_counter);

            Box::pin(async move {
                // Increment chunk counter
                let mut counter = chunk_counter.lock().await;
                let chunk_index = *counter;
                *counter += 1;

                // Compute hash
                let hash_bytes = chunk_hash.hash(&chunk_data);
                let hash_hex = hex::encode(hash_bytes);

                // Build file path
                let file_path = get_chunk_path(&output_dir, &hash_hex);

                // Create directory if needed
                if let Some(parent_dir) = file_path.parent() {
                    tokio::fs::create_dir_all(parent_dir).await?;
                }

                // Write chunk if file doesn't exist
                if !file_path.exists() {
                    tokio::fs::write(&file_path, &chunk_data).await?;
                }

                // Store chunk info
                let mut infos = chunk_infos.lock().await;
                infos.push(ChunkInfo {
                    index: chunk_index,
                    hash: hash_hex,
                });

                // Update progress
                progress::increase(&progress_name, 0.01); // Small increment per chunk

                Ok(())
            })
        },
        params,
    )
    .await
    .map_err(|e| {
        error!("Stream chunking failed: {}", e);
        ButckRWErrorKind::ChunkingFailed(e.to_string())
    })?;

    // Complete progress
    progress::complete(&progress_name);

    // Get chunk infos
    let chunk_infos = chunk_infos.lock().await;

    // Write index file
    let index_file_name = get_index_file_name(path, ctx);

    // Use the unified bidx file writer
    write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| {
        error!("Failed to write index file: {}", e);
        ButckRWErrorKind::IndexFileWriteFailed(e.to_string())
    })?;

    info!(
        "Stream write completed for {}: {} chunks written",
        path.display(),
        chunk_infos.len()
    );

    Ok(())
}