use futures::future::join_all; use just_progress::progress; use log::{error, info, trace}; use std::path::{Path, PathBuf}; use tokio::{fs::File, io::AsyncWriteExt}; use crate::{ chunker::{ constants::BUTCK_INDEX_FILE_SUFFIX, context::ButckContext, rw::{ error::{ButckRWError, ButckRWErrorKind}, storage, }, }, utils::size_display::size_display, }; pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> { if ctx.storage_path.is_none() { return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx)); } if ctx.file_paths.is_empty() { return Err( ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx), ); } let tasks: Vec<_> = ctx .file_paths .iter() .map(|bidx_path| async { trace!( "Preparing to rebuild from bidx file `{}`", bidx_path.display() ); rebuild_from_bidx(bidx_path, &ctx).await }) .collect(); let results = join_all(tasks).await; for result in results { if let Err(e) = result { return Err(e.pack(ctx)); } } Ok(()) } async fn rebuild_from_bidx( bidx_path: &PathBuf, ctx: &ButckContext, ) -> Result<(), ButckRWErrorKind> { // Validate file suffix if let Some(suffix) = bidx_path.extension() && suffix != BUTCK_INDEX_FILE_SUFFIX { return Err(ButckRWErrorKind::InvalidBidxFormat); } info!("Rebuilding from bidx file: {}", bidx_path.display()); // Use the unified bidx file reader let (original_filename, chunk_infos) = crate::chunker::rw::storage::bidx::read_bidx_file(bidx_path).map_err(|e| { error!("Failed to read bidx file: {}", e); ButckRWErrorKind::InvalidBidxFormat })?; trace!("Original filename from bidx: {}", original_filename); let chunk_count = chunk_infos.len(); info!("Found {} chunks in bidx file", chunk_count); // Extract hash bytes from chunk infos let mut chunk_hashes = Vec::with_capacity(chunk_count); for chunk_info in &chunk_infos { match hex::decode(&chunk_info.hash) { Ok(hash_bytes) => { if hash_bytes.len() == 32 { let hash_array: [u8; 32] = hash_bytes .try_into() .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; chunk_hashes.push(hash_array); } else { error!("Invalid hash length: {} bytes", hash_bytes.len()); return Err(ButckRWErrorKind::InvalidBidxFormat); } } Err(e) => { error!("Failed to decode hash hex: {}", e); return Err(ButckRWErrorKind::InvalidBidxFormat); } } } trace!("Parsed {} chunk hashes", chunk_hashes.len()); // Determine output file path let output_path = if let Some(output_file) = &ctx.output_file { output_file.clone() } else { // Use the original filename read from the bidx file storage::generate_unique_path(&ctx.output_dir, &original_filename) }; info!("Rebuilding file to: {}", output_path.display()); let progress_name = format!("Rebuild `{}`", output_path.display()); progress::update_progress(progress_name.as_str(), 0.0); let step = 1.0 / chunk_count as f64; let storage_dir = ctx.storage_path.as_ref().unwrap(); let mut tasks = Vec::with_capacity(chunk_count); for (index, hash_bytes) in chunk_hashes.iter().enumerate() { let hash_hex = hex::encode(hash_bytes); tasks.push(read_chunk( progress_name.as_str(), step, hash_hex, &storage_dir, index, )); } trace!("Starting parallel read of {} chunks", tasks.len()); let results = join_all(tasks).await; trace!("All read tasks completed"); // Collect chunk data and verify order let mut chunk_data_list = Vec::with_capacity(chunk_count); let mut success_count = 0; for (index, result) in results.into_iter().enumerate() { match result { Ok(chunk_data) => { let chunk_size = chunk_data.len(); success_count += 1; chunk_data_list.push((index, chunk_data)); trace!( "Chunk {} read successfully, size: {} bytes", index, chunk_size ); } Err(e) => { error!("Failed to read chunk {}: {:?}", index, e); return Err(e); } } } if success_count != chunk_count { return Err(ButckRWErrorKind::ChunkNotFound(format!( "Only {}/{} chunks found in storage", success_count, chunk_count ))); } info!("All {} chunks read successfully", success_count); // Sort by index and concatenate files chunk_data_list.sort_by_key(|(index, _)| *index); // Calculate total size let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum(); let (total_value, total_unit) = size_display(total_size); info!( "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)", chunk_count, total_value, total_unit, total_size ); // Write to output file trace!("Writing to output file: {}", output_path.display()); let mut output_file = File::create(&output_path).await?; for (index, chunk_data) in chunk_data_list { trace!("Writing chunk {} ({} bytes)", index, chunk_data.len()); output_file.write_all(&chunk_data).await?; progress::increase(progress_name.as_str(), step as f32); } output_file.flush().await?; info!("File successfully rebuilt: {}", output_path.display()); progress::complete(progress_name.as_str()); Ok(()) } /// Read a single chunk from storage async fn read_chunk( progress_name: &str, step: f64, hash_hex: String, storage_dir: &Path, chunk_index: usize, ) -> Result, ButckRWErrorKind> { trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex); // Build chunk file path let file_path = storage::get_chunk_path(storage_dir, &hash_hex); trace!( "read_chunk[{}]: Looking for file at: {}", chunk_index, file_path.display() ); // Read chunk file match tokio::fs::read(&file_path).await { Ok(data) => { trace!( "read_chunk[{}]: Read {} bytes successfully", chunk_index, data.len() ); progress::increase(progress_name, step as f32); Ok(data) } Err(e) if e.kind() == std::io::ErrorKind::NotFound => { trace!("read_chunk[{}]: File not found", chunk_index); Err(ButckRWErrorKind::ChunkNotFound(format!( "Chunk {} (hash: {}) not found in storage", chunk_index, hash_hex ))) } Err(e) => { trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e); Err(ButckRWErrorKind::IOError(e)) } } }