use futures::future::join_all; use just_progress::progress; use log::{error, info, trace}; use std::{collections::HashMap, path::PathBuf}; use tokio::{fs::File, io::AsyncReadExt}; use crate::{ chunker::{ context::ButckContext, rw::{error::ButckRWErrorKind, storage}, }, core::hash::ChunkWriteHash, storage::get_index_file_name, utils::size_display::size_display, }; pub async fn write_file_simple( path: &PathBuf, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { read_file(path, ctx, params).await?; Ok(()) } async fn read_file( path: &PathBuf, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { let mut file = File::open(path).await?; // Use butck_policies::chunk_with to locate chunk boundaries in the file if ctx.memmap_read { let mmap = unsafe { memmap2::Mmap::map(&file)? }; let raw_data = &mmap[..]; let (chunk_boundaries, total_bytes) = (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); // If output boundaries, do not execute actual write logic if ctx.display_boundaries { display_boundaries(&chunk_boundaries, total_bytes).await; return Ok(()); } else { write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; } } else { let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; let raw_data = &contents[..]; let (chunk_boundaries, total_bytes) = (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); // If output boundaries, do not execute actual write logic if ctx.display_boundaries { display_boundaries(&chunk_boundaries, total_bytes).await; return Ok(()); } else { write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; } }; progress::clear_all(); Ok(()) } async fn write_file_to_storage( path: &PathBuf, ctx: &ButckContext, chunk_boundaries: Vec, raw_data: &[u8], ) -> Result<(), ButckRWErrorKind> { let output_index_file = get_index_file_name(path, ctx); let chunk_count = chunk_boundaries.len() + 1; let progress_name = format!("Write `{}`", path.display()); progress::update_progress(progress_name.as_str(), 0.0); let step = 1.0 / chunk_count as f64; trace!("chunks_count={}", chunk_count); trace!("chunk_hash={:?}", ctx.chunk_hash); trace!("file_size={}", raw_data.len()); trace!("output_index_file={}", output_index_file.display()); trace!("policy_name={:?}", ctx.policy_name); trace!("storage_dir={}", ctx.output_dir.display()); info!( "{} chunks will be written to {}", chunk_count, ctx.output_dir.display() ); tokio::fs::create_dir_all(&ctx.output_dir).await?; trace!("Output directory created or already exists"); let mut tasks = Vec::new(); let mut start = 0; let mut chunk_index = 0; trace!("Processing chunk boundaries:"); for &boundary in &chunk_boundaries { let end = boundary as usize; if start < end && end <= raw_data.len() { let chunk_data = &raw_data[start..end]; trace!( "Chunk {}: bytes {}..{} (size: {} bytes)", chunk_index, start, end - 1, end - start ); tasks.push(write_chunk( progress_name.as_str(), step, chunk_data, &ctx.output_dir, &ctx.chunk_hash, chunk_index, start, end, )); chunk_index += 1; } else { trace!( "Skipping invalid chunk boundary: start={}, end={}, data_len={}", start, end, raw_data.len() ); } start = end; } if start < raw_data.len() { let chunk_data = &raw_data[start..]; trace!( "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk", chunk_index, start, raw_data.len() - 1, raw_data.len() - start ); tasks.push(write_chunk( progress_name.as_str(), step, chunk_data, &ctx.output_dir, &ctx.chunk_hash, chunk_index, start, raw_data.len(), )); } trace!("Total chunks prepared for writing: {}", tasks.len()); trace!("Starting parallel write of {} chunks", tasks.len()); let results = join_all(tasks).await; trace!("All write tasks completed"); let mut success_count = 0; let mut chunk_infos = Vec::new(); for result in results { match result { Ok(chunk_info) => { success_count += 1; chunk_infos.push(chunk_info); } Err(e) => { trace!("Chunk write failed: {:?}", e); return Err(e); } } } info!("All {} chunks written successfully", success_count); // Write index file trace!("Writing index file to: {}", output_index_file.display()); if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await { error!("Failed to write index file: {}", e); return Err(ButckRWErrorKind::IOError(e)); } info!("Index file written to: {}", output_index_file.display()); trace!("write_file_to_storage completed successfully"); progress::complete(progress_name.as_str()); Ok(()) } async fn write_chunk( progress_name: &str, step: f64, chunk_data: &[u8], output_dir: &PathBuf, chunk_hash: &ChunkWriteHash, chunk_index: usize, start: usize, end: usize, ) -> Result { trace!( "write_chunk[{}]: Starting, data size: {} bytes", chunk_index, chunk_data.len() ); trace!( "write_chunk[{}]: Computing hash with algorithm: {:?}", chunk_index, chunk_hash ); let hash_bytes = chunk_hash.hash(chunk_data); trace!( "write_chunk[{}]: Hash computed: {:?}", chunk_index, hash_bytes ); let hash_hex = hex::encode(hash_bytes); trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex); let file_path = storage::get_chunk_path(output_dir, &hash_hex); if let Some(parent_dir) = file_path.parent() { trace!( "write_chunk[{}]: Creating directory structure: {}", chunk_index, parent_dir.display() ); tokio::fs::create_dir_all(parent_dir).await?; trace!("write_chunk[{}]: Directory created", chunk_index); } trace!( "write_chunk[{}]: File path: {}", chunk_index, file_path.display() ); trace!( "write_chunk[{}]: Writing {} bytes to file", chunk_index, chunk_data.len() ); if !file_path.exists() { tokio::fs::write(&file_path, chunk_data).await?; } else { trace!( "write_chunk[{}]: File already exists, skipping", chunk_index ); } trace!("write_chunk[{}]: File written successfully", chunk_index); progress::increase(progress_name, step as f32); Ok(crate::chunker::rw::storage::ChunkInfo { index: chunk_index, hash: hash_hex, size: chunk_data.len(), start, end, }) } async fn get_boundaries<'a>( raw_data: &[u8], ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result, ButckRWErrorKind> { let policy_name = ctx.policy_name.as_ref().unwrap().as_str(); match butck_policies::chunk_with(policy_name, raw_data, params).await { Ok(s) => Ok(s), Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)), } } async fn write_index_file( index_path: &PathBuf, chunk_infos: &[crate::chunker::rw::storage::ChunkInfo], original_file_path: &PathBuf, ) -> Result<(), std::io::Error> { use std::io::Write; let file = std::fs::File::create(index_path)?; let mut writer = std::io::BufWriter::new(file); // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes use crate::chunker::constants::BUTCK_INDEX_MAGIC; // Write magic bytes writer.write_all(&BUTCK_INDEX_MAGIC)?; // Get original filename as bytes let filename = original_file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown"); let filename_bytes = filename.as_bytes(); // Write filename length as u16 (little-endian) if filename_bytes.len() > u16::MAX as usize { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!("Filename too long: {} bytes", filename_bytes.len()), )); } let filename_len = filename_bytes.len() as u16; writer.write_all(&filename_len.to_le_bytes())?; // Write filename bytes writer.write_all(filename_bytes)?; // Write chunk hashes: [u8; 32][u8; 32][u8; 32]... for chunk_info in chunk_infos { // Convert hex hash to bytes match hex::decode(&chunk_info.hash) { Ok(hash_bytes) => { if hash_bytes.len() == 32 { writer.write_all(&hash_bytes)?; } else { // Pad or truncate to 32 bytes if needed let mut fixed_hash = [0u8; 32]; let len = hash_bytes.len().min(32); fixed_hash[..len].copy_from_slice(&hash_bytes[..len]); writer.write_all(&fixed_hash)?; } } Err(e) => { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("Failed to decode hash hex: {}", e), )); } } } Ok(()) } async fn display_boundaries(chunk_boundaries: &Vec, total_bytes: usize) { let total_chunks = chunk_boundaries.len() + 1; let (total_value, total_unit) = size_display(total_bytes); info!( "{} chunks, ({:.2} {}, {})", total_chunks, total_value, total_unit, total_bytes ); let mut start = 0; chunk_boundaries.iter().for_each(|p| { let next = *p as usize; let (size_value, size_unit) = size_display(next - start); info!( "{} - {} (size: {:.2} {})", start, next - 1, size_value, size_unit ); start = next; }); let last = start; let r#final = total_bytes; let (size_value, size_unit) = size_display(total_bytes - start); info!( "{} - {} (size: {:.2} {})", last, r#final, size_value, size_unit ); }