diff options
Diffstat (limited to 'src/chunker/rw/storage/build.rs')
| -rw-r--r-- | src/chunker/rw/storage/build.rs | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs new file mode 100644 index 0000000..7608b5c --- /dev/null +++ b/src/chunker/rw/storage/build.rs @@ -0,0 +1,250 @@ +use futures::future::join_all; +use just_progress::progress; +use log::{error, info, trace}; +use memmap2::Mmap; +use std::path::PathBuf; +use tokio::{fs::File, io::AsyncWriteExt}; + +use crate::{ + chunker::{ + constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC}, + context::ButckContext, + rw::error::{ButckRWError, ButckRWErrorKind}, + rw::storage, + }, + utils::size_display::size_display, +}; + +pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> { + if ctx.storage_path.is_none() { + return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx)); + } + if ctx.file_paths.is_empty() { + return Err( + ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx), + ); + } + + let tasks: Vec<_> = ctx + .file_paths + .iter() + .map(|bidx_path| async { + trace!( + "Preparing to rebuild from bidx file `{}`", + bidx_path.display() + ); + rebuild_from_bidx(bidx_path, &ctx).await + }) + .collect(); + + let results = join_all(tasks).await; + + for result in results { + if let Err(e) = result { + return Err(e.pack(ctx)); + } + } + + Ok(()) +} + +async fn rebuild_from_bidx( + bidx_path: &PathBuf, + ctx: &ButckContext, +) -> Result<(), ButckRWErrorKind> { + // Validate file extension + if let Some(ext) = bidx_path.extension() + && ext != BUTCK_INDEX_FILE_SUFFIX + { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + info!("Rebuilding from bidx file: {}", bidx_path.display()); + + // Read bidx file content + let bidx_content = if ctx.memmap_read { + let file = File::open(bidx_path).await?; + let mmap = unsafe { Mmap::map(&file)? }; + mmap.to_vec() + } else { + tokio::fs::read(bidx_path).await? + }; + + // Verify file size includes at least the header + if bidx_content.len() < 6 { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + // Validate MAGIC bytes + if bidx_content[0..4] != BUTCK_INDEX_MAGIC { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + // Read filename + let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize; + if bidx_content.len() < 6 + filename_len { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + let filename_bytes = &bidx_content[6..6 + filename_len]; + let original_filename = String::from_utf8(filename_bytes.to_vec()) + .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + + trace!("Original filename from bidx: {}", original_filename); + + let hash_data_start = 6 + filename_len; + let hash_data = &bidx_content[hash_data_start..]; + + // Verify that hash data size is a multiple of 32 bytes + if hash_data.len() % 32 != 0 { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + let chunk_count = hash_data.len() / 32; + info!("Found {} chunks in bidx file", chunk_count); + + let mut chunk_hashes = Vec::with_capacity(chunk_count); + for i in 0..chunk_count { + let start = i * 32; + let end = start + 32; + let hash_bytes: [u8; 32] = hash_data[start..end] + .try_into() + .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + chunk_hashes.push(hash_bytes); + } + + trace!("Parsed {} chunk hashes", chunk_hashes.len()); + + // Determine output file path + let output_path = if let Some(output_file) = &ctx.output_file { + output_file.clone() + } else { + // Use the original filename read from the bidx file + storage::generate_unique_path(&ctx.output_dir, &original_filename) + }; + + info!("Rebuilding file to: {}", output_path.display()); + + let progress_name = format!("Rebuild `{}`", output_path.display()); + progress::update_progress(progress_name.as_str(), 0.0); + let step = 1.0 / chunk_count as f64; + + let mut tasks = Vec::with_capacity(chunk_count); + + for (index, hash_bytes) in chunk_hashes.iter().enumerate() { + let hash_hex = hex::encode(hash_bytes); + tasks.push(read_chunk( + progress_name.as_str(), + step, + hash_hex, + &ctx.output_dir, + index, + )); + } + + trace!("Starting parallel read of {} chunks", tasks.len()); + let results = join_all(tasks).await; + trace!("All read tasks completed"); + + // Collect chunk data and verify order + let mut chunk_data_list = Vec::with_capacity(chunk_count); + let mut success_count = 0; + + for (index, result) in results.into_iter().enumerate() { + match result { + Ok(chunk_data) => { + let chunk_size = chunk_data.len(); + success_count += 1; + chunk_data_list.push((index, chunk_data)); + trace!( + "Chunk {} read successfully, size: {} bytes", + index, chunk_size + ); + } + Err(e) => { + error!("Failed to read chunk {}: {:?}", index, e); + return Err(e); + } + } + } + + if success_count != chunk_count { + return Err(ButckRWErrorKind::ChunkNotFound(format!( + "Only {}/{} chunks found in storage", + success_count, chunk_count + ))); + } + + info!("All {} chunks read successfully", success_count); + + // Sort by index and concatenate files + chunk_data_list.sort_by_key(|(index, _)| *index); + + // Calculate total size + let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum(); + let (total_value, total_unit) = size_display(total_size); + info!( + "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)", + chunk_count, total_value, total_unit, total_size + ); + + // Write to output file + trace!("Writing to output file: {}", output_path.display()); + let mut output_file = File::create(&output_path).await?; + + for (index, chunk_data) in chunk_data_list { + trace!("Writing chunk {} ({} bytes)", index, chunk_data.len()); + output_file.write_all(&chunk_data).await?; + progress::increase(progress_name.as_str(), step as f32); + } + + output_file.flush().await?; + + info!("File successfully rebuilt: {}", output_path.display()); + progress::complete(progress_name.as_str()); + + Ok(()) +} + +/// Read a single chunk from storage +async fn read_chunk( + progress_name: &str, + step: f64, + hash_hex: String, + storage_dir: &PathBuf, + chunk_index: usize, +) -> Result<Vec<u8>, ButckRWErrorKind> { + trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex); + + // Build chunk file path + let file_path = storage::get_chunk_path(storage_dir, &hash_hex); + + trace!( + "read_chunk[{}]: Looking for file at: {}", + chunk_index, + file_path.display() + ); + + // Read chunk file + match tokio::fs::read(&file_path).await { + Ok(data) => { + trace!( + "read_chunk[{}]: Read {} bytes successfully", + chunk_index, + data.len() + ); + progress::increase(progress_name, step as f32); + Ok(data) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + trace!("read_chunk[{}]: File not found", chunk_index); + Err(ButckRWErrorKind::ChunkNotFound(format!( + "Chunk {} (hash: {}) not found in storage", + chunk_index, hash_hex + ))) + } + Err(e) => { + trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e); + Err(ButckRWErrorKind::IOError(e)) + } + } +} |
