summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/chunker/rw/storage/write/stream.rs')
-rw-r--r--src/chunker/rw/storage/write/stream.rs151
1 files changed, 147 insertions, 4 deletions
diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs
index 020cfcd..092cee7 100644
--- a/src/chunker/rw/storage/write/stream.rs
+++ b/src/chunker/rw/storage/write/stream.rs
@@ -1,12 +1,155 @@
-use std::{collections::HashMap, path::PathBuf};
+use std::{collections::HashMap, path::Path, sync::Arc};
-use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind};
+use crate::{
+ chunker::{
+ context::ButckContext,
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path},
+ },
+ },
+ storage::{get_index_file_name, simple::display_boundaries},
+};
+use butck_policies::chunk_stream_with;
+use just_progress::progress;
+use log::{error, info, trace};
+use tokio::sync::Mutex;
pub async fn write_file_stream(
- path: &PathBuf,
+ path: &Path,
stream_read_size: u32,
ctx: &ButckContext,
params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
- todo!()
+ trace!(
+ "write_file_stream: Starting stream write for {}",
+ path.display()
+ );
+
+ // Check if policy is specified
+ let policy_name = ctx.policy_name.as_ref().ok_or_else(|| {
+ error!("No chunking policy specified for stream write");
+ ButckRWErrorKind::ChunkingPolicyNotSpecified
+ })?;
+
+ // Create progress bar
+ let progress_name = format!(
+ "Write `{}`",
+ path.file_name().unwrap_or_default().to_string_lossy()
+ );
+ progress::update_progress(&progress_name, 0.0);
+
+ // Collect chunk information
+ let chunk_infos = Arc::new(Mutex::new(Vec::new()));
+ let chunk_counter = Arc::new(Mutex::new(0usize));
+ let output_dir = ctx.output_dir.clone();
+ let chunk_hash = ctx.chunk_hash;
+
+ // If only displaying boundaries, use chunk_stream_display_boundaries
+ if ctx.display_boundaries {
+ let boundaries = butck_policies::chunk_stream_display_boundaries(
+ policy_name,
+ stream_read_size,
+ path,
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Calculate total file size by reading the file
+ let total_bytes = tokio::fs::metadata(path)
+ .await
+ .map_err(|e| {
+ error!("Failed to get file metadata: {}", e);
+ ButckRWErrorKind::IOError(e)
+ })?
+ .len() as usize;
+
+ // Display boundaries information
+ display_boundaries(&boundaries, total_bytes).await;
+
+ return Ok(());
+ }
+
+ // Call chunk_stream_with with callback to write chunks
+ chunk_stream_with(
+ policy_name,
+ stream_read_size,
+ path,
+ |chunk_data: Vec<u8>| {
+ let output_dir = output_dir.clone();
+ let chunk_hash = chunk_hash;
+ let progress_name = progress_name.clone();
+ let chunk_infos = Arc::clone(&chunk_infos);
+ let chunk_counter = Arc::clone(&chunk_counter);
+
+ Box::pin(async move {
+ // Increment chunk counter
+ let mut counter = chunk_counter.lock().await;
+ let chunk_index = *counter;
+ *counter += 1;
+
+ // Compute hash
+ let hash_bytes = chunk_hash.hash(&chunk_data);
+ let hash_hex = hex::encode(hash_bytes);
+
+ // Build file path
+ let file_path = get_chunk_path(&output_dir, &hash_hex);
+
+ // Create directory if needed
+ if let Some(parent_dir) = file_path.parent() {
+ tokio::fs::create_dir_all(parent_dir).await?;
+ }
+
+ // Write chunk if file doesn't exist
+ if !file_path.exists() {
+ tokio::fs::write(&file_path, &chunk_data).await?;
+ }
+
+ // Store chunk info
+ let mut infos = chunk_infos.lock().await;
+ infos.push(ChunkInfo {
+ index: chunk_index,
+ hash: hash_hex,
+ });
+
+ // Update progress
+ progress::increase(&progress_name, 0.01); // Small increment per chunk
+
+ Ok(())
+ })
+ },
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Complete progress
+ progress::complete(&progress_name);
+
+ // Get chunk infos
+ let chunk_infos = chunk_infos.lock().await;
+
+ // Write index file
+ let index_file_name = get_index_file_name(path, ctx);
+
+ // Use the unified bidx file writer
+ write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| {
+ error!("Failed to write index file: {}", e);
+ ButckRWErrorKind::IndexFileWriteFailed(e.to_string())
+ })?;
+
+ info!(
+ "Stream write completed for {}: {} chunks written",
+ path.display(),
+ chunk_infos.len()
+ );
+
+ Ok(())
}