summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/build.rs
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-04 21:26:04 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-04 21:35:09 +0800
commit22926ce29e3f8e040ec349401aeb6a77f32eae72 (patch)
tree678753ec49a61fb9d3e2d8e869393dec90ea7ef4 /src/chunker/rw/storage/build.rs
Initialize Butchunker project structure and policy system
Diffstat (limited to 'src/chunker/rw/storage/build.rs')
-rw-r--r--src/chunker/rw/storage/build.rs250
1 files changed, 250 insertions, 0 deletions
diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs
new file mode 100644
index 0000000..7608b5c
--- /dev/null
+++ b/src/chunker/rw/storage/build.rs
@@ -0,0 +1,250 @@
+use futures::future::join_all;
+use just_progress::progress;
+use log::{error, info, trace};
+use memmap2::Mmap;
+use std::path::PathBuf;
+use tokio::{fs::File, io::AsyncWriteExt};
+
+use crate::{
+ chunker::{
+ constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC},
+ context::ButckContext,
+ rw::error::{ButckRWError, ButckRWErrorKind},
+ rw::storage,
+ },
+ utils::size_display::size_display,
+};
+
+pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> {
+ if ctx.storage_path.is_none() {
+ return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx));
+ }
+ if ctx.file_paths.is_empty() {
+ return Err(
+ ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx),
+ );
+ }
+
+ let tasks: Vec<_> = ctx
+ .file_paths
+ .iter()
+ .map(|bidx_path| async {
+ trace!(
+ "Preparing to rebuild from bidx file `{}`",
+ bidx_path.display()
+ );
+ rebuild_from_bidx(bidx_path, &ctx).await
+ })
+ .collect();
+
+ let results = join_all(tasks).await;
+
+ for result in results {
+ if let Err(e) = result {
+ return Err(e.pack(ctx));
+ }
+ }
+
+ Ok(())
+}
+
+async fn rebuild_from_bidx(
+ bidx_path: &PathBuf,
+ ctx: &ButckContext,
+) -> Result<(), ButckRWErrorKind> {
+ // Validate file extension
+ if let Some(ext) = bidx_path.extension()
+ && ext != BUTCK_INDEX_FILE_SUFFIX
+ {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ info!("Rebuilding from bidx file: {}", bidx_path.display());
+
+ // Read bidx file content
+ let bidx_content = if ctx.memmap_read {
+ let file = File::open(bidx_path).await?;
+ let mmap = unsafe { Mmap::map(&file)? };
+ mmap.to_vec()
+ } else {
+ tokio::fs::read(bidx_path).await?
+ };
+
+ // Verify file size includes at least the header
+ if bidx_content.len() < 6 {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ // Validate MAGIC bytes
+ if bidx_content[0..4] != BUTCK_INDEX_MAGIC {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ // Read filename
+ let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize;
+ if bidx_content.len() < 6 + filename_len {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+ let filename_bytes = &bidx_content[6..6 + filename_len];
+ let original_filename = String::from_utf8(filename_bytes.to_vec())
+ .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+
+ trace!("Original filename from bidx: {}", original_filename);
+
+ let hash_data_start = 6 + filename_len;
+ let hash_data = &bidx_content[hash_data_start..];
+
+ // Verify that hash data size is a multiple of 32 bytes
+ if hash_data.len() % 32 != 0 {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ let chunk_count = hash_data.len() / 32;
+ info!("Found {} chunks in bidx file", chunk_count);
+
+ let mut chunk_hashes = Vec::with_capacity(chunk_count);
+ for i in 0..chunk_count {
+ let start = i * 32;
+ let end = start + 32;
+ let hash_bytes: [u8; 32] = hash_data[start..end]
+ .try_into()
+ .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+ chunk_hashes.push(hash_bytes);
+ }
+
+ trace!("Parsed {} chunk hashes", chunk_hashes.len());
+
+ // Determine output file path
+ let output_path = if let Some(output_file) = &ctx.output_file {
+ output_file.clone()
+ } else {
+ // Use the original filename read from the bidx file
+ storage::generate_unique_path(&ctx.output_dir, &original_filename)
+ };
+
+ info!("Rebuilding file to: {}", output_path.display());
+
+ let progress_name = format!("Rebuild `{}`", output_path.display());
+ progress::update_progress(progress_name.as_str(), 0.0);
+ let step = 1.0 / chunk_count as f64;
+
+ let mut tasks = Vec::with_capacity(chunk_count);
+
+ for (index, hash_bytes) in chunk_hashes.iter().enumerate() {
+ let hash_hex = hex::encode(hash_bytes);
+ tasks.push(read_chunk(
+ progress_name.as_str(),
+ step,
+ hash_hex,
+ &ctx.output_dir,
+ index,
+ ));
+ }
+
+ trace!("Starting parallel read of {} chunks", tasks.len());
+ let results = join_all(tasks).await;
+ trace!("All read tasks completed");
+
+ // Collect chunk data and verify order
+ let mut chunk_data_list = Vec::with_capacity(chunk_count);
+ let mut success_count = 0;
+
+ for (index, result) in results.into_iter().enumerate() {
+ match result {
+ Ok(chunk_data) => {
+ let chunk_size = chunk_data.len();
+ success_count += 1;
+ chunk_data_list.push((index, chunk_data));
+ trace!(
+ "Chunk {} read successfully, size: {} bytes",
+ index, chunk_size
+ );
+ }
+ Err(e) => {
+ error!("Failed to read chunk {}: {:?}", index, e);
+ return Err(e);
+ }
+ }
+ }
+
+ if success_count != chunk_count {
+ return Err(ButckRWErrorKind::ChunkNotFound(format!(
+ "Only {}/{} chunks found in storage",
+ success_count, chunk_count
+ )));
+ }
+
+ info!("All {} chunks read successfully", success_count);
+
+ // Sort by index and concatenate files
+ chunk_data_list.sort_by_key(|(index, _)| *index);
+
+ // Calculate total size
+ let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum();
+ let (total_value, total_unit) = size_display(total_size);
+ info!(
+ "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)",
+ chunk_count, total_value, total_unit, total_size
+ );
+
+ // Write to output file
+ trace!("Writing to output file: {}", output_path.display());
+ let mut output_file = File::create(&output_path).await?;
+
+ for (index, chunk_data) in chunk_data_list {
+ trace!("Writing chunk {} ({} bytes)", index, chunk_data.len());
+ output_file.write_all(&chunk_data).await?;
+ progress::increase(progress_name.as_str(), step as f32);
+ }
+
+ output_file.flush().await?;
+
+ info!("File successfully rebuilt: {}", output_path.display());
+ progress::complete(progress_name.as_str());
+
+ Ok(())
+}
+
+/// Read a single chunk from storage
+async fn read_chunk(
+ progress_name: &str,
+ step: f64,
+ hash_hex: String,
+ storage_dir: &PathBuf,
+ chunk_index: usize,
+) -> Result<Vec<u8>, ButckRWErrorKind> {
+ trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex);
+
+ // Build chunk file path
+ let file_path = storage::get_chunk_path(storage_dir, &hash_hex);
+
+ trace!(
+ "read_chunk[{}]: Looking for file at: {}",
+ chunk_index,
+ file_path.display()
+ );
+
+ // Read chunk file
+ match tokio::fs::read(&file_path).await {
+ Ok(data) => {
+ trace!(
+ "read_chunk[{}]: Read {} bytes successfully",
+ chunk_index,
+ data.len()
+ );
+ progress::increase(progress_name, step as f32);
+ Ok(data)
+ }
+ Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+ trace!("read_chunk[{}]: File not found", chunk_index);
+ Err(ButckRWErrorKind::ChunkNotFound(format!(
+ "Chunk {} (hash: {}) not found in storage",
+ chunk_index, hash_hex
+ )))
+ }
+ Err(e) => {
+ trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e);
+ Err(ButckRWErrorKind::IOError(e))
+ }
+ }
+}