use futures::future::join_all; use just_progress::progress; use log::{debug, error, trace}; use std::{ collections::HashMap, path::{Path, PathBuf}, }; use tokio::{fs::File, io::AsyncReadExt}; use crate::{ chunker::{ context::ButckContext, rw::{ error::ButckRWErrorKind, storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash}, }, }, storage::{display_boundaries, get_index_file_name}, }; pub async fn write_file_simple( path: &PathBuf, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { read_file(path, ctx, params).await?; Ok(()) } async fn read_file( path: &PathBuf, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { let mut file = File::open(path).await?; // Use butck_policies::chunk_with to locate chunk boundaries in the file if ctx.memmap_read { let mmap = unsafe { memmap2::Mmap::map(&file)? }; let raw_data = &mmap[..]; let (chunk_boundaries, total_bytes) = (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); // If output boundaries, do not execute actual write logic if ctx.display_boundaries { display_boundaries(&chunk_boundaries, total_bytes).await; return Ok(()); } else { write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; } } else { let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; let raw_data = &contents[..]; let (chunk_boundaries, total_bytes) = (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); // If output boundaries, do not execute actual write logic if ctx.display_boundaries { display_boundaries(&chunk_boundaries, total_bytes).await; return Ok(()); } else { write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; } }; progress::clear_all(); Ok(()) } async fn write_file_to_storage( path: &Path, ctx: &ButckContext, chunk_boundaries: Vec, raw_data: &[u8], ) -> Result<(), ButckRWErrorKind> { let output_index_file = get_index_file_name(path, ctx); let chunk_count = chunk_boundaries.len() + 1; let progress_name = format!("Write `{}`", path.display()); progress::update_progress(progress_name.as_str(), 0.0); let step = 1.0 / chunk_count as f64; trace!("chunks_count={}", chunk_count); trace!("chunk_hash={:?}", ctx.chunk_hash); trace!("file_size={}", raw_data.len()); trace!("output_index_file={}", output_index_file.display()); trace!("policy_name={:?}", ctx.policy_name); trace!( "storage_dir={}", ctx.storage_path.as_ref().unwrap().display() ); debug!( "{} chunks will be written to {}", chunk_count, ctx.storage_path.as_ref().unwrap().display() ); let storage_dir = ctx.storage_path.as_ref().unwrap().clone(); tokio::fs::create_dir_all(&storage_dir).await?; trace!("Storage directory created or already exists"); let mut tasks = Vec::new(); let mut start = 0; let mut chunk_index = 0; trace!("Processing chunk boundaries:"); for &boundary in &chunk_boundaries { let end = boundary as usize; if start < end && end <= raw_data.len() { let chunk_data = &raw_data[start..end]; trace!( "Chunk {}: bytes {}..{} (size: {} bytes)", chunk_index, start, end - 1, end - start ); let params = WriteChunkParams { progress_name: progress_name.clone(), step, chunk_data: chunk_data.to_vec(), output_dir: ctx.storage_path.as_ref().unwrap().clone(), chunk_hash: ctx.chunk_hash, chunk_index, }; tasks.push(write_chunk(params)); chunk_index += 1; } else { trace!( "Skipping invalid chunk boundary: start={}, end={}, data_len={}", start, end, raw_data.len() ); } start = end; } if start < raw_data.len() { let chunk_data = &raw_data[start..]; trace!( "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk", chunk_index, start, raw_data.len() - 1, raw_data.len() - start ); let params = WriteChunkParams { progress_name: progress_name.clone(), step, chunk_data: chunk_data.to_vec(), output_dir: ctx.storage_path.as_ref().unwrap().clone(), chunk_hash: ctx.chunk_hash, chunk_index, }; tasks.push(write_chunk(params)); } trace!("Total chunks prepared for writing: {}", tasks.len()); trace!("Starting parallel write of {} chunks", tasks.len()); let results = join_all(tasks).await; trace!("All write tasks completed"); let mut success_count = 0; let mut chunk_infos = Vec::new(); for result in results { match result { Ok(chunk_info) => { success_count += 1; chunk_infos.push(chunk_info); } Err(e) => { trace!("Chunk write failed: {:?}", e); return Err(e); } } } debug!("All {} chunks written successfully", success_count); // Write index file trace!("Writing index file to: {}", output_index_file.display()); if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await { error!("Failed to write index file: {}", e); return Err(ButckRWErrorKind::IOError(e)); } debug!("Index file written to: {}", output_index_file.display()); trace!("write_file_to_storage completed successfully"); progress::complete(progress_name.as_str()); Ok(()) } struct WriteChunkParams { progress_name: String, step: f64, chunk_data: Vec, output_dir: PathBuf, chunk_hash: ChunkWriteHash, chunk_index: usize, } async fn write_chunk(params: WriteChunkParams) -> Result { trace!( "write_chunk[{}]: Starting, data size: {} bytes", params.chunk_index, params.chunk_data.len() ); trace!( "write_chunk[{}]: Computing hash with algorithm: {:?}", params.chunk_index, params.chunk_hash ); let hash_bytes = params.chunk_hash.hash(¶ms.chunk_data); trace!( "write_chunk[{}]: Hash computed: {:?}", params.chunk_index, hash_bytes ); let hash_hex = hex::encode(hash_bytes); trace!( "write_chunk[{}]: Hash hex: {}", params.chunk_index, hash_hex ); let file_path = storage::get_chunk_path(¶ms.output_dir, &hash_hex); if let Some(parent_dir) = file_path.parent() { trace!( "write_chunk[{}]: Creating directory structure: {}", params.chunk_index, parent_dir.display() ); tokio::fs::create_dir_all(parent_dir).await?; trace!("write_chunk[{}]: Directory created", params.chunk_index); } trace!( "write_chunk[{}]: File path: {}", params.chunk_index, file_path.display() ); trace!( "write_chunk[{}]: Writing {} bytes to file", params.chunk_index, params.chunk_data.len() ); progress::increase(¶ms.progress_name, params.step as f32); if !file_path.exists() { tokio::fs::write(&file_path, ¶ms.chunk_data).await?; } else { trace!( "write_chunk[{}]: File already exists, skipping", params.chunk_index ); } trace!( "write_chunk[{}]: File written successfully", params.chunk_index ); Ok(ChunkInfo { index: params.chunk_index, hash: hash_hex, }) } async fn get_boundaries( raw_data: &[u8], ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result, ButckRWErrorKind> { let policy_name = ctx.policy_name.as_ref().unwrap().as_str(); match butck_policies::chunk_with(policy_name, raw_data, params).await { Ok(s) => Ok(s), Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)), } } async fn write_index_file( index_path: &Path, chunk_infos: &[ChunkInfo], original_file_path: &Path, ) -> Result<(), std::io::Error> { write_bidx_file(index_path, chunk_infos, original_file_path) }