1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
use std::{
collections::HashMap,
path::Path,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
};
use crate::{
chunker::{
context::ButckContext,
rw::{
error::ButckRWErrorKind,
storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path},
},
},
storage::{display_boundaries, get_index_file_name},
};
use butck_policies::chunk_stream_with;
use just_progress::progress;
use log::{debug, error, trace};
use tokio::sync::Mutex;
pub async fn write_file_stream(
path: &Path,
stream_read_size: u32,
ctx: &ButckContext,
params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
trace!(
"write_file_stream: Starting stream write for {}",
path.display()
);
let file_size = tokio::fs::metadata(path)
.await
.map_err(|e| {
error!("Failed to get file metadata: {}", e);
ButckRWErrorKind::IOError(e)
})?
.len() as f32;
let readed: Arc<AtomicU32> = Arc::new(AtomicU32::new(0));
// Check if policy is specified
let policy_name = ctx.policy_name.as_ref().ok_or_else(|| {
error!("No chunking policy specified for stream write");
ButckRWErrorKind::ChunkingPolicyNotSpecified
})?;
// Create progress bar
let progress_name = format!(
"Write `{}`",
path.file_name().unwrap_or_default().to_string_lossy()
);
progress::update_progress(&progress_name, 0.0);
// Collect chunk information
let chunk_infos = Arc::new(Mutex::new(Vec::new()));
let chunk_counter = Arc::new(Mutex::new(0usize));
let storage_dir = ctx.storage_path.as_ref().unwrap().clone();
let chunk_hash = ctx.chunk_hash;
// If only displaying boundaries, use chunk_stream_display_boundaries
if ctx.display_boundaries {
let boundaries = butck_policies::chunk_stream_display_boundaries(
policy_name,
stream_read_size,
path,
params,
)
.await
.map_err(|e| {
error!("Stream chunking failed: {}", e);
ButckRWErrorKind::ChunkingFailed(e.to_string())
})?;
// Calculate total file size by reading the file
let total_bytes = tokio::fs::metadata(path)
.await
.map_err(|e| {
error!("Failed to get file metadata: {}", e);
ButckRWErrorKind::IOError(e)
})?
.len() as usize;
// Display boundaries information
display_boundaries(&boundaries, total_bytes).await;
return Ok(());
}
// Call chunk_stream_with with callback to write chunks
chunk_stream_with(
policy_name,
stream_read_size,
path,
|chunk_data: Vec<u8>| {
let readed = readed.clone();
let storage_dir = storage_dir.clone();
let chunk_hash = chunk_hash;
let progress_name = progress_name.clone();
let chunk_infos = Arc::clone(&chunk_infos);
let chunk_counter = Arc::clone(&chunk_counter);
Box::pin(async move {
// Update readed
readed.fetch_add(chunk_data.len() as u32, Ordering::SeqCst);
let readed_f32 = readed.load(Ordering::SeqCst) as f32;
// Update progress
let progress = readed_f32 / file_size;
progress::update_progress(&progress_name, progress);
// Increment chunk counter
let mut counter = chunk_counter.lock().await;
let chunk_index = *counter;
*counter += 1;
// Compute hash
let hash_bytes = chunk_hash.hash(&chunk_data);
let hash_hex = hex::encode(hash_bytes);
// Build file path
let file_path = get_chunk_path(&storage_dir, &hash_hex);
// Create directory if needed
if let Some(parent_dir) = file_path.parent() {
tokio::fs::create_dir_all(parent_dir).await?;
}
// Write chunk if file doesn't exist
if !file_path.exists() {
tokio::fs::write(&file_path, &chunk_data).await?;
}
// Store chunk info
let mut infos = chunk_infos.lock().await;
infos.push(ChunkInfo {
index: chunk_index,
hash: hash_hex,
});
Ok(())
})
},
params,
)
.await
.map_err(|e| {
error!("Stream chunking failed: {}", e);
ButckRWErrorKind::ChunkingFailed(e.to_string())
})?;
// Complete progress
progress::complete(&progress_name);
// Get chunk infos
let chunk_infos = chunk_infos.lock().await;
// Write index file
let index_file_name = get_index_file_name(path, ctx);
// Use the unified bidx file writer
write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| {
error!("Failed to write index file: {}", e);
ButckRWErrorKind::IndexFileWriteFailed(e.to_string())
})?;
debug!(
"Stream write completed for {}: {} chunks written",
path.display(),
chunk_infos.len()
);
Ok(())
}
|