diff options
Diffstat (limited to 'src/chunker/rw/storage/write')
| -rw-r--r-- | src/chunker/rw/storage/write/simple.rs | 368 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/stream.rs | 12 |
2 files changed, 380 insertions, 0 deletions
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs new file mode 100644 index 0000000..75b9bd7 --- /dev/null +++ b/src/chunker/rw/storage/write/simple.rs @@ -0,0 +1,368 @@ +use futures::future::join_all; +use just_progress::progress; +use log::{error, info, trace}; +use std::{collections::HashMap, path::PathBuf}; +use tokio::{fs::File, io::AsyncReadExt}; + +use crate::{ + chunker::{ + context::ButckContext, + rw::{error::ButckRWErrorKind, storage}, + }, + core::hash::ChunkWriteHash, + storage::get_index_file_name, + utils::size_display::size_display, +}; + +pub async fn write_file_simple( + path: &PathBuf, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + read_file(path, ctx, params).await?; + Ok(()) +} + +async fn read_file( + path: &PathBuf, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + let mut file = File::open(path).await?; + + // Use butck_policies::chunk_with to locate chunk boundaries in the file + if ctx.memmap_read { + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let raw_data = &mmap[..]; + let (chunk_boundaries, total_bytes) = + (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); + + // If output boundaries, do not execute actual write logic + if ctx.display_boundaries { + display_boundaries(&chunk_boundaries, total_bytes).await; + return Ok(()); + } else { + write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; + } + } else { + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await?; + let raw_data = &contents[..]; + let (chunk_boundaries, total_bytes) = + (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); + + // If output boundaries, do not execute actual write logic + if ctx.display_boundaries { + display_boundaries(&chunk_boundaries, total_bytes).await; + return Ok(()); + } else { + write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; + } + }; + progress::clear_all(); + Ok(()) +} + +async fn write_file_to_storage( + path: &PathBuf, + ctx: &ButckContext, + chunk_boundaries: Vec<u32>, + raw_data: &[u8], +) -> Result<(), ButckRWErrorKind> { + let output_index_file = get_index_file_name(path, ctx); + + let chunk_count = chunk_boundaries.len() + 1; + let progress_name = format!("Write `{}`", path.display()); + + progress::update_progress(progress_name.as_str(), 0.0); + let step = 1.0 / chunk_count as f64; + + trace!("chunks_count={}", chunk_count); + trace!("chunk_hash={:?}", ctx.chunk_hash); + trace!("file_size={}", raw_data.len()); + trace!("output_index_file={}", output_index_file.display()); + trace!("policy_name={:?}", ctx.policy_name); + trace!("storage_dir={}", ctx.output_dir.display()); + + info!( + "{} chunks will be written to {}", + chunk_count, + ctx.output_dir.display() + ); + + tokio::fs::create_dir_all(&ctx.output_dir).await?; + trace!("Output directory created or already exists"); + + let mut tasks = Vec::new(); + let mut start = 0; + let mut chunk_index = 0; + + trace!("Processing chunk boundaries:"); + + for &boundary in &chunk_boundaries { + let end = boundary as usize; + if start < end && end <= raw_data.len() { + let chunk_data = &raw_data[start..end]; + trace!( + "Chunk {}: bytes {}..{} (size: {} bytes)", + chunk_index, + start, + end - 1, + end - start + ); + tasks.push(write_chunk( + progress_name.as_str(), + step, + chunk_data, + &ctx.output_dir, + &ctx.chunk_hash, + chunk_index, + start, + end, + )); + chunk_index += 1; + } else { + trace!( + "Skipping invalid chunk boundary: start={}, end={}, data_len={}", + start, + end, + raw_data.len() + ); + } + start = end; + } + + if start < raw_data.len() { + let chunk_data = &raw_data[start..]; + trace!( + "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk", + chunk_index, + start, + raw_data.len() - 1, + raw_data.len() - start + ); + tasks.push(write_chunk( + progress_name.as_str(), + step, + chunk_data, + &ctx.output_dir, + &ctx.chunk_hash, + chunk_index, + start, + raw_data.len(), + )); + } + + trace!("Total chunks prepared for writing: {}", tasks.len()); + + trace!("Starting parallel write of {} chunks", tasks.len()); + let results = join_all(tasks).await; + trace!("All write tasks completed"); + + let mut success_count = 0; + let mut chunk_infos = Vec::new(); + + for result in results { + match result { + Ok(chunk_info) => { + success_count += 1; + chunk_infos.push(chunk_info); + } + Err(e) => { + trace!("Chunk write failed: {:?}", e); + return Err(e); + } + } + } + + info!("All {} chunks written successfully", success_count); + + // Write index file + trace!("Writing index file to: {}", output_index_file.display()); + if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await { + error!("Failed to write index file: {}", e); + return Err(ButckRWErrorKind::IOError(e)); + } + info!("Index file written to: {}", output_index_file.display()); + + trace!("write_file_to_storage completed successfully"); + + progress::complete(progress_name.as_str()); + + Ok(()) +} + +async fn write_chunk( + progress_name: &str, + step: f64, + chunk_data: &[u8], + output_dir: &PathBuf, + chunk_hash: &ChunkWriteHash, + chunk_index: usize, + start: usize, + end: usize, +) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> { + trace!( + "write_chunk[{}]: Starting, data size: {} bytes", + chunk_index, + chunk_data.len() + ); + + trace!( + "write_chunk[{}]: Computing hash with algorithm: {:?}", + chunk_index, chunk_hash + ); + let hash_bytes = chunk_hash.hash(chunk_data); + trace!( + "write_chunk[{}]: Hash computed: {:?}", + chunk_index, hash_bytes + ); + + let hash_hex = hex::encode(hash_bytes); + trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex); + + let file_path = storage::get_chunk_path(output_dir, &hash_hex); + + if let Some(parent_dir) = file_path.parent() { + trace!( + "write_chunk[{}]: Creating directory structure: {}", + chunk_index, + parent_dir.display() + ); + tokio::fs::create_dir_all(parent_dir).await?; + trace!("write_chunk[{}]: Directory created", chunk_index); + } + + trace!( + "write_chunk[{}]: File path: {}", + chunk_index, + file_path.display() + ); + + trace!( + "write_chunk[{}]: Writing {} bytes to file", + chunk_index, + chunk_data.len() + ); + if !file_path.exists() { + tokio::fs::write(&file_path, chunk_data).await?; + } else { + trace!( + "write_chunk[{}]: File already exists, skipping", + chunk_index + ); + } + trace!("write_chunk[{}]: File written successfully", chunk_index); + progress::increase(progress_name, step as f32); + Ok(crate::chunker::rw::storage::ChunkInfo { + index: chunk_index, + hash: hash_hex, + size: chunk_data.len(), + start, + end, + }) +} + +async fn get_boundaries<'a>( + raw_data: &[u8], + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ButckRWErrorKind> { + let policy_name = ctx.policy_name.as_ref().unwrap().as_str(); + match butck_policies::chunk_with(policy_name, raw_data, params).await { + Ok(s) => Ok(s), + Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)), + } +} + +async fn write_index_file( + index_path: &PathBuf, + chunk_infos: &[crate::chunker::rw::storage::ChunkInfo], + original_file_path: &PathBuf, +) -> Result<(), std::io::Error> { + use std::io::Write; + + let file = std::fs::File::create(index_path)?; + let mut writer = std::io::BufWriter::new(file); + + // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes + use crate::chunker::constants::BUTCK_INDEX_MAGIC; + + // Write magic bytes + writer.write_all(&BUTCK_INDEX_MAGIC)?; + + // Get original filename as bytes + let filename = original_file_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + let filename_bytes = filename.as_bytes(); + + // Write filename length as u16 (little-endian) + if filename_bytes.len() > u16::MAX as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Filename too long: {} bytes", filename_bytes.len()), + )); + } + let filename_len = filename_bytes.len() as u16; + writer.write_all(&filename_len.to_le_bytes())?; + + // Write filename bytes + writer.write_all(filename_bytes)?; + + // Write chunk hashes: [u8; 32][u8; 32][u8; 32]... + for chunk_info in chunk_infos { + // Convert hex hash to bytes + match hex::decode(&chunk_info.hash) { + Ok(hash_bytes) => { + if hash_bytes.len() == 32 { + writer.write_all(&hash_bytes)?; + } else { + // Pad or truncate to 32 bytes if needed + let mut fixed_hash = [0u8; 32]; + let len = hash_bytes.len().min(32); + fixed_hash[..len].copy_from_slice(&hash_bytes[..len]); + writer.write_all(&fixed_hash)?; + } + } + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to decode hash hex: {}", e), + )); + } + } + } + + Ok(()) +} + +async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) { + let total_chunks = chunk_boundaries.len() + 1; + let (total_value, total_unit) = size_display(total_bytes); + info!( + "{} chunks, ({:.2} {}, {})", + total_chunks, total_value, total_unit, total_bytes + ); + let mut start = 0; + chunk_boundaries.iter().for_each(|p| { + let next = *p as usize; + let (size_value, size_unit) = size_display(next - start); + info!( + "{} - {} (size: {:.2} {})", + start, + next - 1, + size_value, + size_unit + ); + start = next; + }); + let last = start; + let r#final = total_bytes; + let (size_value, size_unit) = size_display(total_bytes - start); + info!( + "{} - {} (size: {:.2} {})", + last, r#final, size_value, size_unit + ); +} diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs new file mode 100644 index 0000000..020cfcd --- /dev/null +++ b/src/chunker/rw/storage/write/stream.rs @@ -0,0 +1,12 @@ +use std::{collections::HashMap, path::PathBuf}; + +use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind}; + +pub async fn write_file_stream( + path: &PathBuf, + stream_read_size: u32, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + todo!() +} |
