summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write/simple.rs
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-04 21:26:04 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-04 21:35:09 +0800
commit22926ce29e3f8e040ec349401aeb6a77f32eae72 (patch)
tree678753ec49a61fb9d3e2d8e869393dec90ea7ef4 /src/chunker/rw/storage/write/simple.rs
Initialize Butchunker project structure and policy system
Diffstat (limited to 'src/chunker/rw/storage/write/simple.rs')
-rw-r--r--src/chunker/rw/storage/write/simple.rs368
1 files changed, 368 insertions, 0 deletions
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs
new file mode 100644
index 0000000..75b9bd7
--- /dev/null
+++ b/src/chunker/rw/storage/write/simple.rs
@@ -0,0 +1,368 @@
+use futures::future::join_all;
+use just_progress::progress;
+use log::{error, info, trace};
+use std::{collections::HashMap, path::PathBuf};
+use tokio::{fs::File, io::AsyncReadExt};
+
+use crate::{
+ chunker::{
+ context::ButckContext,
+ rw::{error::ButckRWErrorKind, storage},
+ },
+ core::hash::ChunkWriteHash,
+ storage::get_index_file_name,
+ utils::size_display::size_display,
+};
+
+pub async fn write_file_simple(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ read_file(path, ctx, params).await?;
+ Ok(())
+}
+
+async fn read_file(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ let mut file = File::open(path).await?;
+
+ // Use butck_policies::chunk_with to locate chunk boundaries in the file
+ if ctx.memmap_read {
+ let mmap = unsafe { memmap2::Mmap::map(&file)? };
+ let raw_data = &mmap[..];
+ let (chunk_boundaries, total_bytes) =
+ (get_boundaries(raw_data, ctx, params).await?, raw_data.len());
+
+ // If output boundaries, do not execute actual write logic
+ if ctx.display_boundaries {
+ display_boundaries(&chunk_boundaries, total_bytes).await;
+ return Ok(());
+ } else {
+ write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?;
+ }
+ } else {
+ let mut contents = Vec::new();
+ file.read_to_end(&mut contents).await?;
+ let raw_data = &contents[..];
+ let (chunk_boundaries, total_bytes) =
+ (get_boundaries(raw_data, ctx, params).await?, raw_data.len());
+
+ // If output boundaries, do not execute actual write logic
+ if ctx.display_boundaries {
+ display_boundaries(&chunk_boundaries, total_bytes).await;
+ return Ok(());
+ } else {
+ write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?;
+ }
+ };
+ progress::clear_all();
+ Ok(())
+}
+
+async fn write_file_to_storage(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ chunk_boundaries: Vec<u32>,
+ raw_data: &[u8],
+) -> Result<(), ButckRWErrorKind> {
+ let output_index_file = get_index_file_name(path, ctx);
+
+ let chunk_count = chunk_boundaries.len() + 1;
+ let progress_name = format!("Write `{}`", path.display());
+
+ progress::update_progress(progress_name.as_str(), 0.0);
+ let step = 1.0 / chunk_count as f64;
+
+ trace!("chunks_count={}", chunk_count);
+ trace!("chunk_hash={:?}", ctx.chunk_hash);
+ trace!("file_size={}", raw_data.len());
+ trace!("output_index_file={}", output_index_file.display());
+ trace!("policy_name={:?}", ctx.policy_name);
+ trace!("storage_dir={}", ctx.output_dir.display());
+
+ info!(
+ "{} chunks will be written to {}",
+ chunk_count,
+ ctx.output_dir.display()
+ );
+
+ tokio::fs::create_dir_all(&ctx.output_dir).await?;
+ trace!("Output directory created or already exists");
+
+ let mut tasks = Vec::new();
+ let mut start = 0;
+ let mut chunk_index = 0;
+
+ trace!("Processing chunk boundaries:");
+
+ for &boundary in &chunk_boundaries {
+ let end = boundary as usize;
+ if start < end && end <= raw_data.len() {
+ let chunk_data = &raw_data[start..end];
+ trace!(
+ "Chunk {}: bytes {}..{} (size: {} bytes)",
+ chunk_index,
+ start,
+ end - 1,
+ end - start
+ );
+ tasks.push(write_chunk(
+ progress_name.as_str(),
+ step,
+ chunk_data,
+ &ctx.output_dir,
+ &ctx.chunk_hash,
+ chunk_index,
+ start,
+ end,
+ ));
+ chunk_index += 1;
+ } else {
+ trace!(
+ "Skipping invalid chunk boundary: start={}, end={}, data_len={}",
+ start,
+ end,
+ raw_data.len()
+ );
+ }
+ start = end;
+ }
+
+ if start < raw_data.len() {
+ let chunk_data = &raw_data[start..];
+ trace!(
+ "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk",
+ chunk_index,
+ start,
+ raw_data.len() - 1,
+ raw_data.len() - start
+ );
+ tasks.push(write_chunk(
+ progress_name.as_str(),
+ step,
+ chunk_data,
+ &ctx.output_dir,
+ &ctx.chunk_hash,
+ chunk_index,
+ start,
+ raw_data.len(),
+ ));
+ }
+
+ trace!("Total chunks prepared for writing: {}", tasks.len());
+
+ trace!("Starting parallel write of {} chunks", tasks.len());
+ let results = join_all(tasks).await;
+ trace!("All write tasks completed");
+
+ let mut success_count = 0;
+ let mut chunk_infos = Vec::new();
+
+ for result in results {
+ match result {
+ Ok(chunk_info) => {
+ success_count += 1;
+ chunk_infos.push(chunk_info);
+ }
+ Err(e) => {
+ trace!("Chunk write failed: {:?}", e);
+ return Err(e);
+ }
+ }
+ }
+
+ info!("All {} chunks written successfully", success_count);
+
+ // Write index file
+ trace!("Writing index file to: {}", output_index_file.display());
+ if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await {
+ error!("Failed to write index file: {}", e);
+ return Err(ButckRWErrorKind::IOError(e));
+ }
+ info!("Index file written to: {}", output_index_file.display());
+
+ trace!("write_file_to_storage completed successfully");
+
+ progress::complete(progress_name.as_str());
+
+ Ok(())
+}
+
+async fn write_chunk(
+ progress_name: &str,
+ step: f64,
+ chunk_data: &[u8],
+ output_dir: &PathBuf,
+ chunk_hash: &ChunkWriteHash,
+ chunk_index: usize,
+ start: usize,
+ end: usize,
+) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> {
+ trace!(
+ "write_chunk[{}]: Starting, data size: {} bytes",
+ chunk_index,
+ chunk_data.len()
+ );
+
+ trace!(
+ "write_chunk[{}]: Computing hash with algorithm: {:?}",
+ chunk_index, chunk_hash
+ );
+ let hash_bytes = chunk_hash.hash(chunk_data);
+ trace!(
+ "write_chunk[{}]: Hash computed: {:?}",
+ chunk_index, hash_bytes
+ );
+
+ let hash_hex = hex::encode(hash_bytes);
+ trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex);
+
+ let file_path = storage::get_chunk_path(output_dir, &hash_hex);
+
+ if let Some(parent_dir) = file_path.parent() {
+ trace!(
+ "write_chunk[{}]: Creating directory structure: {}",
+ chunk_index,
+ parent_dir.display()
+ );
+ tokio::fs::create_dir_all(parent_dir).await?;
+ trace!("write_chunk[{}]: Directory created", chunk_index);
+ }
+
+ trace!(
+ "write_chunk[{}]: File path: {}",
+ chunk_index,
+ file_path.display()
+ );
+
+ trace!(
+ "write_chunk[{}]: Writing {} bytes to file",
+ chunk_index,
+ chunk_data.len()
+ );
+ if !file_path.exists() {
+ tokio::fs::write(&file_path, chunk_data).await?;
+ } else {
+ trace!(
+ "write_chunk[{}]: File already exists, skipping",
+ chunk_index
+ );
+ }
+ trace!("write_chunk[{}]: File written successfully", chunk_index);
+ progress::increase(progress_name, step as f32);
+ Ok(crate::chunker::rw::storage::ChunkInfo {
+ index: chunk_index,
+ hash: hash_hex,
+ size: chunk_data.len(),
+ start,
+ end,
+ })
+}
+
+async fn get_boundaries<'a>(
+ raw_data: &[u8],
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ButckRWErrorKind> {
+ let policy_name = ctx.policy_name.as_ref().unwrap().as_str();
+ match butck_policies::chunk_with(policy_name, raw_data, params).await {
+ Ok(s) => Ok(s),
+ Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)),
+ }
+}
+
+async fn write_index_file(
+ index_path: &PathBuf,
+ chunk_infos: &[crate::chunker::rw::storage::ChunkInfo],
+ original_file_path: &PathBuf,
+) -> Result<(), std::io::Error> {
+ use std::io::Write;
+
+ let file = std::fs::File::create(index_path)?;
+ let mut writer = std::io::BufWriter::new(file);
+
+ // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes
+ use crate::chunker::constants::BUTCK_INDEX_MAGIC;
+
+ // Write magic bytes
+ writer.write_all(&BUTCK_INDEX_MAGIC)?;
+
+ // Get original filename as bytes
+ let filename = original_file_path
+ .file_name()
+ .and_then(|n| n.to_str())
+ .unwrap_or("unknown");
+ let filename_bytes = filename.as_bytes();
+
+ // Write filename length as u16 (little-endian)
+ if filename_bytes.len() > u16::MAX as usize {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ format!("Filename too long: {} bytes", filename_bytes.len()),
+ ));
+ }
+ let filename_len = filename_bytes.len() as u16;
+ writer.write_all(&filename_len.to_le_bytes())?;
+
+ // Write filename bytes
+ writer.write_all(filename_bytes)?;
+
+ // Write chunk hashes: [u8; 32][u8; 32][u8; 32]...
+ for chunk_info in chunk_infos {
+ // Convert hex hash to bytes
+ match hex::decode(&chunk_info.hash) {
+ Ok(hash_bytes) => {
+ if hash_bytes.len() == 32 {
+ writer.write_all(&hash_bytes)?;
+ } else {
+ // Pad or truncate to 32 bytes if needed
+ let mut fixed_hash = [0u8; 32];
+ let len = hash_bytes.len().min(32);
+ fixed_hash[..len].copy_from_slice(&hash_bytes[..len]);
+ writer.write_all(&fixed_hash)?;
+ }
+ }
+ Err(e) => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("Failed to decode hash hex: {}", e),
+ ));
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) {
+ let total_chunks = chunk_boundaries.len() + 1;
+ let (total_value, total_unit) = size_display(total_bytes);
+ info!(
+ "{} chunks, ({:.2} {}, {})",
+ total_chunks, total_value, total_unit, total_bytes
+ );
+ let mut start = 0;
+ chunk_boundaries.iter().for_each(|p| {
+ let next = *p as usize;
+ let (size_value, size_unit) = size_display(next - start);
+ info!(
+ "{} - {} (size: {:.2} {})",
+ start,
+ next - 1,
+ size_value,
+ size_unit
+ );
+ start = next;
+ });
+ let last = start;
+ let r#final = total_bytes;
+ let (size_value, size_unit) = size_display(total_bytes - start);
+ info!(
+ "{} - {} (size: {:.2} {})",
+ last, r#final, size_value, size_unit
+ );
+}