summaryrefslogtreecommitdiff
path: root/src/chunker
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-04 21:26:04 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-04 21:35:09 +0800
commit22926ce29e3f8e040ec349401aeb6a77f32eae72 (patch)
tree678753ec49a61fb9d3e2d8e869393dec90ea7ef4 /src/chunker
Initialize Butchunker project structure and policy system
Diffstat (limited to 'src/chunker')
-rw-r--r--src/chunker/constants.rs3
-rw-r--r--src/chunker/context.rs226
-rw-r--r--src/chunker/entry.rs39
-rw-r--r--src/chunker/rw.rs2
-rw-r--r--src/chunker/rw/error.rs61
-rw-r--r--src/chunker/rw/storage.rs88
-rw-r--r--src/chunker/rw/storage/build.rs250
-rw-r--r--src/chunker/rw/storage/write.rs118
-rw-r--r--src/chunker/rw/storage/write/simple.rs368
-rw-r--r--src/chunker/rw/storage/write/stream.rs12
10 files changed, 1167 insertions, 0 deletions
diff --git a/src/chunker/constants.rs b/src/chunker/constants.rs
new file mode 100644
index 0000000..5e4870e
--- /dev/null
+++ b/src/chunker/constants.rs
@@ -0,0 +1,3 @@
+pub const BUTCK_STORAGE_DIR_NAME: &str = ".butck";
+pub const BUTCK_INDEX_FILE_SUFFIX: &str = "bidx";
+pub const BUTCK_INDEX_MAGIC: [u8; 4] = [0xfe, 0xe1, 0xf0, 0x0d];
diff --git a/src/chunker/context.rs b/src/chunker/context.rs
new file mode 100644
index 0000000..79254f5
--- /dev/null
+++ b/src/chunker/context.rs
@@ -0,0 +1,226 @@
+use std::{collections::HashMap, env::current_dir, path::PathBuf, process::exit, str::FromStr};
+
+use log::{error, warn};
+
+use crate::{
+ chunker::constants::BUTCK_STORAGE_DIR_NAME, core::hash::ChunkWriteHash, special_argument,
+ special_flag, utils::file_input_solve::parse_path_input,
+};
+
+#[derive(Debug, Default)]
+pub struct ButckContext {
+ /// All input files
+ pub file_paths: Vec<PathBuf>,
+
+ /// Path of Butck Storage
+ pub storage_path: Option<PathBuf>,
+
+ // Display chunk boundaries
+ pub display_boundaries: bool,
+
+ /// Whether to read in stream mode
+ pub stream_read: Option<u32>,
+
+ /// Whether to read files using memory mapping
+ pub memmap_read: bool,
+
+ /// Register name
+ pub register_name: Option<String>,
+
+ /// Chunking policy name
+ pub policy_name: Option<String>,
+
+ /// Hash algorithm used for chunking
+ pub chunk_hash: ChunkWriteHash,
+
+ /// Output directory
+ pub output_dir: PathBuf,
+
+ /// Output file (not available for some commands)
+ pub output_file: Option<PathBuf>,
+
+ /// Override parameters
+ pub params: HashMap<String, String>,
+}
+
+impl ButckContext {
+ /// Apply the args of ChunkerContext to itself
+ pub fn from_args(mut args: Vec<String>) -> Self {
+ let mut ctx = ButckContext::default();
+ let recursive = ctx.read_recursive(&mut args);
+ ctx.apply_stream_read(&mut args);
+ ctx.apply_memmap_read(&mut args);
+ ctx.apply_register_name(&mut args);
+ ctx.apply_policy_name(&mut args);
+ ctx.apply_chunk_hash(&mut args);
+ ctx.apply_storage_dir(&mut args);
+ ctx.apply_output_paths(&mut args);
+ ctx.apply_params(&mut args);
+ ctx.apply_display_boundaries(&mut args);
+
+ // Finally, parse path input
+ ctx.file_paths = parse_path_input(args, recursive, vec![BUTCK_STORAGE_DIR_NAME]);
+ ctx
+ }
+
+ fn read_recursive(&mut self, args: &mut Vec<String>) -> bool {
+ special_flag!(args, "-r", "--recursive")
+ }
+
+ fn apply_stream_read(&mut self, args: &mut Vec<String>) {
+ if let Some(size_str) = special_argument!(args, "-S", "--stream-read")
+ && let Ok(size) = size_str.parse::<u32>() {
+ self.stream_read = Some(size);
+ }
+ }
+
+ fn apply_memmap_read(&mut self, args: &mut Vec<String>) -> bool {
+ special_flag!(args, "-m", "--memmap-read")
+ }
+
+ fn apply_register_name(&mut self, args: &mut Vec<String>) {
+ self.register_name = special_argument!(args, "-R", "--register");
+ }
+
+ fn apply_policy_name(&mut self, args: &mut Vec<String>) {
+ self.policy_name = special_argument!(args, "-p", "--policy");
+ }
+
+ fn apply_chunk_hash(&mut self, args: &mut Vec<String>) {
+ let chunk_hash_str = special_argument!(args, "-H", "--chunk-hash");
+ self.chunk_hash = match chunk_hash_str {
+ Some(ref s) => match s.as_str() {
+ "blake3" => ChunkWriteHash::Blake3,
+ "sha256" => ChunkWriteHash::Sha256,
+ _ => ChunkWriteHash::default(),
+ },
+ None => ChunkWriteHash::default(),
+ };
+ }
+
+ fn apply_output_paths(&mut self, args: &mut Vec<String>) {
+ let output_dir_str = special_argument!(args, "-o", "--output-dir");
+ let output_file_str = special_argument!(args, "-O", "--output-file");
+
+ let current_dir = current_dir().unwrap();
+
+ let output_dir = if let Some(output_dir_str) = output_dir_str {
+ let path = PathBuf::from(output_dir_str);
+ if path.exists() { Some(path) } else { None }
+ } else {
+ None
+ };
+
+ self.output_dir = if let Some(output_dir) = output_dir {
+ output_dir
+ } else if let Some(storage_path) = &self.storage_path {
+ storage_path.clone()
+ } else {
+ current_dir
+ };
+
+ self.output_file = output_file_str.map(PathBuf::from)
+ }
+
+ fn apply_params(&mut self, args: &mut Vec<String>) {
+ while let Some(arg) = special_argument!(args, "+p", "+param") {
+ let split = arg.split('=').collect::<Vec<&str>>();
+ if split.len() == 2 {
+ self.params
+ .insert(split[0].to_string(), split[1].to_string());
+ }
+ }
+ }
+
+ fn apply_storage_dir(&mut self, args: &mut Vec<String>) {
+ self.storage_path = {
+ let storage_override = match special_argument!(args, "-s", "--storage") {
+ Some(o) => {
+ let path = PathBuf::from_str(o.as_str());
+ if let Ok(p) = &path {
+ Self::init_butck_storage(p.clone());
+ }
+ path.ok()
+ }
+ None => None,
+ };
+ Self::find_butck_storage_dir(storage_override)
+ };
+ }
+
+ fn apply_display_boundaries(&mut self, args: &mut Vec<String>) {
+ self.display_boundaries = special_flag!(args, "-D", "--display-boundaries");
+ }
+
+ fn init_butck_storage(path: PathBuf) -> Option<PathBuf> {
+ if !path.exists() {
+ // If the path does not exist, create it and initialize Butck Storage here
+ if let Err(e) = std::fs::create_dir_all(&path) {
+ error!("Failed to create directory '{}': {}", path.display(), e);
+ exit(1);
+ }
+ let butck_dir = path.join(BUTCK_STORAGE_DIR_NAME);
+ if let Err(e) = std::fs::create_dir_all(&butck_dir) {
+ error!(
+ "Failed to create '{}' directory: {}",
+ BUTCK_STORAGE_DIR_NAME, e
+ );
+ exit(1);
+ }
+ Some(path)
+ } else {
+ let butck_dir = path.join(BUTCK_STORAGE_DIR_NAME);
+
+ // Check if Butck Storage already exists
+ if butck_dir.exists() {
+ // Butck Storage already exists, return the path
+ Some(path)
+ } else {
+ // Butck Storage doesn't exist, create it with a warning if directory is not empty
+ let is_empty = path
+ .read_dir()
+ .map(|mut entries| entries.next().is_none())
+ .unwrap_or(false);
+
+ if !is_empty {
+ // Warn about creating storage in non-empty directory
+ warn!(
+ "Creating '{}' storage in non-empty directory: {}",
+ BUTCK_STORAGE_DIR_NAME,
+ path.display()
+ );
+ }
+
+ // Create Butck Storage directory
+ if let Err(e) = std::fs::create_dir_all(&butck_dir) {
+ error!(
+ "Failed to create '{}' directory: {}",
+ BUTCK_STORAGE_DIR_NAME, e
+ );
+ exit(1);
+ }
+ Some(path)
+ }
+ }
+ }
+
+ // Get the ButckStorage directory based on context
+ fn find_butck_storage_dir(from: Option<PathBuf>) -> Option<PathBuf> {
+ let mut current_dir = match from {
+ Some(path) => path,
+ None => std::env::current_dir().ok()?,
+ };
+
+ loop {
+ let butck_dir = current_dir.join(BUTCK_STORAGE_DIR_NAME);
+ if butck_dir.is_dir() {
+ return Some(current_dir);
+ }
+
+ if !current_dir.pop() {
+ break;
+ }
+ }
+ None
+ }
+}
diff --git a/src/chunker/entry.rs b/src/chunker/entry.rs
new file mode 100644
index 0000000..4fdb1f8
--- /dev/null
+++ b/src/chunker/entry.rs
@@ -0,0 +1,39 @@
+use std::process::exit;
+
+use log::info;
+
+use crate::chunker::{
+ context::ButckContext,
+ rw::{self, error::ButckRWError},
+};
+
+pub async fn entry(ctx: ButckContext, args: Vec<String>) -> Result<(), ButckRWError> {
+ if let Some(subcommand) = args.first() {
+ return match subcommand.as_str() {
+ "write" => rw::storage::write(ctx).await,
+ "build" => rw::storage::build(ctx).await,
+ "policies" => {
+ butck_policies::policies()
+ .iter()
+ .for_each(|p| info!("{}", p));
+ return Ok(());
+ }
+ _ => {
+ print_help();
+ exit(1)
+ }
+ };
+ }
+ Ok(())
+}
+
+pub fn print_help() {
+ println!("{}", include_str!("../../resources/helps/butck.txt").trim());
+}
+
+pub fn print_version() {
+ println!(
+ "{}",
+ include_str!("../../resources/version_info.txt").trim()
+ );
+}
diff --git a/src/chunker/rw.rs b/src/chunker/rw.rs
new file mode 100644
index 0000000..85e734e
--- /dev/null
+++ b/src/chunker/rw.rs
@@ -0,0 +1,2 @@
+pub mod error;
+pub mod storage;
diff --git a/src/chunker/rw/error.rs b/src/chunker/rw/error.rs
new file mode 100644
index 0000000..7f263a5
--- /dev/null
+++ b/src/chunker/rw/error.rs
@@ -0,0 +1,61 @@
+use butck_policies::error::ChunkFailed;
+
+use crate::chunker::context::ButckContext;
+
+#[derive(Debug)]
+pub struct ButckRWError {
+ kind: ButckRWErrorKind,
+ ctx: ButckContext,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum ButckRWErrorKind {
+ #[error("No butck storage found")]
+ NoButckStorageFound,
+
+ #[error("Chunking policy not specified")]
+ ChunkingPolicyNotSpecified,
+
+ #[error("Cannot enable both MemmapRead and StreamRead")]
+ ReadingMethodAmbiguous,
+
+ #[error("Multiple input files specified but only one output file allowed")]
+ OutputCountMismatch,
+
+ #[error("Invalid bidx file format")]
+ InvalidBidxFormat,
+
+ #[error("Chunk not found in storage: {0}")]
+ ChunkNotFound(String),
+
+ #[error("Failed to rebuild file: {0}")]
+ RebuildFailed(String),
+
+ #[error("Chunking failed: {0}")]
+ ChunkFailed(#[from] ChunkFailed),
+
+ #[error("IO error: {0}")]
+ IOError(#[from] std::io::Error),
+}
+
+impl std::fmt::Display for ButckRWError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self.kind)
+ }
+}
+
+impl ButckRWError {
+ pub fn ctx(&self) -> &ButckContext {
+ &self.ctx
+ }
+
+ pub fn kind(&self) -> &ButckRWErrorKind {
+ &self.kind
+ }
+}
+
+impl ButckRWErrorKind {
+ pub fn pack(self, ctx: ButckContext) -> ButckRWError {
+ ButckRWError { kind: self, ctx }
+ }
+}
diff --git a/src/chunker/rw/storage.rs b/src/chunker/rw/storage.rs
new file mode 100644
index 0000000..13452d0
--- /dev/null
+++ b/src/chunker/rw/storage.rs
@@ -0,0 +1,88 @@
+pub mod build;
+pub mod write;
+
+pub use build::build;
+pub use write::write;
+
+use std::path::{Path, PathBuf};
+
+/// Information about a chunk for index file
+#[derive(Debug, Clone)]
+pub struct ChunkInfo {
+ /// Index of the chunk in the file
+ pub index: usize,
+ /// Hash of the chunk (hex string)
+ pub hash: String,
+ /// Size of the chunk in bytes
+ pub size: usize,
+ /// Start position in the original file
+ pub start: usize,
+ /// End position in the original file (exclusive)
+ pub end: usize,
+}
+
+/// 根据hash值计算chunk文件的存储路径
+///
+/// # 参数
+/// - `storage_dir`: 存储目录
+/// - `hash_hex`: chunk的hash值(16进制字符串)
+///
+/// # 返回
+/// 返回chunk文件的完整路径
+pub fn get_chunk_path(storage_dir: &Path, hash_hex: &str) -> PathBuf {
+ let first_slice = &hash_hex[0..2];
+ let second_slice = &hash_hex[2..4];
+ storage_dir
+ .join(first_slice)
+ .join(second_slice)
+ .join(hash_hex)
+}
+
+/// 根据hash字节数组计算chunk文件的存储路径
+///
+/// # 参数
+/// - `storage_dir`: 存储目录
+/// - `hash_bytes`: chunk的hash值(字节数组)
+///
+/// # 返回
+/// 返回chunk文件的完整路径
+pub fn get_chunk_path_from_bytes(storage_dir: &Path, hash_bytes: &[u8; 32]) -> PathBuf {
+ let hash_hex = hex::encode(hash_bytes);
+ get_chunk_path(storage_dir, &hash_hex)
+}
+
+/// 生成唯一的文件路径,如果文件已存在则添加数字后缀
+///
+/// # 参数
+/// - `output_dir`: 输出目录
+/// - `desired_filename`: 期望的文件名
+///
+/// # 返回
+/// 返回唯一的文件路径
+pub fn generate_unique_path(output_dir: &Path, desired_filename: &str) -> PathBuf {
+ let desired_path = output_dir.join(desired_filename);
+ let mut candidate = desired_path.clone();
+
+ let mut counter = 1;
+ while candidate.exists() {
+ let path_buf = PathBuf::from(desired_filename);
+ if let Some(stem) = path_buf.file_stem() {
+ if let Some(ext) = path_buf.extension() {
+ let ext_str = ext.to_string_lossy();
+ let new_name = if ext_str.is_empty() {
+ format!("{}_{}", stem.to_string_lossy(), counter)
+ } else {
+ format!("{}.{}_{}", stem.to_string_lossy(), ext_str, counter)
+ };
+ candidate = output_dir.join(new_name);
+ } else {
+ candidate = output_dir.join(format!("{}_{}", stem.to_string_lossy(), counter));
+ }
+ } else {
+ candidate = output_dir.join(format!("{}_{}", desired_filename, counter));
+ }
+ counter += 1;
+ }
+
+ candidate
+}
diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs
new file mode 100644
index 0000000..7608b5c
--- /dev/null
+++ b/src/chunker/rw/storage/build.rs
@@ -0,0 +1,250 @@
+use futures::future::join_all;
+use just_progress::progress;
+use log::{error, info, trace};
+use memmap2::Mmap;
+use std::path::PathBuf;
+use tokio::{fs::File, io::AsyncWriteExt};
+
+use crate::{
+ chunker::{
+ constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC},
+ context::ButckContext,
+ rw::error::{ButckRWError, ButckRWErrorKind},
+ rw::storage,
+ },
+ utils::size_display::size_display,
+};
+
+pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> {
+ if ctx.storage_path.is_none() {
+ return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx));
+ }
+ if ctx.file_paths.is_empty() {
+ return Err(
+ ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx),
+ );
+ }
+
+ let tasks: Vec<_> = ctx
+ .file_paths
+ .iter()
+ .map(|bidx_path| async {
+ trace!(
+ "Preparing to rebuild from bidx file `{}`",
+ bidx_path.display()
+ );
+ rebuild_from_bidx(bidx_path, &ctx).await
+ })
+ .collect();
+
+ let results = join_all(tasks).await;
+
+ for result in results {
+ if let Err(e) = result {
+ return Err(e.pack(ctx));
+ }
+ }
+
+ Ok(())
+}
+
+async fn rebuild_from_bidx(
+ bidx_path: &PathBuf,
+ ctx: &ButckContext,
+) -> Result<(), ButckRWErrorKind> {
+ // Validate file extension
+ if let Some(ext) = bidx_path.extension()
+ && ext != BUTCK_INDEX_FILE_SUFFIX
+ {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ info!("Rebuilding from bidx file: {}", bidx_path.display());
+
+ // Read bidx file content
+ let bidx_content = if ctx.memmap_read {
+ let file = File::open(bidx_path).await?;
+ let mmap = unsafe { Mmap::map(&file)? };
+ mmap.to_vec()
+ } else {
+ tokio::fs::read(bidx_path).await?
+ };
+
+ // Verify file size includes at least the header
+ if bidx_content.len() < 6 {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ // Validate MAGIC bytes
+ if bidx_content[0..4] != BUTCK_INDEX_MAGIC {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ // Read filename
+ let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize;
+ if bidx_content.len() < 6 + filename_len {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+ let filename_bytes = &bidx_content[6..6 + filename_len];
+ let original_filename = String::from_utf8(filename_bytes.to_vec())
+ .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+
+ trace!("Original filename from bidx: {}", original_filename);
+
+ let hash_data_start = 6 + filename_len;
+ let hash_data = &bidx_content[hash_data_start..];
+
+ // Verify that hash data size is a multiple of 32 bytes
+ if hash_data.len() % 32 != 0 {
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+
+ let chunk_count = hash_data.len() / 32;
+ info!("Found {} chunks in bidx file", chunk_count);
+
+ let mut chunk_hashes = Vec::with_capacity(chunk_count);
+ for i in 0..chunk_count {
+ let start = i * 32;
+ let end = start + 32;
+ let hash_bytes: [u8; 32] = hash_data[start..end]
+ .try_into()
+ .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+ chunk_hashes.push(hash_bytes);
+ }
+
+ trace!("Parsed {} chunk hashes", chunk_hashes.len());
+
+ // Determine output file path
+ let output_path = if let Some(output_file) = &ctx.output_file {
+ output_file.clone()
+ } else {
+ // Use the original filename read from the bidx file
+ storage::generate_unique_path(&ctx.output_dir, &original_filename)
+ };
+
+ info!("Rebuilding file to: {}", output_path.display());
+
+ let progress_name = format!("Rebuild `{}`", output_path.display());
+ progress::update_progress(progress_name.as_str(), 0.0);
+ let step = 1.0 / chunk_count as f64;
+
+ let mut tasks = Vec::with_capacity(chunk_count);
+
+ for (index, hash_bytes) in chunk_hashes.iter().enumerate() {
+ let hash_hex = hex::encode(hash_bytes);
+ tasks.push(read_chunk(
+ progress_name.as_str(),
+ step,
+ hash_hex,
+ &ctx.output_dir,
+ index,
+ ));
+ }
+
+ trace!("Starting parallel read of {} chunks", tasks.len());
+ let results = join_all(tasks).await;
+ trace!("All read tasks completed");
+
+ // Collect chunk data and verify order
+ let mut chunk_data_list = Vec::with_capacity(chunk_count);
+ let mut success_count = 0;
+
+ for (index, result) in results.into_iter().enumerate() {
+ match result {
+ Ok(chunk_data) => {
+ let chunk_size = chunk_data.len();
+ success_count += 1;
+ chunk_data_list.push((index, chunk_data));
+ trace!(
+ "Chunk {} read successfully, size: {} bytes",
+ index, chunk_size
+ );
+ }
+ Err(e) => {
+ error!("Failed to read chunk {}: {:?}", index, e);
+ return Err(e);
+ }
+ }
+ }
+
+ if success_count != chunk_count {
+ return Err(ButckRWErrorKind::ChunkNotFound(format!(
+ "Only {}/{} chunks found in storage",
+ success_count, chunk_count
+ )));
+ }
+
+ info!("All {} chunks read successfully", success_count);
+
+ // Sort by index and concatenate files
+ chunk_data_list.sort_by_key(|(index, _)| *index);
+
+ // Calculate total size
+ let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum();
+ let (total_value, total_unit) = size_display(total_size);
+ info!(
+ "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)",
+ chunk_count, total_value, total_unit, total_size
+ );
+
+ // Write to output file
+ trace!("Writing to output file: {}", output_path.display());
+ let mut output_file = File::create(&output_path).await?;
+
+ for (index, chunk_data) in chunk_data_list {
+ trace!("Writing chunk {} ({} bytes)", index, chunk_data.len());
+ output_file.write_all(&chunk_data).await?;
+ progress::increase(progress_name.as_str(), step as f32);
+ }
+
+ output_file.flush().await?;
+
+ info!("File successfully rebuilt: {}", output_path.display());
+ progress::complete(progress_name.as_str());
+
+ Ok(())
+}
+
+/// Read a single chunk from storage
+async fn read_chunk(
+ progress_name: &str,
+ step: f64,
+ hash_hex: String,
+ storage_dir: &PathBuf,
+ chunk_index: usize,
+) -> Result<Vec<u8>, ButckRWErrorKind> {
+ trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex);
+
+ // Build chunk file path
+ let file_path = storage::get_chunk_path(storage_dir, &hash_hex);
+
+ trace!(
+ "read_chunk[{}]: Looking for file at: {}",
+ chunk_index,
+ file_path.display()
+ );
+
+ // Read chunk file
+ match tokio::fs::read(&file_path).await {
+ Ok(data) => {
+ trace!(
+ "read_chunk[{}]: Read {} bytes successfully",
+ chunk_index,
+ data.len()
+ );
+ progress::increase(progress_name, step as f32);
+ Ok(data)
+ }
+ Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+ trace!("read_chunk[{}]: File not found", chunk_index);
+ Err(ButckRWErrorKind::ChunkNotFound(format!(
+ "Chunk {} (hash: {}) not found in storage",
+ chunk_index, hash_hex
+ )))
+ }
+ Err(e) => {
+ trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e);
+ Err(ButckRWErrorKind::IOError(e))
+ }
+ }
+}
diff --git a/src/chunker/rw/storage/write.rs b/src/chunker/rw/storage/write.rs
new file mode 100644
index 0000000..8b3acc7
--- /dev/null
+++ b/src/chunker/rw/storage/write.rs
@@ -0,0 +1,118 @@
+use std::{collections::HashMap, path::PathBuf};
+
+use log::trace;
+
+use crate::{
+ chunker::{
+ constants::BUTCK_INDEX_FILE_SUFFIX,
+ context::ButckContext,
+ rw::{
+ error::{ButckRWError, ButckRWErrorKind},
+ storage::generate_unique_path,
+ },
+ },
+ storage::{simple::write_file_simple, stream::write_file_stream},
+};
+
+pub mod simple;
+pub mod stream;
+
+pub async fn write(ctx: ButckContext) -> Result<(), ButckRWError> {
+ if ctx.storage_path.is_none() {
+ return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx));
+ }
+ if ctx.policy_name.is_none() {
+ return Err(ButckRWErrorKind::ChunkingPolicyNotSpecified.pack(ctx));
+ }
+ if ctx.file_paths.len() > 1 && ctx.output_file.is_some() {
+ return Err(ButckRWErrorKind::OutputCountMismatch.pack(ctx));
+ }
+
+ // Cannot enable both memory-mapped and stream reading simultaneously.
+ // Stream reading uses butck_policies::chunk_stream_with,
+ // while memory-mapped or default reading uses butck_policies::chunk_with.
+ if ctx.memmap_read && ctx.stream_read.is_some() {
+ return Err(ButckRWErrorKind::ReadingMethodAmbiguous.pack(ctx));
+ }
+
+ let param_refs: HashMap<&str, &str> = ctx
+ .params
+ .iter()
+ .map(|(k, v)| (k.as_str(), v.as_str()))
+ .collect();
+
+ let tasks: Vec<_> = ctx
+ .file_paths
+ .iter()
+ .map(|path| async {
+ trace!("Preparing to write file `{}`", path.display());
+ write_file(path, &ctx, &param_refs).await
+ })
+ .collect();
+
+ let results = futures::future::join_all(tasks).await;
+
+ for result in results {
+ if let Err(e) = result {
+ return Err(e.pack(ctx));
+ }
+ }
+
+ Ok(())
+}
+
+async fn write_file(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ if let Some(stream_read_size) = ctx.stream_read {
+ write_file_stream(path, stream_read_size, ctx, params).await
+ } else {
+ write_file_simple(path, ctx, params).await
+ }
+}
+
+pub fn get_index_file_name(path: &PathBuf, ctx: &ButckContext) -> PathBuf {
+ let output_file = if let Some(output_file) = &ctx.output_file {
+ return output_file.clone();
+ } else {
+ ctx.output_dir.join(path.file_name().unwrap_or_default())
+ };
+
+ // Append .bidx suffix directly to the original file name
+ let desired_filename = if let Some(ext) = output_file.extension() {
+ let ext_str = ext.to_string_lossy();
+ if ext_str.is_empty() {
+ format!(
+ "{}.{}",
+ output_file
+ .file_stem()
+ .unwrap_or_default()
+ .to_string_lossy(),
+ BUTCK_INDEX_FILE_SUFFIX
+ )
+ } else {
+ format!(
+ "{}.{}.{}",
+ output_file
+ .file_stem()
+ .unwrap_or_default()
+ .to_string_lossy(),
+ ext_str,
+ BUTCK_INDEX_FILE_SUFFIX
+ )
+ }
+ } else {
+ format!(
+ "{}.{}",
+ output_file
+ .file_name()
+ .unwrap_or_default()
+ .to_string_lossy(),
+ BUTCK_INDEX_FILE_SUFFIX
+ )
+ };
+
+ generate_unique_path(&ctx.output_dir, &desired_filename)
+}
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs
new file mode 100644
index 0000000..75b9bd7
--- /dev/null
+++ b/src/chunker/rw/storage/write/simple.rs
@@ -0,0 +1,368 @@
+use futures::future::join_all;
+use just_progress::progress;
+use log::{error, info, trace};
+use std::{collections::HashMap, path::PathBuf};
+use tokio::{fs::File, io::AsyncReadExt};
+
+use crate::{
+ chunker::{
+ context::ButckContext,
+ rw::{error::ButckRWErrorKind, storage},
+ },
+ core::hash::ChunkWriteHash,
+ storage::get_index_file_name,
+ utils::size_display::size_display,
+};
+
+pub async fn write_file_simple(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ read_file(path, ctx, params).await?;
+ Ok(())
+}
+
+async fn read_file(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ let mut file = File::open(path).await?;
+
+ // Use butck_policies::chunk_with to locate chunk boundaries in the file
+ if ctx.memmap_read {
+ let mmap = unsafe { memmap2::Mmap::map(&file)? };
+ let raw_data = &mmap[..];
+ let (chunk_boundaries, total_bytes) =
+ (get_boundaries(raw_data, ctx, params).await?, raw_data.len());
+
+ // If output boundaries, do not execute actual write logic
+ if ctx.display_boundaries {
+ display_boundaries(&chunk_boundaries, total_bytes).await;
+ return Ok(());
+ } else {
+ write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?;
+ }
+ } else {
+ let mut contents = Vec::new();
+ file.read_to_end(&mut contents).await?;
+ let raw_data = &contents[..];
+ let (chunk_boundaries, total_bytes) =
+ (get_boundaries(raw_data, ctx, params).await?, raw_data.len());
+
+ // If output boundaries, do not execute actual write logic
+ if ctx.display_boundaries {
+ display_boundaries(&chunk_boundaries, total_bytes).await;
+ return Ok(());
+ } else {
+ write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?;
+ }
+ };
+ progress::clear_all();
+ Ok(())
+}
+
+async fn write_file_to_storage(
+ path: &PathBuf,
+ ctx: &ButckContext,
+ chunk_boundaries: Vec<u32>,
+ raw_data: &[u8],
+) -> Result<(), ButckRWErrorKind> {
+ let output_index_file = get_index_file_name(path, ctx);
+
+ let chunk_count = chunk_boundaries.len() + 1;
+ let progress_name = format!("Write `{}`", path.display());
+
+ progress::update_progress(progress_name.as_str(), 0.0);
+ let step = 1.0 / chunk_count as f64;
+
+ trace!("chunks_count={}", chunk_count);
+ trace!("chunk_hash={:?}", ctx.chunk_hash);
+ trace!("file_size={}", raw_data.len());
+ trace!("output_index_file={}", output_index_file.display());
+ trace!("policy_name={:?}", ctx.policy_name);
+ trace!("storage_dir={}", ctx.output_dir.display());
+
+ info!(
+ "{} chunks will be written to {}",
+ chunk_count,
+ ctx.output_dir.display()
+ );
+
+ tokio::fs::create_dir_all(&ctx.output_dir).await?;
+ trace!("Output directory created or already exists");
+
+ let mut tasks = Vec::new();
+ let mut start = 0;
+ let mut chunk_index = 0;
+
+ trace!("Processing chunk boundaries:");
+
+ for &boundary in &chunk_boundaries {
+ let end = boundary as usize;
+ if start < end && end <= raw_data.len() {
+ let chunk_data = &raw_data[start..end];
+ trace!(
+ "Chunk {}: bytes {}..{} (size: {} bytes)",
+ chunk_index,
+ start,
+ end - 1,
+ end - start
+ );
+ tasks.push(write_chunk(
+ progress_name.as_str(),
+ step,
+ chunk_data,
+ &ctx.output_dir,
+ &ctx.chunk_hash,
+ chunk_index,
+ start,
+ end,
+ ));
+ chunk_index += 1;
+ } else {
+ trace!(
+ "Skipping invalid chunk boundary: start={}, end={}, data_len={}",
+ start,
+ end,
+ raw_data.len()
+ );
+ }
+ start = end;
+ }
+
+ if start < raw_data.len() {
+ let chunk_data = &raw_data[start..];
+ trace!(
+ "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk",
+ chunk_index,
+ start,
+ raw_data.len() - 1,
+ raw_data.len() - start
+ );
+ tasks.push(write_chunk(
+ progress_name.as_str(),
+ step,
+ chunk_data,
+ &ctx.output_dir,
+ &ctx.chunk_hash,
+ chunk_index,
+ start,
+ raw_data.len(),
+ ));
+ }
+
+ trace!("Total chunks prepared for writing: {}", tasks.len());
+
+ trace!("Starting parallel write of {} chunks", tasks.len());
+ let results = join_all(tasks).await;
+ trace!("All write tasks completed");
+
+ let mut success_count = 0;
+ let mut chunk_infos = Vec::new();
+
+ for result in results {
+ match result {
+ Ok(chunk_info) => {
+ success_count += 1;
+ chunk_infos.push(chunk_info);
+ }
+ Err(e) => {
+ trace!("Chunk write failed: {:?}", e);
+ return Err(e);
+ }
+ }
+ }
+
+ info!("All {} chunks written successfully", success_count);
+
+ // Write index file
+ trace!("Writing index file to: {}", output_index_file.display());
+ if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await {
+ error!("Failed to write index file: {}", e);
+ return Err(ButckRWErrorKind::IOError(e));
+ }
+ info!("Index file written to: {}", output_index_file.display());
+
+ trace!("write_file_to_storage completed successfully");
+
+ progress::complete(progress_name.as_str());
+
+ Ok(())
+}
+
+async fn write_chunk(
+ progress_name: &str,
+ step: f64,
+ chunk_data: &[u8],
+ output_dir: &PathBuf,
+ chunk_hash: &ChunkWriteHash,
+ chunk_index: usize,
+ start: usize,
+ end: usize,
+) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> {
+ trace!(
+ "write_chunk[{}]: Starting, data size: {} bytes",
+ chunk_index,
+ chunk_data.len()
+ );
+
+ trace!(
+ "write_chunk[{}]: Computing hash with algorithm: {:?}",
+ chunk_index, chunk_hash
+ );
+ let hash_bytes = chunk_hash.hash(chunk_data);
+ trace!(
+ "write_chunk[{}]: Hash computed: {:?}",
+ chunk_index, hash_bytes
+ );
+
+ let hash_hex = hex::encode(hash_bytes);
+ trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex);
+
+ let file_path = storage::get_chunk_path(output_dir, &hash_hex);
+
+ if let Some(parent_dir) = file_path.parent() {
+ trace!(
+ "write_chunk[{}]: Creating directory structure: {}",
+ chunk_index,
+ parent_dir.display()
+ );
+ tokio::fs::create_dir_all(parent_dir).await?;
+ trace!("write_chunk[{}]: Directory created", chunk_index);
+ }
+
+ trace!(
+ "write_chunk[{}]: File path: {}",
+ chunk_index,
+ file_path.display()
+ );
+
+ trace!(
+ "write_chunk[{}]: Writing {} bytes to file",
+ chunk_index,
+ chunk_data.len()
+ );
+ if !file_path.exists() {
+ tokio::fs::write(&file_path, chunk_data).await?;
+ } else {
+ trace!(
+ "write_chunk[{}]: File already exists, skipping",
+ chunk_index
+ );
+ }
+ trace!("write_chunk[{}]: File written successfully", chunk_index);
+ progress::increase(progress_name, step as f32);
+ Ok(crate::chunker::rw::storage::ChunkInfo {
+ index: chunk_index,
+ hash: hash_hex,
+ size: chunk_data.len(),
+ start,
+ end,
+ })
+}
+
+async fn get_boundaries<'a>(
+ raw_data: &[u8],
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ButckRWErrorKind> {
+ let policy_name = ctx.policy_name.as_ref().unwrap().as_str();
+ match butck_policies::chunk_with(policy_name, raw_data, params).await {
+ Ok(s) => Ok(s),
+ Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)),
+ }
+}
+
+async fn write_index_file(
+ index_path: &PathBuf,
+ chunk_infos: &[crate::chunker::rw::storage::ChunkInfo],
+ original_file_path: &PathBuf,
+) -> Result<(), std::io::Error> {
+ use std::io::Write;
+
+ let file = std::fs::File::create(index_path)?;
+ let mut writer = std::io::BufWriter::new(file);
+
+ // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes
+ use crate::chunker::constants::BUTCK_INDEX_MAGIC;
+
+ // Write magic bytes
+ writer.write_all(&BUTCK_INDEX_MAGIC)?;
+
+ // Get original filename as bytes
+ let filename = original_file_path
+ .file_name()
+ .and_then(|n| n.to_str())
+ .unwrap_or("unknown");
+ let filename_bytes = filename.as_bytes();
+
+ // Write filename length as u16 (little-endian)
+ if filename_bytes.len() > u16::MAX as usize {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ format!("Filename too long: {} bytes", filename_bytes.len()),
+ ));
+ }
+ let filename_len = filename_bytes.len() as u16;
+ writer.write_all(&filename_len.to_le_bytes())?;
+
+ // Write filename bytes
+ writer.write_all(filename_bytes)?;
+
+ // Write chunk hashes: [u8; 32][u8; 32][u8; 32]...
+ for chunk_info in chunk_infos {
+ // Convert hex hash to bytes
+ match hex::decode(&chunk_info.hash) {
+ Ok(hash_bytes) => {
+ if hash_bytes.len() == 32 {
+ writer.write_all(&hash_bytes)?;
+ } else {
+ // Pad or truncate to 32 bytes if needed
+ let mut fixed_hash = [0u8; 32];
+ let len = hash_bytes.len().min(32);
+ fixed_hash[..len].copy_from_slice(&hash_bytes[..len]);
+ writer.write_all(&fixed_hash)?;
+ }
+ }
+ Err(e) => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("Failed to decode hash hex: {}", e),
+ ));
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) {
+ let total_chunks = chunk_boundaries.len() + 1;
+ let (total_value, total_unit) = size_display(total_bytes);
+ info!(
+ "{} chunks, ({:.2} {}, {})",
+ total_chunks, total_value, total_unit, total_bytes
+ );
+ let mut start = 0;
+ chunk_boundaries.iter().for_each(|p| {
+ let next = *p as usize;
+ let (size_value, size_unit) = size_display(next - start);
+ info!(
+ "{} - {} (size: {:.2} {})",
+ start,
+ next - 1,
+ size_value,
+ size_unit
+ );
+ start = next;
+ });
+ let last = start;
+ let r#final = total_bytes;
+ let (size_value, size_unit) = size_display(total_bytes - start);
+ info!(
+ "{} - {} (size: {:.2} {})",
+ last, r#final, size_value, size_unit
+ );
+}
diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs
new file mode 100644
index 0000000..020cfcd
--- /dev/null
+++ b/src/chunker/rw/storage/write/stream.rs
@@ -0,0 +1,12 @@
+use std::{collections::HashMap, path::PathBuf};
+
+use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind};
+
+pub async fn write_file_stream(
+ path: &PathBuf,
+ stream_read_size: u32,
+ ctx: &ButckContext,
+ params: &HashMap<&str, &str>,
+) -> Result<(), ButckRWErrorKind> {
+ todo!()
+}