use futures::future::join_all; use just_progress::progress; use log::{error, info, trace}; use memmap2::Mmap; use std::path::PathBuf; use tokio::{fs::File, io::AsyncWriteExt}; use crate::{ chunker::{ constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC}, context::ButckContext, rw::error::{ButckRWError, ButckRWErrorKind}, rw::storage, }, utils::size_display::size_display, }; pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> { if ctx.storage_path.is_none() { return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx)); } if ctx.file_paths.is_empty() { return Err( ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx), ); } let tasks: Vec<_> = ctx .file_paths .iter() .map(|bidx_path| async { trace!( "Preparing to rebuild from bidx file `{}`", bidx_path.display() ); rebuild_from_bidx(bidx_path, &ctx).await }) .collect(); let results = join_all(tasks).await; for result in results { if let Err(e) = result { return Err(e.pack(ctx)); } } Ok(()) } async fn rebuild_from_bidx( bidx_path: &PathBuf, ctx: &ButckContext, ) -> Result<(), ButckRWErrorKind> { // Validate file extension if let Some(ext) = bidx_path.extension() && ext != BUTCK_INDEX_FILE_SUFFIX { return Err(ButckRWErrorKind::InvalidBidxFormat); } info!("Rebuilding from bidx file: {}", bidx_path.display()); // Read bidx file content let bidx_content = if ctx.memmap_read { let file = File::open(bidx_path).await?; let mmap = unsafe { Mmap::map(&file)? }; mmap.to_vec() } else { tokio::fs::read(bidx_path).await? }; // Verify file size includes at least the header if bidx_content.len() < 6 { return Err(ButckRWErrorKind::InvalidBidxFormat); } // Validate MAGIC bytes if bidx_content[0..4] != BUTCK_INDEX_MAGIC { return Err(ButckRWErrorKind::InvalidBidxFormat); } // Read filename let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize; if bidx_content.len() < 6 + filename_len { return Err(ButckRWErrorKind::InvalidBidxFormat); } let filename_bytes = &bidx_content[6..6 + filename_len]; let original_filename = String::from_utf8(filename_bytes.to_vec()) .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; trace!("Original filename from bidx: {}", original_filename); let hash_data_start = 6 + filename_len; let hash_data = &bidx_content[hash_data_start..]; // Verify that hash data size is a multiple of 32 bytes if hash_data.len() % 32 != 0 { return Err(ButckRWErrorKind::InvalidBidxFormat); } let chunk_count = hash_data.len() / 32; info!("Found {} chunks in bidx file", chunk_count); let mut chunk_hashes = Vec::with_capacity(chunk_count); for i in 0..chunk_count { let start = i * 32; let end = start + 32; let hash_bytes: [u8; 32] = hash_data[start..end] .try_into() .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; chunk_hashes.push(hash_bytes); } trace!("Parsed {} chunk hashes", chunk_hashes.len()); // Determine output file path let output_path = if let Some(output_file) = &ctx.output_file { output_file.clone() } else { // Use the original filename read from the bidx file storage::generate_unique_path(&ctx.output_dir, &original_filename) }; info!("Rebuilding file to: {}", output_path.display()); let progress_name = format!("Rebuild `{}`", output_path.display()); progress::update_progress(progress_name.as_str(), 0.0); let step = 1.0 / chunk_count as f64; let mut tasks = Vec::with_capacity(chunk_count); for (index, hash_bytes) in chunk_hashes.iter().enumerate() { let hash_hex = hex::encode(hash_bytes); tasks.push(read_chunk( progress_name.as_str(), step, hash_hex, &ctx.output_dir, index, )); } trace!("Starting parallel read of {} chunks", tasks.len()); let results = join_all(tasks).await; trace!("All read tasks completed"); // Collect chunk data and verify order let mut chunk_data_list = Vec::with_capacity(chunk_count); let mut success_count = 0; for (index, result) in results.into_iter().enumerate() { match result { Ok(chunk_data) => { let chunk_size = chunk_data.len(); success_count += 1; chunk_data_list.push((index, chunk_data)); trace!( "Chunk {} read successfully, size: {} bytes", index, chunk_size ); } Err(e) => { error!("Failed to read chunk {}: {:?}", index, e); return Err(e); } } } if success_count != chunk_count { return Err(ButckRWErrorKind::ChunkNotFound(format!( "Only {}/{} chunks found in storage", success_count, chunk_count ))); } info!("All {} chunks read successfully", success_count); // Sort by index and concatenate files chunk_data_list.sort_by_key(|(index, _)| *index); // Calculate total size let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum(); let (total_value, total_unit) = size_display(total_size); info!( "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)", chunk_count, total_value, total_unit, total_size ); // Write to output file trace!("Writing to output file: {}", output_path.display()); let mut output_file = File::create(&output_path).await?; for (index, chunk_data) in chunk_data_list { trace!("Writing chunk {} ({} bytes)", index, chunk_data.len()); output_file.write_all(&chunk_data).await?; progress::increase(progress_name.as_str(), step as f32); } output_file.flush().await?; info!("File successfully rebuilt: {}", output_path.display()); progress::complete(progress_name.as_str()); Ok(()) } /// Read a single chunk from storage async fn read_chunk( progress_name: &str, step: f64, hash_hex: String, storage_dir: &PathBuf, chunk_index: usize, ) -> Result, ButckRWErrorKind> { trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex); // Build chunk file path let file_path = storage::get_chunk_path(storage_dir, &hash_hex); trace!( "read_chunk[{}]: Looking for file at: {}", chunk_index, file_path.display() ); // Read chunk file match tokio::fs::read(&file_path).await { Ok(data) => { trace!( "read_chunk[{}]: Read {} bytes successfully", chunk_index, data.len() ); progress::increase(progress_name, step as f32); Ok(data) } Err(e) if e.kind() == std::io::ErrorKind::NotFound => { trace!("read_chunk[{}]: File not found", chunk_index); Err(ButckRWErrorKind::ChunkNotFound(format!( "Chunk {} (hash: {}) not found in storage", chunk_index, hash_hex ))) } Err(e) => { trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e); Err(ButckRWErrorKind::IOError(e)) } } }