summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write/simple.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/chunker/rw/storage/write/simple.rs')
-rw-r--r--src/chunker/rw/storage/write/simple.rs171
1 files changed, 61 insertions, 110 deletions
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs
index 75b9bd7..38aecfc 100644
--- a/src/chunker/rw/storage/write/simple.rs
+++ b/src/chunker/rw/storage/write/simple.rs
@@ -1,15 +1,20 @@
use futures::future::join_all;
use just_progress::progress;
use log::{error, info, trace};
-use std::{collections::HashMap, path::PathBuf};
+use std::{
+ collections::HashMap,
+ path::{Path, PathBuf},
+};
use tokio::{fs::File, io::AsyncReadExt};
use crate::{
chunker::{
context::ButckContext,
- rw::{error::ButckRWErrorKind, storage},
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash},
+ },
},
- core::hash::ChunkWriteHash,
storage::get_index_file_name,
utils::size_display::size_display,
};
@@ -64,7 +69,7 @@ async fn read_file(
}
async fn write_file_to_storage(
- path: &PathBuf,
+ path: &Path,
ctx: &ButckContext,
chunk_boundaries: Vec<u32>,
raw_data: &[u8],
@@ -110,16 +115,15 @@ async fn write_file_to_storage(
end - 1,
end - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- end,
- ));
+ };
+ tasks.push(write_chunk(params));
chunk_index += 1;
} else {
trace!(
@@ -141,16 +145,15 @@ async fn write_file_to_storage(
raw_data.len() - 1,
raw_data.len() - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- raw_data.len(),
- ));
+ };
+ tasks.push(write_chunk(params));
}
trace!("Total chunks prepared for writing: {}", tasks.len());
@@ -192,78 +195,81 @@ async fn write_file_to_storage(
Ok(())
}
-async fn write_chunk(
- progress_name: &str,
+struct WriteChunkParams {
+ progress_name: String,
step: f64,
- chunk_data: &[u8],
- output_dir: &PathBuf,
- chunk_hash: &ChunkWriteHash,
+ chunk_data: Vec<u8>,
+ output_dir: PathBuf,
+ chunk_hash: ChunkWriteHash,
chunk_index: usize,
- start: usize,
- end: usize,
-) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> {
+}
+
+async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWErrorKind> {
trace!(
"write_chunk[{}]: Starting, data size: {} bytes",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
trace!(
"write_chunk[{}]: Computing hash with algorithm: {:?}",
- chunk_index, chunk_hash
+ params.chunk_index, params.chunk_hash
);
- let hash_bytes = chunk_hash.hash(chunk_data);
+ let hash_bytes = params.chunk_hash.hash(&params.chunk_data);
trace!(
"write_chunk[{}]: Hash computed: {:?}",
- chunk_index, hash_bytes
+ params.chunk_index, hash_bytes
);
let hash_hex = hex::encode(hash_bytes);
- trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex);
+ trace!(
+ "write_chunk[{}]: Hash hex: {}",
+ params.chunk_index, hash_hex
+ );
- let file_path = storage::get_chunk_path(output_dir, &hash_hex);
+ let file_path = storage::get_chunk_path(&params.output_dir, &hash_hex);
if let Some(parent_dir) = file_path.parent() {
trace!(
"write_chunk[{}]: Creating directory structure: {}",
- chunk_index,
+ params.chunk_index,
parent_dir.display()
);
tokio::fs::create_dir_all(parent_dir).await?;
- trace!("write_chunk[{}]: Directory created", chunk_index);
+ trace!("write_chunk[{}]: Directory created", params.chunk_index);
}
trace!(
"write_chunk[{}]: File path: {}",
- chunk_index,
+ params.chunk_index,
file_path.display()
);
trace!(
"write_chunk[{}]: Writing {} bytes to file",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
if !file_path.exists() {
- tokio::fs::write(&file_path, chunk_data).await?;
+ tokio::fs::write(&file_path, &params.chunk_data).await?;
} else {
trace!(
"write_chunk[{}]: File already exists, skipping",
- chunk_index
+ params.chunk_index
);
}
- trace!("write_chunk[{}]: File written successfully", chunk_index);
- progress::increase(progress_name, step as f32);
- Ok(crate::chunker::rw::storage::ChunkInfo {
- index: chunk_index,
+ trace!(
+ "write_chunk[{}]: File written successfully",
+ params.chunk_index
+ );
+ progress::increase(&params.progress_name, params.step as f32);
+ Ok(ChunkInfo {
+ index: params.chunk_index,
hash: hash_hex,
- size: chunk_data.len(),
- start,
- end,
})
}
-async fn get_boundaries<'a>(
+async fn get_boundaries(
raw_data: &[u8],
ctx: &ButckContext,
params: &HashMap<&str, &str>,
@@ -276,69 +282,14 @@ async fn get_boundaries<'a>(
}
async fn write_index_file(
- index_path: &PathBuf,
- chunk_infos: &[crate::chunker::rw::storage::ChunkInfo],
- original_file_path: &PathBuf,
+ index_path: &Path,
+ chunk_infos: &[ChunkInfo],
+ original_file_path: &Path,
) -> Result<(), std::io::Error> {
- use std::io::Write;
-
- let file = std::fs::File::create(index_path)?;
- let mut writer = std::io::BufWriter::new(file);
-
- // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes
- use crate::chunker::constants::BUTCK_INDEX_MAGIC;
-
- // Write magic bytes
- writer.write_all(&BUTCK_INDEX_MAGIC)?;
-
- // Get original filename as bytes
- let filename = original_file_path
- .file_name()
- .and_then(|n| n.to_str())
- .unwrap_or("unknown");
- let filename_bytes = filename.as_bytes();
-
- // Write filename length as u16 (little-endian)
- if filename_bytes.len() > u16::MAX as usize {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- format!("Filename too long: {} bytes", filename_bytes.len()),
- ));
- }
- let filename_len = filename_bytes.len() as u16;
- writer.write_all(&filename_len.to_le_bytes())?;
-
- // Write filename bytes
- writer.write_all(filename_bytes)?;
-
- // Write chunk hashes: [u8; 32][u8; 32][u8; 32]...
- for chunk_info in chunk_infos {
- // Convert hex hash to bytes
- match hex::decode(&chunk_info.hash) {
- Ok(hash_bytes) => {
- if hash_bytes.len() == 32 {
- writer.write_all(&hash_bytes)?;
- } else {
- // Pad or truncate to 32 bytes if needed
- let mut fixed_hash = [0u8; 32];
- let len = hash_bytes.len().min(32);
- fixed_hash[..len].copy_from_slice(&hash_bytes[..len]);
- writer.write_all(&fixed_hash)?;
- }
- }
- Err(e) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Failed to decode hash hex: {}", e),
- ));
- }
- }
- }
-
- Ok(())
+ write_bidx_file(index_path, chunk_infos, original_file_path)
}
-async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) {
+pub async fn display_boundaries(chunk_boundaries: &[u32], total_bytes: usize) {
let total_chunks = chunk_boundaries.len() + 1;
let (total_value, total_unit) = size_display(total_bytes);
info!(