use std::{collections::HashMap, path::Path, sync::Arc}; use crate::{ chunker::{ context::ButckContext, rw::{ error::ButckRWErrorKind, storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path}, }, }, storage::{get_index_file_name, simple::display_boundaries}, }; use butck_policies::chunk_stream_with; use just_progress::progress; use log::{error, info, trace}; use tokio::sync::Mutex; pub async fn write_file_stream( path: &Path, stream_read_size: u32, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { trace!( "write_file_stream: Starting stream write for {}", path.display() ); // Check if policy is specified let policy_name = ctx.policy_name.as_ref().ok_or_else(|| { error!("No chunking policy specified for stream write"); ButckRWErrorKind::ChunkingPolicyNotSpecified })?; // Create progress bar let progress_name = format!( "Write `{}`", path.file_name().unwrap_or_default().to_string_lossy() ); progress::update_progress(&progress_name, 0.0); // Collect chunk information let chunk_infos = Arc::new(Mutex::new(Vec::new())); let chunk_counter = Arc::new(Mutex::new(0usize)); let storage_dir = ctx.storage_path.as_ref().unwrap().clone(); let chunk_hash = ctx.chunk_hash; // If only displaying boundaries, use chunk_stream_display_boundaries if ctx.display_boundaries { let boundaries = butck_policies::chunk_stream_display_boundaries( policy_name, stream_read_size, path, params, ) .await .map_err(|e| { error!("Stream chunking failed: {}", e); ButckRWErrorKind::ChunkingFailed(e.to_string()) })?; // Calculate total file size by reading the file let total_bytes = tokio::fs::metadata(path) .await .map_err(|e| { error!("Failed to get file metadata: {}", e); ButckRWErrorKind::IOError(e) })? .len() as usize; // Display boundaries information display_boundaries(&boundaries, total_bytes).await; return Ok(()); } // Call chunk_stream_with with callback to write chunks chunk_stream_with( policy_name, stream_read_size, path, |chunk_data: Vec| { let storage_dir = storage_dir.clone(); let chunk_hash = chunk_hash; let progress_name = progress_name.clone(); let chunk_infos = Arc::clone(&chunk_infos); let chunk_counter = Arc::clone(&chunk_counter); Box::pin(async move { // Increment chunk counter let mut counter = chunk_counter.lock().await; let chunk_index = *counter; *counter += 1; // Compute hash let hash_bytes = chunk_hash.hash(&chunk_data); let hash_hex = hex::encode(hash_bytes); // Build file path let file_path = get_chunk_path(&storage_dir, &hash_hex); // Create directory if needed if let Some(parent_dir) = file_path.parent() { tokio::fs::create_dir_all(parent_dir).await?; } // Write chunk if file doesn't exist if !file_path.exists() { tokio::fs::write(&file_path, &chunk_data).await?; } // Store chunk info let mut infos = chunk_infos.lock().await; infos.push(ChunkInfo { index: chunk_index, hash: hash_hex, }); // Update progress progress::increase(&progress_name, 0.01); // Small increment per chunk Ok(()) }) }, params, ) .await .map_err(|e| { error!("Stream chunking failed: {}", e); ButckRWErrorKind::ChunkingFailed(e.to_string()) })?; // Complete progress progress::complete(&progress_name); // Get chunk infos let chunk_infos = chunk_infos.lock().await; // Write index file let index_file_name = get_index_file_name(path, ctx); // Use the unified bidx file writer write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| { error!("Failed to write index file: {}", e); ButckRWErrorKind::IndexFileWriteFailed(e.to_string()) })?; info!( "Stream write completed for {}: {} chunks written", path.display(), chunk_infos.len() ); Ok(()) }