From e98b298d583626ab505debe778d0beba303256c3 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 29 Sep 2025 15:50:12 +0800 Subject: Add incremental transfer functionality and update core components - Implement instance_incremental_transfer module for efficient file synchronization - Add support for chunk-based file transfer with hash comparison - Update TCP connection utilities to support incremental transfer protocol - Enhance error handling and version management for file synchronization - Update dependencies and integrate new functionality into main library --- .../src/instance_incremental_transfer.rs | 1209 -------------------- 1 file changed, 1209 deletions(-) delete mode 100644 crates/utils/tcp_connection/src/instance_incremental_transfer.rs (limited to 'crates/utils/tcp_connection/src/instance_incremental_transfer.rs') diff --git a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs b/crates/utils/tcp_connection/src/instance_incremental_transfer.rs deleted file mode 100644 index aeb2cb8..0000000 --- a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs +++ /dev/null @@ -1,1209 +0,0 @@ -use std::path::{Path, PathBuf}; - -use tokio::fs::{File, OpenOptions, copy, create_dir_all, read, read_to_string, remove_file}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; - -use crate::{error::TcpTargetError, instance::ConnectionInstance}; - -// 增量传输协议版本 -const INCREMENTAL_TRANSFER_VERSION: u64 = 1; -// 块大小(字节) -const DEFAULT_CHUNK_SIZE: usize = 8192; -// 哈希大小(字节) -const HASH_SIZE: usize = 32; // blake3 produces 32-byte hashes -// 版本文件扩展名 -const VERSION_FILE_EXTENSION: &str = "ver"; -// 版本历史目录名 -const VERSION_HISTORY_DIR: &str = "diff"; -// 差异文件扩展名 -const DELTA_FILE_EXTENSION: &str = "delta"; - -// 协议模式常量 -const SERVER_DELTA_MODE: u8 = 1; -const SERVER_FULL_MODE: u8 = 2; -const CLIENT_UPDATE_MODE: u8 = 1; -const CLIENT_UPLOAD_MODE: u8 = 2; -const NO_CHANGE_MODE: u8 = 3; - -// 协议错误消息 -const ERR_INVALID_SERVER_RESPONSE: &str = "Invalid server response format"; -const ERR_VERSION_MISMATCH: &str = "Version mismatch detected"; -const ERR_CHUNK_INDEX_OUT_OF_BOUNDS: &str = "Chunk index out of bounds"; -const ERR_DELTA_FILE_CORRUPTED: &str = "Delta file corrupted or incomplete"; - -impl ConnectionInstance { - // ==================== 客户端功能 ==================== - - /// 客户端:增量更新到指定版本 - pub async fn client_update_to_version( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - println!( - "Client update: current_version={}, target_version={}", - current_version, target_version - ); - - if current_version >= target_version { - println!("Client update: Already up to date, skipping"); - return Ok(()); // 已经是最新版本 - } - - // 发送更新请求 - println!( - "Client update: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Client update: Sending mode: {}", CLIENT_UPDATE_MODE); - self.stream.write_all(&[CLIENT_UPDATE_MODE]).await?; // 客户端更新模式 - println!("Client update: Sending target version: {}", target_version); - self.stream.write_all(&target_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Client update: Request header sent"); - - // 执行增量更新 - println!("Client update: Starting incremental update..."); - let result = self - .client_perform_incremental_update(path, current_version, target_version) - .await; - println!( - "Client update: Incremental update completed, result: {:?}", - result - ); - result - } - - /// 客户端:增量上传变化到服务器 - pub async fn client_upload( - &mut self, - file_path: impl AsRef, - ) -> Result { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - - // 发送上传请求 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[2u8]).await?; // 客户端上传模式 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 执行增量上传 - let new_version = self - .client_perform_incremental_upload(path, current_version) - .await?; - - // 更新本地版本 - self.save_version(path, new_version).await?; - - Ok(new_version) - } - - // ==================== 服务端功能 ==================== - - /// 服务端:处理客户端更新请求 - pub async fn server_handle_client_update( - &mut self, - file_path: impl AsRef, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server: Reading protocol version..."); - // 读取协议版本 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Server: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - // 读取请求模式 - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Server: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - CLIENT_UPDATE_MODE => { - // 客户端更新模式 - println!("Server: Client update mode detected"); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let target_version = i32::from_be_bytes(version_buf); - println!("Server: Target version: {}", target_version); - - println!("Server: Sending version delta..."); - let result = self.server_send_version_delta(path, target_version).await; - println!("Server: Version delta sent, result: {:?}", result); - result - } - CLIENT_UPLOAD_MODE => { - // 客户端上传模式 - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let client_version = i32::from_be_bytes(version_buf); - - self.server_receive_client_changes(path, client_version) - .await - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, mode_buf[0] - ))); - } - } - } - - /// 服务端:发送指定版本的增量数据 - pub async fn server_send_delta_to_version( - &mut self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送响应头 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[1u8]).await?; // 服务端增量模式 - - self.server_send_version_delta_internal(path, from_version, to_version) - .await - } - - // ==================== 内部实现 ==================== - - /// 客户端:执行增量更新 - async fn client_perform_incremental_update( - &mut self, - file_path: impl AsRef, - current_version: i32, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送客户端当前版本给服务器 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 读取服务器响应 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - - match mode_buf[0] { - SERVER_DELTA_MODE => { - // 服务端增量模式 - self.client_apply_delta(path, target_version).await - } - NO_CHANGE_MODE => { - // 无变化模式,不需要更新 - Ok(()) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:应用增量数据 - async fn client_apply_delta( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取增量类型 - let mut delta_type_buf = [0u8; 1]; - self.stream.read_exact(&mut delta_type_buf).await?; - - match delta_type_buf[0] { - SERVER_FULL_MODE => { - // 完整文件传输 - self.read_file(path).await?; - } - SERVER_DELTA_MODE => { - // 增量块传输 - self.client_receive_and_apply_chunks(path).await?; - } - NO_CHANGE_MODE => { - // 无变化模式 - // 不需要做任何操作 - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, delta_type_buf[0] - ))); - } - } - - // 更新本地版本 - self.save_version(path, target_version).await?; - - // 发送确认 - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - - Ok(()) - } - - /// 客户端:接收并应用增量块 - async fn client_receive_and_apply_chunks( - &mut self, - file_path: impl AsRef, - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, None) - .await - } - - /// 客户端:执行增量上传 - async fn client_perform_incremental_upload( - &mut self, - file_path: impl AsRef, - current_version: i32, - ) -> Result { - let path = file_path.as_ref(); - - // 读取服务器响应 - println!("Client upload: Reading server response version..."); - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Client upload: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - println!("Client upload: Reading server response mode..."); - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Client upload: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - SERVER_FULL_MODE => { - // 服务端接收模式 - let new_version = self.client_send_changes(path, current_version).await?; - Ok(new_version) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:发送变化到服务器 - async fn client_send_changes( - &mut self, - file_path: impl AsRef, - client_version: i32, - ) -> Result { - let path = file_path.as_ref(); - println!( - "Client send changes: Starting to send changes, client_version={}", - client_version - ); - - // 发送确认给服务器 - println!("Client: Sending acknowledgment to server..."); - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - println!("Client: Acknowledgment sent"); - - // 发送客户端版本给服务器进行验证 - self.stream.write_all(&client_version.to_be_bytes()).await?; - self.stream.flush().await?; - - // 计算当前文件块哈希 - let current_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Client: Calculated {} current chunk hashes", - current_hashes.len() - ); - - // 发送块哈希 - println!("Client: Sending chunk hashes to server..."); - match self.send_chunk_hashes(¤t_hashes).await { - Ok(_) => println!("Client: Successfully sent chunk hashes"), - Err(e) => { - println!("Client: ERROR sending chunk hashes: {:?}", e); - return Err(e); - } - } - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed stream"), - Err(e) => { - println!("Client: ERROR flushing stream: {:?}", e); - return Err(e.into()); - } - } - - // 读取服务器需要的块列表 - println!("Client: Reading chunks needed from server..."); - let chunks_to_send = self.receive_chunks_to_send().await?; - println!("Client: Received chunks to send: {:?}", chunks_to_send); - - // 发送需要的块 - println!("Client send changes: Sending file chunks to server..."); - self.send_file_chunks(path, &chunks_to_send).await?; - println!("Client send changes: File chunks sent"); - - // 读取新版本号 - println!("Client send changes: Reading new version from server..."); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let new_version = i32::from_be_bytes(version_buf); - println!("Client send changes: Received new version: {}", new_version); - - Ok(new_version) - } - - /// 服务端:发送版本增量数据 - async fn server_send_version_delta( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server send version delta: Reading client version..."); - // 获取客户端当前版本(需要从协议中读取) - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server send version delta: Client version: {}", - client_version - ); - - // 发送响应头 - println!( - "Server send version delta: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!( - "Server send version delta: Sending mode: {}", - SERVER_DELTA_MODE - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 服务端增量模式 - self.stream.flush().await?; - println!("Server send version delta: Response header sent"); - - println!("Server send version delta: Calling internal delta function..."); - let result = self - .server_send_version_delta_internal(path, client_version, target_version) - .await; - println!( - "Server send version delta: Internal function completed, result: {:?}", - result - ); - result - } - - /// 服务端:内部发送版本增量 - async fn server_send_version_delta_internal( - &mut self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!( - "Server send version delta internal: from_version={}, to_version={}", - from_version, to_version - ); - - if from_version == to_version { - // 版本相同,不需要传输 - println!( - "Server send version delta internal: Versions are the same, sending NO_CHANGE_MODE" - ); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - return Ok(()); - } - - // 计算版本间的差异 - println!("Server send version delta internal: Calculating version delta..."); - let delta_chunks = self - .calculate_version_delta(path, from_version, to_version) - .await?; - println!( - "Server send version delta internal: Delta chunks count: {}", - delta_chunks.len() - ); - - if delta_chunks.is_empty() { - // 没有变化或目标版本不存在 - println!("Server send version delta internal: No changes, sending NO_CHANGE_MODE"); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - } else { - // 发送增量块 - println!( - "Server send version delta internal: Sending delta chunks with SERVER_DELTA_MODE" - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 增量块模式 - self.send_file_chunks(path, &delta_chunks).await?; - } - println!("Server send version delta internal: Completed successfully"); - Ok(()) - } - - /// 服务端:接收客户端变化 - async fn server_receive_client_changes( - &mut self, - file_path: impl AsRef, - client_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - println!("Server receive changes: Client version: {}", client_version); - - // 验证客户端版本是否匹配服务器当前版本 - let server_version = self.get_current_version(path).await?; - if client_version != server_version { - return Err(TcpTargetError::Protocol(format!( - "{}: client {}, server {}", - ERR_VERSION_MISMATCH, client_version, server_version - ))); - } - - // 发送响应头 - println!( - "Server receive changes: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Server receive changes: Sending mode: {}", SERVER_FULL_MODE); - self.stream.write_all(&[SERVER_FULL_MODE]).await?; // 服务端完整文件模式 - self.stream.flush().await?; - println!("Server receive changes: Response header sent"); - - // 接收客户端块哈希 - println!("Server receive changes: Receiving client chunk hashes..."); - // 等待客户端确认 - let mut ack_buf = [0u8; 1]; - self.stream.read_exact(&mut ack_buf).await?; - if ack_buf[0] != 1 { - return Err(TcpTargetError::Protocol( - "Client acknowledgment failed".to_string(), - )); - } - println!("Server receive changes: Client acknowledged, reading client version..."); - - // 读取客户端版本进行验证 - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let received_client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server receive changes: Received client version: {}", - received_client_version - ); - - // 验证客户端版本是否匹配 - if received_client_version != client_version { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {}, got {}", - ERR_VERSION_MISMATCH, client_version, received_client_version - ))); - } - - println!("Server receive changes: Reading chunk hashes..."); - let client_chunk_hashes = match self.receive_chunk_hashes().await { - Ok(hashes) => { - println!( - "Server receive changes: Successfully received {} client chunk hashes", - hashes.len() - ); - hashes - } - Err(e) => { - println!( - "Server receive changes: ERROR receiving chunk hashes: {:?}", - e - ); - return Err(e); - } - }; - - // 计算服务器当前块哈希 - let server_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Server: Calculated {} server chunk hashes", - server_hashes.len() - ); - - // 比较差异,确定需要更新的块 - let chunks_needed = self.compare_chunk_hashes(&client_chunk_hashes, &server_hashes); - println!( - "Server: Chunks needed after comparison: {:?}", - chunks_needed - ); - - // 发送需要的块列表 - println!("Server: Sending chunks needed list: {:?}", chunks_needed); - self.send_chunks_needed(&chunks_needed).await?; - - // 接收并应用客户端块 - self.receive_and_apply_chunks(path, &chunks_needed).await?; - - // 生成新版本号(这里简化:版本号+1) - let current_version = self.get_current_version(path).await?; - let new_version = current_version + 1; - - // 保存差异而不是完整文件 - self.save_version_delta(path, current_version, new_version, &chunks_needed) - .await?; - self.save_version(path, new_version).await?; - - // 发送新版本号给客户端 - println!( - "Server receive changes: Sending new version to client: {}", - new_version - ); - self.stream.write_all(&new_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Server receive changes: New version sent to client"); - - Ok(()) - } - - // ==================== 工具函数 ==================== - - /// 计算文件的块哈希 - async fn calculate_file_chunk_hashes( - &self, - file_path: impl AsRef, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - let mut hashes = Vec::new(); - let mut buffer = vec![0u8; DEFAULT_CHUNK_SIZE]; - let mut bytes_read = 0; - - while bytes_read < file_size { - let bytes_to_read = (file_size - bytes_read).min(DEFAULT_CHUNK_SIZE as u64) as usize; - let chunk = &mut buffer[..bytes_to_read]; - - file.read_exact(chunk).await?; - - let hash = self.simple_chunk_hash(chunk); - hashes.push(hash); - - bytes_read += bytes_to_read as u64; - } - - Ok(hashes) - } - - /// 简单的块哈希函数 - fn simple_chunk_hash(&self, data: &[u8]) -> [u8; HASH_SIZE] { - // 使用稳定的blake3哈希算法,确保跨平台一致性 - let hash = blake3::hash(data); - let mut hash_bytes = [0u8; HASH_SIZE]; - hash_bytes[..32].copy_from_slice(hash.as_bytes()); - hash_bytes - } - - /// 发送块哈希列表 - async fn send_chunk_hashes( - &mut self, - hashes: &[[u8; HASH_SIZE]], - ) -> Result<(), TcpTargetError> { - let hash_count = hashes.len() as u32; - println!("Client: Sending {} chunk hashes", hash_count); - - match self.stream.write_all(&hash_count.to_be_bytes()).await { - Ok(_) => println!("Client: Successfully sent hash count"), - Err(e) => { - println!("Client: ERROR sending hash count: {:?}", e); - return Err(e.into()); - } - } - // Extra flush to ensure hash count is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed hash count"), - Err(e) => { - println!("Client: ERROR flushing hash count: {:?}", e); - return Err(e.into()); - } - } - - for (i, hash) in hashes.iter().enumerate() { - println!("Client: Sending chunk hash {}: {:?}", i, &hash[..8]); // Show first 8 bytes for debugging - match self.stream.write_all(hash).await { - Ok(_) => println!("Client: Successfully sent chunk hash {}", i), - Err(e) => { - println!("Client: ERROR sending chunk hash {}: {:?}", i, e); - return Err(e.into()); - } - } - } - - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed after sending hashes"), - Err(e) => { - println!("Client: ERROR flushing after sending hashes: {:?}", e); - return Err(e.into()); - } - } - println!("Client: All chunk hashes sent"); - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed final"), - Err(e) => { - println!("Client: ERROR flushing final: {:?}", e); - return Err(e.into()); - } - } - Ok(()) - } - - /// 接收块哈希列表 - async fn receive_chunk_hashes(&mut self) -> Result, TcpTargetError> { - println!("Server: Starting to receive chunk hashes..."); - - let mut count_buf = [0u8; 4]; - println!("Server: Reading chunk hash count..."); - self.stream.read_exact(&mut count_buf).await?; - let hash_count = u32::from_be_bytes(count_buf) as usize; - println!( - "Server: Reading {} chunk hashes, raw bytes: {:?}", - hash_count, count_buf - ); - - let mut hashes = Vec::with_capacity(hash_count); - let mut hash_buf = [0u8; HASH_SIZE]; - - for i in 0..hash_count { - println!("Server: Reading chunk hash {}...", i); - self.stream.read_exact(&mut hash_buf).await?; - println!("Server: Received chunk hash {}: {:?}", i, &hash_buf[..8]); // Show first 8 bytes for debugging - hashes.push(hash_buf); - } - - println!("Server: All {} chunk hashes received", hashes.len()); - Ok(hashes) - } - - /// 比较块哈希并返回需要更新的块索引 - fn compare_chunk_hashes( - &self, - new_hashes: &[[u8; HASH_SIZE]], - old_hashes: &[[u8; HASH_SIZE]], - ) -> Vec { - let mut chunks_needed = Vec::new(); - - for (i, (new_hash, old_hash)) in new_hashes.iter().zip(old_hashes.iter()).enumerate() { - if new_hash != old_hash { - chunks_needed.push(i); - } - } - - // 如果新文件有更多块,也需要更新 - for i in old_hashes.len()..new_hashes.len() { - chunks_needed.push(i); - } - - chunks_needed - } - - /// 接收需要传输的块列表 - async fn receive_chunks_to_send(&mut self) -> Result, TcpTargetError> { - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - let mut chunks = Vec::with_capacity(chunk_count); - let mut index_buf = [0u8; 4]; - - for _ in 0..chunk_count { - self.stream.read_exact(&mut index_buf).await?; - let index = u32::from_be_bytes(index_buf) as usize; - chunks.push(index); - } - - Ok(chunks) - } - - /// 发送需要接收的块列表 - async fn send_chunks_needed(&mut self, chunks: &[usize]) -> Result<(), TcpTargetError> { - let chunk_count = chunks.len() as u32; - self.stream.write_all(&chunk_count.to_be_bytes()).await?; - - for &chunk_index in chunks { - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 发送指定的文件块 - async fn send_file_chunks( - &mut self, - file_path: impl AsRef, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - // 发送块数量 - self.stream - .write_all(&(chunk_indices.len() as u32).to_be_bytes()) - .await?; - - if chunk_indices.is_empty() { - self.stream.flush().await?; - return Ok(()); - } - - for &chunk_index in chunk_indices { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - if chunk_offset >= file_size { - continue; // 跳过超出文件范围的块 - } - - // 定位到块位置 - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 计算块大小 - let remaining_bytes = file_size - chunk_offset; - let chunk_size = remaining_bytes.min(DEFAULT_CHUNK_SIZE as u64) as usize; - - // 发送块索引和大小 - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - self.stream - .write_all(&(chunk_size as u32).to_be_bytes()) - .await?; - - // 发送块数据 - let mut buffer = vec![0u8; chunk_size]; - file.read_exact(&mut buffer).await?; - self.stream.write_all(&buffer).await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 接收并应用增量块 - async fn receive_and_apply_chunks( - &mut self, - file_path: impl AsRef, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, Some(chunk_indices)) - .await - } - - /// 内部工具函数:接收并应用块数据 - async fn receive_and_apply_chunks_internal( - &mut self, - file_path: impl AsRef, - expected_indices: Option<&[usize]>, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取块数量 - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - // 创建临时文件来接收完整内容 - let temp_path = path.with_extension("tmp"); - let temp_file = File::create(&temp_path).await?; - let mut temp_writer = BufWriter::new(temp_file); - - // 记录接收到的块数据 - let mut received_chunks = Vec::new(); - - for i in 0..chunk_count { - // 读取块索引和大小 - let mut index_buf = [0u8; 4]; - self.stream.read_exact(&mut index_buf).await?; - let chunk_index = u32::from_be_bytes(index_buf) as usize; - - // 验证块索引(如果提供了预期索引) - if let Some(expected) = expected_indices { - if i >= expected.len() || chunk_index != expected[i] { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {:?}, got {}", - ERR_CHUNK_INDEX_OUT_OF_BOUNDS, - expected.get(i), - chunk_index - ))); - } - } - - let mut size_buf = [0u8; 4]; - self.stream.read_exact(&mut size_buf).await?; - let chunk_size = u32::from_be_bytes(size_buf) as usize; - - // 读取块数据 - let mut chunk_data = vec![0u8; chunk_size]; - self.stream.read_exact(&mut chunk_data).await?; - - // 记录接收到的块 - received_chunks.push((chunk_index, chunk_data)); - } - - // 按块索引排序并写入临时文件 - received_chunks.sort_by_key(|(index, _)| *index); - - for (chunk_index, chunk_data) in received_chunks { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut temp_writer, std::io::SeekFrom::Start(chunk_offset)) - .await?; - temp_writer.write_all(&chunk_data).await?; - } - - temp_writer.flush().await?; - - // 替换原文件 - tokio::fs::rename(&temp_path, path).await?; - - Ok(()) - } - - /// 获取当前文件版本 - async fn get_current_version( - &self, - file_path: impl AsRef, - ) -> Result { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - if !version_file_path.exists() { - return Ok(0); // 默认版本为0 - } - - let version_content = read_to_string(&version_file_path) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to read version file: {}", e)))?; - - version_content - .trim() - .parse::() - .map_err(|e| TcpTargetError::File(format!("Invalid version format: {}", e))) - } - - /// 保存文件版本 - async fn save_version( - &self, - file_path: impl AsRef, - version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - tokio::fs::write(&version_file_path, version.to_string()) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to write version file: {}", e)))?; - - Ok(()) - } - - /// 计算版本间的差异 - async fn calculate_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - - // 如果目标版本不存在,返回空差异 - let current_version = self.get_current_version(path).await?; - if to_version > current_version { - return Ok(Vec::new()); - } - - // 获取源版本和目标版本的块哈希 - let from_hashes = self.get_version_chunk_hashes(path, from_version).await?; - let to_hashes = self.get_version_chunk_hashes(path, to_version).await?; - - // 比较差异 - let delta_chunks = self.compare_chunk_hashes(&to_hashes, &from_hashes); - - Ok(delta_chunks) - } - - /// 获取指定版本的块哈希 - async fn get_version_chunk_hashes( - &self, - file_path: impl AsRef, - version: i32, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - - if version == 0 { - // 版本0表示空文件 - return Ok(Vec::new()); - } - - // 从版本历史中重建文件并计算哈希 - let reconstructed_path = self.reconstruct_version(path, version).await?; - let hashes = self - .calculate_file_chunk_hashes(&reconstructed_path) - .await?; - - // 清理临时文件 - let _ = remove_file(&reconstructed_path).await; - - Ok(hashes) - } - - /// 从差异重建指定版本的文件 - async fn reconstruct_version( - &self, - file_path: impl AsRef, - target_version: i32, - ) -> Result { - let path = file_path.as_ref(); - let temp_path = path.with_extension(format!("temp_{}", target_version)); - - if target_version == 0 { - // 创建空文件 - tokio::fs::write(&temp_path, b"").await?; - return Ok(temp_path); - } - - // 从版本0开始,逐步应用差异 - let mut current_version = 0; - let mut current_path = path.with_extension("temp_base"); - tokio::fs::write(¤t_path, b"").await?; // 创建基础空文件 - - while current_version < target_version { - let next_version = current_version + 1; - let (delta_chunks, chunk_data_list) = self - .load_version_delta(path, current_version, next_version) - .await?; - - // 应用差异到新文件 - let next_path = path.with_extension(format!("temp_{}", next_version)); - self.apply_delta_to_file(¤t_path, &next_path, &delta_chunks, &chunk_data_list) - .await?; - - // 清理旧临时文件 - if current_version > 0 { - let _ = tokio::fs::remove_file(¤t_path).await; - } - current_path = next_path; - current_version = next_version; - } - - Ok(current_path) - } - - /// 保存版本差异 - async fn save_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - changed_chunks: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 创建版本历史目录 - let history_dir = path - .parent() - .ok_or_else(|| TcpTargetError::File("Invalid file path".to_string()))? - .join(VERSION_HISTORY_DIR); - - if !history_dir.exists() { - create_dir_all(&history_dir).await?; - } - - // 保存差异信息(记录变化的块索引和实际数据) - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name() - .ok_or_else(|| TcpTargetError::File("Invalid file name".to_string()))? - .to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - // 读取当前版本文件以获取变化块的实际数据 - let current_file_data = read(path).await?; - - let mut delta_data = Vec::new(); - - for &chunk_index in changed_chunks { - // 写入块索引 - delta_data.extend_from_slice(&(chunk_index as u32).to_be_bytes()); - - // 计算块的起始和结束位置 - let chunk_start = chunk_index * DEFAULT_CHUNK_SIZE; - let chunk_end = - std::cmp::min(chunk_start + DEFAULT_CHUNK_SIZE, current_file_data.len()); - - // 写入块数据大小 - let chunk_size = (chunk_end - chunk_start) as u32; - delta_data.extend_from_slice(&chunk_size.to_be_bytes()); - - // 写入块数据 - delta_data.extend_from_slice(¤t_file_data[chunk_start..chunk_end]); - } - - tokio::fs::write(&delta_path, &delta_data).await?; - - Ok(()) - } - - /// 加载版本差异 - async fn load_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(Vec, Vec>), TcpTargetError> { - let path = file_path.as_ref(); - let history_dir = path.parent().unwrap().join(VERSION_HISTORY_DIR); - - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name().unwrap().to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - if !delta_path.exists() { - return Ok((Vec::new(), Vec::new())); - } - - let delta_data = read(&delta_path).await?; - let mut chunks = Vec::new(); - let mut chunk_data_list = Vec::new(); - - let mut offset = 0; - while offset < delta_data.len() { - // 读取块索引 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete index data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let index = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块大小 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete size data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_size = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块数据 - if offset + chunk_size > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete chunk data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_data = delta_data[offset..offset + chunk_size].to_vec(); - offset += chunk_size; - - chunks.push(index); - chunk_data_list.push(chunk_data); - } - - Ok((chunks, chunk_data_list)) - } - - /// 应用差异到文件 - async fn apply_delta_to_file( - &self, - source_path: impl AsRef, - target_path: impl AsRef, - delta_chunks: &[usize], - chunk_data_list: &[Vec], - ) -> Result<(), TcpTargetError> { - let source_path = source_path.as_ref(); - let target_path = target_path.as_ref(); - - // 复制源文件到目标文件 - copy(source_path, target_path).await?; - - if delta_chunks.is_empty() { - return Ok(()); - } - - // 打开目标文件进行修改 - let mut file = OpenOptions::new().write(true).open(target_path).await?; - - for (i, &chunk_index) in delta_chunks.iter().enumerate() { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 使用从delta文件中读取的实际块数据进行写入 - if i < chunk_data_list.len() { - file.write_all(&chunk_data_list[i]).await?; - } - } - - Ok(()) - } -} -- cgit