diff options
Diffstat (limited to 'src/chunker/rw/storage/write')
| -rw-r--r-- | src/chunker/rw/storage/write/simple.rs | 44 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/stream.rs | 36 |
2 files changed, 37 insertions, 43 deletions
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs index 461afff..c7a20ea 100644 --- a/src/chunker/rw/storage/write/simple.rs +++ b/src/chunker/rw/storage/write/simple.rs @@ -1,6 +1,6 @@ use futures::future::join_all; use just_progress::progress; -use log::{error, info, trace}; +use log::{debug, error, trace}; use std::{ collections::HashMap, path::{Path, PathBuf}, @@ -15,8 +15,7 @@ use crate::{ storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash}, }, }, - storage::get_index_file_name, - utils::size_display::size_display, + storage::{display_boundaries, get_index_file_name}, }; pub async fn write_file_simple( @@ -92,7 +91,7 @@ async fn write_file_to_storage( ctx.storage_path.as_ref().unwrap().display() ); - info!( + debug!( "{} chunks will be written to {}", chunk_count, ctx.storage_path.as_ref().unwrap().display() @@ -182,7 +181,7 @@ async fn write_file_to_storage( } } - info!("All {} chunks written successfully", success_count); + debug!("All {} chunks written successfully", success_count); // Write index file trace!("Writing index file to: {}", output_index_file.display()); @@ -190,7 +189,7 @@ async fn write_file_to_storage( error!("Failed to write index file: {}", e); return Err(ButckRWErrorKind::IOError(e)); } - info!("Index file written to: {}", output_index_file.display()); + debug!("Index file written to: {}", output_index_file.display()); trace!("write_file_to_storage completed successfully"); @@ -254,6 +253,9 @@ async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWError params.chunk_index, params.chunk_data.len() ); + + progress::increase(¶ms.progress_name, params.step as f32); + if !file_path.exists() { tokio::fs::write(&file_path, ¶ms.chunk_data).await?; } else { @@ -266,7 +268,6 @@ async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWError "write_chunk[{}]: File written successfully", params.chunk_index ); - progress::increase(¶ms.progress_name, params.step as f32); Ok(ChunkInfo { index: params.chunk_index, hash: hash_hex, @@ -292,32 +293,3 @@ async fn write_index_file( ) -> Result<(), std::io::Error> { write_bidx_file(index_path, chunk_infos, original_file_path) } - -pub async fn display_boundaries(chunk_boundaries: &[u32], total_bytes: usize) { - let total_chunks = chunk_boundaries.len() + 1; - let (total_value, total_unit) = size_display(total_bytes); - info!( - "{} chunks, ({:.2} {}, {})", - total_chunks, total_value, total_unit, total_bytes - ); - let mut start = 0; - chunk_boundaries.iter().for_each(|p| { - let next = *p as usize; - let (size_value, size_unit) = size_display(next - start); - info!( - "{} - {} (size: {:.2} {})", - start, - next - 1, - size_value, - size_unit - ); - start = next; - }); - let last = start; - let r#final = total_bytes; - let (size_value, size_unit) = size_display(total_bytes - start); - info!( - "{} - {} (size: {:.2} {})", - last, r#final, size_value, size_unit - ); -} diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs index 74a391b..c4e786f 100644 --- a/src/chunker/rw/storage/write/stream.rs +++ b/src/chunker/rw/storage/write/stream.rs @@ -1,4 +1,11 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + path::Path, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, +}; use crate::{ chunker::{ @@ -8,11 +15,11 @@ use crate::{ storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path}, }, }, - storage::{get_index_file_name, simple::display_boundaries}, + storage::{display_boundaries, get_index_file_name}, }; use butck_policies::chunk_stream_with; use just_progress::progress; -use log::{error, info, trace}; +use log::{debug, error, trace}; use tokio::sync::Mutex; pub async fn write_file_stream( @@ -26,6 +33,15 @@ pub async fn write_file_stream( path.display() ); + let file_size = tokio::fs::metadata(path) + .await + .map_err(|e| { + error!("Failed to get file metadata: {}", e); + ButckRWErrorKind::IOError(e) + })? + .len() as f32; + let readed: Arc<AtomicU32> = Arc::new(AtomicU32::new(0)); + // Check if policy is specified let policy_name = ctx.policy_name.as_ref().ok_or_else(|| { error!("No chunking policy specified for stream write"); @@ -80,6 +96,7 @@ pub async fn write_file_stream( stream_read_size, path, |chunk_data: Vec<u8>| { + let readed = readed.clone(); let storage_dir = storage_dir.clone(); let chunk_hash = chunk_hash; let progress_name = progress_name.clone(); @@ -87,6 +104,14 @@ pub async fn write_file_stream( let chunk_counter = Arc::clone(&chunk_counter); Box::pin(async move { + // Update readed + readed.fetch_add(chunk_data.len() as u32, Ordering::SeqCst); + let readed_f32 = readed.load(Ordering::SeqCst) as f32; + + // Update progress + let progress = readed_f32 / file_size; + progress::update_progress(&progress_name, progress); + // Increment chunk counter let mut counter = chunk_counter.lock().await; let chunk_index = *counter; @@ -116,9 +141,6 @@ pub async fn write_file_stream( hash: hash_hex, }); - // Update progress - progress::increase(&progress_name, 0.01); // Small increment per chunk - Ok(()) }) }, @@ -145,7 +167,7 @@ pub async fn write_file_stream( ButckRWErrorKind::IndexFileWriteFailed(e.to_string()) })?; - info!( + debug!( "Stream write completed for {}: {} chunks written", path.display(), chunk_infos.len() |
