From 7f6a6600bbb6ec4a76e7cbd6bdbaec2875a80f5d Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Sun, 28 Sep 2025 17:39:31 +0800 Subject: Refactor TCP connection authentication into separate module - Extract challenge-response authentication code from instance.rs to new instance_challenge.rs - Add instance_challenge module declaration to lib.rs - Maintain all cryptographic functionality while improving code organization --- crates/utils/tcp_connection/src/instance.rs | 285 +------------------- .../utils/tcp_connection/src/instance_challenge.rs | 297 +++++++++++++++++++++ crates/utils/tcp_connection/src/lib.rs | 2 + 3 files changed, 300 insertions(+), 284 deletions(-) create mode 100644 crates/utils/tcp_connection/src/instance_challenge.rs (limited to 'crates/utils/tcp_connection/src') diff --git a/crates/utils/tcp_connection/src/instance.rs b/crates/utils/tcp_connection/src/instance.rs index fd620e2..dc49591 100644 --- a/crates/utils/tcp_connection/src/instance.rs +++ b/crates/utils/tcp_connection/src/instance.rs @@ -1,11 +1,5 @@ use std::{path::Path, time::Duration}; -use rand::TryRngCore; -use rsa::{ - RsaPrivateKey, RsaPublicKey, - pkcs1::{DecodeRsaPrivateKey, DecodeRsaPublicKey}, - sha2, -}; use serde::Serialize; use tokio::{ fs::{File, OpenOptions}, @@ -13,12 +7,7 @@ use tokio::{ net::TcpStream, }; -use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey}; -use ring::rand::SystemRandom; -use ring::signature::{ - self, ECDSA_P256_SHA256_ASN1, ECDSA_P384_SHA384_ASN1, EcdsaKeyPair, RSA_PKCS1_2048_8192_SHA256, - UnparsedPublicKey, -}; +use ring::signature::{self}; use crate::error::TcpTargetError; @@ -525,276 +514,4 @@ impl ConnectionInstance { Ok(()) } - - /// Initiates a challenge to the target machine to verify connection security - /// - /// This method performs a cryptographic challenge-response authentication: - /// 1. Generates a random 32-byte challenge - /// 2. Sends the challenge to the target machine - /// 3. Receives a digital signature of the challenge - /// 4. Verifies the signature using the appropriate public key - /// - /// # Arguments - /// * `public_key_dir` - Directory containing public key files for verification - /// - /// # Returns - /// * `Ok(true)` - Challenge verification successful - /// * `Ok(false)` - Challenge verification failed - /// * `Err(TcpTargetError)` - Error during challenge process - pub async fn challenge( - &mut self, - public_key_dir: impl AsRef, - ) -> Result { - // Generate random challenge - let mut challenge = [0u8; 32]; - rand::rngs::OsRng - .try_fill_bytes(&mut challenge) - .map_err(|e| { - TcpTargetError::Crypto(format!("Failed to generate random challenge: {}", e)) - })?; - - // Send challenge to target - self.stream.write_all(&challenge).await?; - self.stream.flush().await?; - - // Read signature from target - let mut signature = Vec::new(); - let mut signature_len_buf = [0u8; 4]; - self.stream.read_exact(&mut signature_len_buf).await?; - - let signature_len = u32::from_be_bytes(signature_len_buf) as usize; - signature.resize(signature_len, 0); - self.stream.read_exact(&mut signature).await?; - - // Read key identifier from target to identify which public key to use - let mut key_id_len_buf = [0u8; 4]; - self.stream.read_exact(&mut key_id_len_buf).await?; - let key_id_len = u32::from_be_bytes(key_id_len_buf) as usize; - - let mut key_id_buf = vec![0u8; key_id_len]; - self.stream.read_exact(&mut key_id_buf).await?; - let key_id = String::from_utf8(key_id_buf) - .map_err(|e| TcpTargetError::Crypto(format!("Invalid key identifier: {}", e)))?; - - // Load appropriate public key - let public_key_path = public_key_dir.as_ref().join(format!("{}.pem", key_id)); - if !public_key_path.exists() { - return Ok(false); - } - - let public_key_pem = tokio::fs::read_to_string(&public_key_path).await?; - - // Try to verify with different key types - let verified = if let Ok(rsa_key) = RsaPublicKey::from_pkcs1_pem(&public_key_pem) { - let padding = rsa::pkcs1v15::Pkcs1v15Sign::new::(); - rsa_key.verify(padding, &challenge, &signature).is_ok() - } else if let Ok(ed25519_key) = - VerifyingKey::from_bytes(&parse_ed25519_public_key(&public_key_pem)) - { - if signature.len() == 64 { - let sig_bytes: [u8; 64] = signature.as_slice().try_into().map_err(|_| { - TcpTargetError::Crypto("Invalid signature length for Ed25519".to_string()) - })?; - let sig = Signature::from_bytes(&sig_bytes); - ed25519_key.verify(&challenge, &sig).is_ok() - } else { - false - } - } else if let Ok(dsa_key_info) = parse_dsa_public_key(&public_key_pem) { - verify_dsa_signature(&dsa_key_info, &challenge, &signature) - } else { - false - }; - - Ok(verified) - } - - /// Accepts a challenge from the target machine to verify connection security - /// - /// This method performs a cryptographic challenge-response authentication: - /// 1. Receives a random 32-byte challenge from the target machine - /// 2. Signs the challenge using the appropriate private key - /// 3. Sends the digital signature back to the target machine - /// 4. Sends the key identifier for public key verification - /// - /// # Arguments - /// * `private_key_file` - Path to the private key file for signing - /// * `verify_public_key` - Key identifier for public key verification - /// - /// # Returns - /// * `Ok(true)` - Challenge response sent successfully - /// * `Ok(false)` - Private key format not supported - /// * `Err(TcpTargetError)` - Error during challenge response process - pub async fn accept_challenge( - &mut self, - private_key_file: impl AsRef, - verify_public_key: &str, - ) -> Result { - // Read challenge from initiator - let mut challenge = [0u8; 32]; - self.stream.read_exact(&mut challenge).await?; - - // Load private key - let private_key_pem = tokio::fs::read_to_string(&private_key_file).await?; - - // Sign the challenge with supported key types - let signature = if let Ok(rsa_key) = RsaPrivateKey::from_pkcs1_pem(&private_key_pem) { - let padding = rsa::pkcs1v15::Pkcs1v15Sign::new::(); - rsa_key.sign(padding, &challenge)? - } else if let Ok(ed25519_key) = parse_ed25519_private_key(&private_key_pem) { - ed25519_key.sign(&challenge).to_bytes().to_vec() - } else if let Ok(dsa_key_info) = parse_dsa_private_key(&private_key_pem) { - sign_with_dsa(&dsa_key_info, &challenge)? - } else { - return Ok(false); - }; - - // Send signature length and signature - let signature_len = signature.len() as u32; - self.stream.write_all(&signature_len.to_be_bytes()).await?; - self.stream.flush().await?; - self.stream.write_all(&signature).await?; - self.stream.flush().await?; - - // Send key identifier for public key identification - let key_id_bytes = verify_public_key.as_bytes(); - let key_id_len = key_id_bytes.len() as u32; - self.stream.write_all(&key_id_len.to_be_bytes()).await?; - self.stream.flush().await?; - self.stream.write_all(key_id_bytes).await?; - self.stream.flush().await?; - - Ok(true) - } -} - -/// Parse Ed25519 public key from PEM format -fn parse_ed25519_public_key(pem: &str) -> [u8; 32] { - // Robust parsing for Ed25519 public key using pem crate - let mut key_bytes = [0u8; 32]; - - if let Ok(pem_data) = pem::parse(pem) - && pem_data.tag() == "PUBLIC KEY" - && pem_data.contents().len() >= 32 - { - let contents = pem_data.contents(); - key_bytes.copy_from_slice(&contents[contents.len() - 32..]); - } - key_bytes -} - -/// Parse Ed25519 private key from PEM format -fn parse_ed25519_private_key(pem: &str) -> Result { - if let Ok(pem_data) = pem::parse(pem) - && pem_data.tag() == "PRIVATE KEY" - && pem_data.contents().len() >= 32 - { - let contents = pem_data.contents(); - let mut seed = [0u8; 32]; - seed.copy_from_slice(&contents[contents.len() - 32..]); - return Ok(SigningKey::from_bytes(&seed)); - } - Err(TcpTargetError::Crypto( - "Invalid Ed25519 private key format".to_string(), - )) -} - -/// Parse DSA public key information from PEM -fn parse_dsa_public_key( - pem: &str, -) -> Result<(&'static dyn signature::VerificationAlgorithm, Vec), TcpTargetError> { - if let Ok(pem_data) = pem::parse(pem) { - let contents = pem_data.contents().to_vec(); - - // Try different DSA algorithms based on PEM tag - match pem_data.tag() { - "EC PUBLIC KEY" | "PUBLIC KEY" if pem.contains("ECDSA") || pem.contains("ecdsa") => { - if pem.contains("P-256") { - return Ok((&ECDSA_P256_SHA256_ASN1, contents)); - } else if pem.contains("P-384") { - return Ok((&ECDSA_P384_SHA384_ASN1, contents)); - } - } - "RSA PUBLIC KEY" | "PUBLIC KEY" => { - return Ok((&RSA_PKCS1_2048_8192_SHA256, contents)); - } - _ => {} - } - - // Default to RSA for unknown types - return Ok((&RSA_PKCS1_2048_8192_SHA256, contents)); - } - Err(TcpTargetError::Crypto( - "Invalid DSA public key format".to_string(), - )) -} - -/// Parse DSA private key information from PEM -fn parse_dsa_private_key( - pem: &str, -) -> Result<(&'static dyn signature::VerificationAlgorithm, Vec), TcpTargetError> { - // For DSA, private key verification uses the same algorithm as public key - parse_dsa_public_key(pem) -} - -/// Verify DSA signature -fn verify_dsa_signature( - algorithm_and_key: &(&'static dyn signature::VerificationAlgorithm, Vec), - message: &[u8], - signature: &[u8], -) -> bool { - let (algorithm, key_bytes) = algorithm_and_key; - let public_key = UnparsedPublicKey::new(*algorithm, key_bytes); - public_key.verify(message, signature).is_ok() -} - -/// Sign with DSA -fn sign_with_dsa( - algorithm_and_key: &(&'static dyn signature::VerificationAlgorithm, Vec), - message: &[u8], -) -> Result, TcpTargetError> { - let (algorithm, key_bytes) = algorithm_and_key; - - // Handle different DSA/ECDSA algorithms by comparing algorithm identifiers - // Since we can't directly compare trait objects, we use pointer comparison - let algorithm_ptr = algorithm as *const _ as *const (); - let ecdsa_p256_ptr = &ECDSA_P256_SHA256_ASN1 as *const _ as *const (); - let ecdsa_p384_ptr = &ECDSA_P384_SHA384_ASN1 as *const _ as *const (); - - if algorithm_ptr == ecdsa_p256_ptr { - let key_pair = EcdsaKeyPair::from_pkcs8( - ECDSA_P256_SHA256_ASN1_SIGNING, - key_bytes, - &SystemRandom::new(), - ) - .map_err(|e| { - TcpTargetError::Crypto(format!("Failed to create ECDSA P-256 key pair: {}", e)) - })?; - - let signature = key_pair - .sign(&SystemRandom::new(), message) - .map_err(|e| TcpTargetError::Crypto(format!("ECDSA P-256 signing failed: {}", e)))?; - - Ok(signature.as_ref().to_vec()) - } else if algorithm_ptr == ecdsa_p384_ptr { - let key_pair = EcdsaKeyPair::from_pkcs8( - ECDSA_P384_SHA384_ASN1_SIGNING, - key_bytes, - &SystemRandom::new(), - ) - .map_err(|e| { - TcpTargetError::Crypto(format!("Failed to create ECDSA P-384 key pair: {}", e)) - })?; - - let signature = key_pair - .sign(&SystemRandom::new(), message) - .map_err(|e| TcpTargetError::Crypto(format!("ECDSA P-384 signing failed: {}", e)))?; - - Ok(signature.as_ref().to_vec()) - } else { - // RSA or unsupported algorithm - Err(TcpTargetError::Unsupported( - "DSA/ECDSA signing not supported for this algorithm type".to_string(), - )) - } } diff --git a/crates/utils/tcp_connection/src/instance_challenge.rs b/crates/utils/tcp_connection/src/instance_challenge.rs new file mode 100644 index 0000000..c1cf46f --- /dev/null +++ b/crates/utils/tcp_connection/src/instance_challenge.rs @@ -0,0 +1,297 @@ +use std::path::Path; + +use rand::TryRngCore; +use rsa::{ + RsaPrivateKey, RsaPublicKey, + pkcs1::{DecodeRsaPrivateKey, DecodeRsaPublicKey}, + sha2, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey}; +use ring::rand::SystemRandom; +use ring::signature::{ + self, ECDSA_P256_SHA256_ASN1, ECDSA_P384_SHA384_ASN1, EcdsaKeyPair, RSA_PKCS1_2048_8192_SHA256, + UnparsedPublicKey, +}; + +use crate::{error::TcpTargetError, instance::ConnectionInstance}; + +const ECDSA_P256_SHA256_ASN1_SIGNING: &signature::EcdsaSigningAlgorithm = + &signature::ECDSA_P256_SHA256_ASN1_SIGNING; +const ECDSA_P384_SHA384_ASN1_SIGNING: &signature::EcdsaSigningAlgorithm = + &signature::ECDSA_P384_SHA384_ASN1_SIGNING; + +impl ConnectionInstance { + /// Initiates a challenge to the target machine to verify connection security + /// + /// This method performs a cryptographic challenge-response authentication: + /// 1. Generates a random 32-byte challenge + /// 2. Sends the challenge to the target machine + /// 3. Receives a digital signature of the challenge + /// 4. Verifies the signature using the appropriate public key + /// + /// # Arguments + /// * `public_key_dir` - Directory containing public key files for verification + /// + /// # Returns + /// * `Ok(true)` - Challenge verification successful + /// * `Ok(false)` - Challenge verification failed + /// * `Err(TcpTargetError)` - Error during challenge process + pub async fn challenge( + &mut self, + public_key_dir: impl AsRef, + ) -> Result { + // Generate random challenge + let mut challenge = [0u8; 32]; + rand::rngs::OsRng + .try_fill_bytes(&mut challenge) + .map_err(|e| { + TcpTargetError::Crypto(format!("Failed to generate random challenge: {}", e)) + })?; + + // Send challenge to target + self.stream.write_all(&challenge).await?; + self.stream.flush().await?; + + // Read signature from target + let mut signature = Vec::new(); + let mut signature_len_buf = [0u8; 4]; + self.stream.read_exact(&mut signature_len_buf).await?; + + let signature_len = u32::from_be_bytes(signature_len_buf) as usize; + signature.resize(signature_len, 0); + self.stream.read_exact(&mut signature).await?; + + // Read key identifier from target to identify which public key to use + let mut key_id_len_buf = [0u8; 4]; + self.stream.read_exact(&mut key_id_len_buf).await?; + let key_id_len = u32::from_be_bytes(key_id_len_buf) as usize; + + let mut key_id_buf = vec![0u8; key_id_len]; + self.stream.read_exact(&mut key_id_buf).await?; + let key_id = String::from_utf8(key_id_buf) + .map_err(|e| TcpTargetError::Crypto(format!("Invalid key identifier: {}", e)))?; + + // Load appropriate public key + let public_key_path = public_key_dir.as_ref().join(format!("{}.pem", key_id)); + if !public_key_path.exists() { + return Ok(false); + } + + let public_key_pem = tokio::fs::read_to_string(&public_key_path).await?; + + // Try to verify with different key types + let verified = if let Ok(rsa_key) = RsaPublicKey::from_pkcs1_pem(&public_key_pem) { + let padding = rsa::pkcs1v15::Pkcs1v15Sign::new::(); + rsa_key.verify(padding, &challenge, &signature).is_ok() + } else if let Ok(ed25519_key) = + VerifyingKey::from_bytes(&parse_ed25519_public_key(&public_key_pem)) + { + if signature.len() == 64 { + let sig_bytes: [u8; 64] = signature.as_slice().try_into().map_err(|_| { + TcpTargetError::Crypto("Invalid signature length for Ed25519".to_string()) + })?; + let sig = Signature::from_bytes(&sig_bytes); + ed25519_key.verify(&challenge, &sig).is_ok() + } else { + false + } + } else if let Ok(dsa_key_info) = parse_dsa_public_key(&public_key_pem) { + verify_dsa_signature(&dsa_key_info, &challenge, &signature) + } else { + false + }; + + Ok(verified) + } + + /// Accepts a challenge from the target machine to verify connection security + /// + /// This method performs a cryptographic challenge-response authentication: + /// 1. Receives a random 32-byte challenge from the target machine + /// 2. Signs the challenge using the appropriate private key + /// 3. Sends the digital signature back to the target machine + /// 4. Sends the key identifier for public key verification + /// + /// # Arguments + /// * `private_key_file` - Path to the private key file for signing + /// * `verify_public_key` - Key identifier for public key verification + /// + /// # Returns + /// * `Ok(true)` - Challenge response sent successfully + /// * `Ok(false)` - Private key format not supported + /// * `Err(TcpTargetError)` - Error during challenge response process + pub async fn accept_challenge( + &mut self, + private_key_file: impl AsRef, + verify_public_key: &str, + ) -> Result { + // Read challenge from initiator + let mut challenge = [0u8; 32]; + self.stream.read_exact(&mut challenge).await?; + + // Load private key + let private_key_pem = tokio::fs::read_to_string(&private_key_file).await?; + + // Sign the challenge with supported key types + let signature = if let Ok(rsa_key) = RsaPrivateKey::from_pkcs1_pem(&private_key_pem) { + let padding = rsa::pkcs1v15::Pkcs1v15Sign::new::(); + rsa_key.sign(padding, &challenge)? + } else if let Ok(ed25519_key) = parse_ed25519_private_key(&private_key_pem) { + ed25519_key.sign(&challenge).to_bytes().to_vec() + } else if let Ok(dsa_key_info) = parse_dsa_private_key(&private_key_pem) { + sign_with_dsa(&dsa_key_info, &challenge)? + } else { + return Ok(false); + }; + + // Send signature length and signature + let signature_len = signature.len() as u32; + self.stream.write_all(&signature_len.to_be_bytes()).await?; + self.stream.flush().await?; + self.stream.write_all(&signature).await?; + self.stream.flush().await?; + + // Send key identifier for public key identification + let key_id_bytes = verify_public_key.as_bytes(); + let key_id_len = key_id_bytes.len() as u32; + self.stream.write_all(&key_id_len.to_be_bytes()).await?; + self.stream.flush().await?; + self.stream.write_all(key_id_bytes).await?; + self.stream.flush().await?; + + Ok(true) + } +} + +/// Parse Ed25519 public key from PEM format +fn parse_ed25519_public_key(pem: &str) -> [u8; 32] { + // Robust parsing for Ed25519 public key using pem crate + let mut key_bytes = [0u8; 32]; + + if let Ok(pem_data) = pem::parse(pem) + && pem_data.tag() == "PUBLIC KEY" + && pem_data.contents().len() >= 32 + { + let contents = pem_data.contents(); + key_bytes.copy_from_slice(&contents[contents.len() - 32..]); + } + key_bytes +} + +/// Parse Ed25519 private key from PEM format +fn parse_ed25519_private_key(pem: &str) -> Result { + if let Ok(pem_data) = pem::parse(pem) + && pem_data.tag() == "PRIVATE KEY" + && pem_data.contents().len() >= 32 + { + let contents = pem_data.contents(); + let mut seed = [0u8; 32]; + seed.copy_from_slice(&contents[contents.len() - 32..]); + return Ok(SigningKey::from_bytes(&seed)); + } + Err(TcpTargetError::Crypto( + "Invalid Ed25519 private key format".to_string(), + )) +} + +/// Parse DSA public key information from PEM +fn parse_dsa_public_key( + pem: &str, +) -> Result<(&'static dyn signature::VerificationAlgorithm, Vec), TcpTargetError> { + if let Ok(pem_data) = pem::parse(pem) { + let contents = pem_data.contents().to_vec(); + + // Try different DSA algorithms based on PEM tag + match pem_data.tag() { + "EC PUBLIC KEY" | "PUBLIC KEY" if pem.contains("ECDSA") || pem.contains("ecdsa") => { + if pem.contains("P-256") { + return Ok((&ECDSA_P256_SHA256_ASN1, contents)); + } else if pem.contains("P-384") { + return Ok((&ECDSA_P384_SHA384_ASN1, contents)); + } + } + "RSA PUBLIC KEY" | "PUBLIC KEY" => { + return Ok((&RSA_PKCS1_2048_8192_SHA256, contents)); + } + _ => {} + } + + // Default to RSA for unknown types + return Ok((&RSA_PKCS1_2048_8192_SHA256, contents)); + } + Err(TcpTargetError::Crypto( + "Invalid DSA public key format".to_string(), + )) +} + +/// Parse DSA private key information from PEM +fn parse_dsa_private_key( + pem: &str, +) -> Result<(&'static dyn signature::VerificationAlgorithm, Vec), TcpTargetError> { + // For DSA, private key verification uses the same algorithm as public key + parse_dsa_public_key(pem) +} + +/// Verify DSA signature +fn verify_dsa_signature( + algorithm_and_key: &(&'static dyn signature::VerificationAlgorithm, Vec), + message: &[u8], + signature: &[u8], +) -> bool { + let (algorithm, key_bytes) = algorithm_and_key; + let public_key = UnparsedPublicKey::new(*algorithm, key_bytes); + public_key.verify(message, signature).is_ok() +} + +/// Sign with DSA +fn sign_with_dsa( + algorithm_and_key: &(&'static dyn signature::VerificationAlgorithm, Vec), + message: &[u8], +) -> Result, TcpTargetError> { + let (algorithm, key_bytes) = algorithm_and_key; + + // Handle different DSA/ECDSA algorithms by comparing algorithm identifiers + // Since we can't directly compare trait objects, we use pointer comparison + let algorithm_ptr = algorithm as *const _ as *const (); + let ecdsa_p256_ptr = &ECDSA_P256_SHA256_ASN1 as *const _ as *const (); + let ecdsa_p384_ptr = &ECDSA_P384_SHA384_ASN1 as *const _ as *const (); + + if algorithm_ptr == ecdsa_p256_ptr { + let key_pair = EcdsaKeyPair::from_pkcs8( + ECDSA_P256_SHA256_ASN1_SIGNING, + key_bytes, + &SystemRandom::new(), + ) + .map_err(|e| { + TcpTargetError::Crypto(format!("Failed to create ECDSA P-256 key pair: {}", e)) + })?; + + let signature = key_pair + .sign(&SystemRandom::new(), message) + .map_err(|e| TcpTargetError::Crypto(format!("ECDSA P-256 signing failed: {}", e)))?; + + Ok(signature.as_ref().to_vec()) + } else if algorithm_ptr == ecdsa_p384_ptr { + let key_pair = EcdsaKeyPair::from_pkcs8( + ECDSA_P384_SHA384_ASN1_SIGNING, + key_bytes, + &SystemRandom::new(), + ) + .map_err(|e| { + TcpTargetError::Crypto(format!("Failed to create ECDSA P-384 key pair: {}", e)) + })?; + + let signature = key_pair + .sign(&SystemRandom::new(), message) + .map_err(|e| TcpTargetError::Crypto(format!("ECDSA P-384 signing failed: {}", e)))?; + + Ok(signature.as_ref().to_vec()) + } else { + // RSA or unsupported algorithm + Err(TcpTargetError::Unsupported( + "DSA/ECDSA signing not supported for this algorithm type".to_string(), + )) + } +} diff --git a/crates/utils/tcp_connection/src/lib.rs b/crates/utils/tcp_connection/src/lib.rs index a5b5c20..6a2e599 100644 --- a/crates/utils/tcp_connection/src/lib.rs +++ b/crates/utils/tcp_connection/src/lib.rs @@ -1,4 +1,6 @@ #[allow(dead_code)] pub mod instance; +pub mod instance_challenge; + pub mod error; -- cgit From 2753a38ab627369c8bffce610b3106869f26dd61 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 29 Sep 2025 13:54:28 +0800 Subject: Add incremental transfer functionality and update TCP connection utilities - Add instance_incremental_transfer module for handling incremental data transfers - Add test_incremental_transfer module for testing incremental transfer functionality - Update TCP connection library to support new incremental transfer features - Update Cargo.toml dependencies for TCP connection utilities - Update main library to integrate new TCP connection functionality --- .../src/instance_incremental_transfer.rs | 1209 ++++++++++++++++++++ crates/utils/tcp_connection/src/lib.rs | 2 + 2 files changed, 1211 insertions(+) create mode 100644 crates/utils/tcp_connection/src/instance_incremental_transfer.rs (limited to 'crates/utils/tcp_connection/src') diff --git a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs b/crates/utils/tcp_connection/src/instance_incremental_transfer.rs new file mode 100644 index 0000000..aeb2cb8 --- /dev/null +++ b/crates/utils/tcp_connection/src/instance_incremental_transfer.rs @@ -0,0 +1,1209 @@ +use std::path::{Path, PathBuf}; + +use tokio::fs::{File, OpenOptions, copy, create_dir_all, read, read_to_string, remove_file}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; + +use crate::{error::TcpTargetError, instance::ConnectionInstance}; + +// 增量传输协议版本 +const INCREMENTAL_TRANSFER_VERSION: u64 = 1; +// 块大小(字节) +const DEFAULT_CHUNK_SIZE: usize = 8192; +// 哈希大小(字节) +const HASH_SIZE: usize = 32; // blake3 produces 32-byte hashes +// 版本文件扩展名 +const VERSION_FILE_EXTENSION: &str = "ver"; +// 版本历史目录名 +const VERSION_HISTORY_DIR: &str = "diff"; +// 差异文件扩展名 +const DELTA_FILE_EXTENSION: &str = "delta"; + +// 协议模式常量 +const SERVER_DELTA_MODE: u8 = 1; +const SERVER_FULL_MODE: u8 = 2; +const CLIENT_UPDATE_MODE: u8 = 1; +const CLIENT_UPLOAD_MODE: u8 = 2; +const NO_CHANGE_MODE: u8 = 3; + +// 协议错误消息 +const ERR_INVALID_SERVER_RESPONSE: &str = "Invalid server response format"; +const ERR_VERSION_MISMATCH: &str = "Version mismatch detected"; +const ERR_CHUNK_INDEX_OUT_OF_BOUNDS: &str = "Chunk index out of bounds"; +const ERR_DELTA_FILE_CORRUPTED: &str = "Delta file corrupted or incomplete"; + +impl ConnectionInstance { + // ==================== 客户端功能 ==================== + + /// 客户端:增量更新到指定版本 + pub async fn client_update_to_version( + &mut self, + file_path: impl AsRef, + target_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + let current_version = self.get_current_version(path).await?; + println!( + "Client update: current_version={}, target_version={}", + current_version, target_version + ); + + if current_version >= target_version { + println!("Client update: Already up to date, skipping"); + return Ok(()); // 已经是最新版本 + } + + // 发送更新请求 + println!( + "Client update: Sending protocol version: {}", + INCREMENTAL_TRANSFER_VERSION + ); + self.stream + .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) + .await?; + println!("Client update: Sending mode: {}", CLIENT_UPDATE_MODE); + self.stream.write_all(&[CLIENT_UPDATE_MODE]).await?; // 客户端更新模式 + println!("Client update: Sending target version: {}", target_version); + self.stream.write_all(&target_version.to_be_bytes()).await?; + self.stream.flush().await?; + println!("Client update: Request header sent"); + + // 执行增量更新 + println!("Client update: Starting incremental update..."); + let result = self + .client_perform_incremental_update(path, current_version, target_version) + .await; + println!( + "Client update: Incremental update completed, result: {:?}", + result + ); + result + } + + /// 客户端:增量上传变化到服务器 + pub async fn client_upload( + &mut self, + file_path: impl AsRef, + ) -> Result { + let path = file_path.as_ref(); + let current_version = self.get_current_version(path).await?; + + // 发送上传请求 + self.stream + .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) + .await?; + self.stream.write_all(&[2u8]).await?; // 客户端上传模式 + self.stream + .write_all(¤t_version.to_be_bytes()) + .await?; + self.stream.flush().await?; + + // 执行增量上传 + let new_version = self + .client_perform_incremental_upload(path, current_version) + .await?; + + // 更新本地版本 + self.save_version(path, new_version).await?; + + Ok(new_version) + } + + // ==================== 服务端功能 ==================== + + /// 服务端:处理客户端更新请求 + pub async fn server_handle_client_update( + &mut self, + file_path: impl AsRef, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + println!("Server: Reading protocol version..."); + // 读取协议版本 + let mut version_buf = [0u8; 8]; + self.stream.read_exact(&mut version_buf).await?; + let version = u64::from_be_bytes(version_buf); + println!("Server: Received protocol version: {}", version); + + if version != INCREMENTAL_TRANSFER_VERSION { + return Err(TcpTargetError::Protocol( + "Unsupported incremental transfer version".to_string(), + )); + } + + // 读取请求模式 + let mut mode_buf = [0u8; 1]; + self.stream.read_exact(&mut mode_buf).await?; + println!("Server: Received mode: {}", mode_buf[0]); + + match mode_buf[0] { + CLIENT_UPDATE_MODE => { + // 客户端更新模式 + println!("Server: Client update mode detected"); + let mut version_buf = [0u8; 4]; + self.stream.read_exact(&mut version_buf).await?; + let target_version = i32::from_be_bytes(version_buf); + println!("Server: Target version: {}", target_version); + + println!("Server: Sending version delta..."); + let result = self.server_send_version_delta(path, target_version).await; + println!("Server: Version delta sent, result: {:?}", result); + result + } + CLIENT_UPLOAD_MODE => { + // 客户端上传模式 + let mut version_buf = [0u8; 4]; + self.stream.read_exact(&mut version_buf).await?; + let client_version = i32::from_be_bytes(version_buf); + + self.server_receive_client_changes(path, client_version) + .await + } + _ => { + return Err(TcpTargetError::Protocol(format!( + "{}: unknown mode {}", + ERR_INVALID_SERVER_RESPONSE, mode_buf[0] + ))); + } + } + } + + /// 服务端:发送指定版本的增量数据 + pub async fn server_send_delta_to_version( + &mut self, + file_path: impl AsRef, + from_version: i32, + to_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + // 发送响应头 + self.stream + .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) + .await?; + self.stream.write_all(&[1u8]).await?; // 服务端增量模式 + + self.server_send_version_delta_internal(path, from_version, to_version) + .await + } + + // ==================== 内部实现 ==================== + + /// 客户端:执行增量更新 + async fn client_perform_incremental_update( + &mut self, + file_path: impl AsRef, + current_version: i32, + target_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + // 发送客户端当前版本给服务器 + self.stream + .write_all(¤t_version.to_be_bytes()) + .await?; + self.stream.flush().await?; + + // 读取服务器响应 + let mut version_buf = [0u8; 8]; + self.stream.read_exact(&mut version_buf).await?; + let version = u64::from_be_bytes(version_buf); + + if version != INCREMENTAL_TRANSFER_VERSION { + return Err(TcpTargetError::Protocol( + "Unsupported incremental transfer version".to_string(), + )); + } + + let mut mode_buf = [0u8; 1]; + self.stream.read_exact(&mut mode_buf).await?; + + match mode_buf[0] { + SERVER_DELTA_MODE => { + // 服务端增量模式 + self.client_apply_delta(path, target_version).await + } + NO_CHANGE_MODE => { + // 无变化模式,不需要更新 + Ok(()) + } + _ => Err(TcpTargetError::Protocol( + "Invalid server response".to_string(), + )), + } + } + + /// 客户端:应用增量数据 + async fn client_apply_delta( + &mut self, + file_path: impl AsRef, + target_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + // 读取增量类型 + let mut delta_type_buf = [0u8; 1]; + self.stream.read_exact(&mut delta_type_buf).await?; + + match delta_type_buf[0] { + SERVER_FULL_MODE => { + // 完整文件传输 + self.read_file(path).await?; + } + SERVER_DELTA_MODE => { + // 增量块传输 + self.client_receive_and_apply_chunks(path).await?; + } + NO_CHANGE_MODE => { + // 无变化模式 + // 不需要做任何操作 + } + _ => { + return Err(TcpTargetError::Protocol(format!( + "{}: unknown mode {}", + ERR_INVALID_SERVER_RESPONSE, delta_type_buf[0] + ))); + } + } + + // 更新本地版本 + self.save_version(path, target_version).await?; + + // 发送确认 + self.stream.write_all(&[1u8]).await?; + self.stream.flush().await?; + + Ok(()) + } + + /// 客户端:接收并应用增量块 + async fn client_receive_and_apply_chunks( + &mut self, + file_path: impl AsRef, + ) -> Result<(), TcpTargetError> { + self.receive_and_apply_chunks_internal(file_path, None) + .await + } + + /// 客户端:执行增量上传 + async fn client_perform_incremental_upload( + &mut self, + file_path: impl AsRef, + current_version: i32, + ) -> Result { + let path = file_path.as_ref(); + + // 读取服务器响应 + println!("Client upload: Reading server response version..."); + let mut version_buf = [0u8; 8]; + self.stream.read_exact(&mut version_buf).await?; + let version = u64::from_be_bytes(version_buf); + println!("Client upload: Received protocol version: {}", version); + + if version != INCREMENTAL_TRANSFER_VERSION { + return Err(TcpTargetError::Protocol( + "Unsupported incremental transfer version".to_string(), + )); + } + + println!("Client upload: Reading server response mode..."); + let mut mode_buf = [0u8; 1]; + self.stream.read_exact(&mut mode_buf).await?; + println!("Client upload: Received mode: {}", mode_buf[0]); + + match mode_buf[0] { + SERVER_FULL_MODE => { + // 服务端接收模式 + let new_version = self.client_send_changes(path, current_version).await?; + Ok(new_version) + } + _ => Err(TcpTargetError::Protocol( + "Invalid server response".to_string(), + )), + } + } + + /// 客户端:发送变化到服务器 + async fn client_send_changes( + &mut self, + file_path: impl AsRef, + client_version: i32, + ) -> Result { + let path = file_path.as_ref(); + println!( + "Client send changes: Starting to send changes, client_version={}", + client_version + ); + + // 发送确认给服务器 + println!("Client: Sending acknowledgment to server..."); + self.stream.write_all(&[1u8]).await?; + self.stream.flush().await?; + println!("Client: Acknowledgment sent"); + + // 发送客户端版本给服务器进行验证 + self.stream.write_all(&client_version.to_be_bytes()).await?; + self.stream.flush().await?; + + // 计算当前文件块哈希 + let current_hashes = self.calculate_file_chunk_hashes(path).await?; + println!( + "Client: Calculated {} current chunk hashes", + current_hashes.len() + ); + + // 发送块哈希 + println!("Client: Sending chunk hashes to server..."); + match self.send_chunk_hashes(¤t_hashes).await { + Ok(_) => println!("Client: Successfully sent chunk hashes"), + Err(e) => { + println!("Client: ERROR sending chunk hashes: {:?}", e); + return Err(e); + } + } + // Extra flush to ensure data is sent before server reads + match self.stream.flush().await { + Ok(_) => println!("Client: Successfully flushed stream"), + Err(e) => { + println!("Client: ERROR flushing stream: {:?}", e); + return Err(e.into()); + } + } + + // 读取服务器需要的块列表 + println!("Client: Reading chunks needed from server..."); + let chunks_to_send = self.receive_chunks_to_send().await?; + println!("Client: Received chunks to send: {:?}", chunks_to_send); + + // 发送需要的块 + println!("Client send changes: Sending file chunks to server..."); + self.send_file_chunks(path, &chunks_to_send).await?; + println!("Client send changes: File chunks sent"); + + // 读取新版本号 + println!("Client send changes: Reading new version from server..."); + let mut version_buf = [0u8; 4]; + self.stream.read_exact(&mut version_buf).await?; + let new_version = i32::from_be_bytes(version_buf); + println!("Client send changes: Received new version: {}", new_version); + + Ok(new_version) + } + + /// 服务端:发送版本增量数据 + async fn server_send_version_delta( + &mut self, + file_path: impl AsRef, + target_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + println!("Server send version delta: Reading client version..."); + // 获取客户端当前版本(需要从协议中读取) + let mut client_version_buf = [0u8; 4]; + self.stream.read_exact(&mut client_version_buf).await?; + let client_version = i32::from_be_bytes(client_version_buf); + println!( + "Server send version delta: Client version: {}", + client_version + ); + + // 发送响应头 + println!( + "Server send version delta: Sending protocol version: {}", + INCREMENTAL_TRANSFER_VERSION + ); + self.stream + .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) + .await?; + println!( + "Server send version delta: Sending mode: {}", + SERVER_DELTA_MODE + ); + self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 服务端增量模式 + self.stream.flush().await?; + println!("Server send version delta: Response header sent"); + + println!("Server send version delta: Calling internal delta function..."); + let result = self + .server_send_version_delta_internal(path, client_version, target_version) + .await; + println!( + "Server send version delta: Internal function completed, result: {:?}", + result + ); + result + } + + /// 服务端:内部发送版本增量 + async fn server_send_version_delta_internal( + &mut self, + file_path: impl AsRef, + from_version: i32, + to_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + println!( + "Server send version delta internal: from_version={}, to_version={}", + from_version, to_version + ); + + if from_version == to_version { + // 版本相同,不需要传输 + println!( + "Server send version delta internal: Versions are the same, sending NO_CHANGE_MODE" + ); + self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 + return Ok(()); + } + + // 计算版本间的差异 + println!("Server send version delta internal: Calculating version delta..."); + let delta_chunks = self + .calculate_version_delta(path, from_version, to_version) + .await?; + println!( + "Server send version delta internal: Delta chunks count: {}", + delta_chunks.len() + ); + + if delta_chunks.is_empty() { + // 没有变化或目标版本不存在 + println!("Server send version delta internal: No changes, sending NO_CHANGE_MODE"); + self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 + } else { + // 发送增量块 + println!( + "Server send version delta internal: Sending delta chunks with SERVER_DELTA_MODE" + ); + self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 增量块模式 + self.send_file_chunks(path, &delta_chunks).await?; + } + println!("Server send version delta internal: Completed successfully"); + Ok(()) + } + + /// 服务端:接收客户端变化 + async fn server_receive_client_changes( + &mut self, + file_path: impl AsRef, + client_version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + println!("Server receive changes: Client version: {}", client_version); + + // 验证客户端版本是否匹配服务器当前版本 + let server_version = self.get_current_version(path).await?; + if client_version != server_version { + return Err(TcpTargetError::Protocol(format!( + "{}: client {}, server {}", + ERR_VERSION_MISMATCH, client_version, server_version + ))); + } + + // 发送响应头 + println!( + "Server receive changes: Sending protocol version: {}", + INCREMENTAL_TRANSFER_VERSION + ); + self.stream + .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) + .await?; + println!("Server receive changes: Sending mode: {}", SERVER_FULL_MODE); + self.stream.write_all(&[SERVER_FULL_MODE]).await?; // 服务端完整文件模式 + self.stream.flush().await?; + println!("Server receive changes: Response header sent"); + + // 接收客户端块哈希 + println!("Server receive changes: Receiving client chunk hashes..."); + // 等待客户端确认 + let mut ack_buf = [0u8; 1]; + self.stream.read_exact(&mut ack_buf).await?; + if ack_buf[0] != 1 { + return Err(TcpTargetError::Protocol( + "Client acknowledgment failed".to_string(), + )); + } + println!("Server receive changes: Client acknowledged, reading client version..."); + + // 读取客户端版本进行验证 + let mut client_version_buf = [0u8; 4]; + self.stream.read_exact(&mut client_version_buf).await?; + let received_client_version = i32::from_be_bytes(client_version_buf); + println!( + "Server receive changes: Received client version: {}", + received_client_version + ); + + // 验证客户端版本是否匹配 + if received_client_version != client_version { + return Err(TcpTargetError::Protocol(format!( + "{}: expected {}, got {}", + ERR_VERSION_MISMATCH, client_version, received_client_version + ))); + } + + println!("Server receive changes: Reading chunk hashes..."); + let client_chunk_hashes = match self.receive_chunk_hashes().await { + Ok(hashes) => { + println!( + "Server receive changes: Successfully received {} client chunk hashes", + hashes.len() + ); + hashes + } + Err(e) => { + println!( + "Server receive changes: ERROR receiving chunk hashes: {:?}", + e + ); + return Err(e); + } + }; + + // 计算服务器当前块哈希 + let server_hashes = self.calculate_file_chunk_hashes(path).await?; + println!( + "Server: Calculated {} server chunk hashes", + server_hashes.len() + ); + + // 比较差异,确定需要更新的块 + let chunks_needed = self.compare_chunk_hashes(&client_chunk_hashes, &server_hashes); + println!( + "Server: Chunks needed after comparison: {:?}", + chunks_needed + ); + + // 发送需要的块列表 + println!("Server: Sending chunks needed list: {:?}", chunks_needed); + self.send_chunks_needed(&chunks_needed).await?; + + // 接收并应用客户端块 + self.receive_and_apply_chunks(path, &chunks_needed).await?; + + // 生成新版本号(这里简化:版本号+1) + let current_version = self.get_current_version(path).await?; + let new_version = current_version + 1; + + // 保存差异而不是完整文件 + self.save_version_delta(path, current_version, new_version, &chunks_needed) + .await?; + self.save_version(path, new_version).await?; + + // 发送新版本号给客户端 + println!( + "Server receive changes: Sending new version to client: {}", + new_version + ); + self.stream.write_all(&new_version.to_be_bytes()).await?; + self.stream.flush().await?; + println!("Server receive changes: New version sent to client"); + + Ok(()) + } + + // ==================== 工具函数 ==================== + + /// 计算文件的块哈希 + async fn calculate_file_chunk_hashes( + &self, + file_path: impl AsRef, + ) -> Result, TcpTargetError> { + let path = file_path.as_ref(); + let mut file = File::open(path).await?; + let file_size = file.metadata().await?.len(); + + let mut hashes = Vec::new(); + let mut buffer = vec![0u8; DEFAULT_CHUNK_SIZE]; + let mut bytes_read = 0; + + while bytes_read < file_size { + let bytes_to_read = (file_size - bytes_read).min(DEFAULT_CHUNK_SIZE as u64) as usize; + let chunk = &mut buffer[..bytes_to_read]; + + file.read_exact(chunk).await?; + + let hash = self.simple_chunk_hash(chunk); + hashes.push(hash); + + bytes_read += bytes_to_read as u64; + } + + Ok(hashes) + } + + /// 简单的块哈希函数 + fn simple_chunk_hash(&self, data: &[u8]) -> [u8; HASH_SIZE] { + // 使用稳定的blake3哈希算法,确保跨平台一致性 + let hash = blake3::hash(data); + let mut hash_bytes = [0u8; HASH_SIZE]; + hash_bytes[..32].copy_from_slice(hash.as_bytes()); + hash_bytes + } + + /// 发送块哈希列表 + async fn send_chunk_hashes( + &mut self, + hashes: &[[u8; HASH_SIZE]], + ) -> Result<(), TcpTargetError> { + let hash_count = hashes.len() as u32; + println!("Client: Sending {} chunk hashes", hash_count); + + match self.stream.write_all(&hash_count.to_be_bytes()).await { + Ok(_) => println!("Client: Successfully sent hash count"), + Err(e) => { + println!("Client: ERROR sending hash count: {:?}", e); + return Err(e.into()); + } + } + // Extra flush to ensure hash count is sent before server reads + match self.stream.flush().await { + Ok(_) => println!("Client: Successfully flushed hash count"), + Err(e) => { + println!("Client: ERROR flushing hash count: {:?}", e); + return Err(e.into()); + } + } + + for (i, hash) in hashes.iter().enumerate() { + println!("Client: Sending chunk hash {}: {:?}", i, &hash[..8]); // Show first 8 bytes for debugging + match self.stream.write_all(hash).await { + Ok(_) => println!("Client: Successfully sent chunk hash {}", i), + Err(e) => { + println!("Client: ERROR sending chunk hash {}: {:?}", i, e); + return Err(e.into()); + } + } + } + + match self.stream.flush().await { + Ok(_) => println!("Client: Successfully flushed after sending hashes"), + Err(e) => { + println!("Client: ERROR flushing after sending hashes: {:?}", e); + return Err(e.into()); + } + } + println!("Client: All chunk hashes sent"); + // Extra flush to ensure data is sent before server reads + match self.stream.flush().await { + Ok(_) => println!("Client: Successfully flushed final"), + Err(e) => { + println!("Client: ERROR flushing final: {:?}", e); + return Err(e.into()); + } + } + Ok(()) + } + + /// 接收块哈希列表 + async fn receive_chunk_hashes(&mut self) -> Result, TcpTargetError> { + println!("Server: Starting to receive chunk hashes..."); + + let mut count_buf = [0u8; 4]; + println!("Server: Reading chunk hash count..."); + self.stream.read_exact(&mut count_buf).await?; + let hash_count = u32::from_be_bytes(count_buf) as usize; + println!( + "Server: Reading {} chunk hashes, raw bytes: {:?}", + hash_count, count_buf + ); + + let mut hashes = Vec::with_capacity(hash_count); + let mut hash_buf = [0u8; HASH_SIZE]; + + for i in 0..hash_count { + println!("Server: Reading chunk hash {}...", i); + self.stream.read_exact(&mut hash_buf).await?; + println!("Server: Received chunk hash {}: {:?}", i, &hash_buf[..8]); // Show first 8 bytes for debugging + hashes.push(hash_buf); + } + + println!("Server: All {} chunk hashes received", hashes.len()); + Ok(hashes) + } + + /// 比较块哈希并返回需要更新的块索引 + fn compare_chunk_hashes( + &self, + new_hashes: &[[u8; HASH_SIZE]], + old_hashes: &[[u8; HASH_SIZE]], + ) -> Vec { + let mut chunks_needed = Vec::new(); + + for (i, (new_hash, old_hash)) in new_hashes.iter().zip(old_hashes.iter()).enumerate() { + if new_hash != old_hash { + chunks_needed.push(i); + } + } + + // 如果新文件有更多块,也需要更新 + for i in old_hashes.len()..new_hashes.len() { + chunks_needed.push(i); + } + + chunks_needed + } + + /// 接收需要传输的块列表 + async fn receive_chunks_to_send(&mut self) -> Result, TcpTargetError> { + let mut count_buf = [0u8; 4]; + self.stream.read_exact(&mut count_buf).await?; + let chunk_count = u32::from_be_bytes(count_buf) as usize; + + let mut chunks = Vec::with_capacity(chunk_count); + let mut index_buf = [0u8; 4]; + + for _ in 0..chunk_count { + self.stream.read_exact(&mut index_buf).await?; + let index = u32::from_be_bytes(index_buf) as usize; + chunks.push(index); + } + + Ok(chunks) + } + + /// 发送需要接收的块列表 + async fn send_chunks_needed(&mut self, chunks: &[usize]) -> Result<(), TcpTargetError> { + let chunk_count = chunks.len() as u32; + self.stream.write_all(&chunk_count.to_be_bytes()).await?; + + for &chunk_index in chunks { + self.stream + .write_all(&(chunk_index as u32).to_be_bytes()) + .await?; + } + + self.stream.flush().await?; + Ok(()) + } + + /// 发送指定的文件块 + async fn send_file_chunks( + &mut self, + file_path: impl AsRef, + chunk_indices: &[usize], + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + let mut file = File::open(path).await?; + let file_size = file.metadata().await?.len(); + + // 发送块数量 + self.stream + .write_all(&(chunk_indices.len() as u32).to_be_bytes()) + .await?; + + if chunk_indices.is_empty() { + self.stream.flush().await?; + return Ok(()); + } + + for &chunk_index in chunk_indices { + let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; + if chunk_offset >= file_size { + continue; // 跳过超出文件范围的块 + } + + // 定位到块位置 + tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) + .await?; + + // 计算块大小 + let remaining_bytes = file_size - chunk_offset; + let chunk_size = remaining_bytes.min(DEFAULT_CHUNK_SIZE as u64) as usize; + + // 发送块索引和大小 + self.stream + .write_all(&(chunk_index as u32).to_be_bytes()) + .await?; + self.stream + .write_all(&(chunk_size as u32).to_be_bytes()) + .await?; + + // 发送块数据 + let mut buffer = vec![0u8; chunk_size]; + file.read_exact(&mut buffer).await?; + self.stream.write_all(&buffer).await?; + } + + self.stream.flush().await?; + Ok(()) + } + + /// 接收并应用增量块 + async fn receive_and_apply_chunks( + &mut self, + file_path: impl AsRef, + chunk_indices: &[usize], + ) -> Result<(), TcpTargetError> { + self.receive_and_apply_chunks_internal(file_path, Some(chunk_indices)) + .await + } + + /// 内部工具函数:接收并应用块数据 + async fn receive_and_apply_chunks_internal( + &mut self, + file_path: impl AsRef, + expected_indices: Option<&[usize]>, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + // 读取块数量 + let mut count_buf = [0u8; 4]; + self.stream.read_exact(&mut count_buf).await?; + let chunk_count = u32::from_be_bytes(count_buf) as usize; + + // 创建临时文件来接收完整内容 + let temp_path = path.with_extension("tmp"); + let temp_file = File::create(&temp_path).await?; + let mut temp_writer = BufWriter::new(temp_file); + + // 记录接收到的块数据 + let mut received_chunks = Vec::new(); + + for i in 0..chunk_count { + // 读取块索引和大小 + let mut index_buf = [0u8; 4]; + self.stream.read_exact(&mut index_buf).await?; + let chunk_index = u32::from_be_bytes(index_buf) as usize; + + // 验证块索引(如果提供了预期索引) + if let Some(expected) = expected_indices { + if i >= expected.len() || chunk_index != expected[i] { + return Err(TcpTargetError::Protocol(format!( + "{}: expected {:?}, got {}", + ERR_CHUNK_INDEX_OUT_OF_BOUNDS, + expected.get(i), + chunk_index + ))); + } + } + + let mut size_buf = [0u8; 4]; + self.stream.read_exact(&mut size_buf).await?; + let chunk_size = u32::from_be_bytes(size_buf) as usize; + + // 读取块数据 + let mut chunk_data = vec![0u8; chunk_size]; + self.stream.read_exact(&mut chunk_data).await?; + + // 记录接收到的块 + received_chunks.push((chunk_index, chunk_data)); + } + + // 按块索引排序并写入临时文件 + received_chunks.sort_by_key(|(index, _)| *index); + + for (chunk_index, chunk_data) in received_chunks { + let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; + tokio::io::AsyncSeekExt::seek(&mut temp_writer, std::io::SeekFrom::Start(chunk_offset)) + .await?; + temp_writer.write_all(&chunk_data).await?; + } + + temp_writer.flush().await?; + + // 替换原文件 + tokio::fs::rename(&temp_path, path).await?; + + Ok(()) + } + + /// 获取当前文件版本 + async fn get_current_version( + &self, + file_path: impl AsRef, + ) -> Result { + let path = file_path.as_ref(); + let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); + + if !version_file_path.exists() { + return Ok(0); // 默认版本为0 + } + + let version_content = read_to_string(&version_file_path) + .await + .map_err(|e| TcpTargetError::File(format!("Failed to read version file: {}", e)))?; + + version_content + .trim() + .parse::() + .map_err(|e| TcpTargetError::File(format!("Invalid version format: {}", e))) + } + + /// 保存文件版本 + async fn save_version( + &self, + file_path: impl AsRef, + version: i32, + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); + + tokio::fs::write(&version_file_path, version.to_string()) + .await + .map_err(|e| TcpTargetError::File(format!("Failed to write version file: {}", e)))?; + + Ok(()) + } + + /// 计算版本间的差异 + async fn calculate_version_delta( + &self, + file_path: impl AsRef, + from_version: i32, + to_version: i32, + ) -> Result, TcpTargetError> { + let path = file_path.as_ref(); + + // 如果目标版本不存在,返回空差异 + let current_version = self.get_current_version(path).await?; + if to_version > current_version { + return Ok(Vec::new()); + } + + // 获取源版本和目标版本的块哈希 + let from_hashes = self.get_version_chunk_hashes(path, from_version).await?; + let to_hashes = self.get_version_chunk_hashes(path, to_version).await?; + + // 比较差异 + let delta_chunks = self.compare_chunk_hashes(&to_hashes, &from_hashes); + + Ok(delta_chunks) + } + + /// 获取指定版本的块哈希 + async fn get_version_chunk_hashes( + &self, + file_path: impl AsRef, + version: i32, + ) -> Result, TcpTargetError> { + let path = file_path.as_ref(); + + if version == 0 { + // 版本0表示空文件 + return Ok(Vec::new()); + } + + // 从版本历史中重建文件并计算哈希 + let reconstructed_path = self.reconstruct_version(path, version).await?; + let hashes = self + .calculate_file_chunk_hashes(&reconstructed_path) + .await?; + + // 清理临时文件 + let _ = remove_file(&reconstructed_path).await; + + Ok(hashes) + } + + /// 从差异重建指定版本的文件 + async fn reconstruct_version( + &self, + file_path: impl AsRef, + target_version: i32, + ) -> Result { + let path = file_path.as_ref(); + let temp_path = path.with_extension(format!("temp_{}", target_version)); + + if target_version == 0 { + // 创建空文件 + tokio::fs::write(&temp_path, b"").await?; + return Ok(temp_path); + } + + // 从版本0开始,逐步应用差异 + let mut current_version = 0; + let mut current_path = path.with_extension("temp_base"); + tokio::fs::write(¤t_path, b"").await?; // 创建基础空文件 + + while current_version < target_version { + let next_version = current_version + 1; + let (delta_chunks, chunk_data_list) = self + .load_version_delta(path, current_version, next_version) + .await?; + + // 应用差异到新文件 + let next_path = path.with_extension(format!("temp_{}", next_version)); + self.apply_delta_to_file(¤t_path, &next_path, &delta_chunks, &chunk_data_list) + .await?; + + // 清理旧临时文件 + if current_version > 0 { + let _ = tokio::fs::remove_file(¤t_path).await; + } + current_path = next_path; + current_version = next_version; + } + + Ok(current_path) + } + + /// 保存版本差异 + async fn save_version_delta( + &self, + file_path: impl AsRef, + from_version: i32, + to_version: i32, + changed_chunks: &[usize], + ) -> Result<(), TcpTargetError> { + let path = file_path.as_ref(); + + // 创建版本历史目录 + let history_dir = path + .parent() + .ok_or_else(|| TcpTargetError::File("Invalid file path".to_string()))? + .join(VERSION_HISTORY_DIR); + + if !history_dir.exists() { + create_dir_all(&history_dir).await?; + } + + // 保存差异信息(记录变化的块索引和实际数据) + let delta_path = history_dir.join(format!( + "{}_{}_{}.{}", + path.file_name() + .ok_or_else(|| TcpTargetError::File("Invalid file name".to_string()))? + .to_string_lossy(), + from_version, + to_version, + DELTA_FILE_EXTENSION + )); + + // 读取当前版本文件以获取变化块的实际数据 + let current_file_data = read(path).await?; + + let mut delta_data = Vec::new(); + + for &chunk_index in changed_chunks { + // 写入块索引 + delta_data.extend_from_slice(&(chunk_index as u32).to_be_bytes()); + + // 计算块的起始和结束位置 + let chunk_start = chunk_index * DEFAULT_CHUNK_SIZE; + let chunk_end = + std::cmp::min(chunk_start + DEFAULT_CHUNK_SIZE, current_file_data.len()); + + // 写入块数据大小 + let chunk_size = (chunk_end - chunk_start) as u32; + delta_data.extend_from_slice(&chunk_size.to_be_bytes()); + + // 写入块数据 + delta_data.extend_from_slice(¤t_file_data[chunk_start..chunk_end]); + } + + tokio::fs::write(&delta_path, &delta_data).await?; + + Ok(()) + } + + /// 加载版本差异 + async fn load_version_delta( + &self, + file_path: impl AsRef, + from_version: i32, + to_version: i32, + ) -> Result<(Vec, Vec>), TcpTargetError> { + let path = file_path.as_ref(); + let history_dir = path.parent().unwrap().join(VERSION_HISTORY_DIR); + + let delta_path = history_dir.join(format!( + "{}_{}_{}.{}", + path.file_name().unwrap().to_string_lossy(), + from_version, + to_version, + DELTA_FILE_EXTENSION + )); + + if !delta_path.exists() { + return Ok((Vec::new(), Vec::new())); + } + + let delta_data = read(&delta_path).await?; + let mut chunks = Vec::new(); + let mut chunk_data_list = Vec::new(); + + let mut offset = 0; + while offset < delta_data.len() { + // 读取块索引 (4 bytes) + if offset + 4 > delta_data.len() { + return Err(TcpTargetError::File(format!( + "{}: incomplete index data", + ERR_DELTA_FILE_CORRUPTED + ))); + } + let index = u32::from_be_bytes([ + delta_data[offset], + delta_data[offset + 1], + delta_data[offset + 2], + delta_data[offset + 3], + ]) as usize; + offset += 4; + + // 读取块大小 (4 bytes) + if offset + 4 > delta_data.len() { + return Err(TcpTargetError::File(format!( + "{}: incomplete size data", + ERR_DELTA_FILE_CORRUPTED + ))); + } + let chunk_size = u32::from_be_bytes([ + delta_data[offset], + delta_data[offset + 1], + delta_data[offset + 2], + delta_data[offset + 3], + ]) as usize; + offset += 4; + + // 读取块数据 + if offset + chunk_size > delta_data.len() { + return Err(TcpTargetError::File(format!( + "{}: incomplete chunk data", + ERR_DELTA_FILE_CORRUPTED + ))); + } + let chunk_data = delta_data[offset..offset + chunk_size].to_vec(); + offset += chunk_size; + + chunks.push(index); + chunk_data_list.push(chunk_data); + } + + Ok((chunks, chunk_data_list)) + } + + /// 应用差异到文件 + async fn apply_delta_to_file( + &self, + source_path: impl AsRef, + target_path: impl AsRef, + delta_chunks: &[usize], + chunk_data_list: &[Vec], + ) -> Result<(), TcpTargetError> { + let source_path = source_path.as_ref(); + let target_path = target_path.as_ref(); + + // 复制源文件到目标文件 + copy(source_path, target_path).await?; + + if delta_chunks.is_empty() { + return Ok(()); + } + + // 打开目标文件进行修改 + let mut file = OpenOptions::new().write(true).open(target_path).await?; + + for (i, &chunk_index) in delta_chunks.iter().enumerate() { + let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; + tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) + .await?; + + // 使用从delta文件中读取的实际块数据进行写入 + if i < chunk_data_list.len() { + file.write_all(&chunk_data_list[i]).await?; + } + } + + Ok(()) + } +} diff --git a/crates/utils/tcp_connection/src/lib.rs b/crates/utils/tcp_connection/src/lib.rs index 6a2e599..a2615d1 100644 --- a/crates/utils/tcp_connection/src/lib.rs +++ b/crates/utils/tcp_connection/src/lib.rs @@ -3,4 +3,6 @@ pub mod instance; pub mod instance_challenge; +pub mod instance_incremental_transfer; + pub mod error; -- cgit From e98b298d583626ab505debe778d0beba303256c3 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 29 Sep 2025 15:50:12 +0800 Subject: Add incremental transfer functionality and update core components - Implement instance_incremental_transfer module for efficient file synchronization - Add support for chunk-based file transfer with hash comparison - Update TCP connection utilities to support incremental transfer protocol - Enhance error handling and version management for file synchronization - Update dependencies and integrate new functionality into main library --- crates/utils/tcp_connection/src/behaviour.rs | 1 + .../src/instance_incremental_transfer.rs | 1209 -------------------- crates/utils/tcp_connection/src/lib.rs | 2 - 3 files changed, 1 insertion(+), 1211 deletions(-) create mode 100644 crates/utils/tcp_connection/src/behaviour.rs delete mode 100644 crates/utils/tcp_connection/src/instance_incremental_transfer.rs (limited to 'crates/utils/tcp_connection/src') diff --git a/crates/utils/tcp_connection/src/behaviour.rs b/crates/utils/tcp_connection/src/behaviour.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/utils/tcp_connection/src/behaviour.rs @@ -0,0 +1 @@ + diff --git a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs b/crates/utils/tcp_connection/src/instance_incremental_transfer.rs deleted file mode 100644 index aeb2cb8..0000000 --- a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs +++ /dev/null @@ -1,1209 +0,0 @@ -use std::path::{Path, PathBuf}; - -use tokio::fs::{File, OpenOptions, copy, create_dir_all, read, read_to_string, remove_file}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; - -use crate::{error::TcpTargetError, instance::ConnectionInstance}; - -// 增量传输协议版本 -const INCREMENTAL_TRANSFER_VERSION: u64 = 1; -// 块大小(字节) -const DEFAULT_CHUNK_SIZE: usize = 8192; -// 哈希大小(字节) -const HASH_SIZE: usize = 32; // blake3 produces 32-byte hashes -// 版本文件扩展名 -const VERSION_FILE_EXTENSION: &str = "ver"; -// 版本历史目录名 -const VERSION_HISTORY_DIR: &str = "diff"; -// 差异文件扩展名 -const DELTA_FILE_EXTENSION: &str = "delta"; - -// 协议模式常量 -const SERVER_DELTA_MODE: u8 = 1; -const SERVER_FULL_MODE: u8 = 2; -const CLIENT_UPDATE_MODE: u8 = 1; -const CLIENT_UPLOAD_MODE: u8 = 2; -const NO_CHANGE_MODE: u8 = 3; - -// 协议错误消息 -const ERR_INVALID_SERVER_RESPONSE: &str = "Invalid server response format"; -const ERR_VERSION_MISMATCH: &str = "Version mismatch detected"; -const ERR_CHUNK_INDEX_OUT_OF_BOUNDS: &str = "Chunk index out of bounds"; -const ERR_DELTA_FILE_CORRUPTED: &str = "Delta file corrupted or incomplete"; - -impl ConnectionInstance { - // ==================== 客户端功能 ==================== - - /// 客户端:增量更新到指定版本 - pub async fn client_update_to_version( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - println!( - "Client update: current_version={}, target_version={}", - current_version, target_version - ); - - if current_version >= target_version { - println!("Client update: Already up to date, skipping"); - return Ok(()); // 已经是最新版本 - } - - // 发送更新请求 - println!( - "Client update: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Client update: Sending mode: {}", CLIENT_UPDATE_MODE); - self.stream.write_all(&[CLIENT_UPDATE_MODE]).await?; // 客户端更新模式 - println!("Client update: Sending target version: {}", target_version); - self.stream.write_all(&target_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Client update: Request header sent"); - - // 执行增量更新 - println!("Client update: Starting incremental update..."); - let result = self - .client_perform_incremental_update(path, current_version, target_version) - .await; - println!( - "Client update: Incremental update completed, result: {:?}", - result - ); - result - } - - /// 客户端:增量上传变化到服务器 - pub async fn client_upload( - &mut self, - file_path: impl AsRef, - ) -> Result { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - - // 发送上传请求 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[2u8]).await?; // 客户端上传模式 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 执行增量上传 - let new_version = self - .client_perform_incremental_upload(path, current_version) - .await?; - - // 更新本地版本 - self.save_version(path, new_version).await?; - - Ok(new_version) - } - - // ==================== 服务端功能 ==================== - - /// 服务端:处理客户端更新请求 - pub async fn server_handle_client_update( - &mut self, - file_path: impl AsRef, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server: Reading protocol version..."); - // 读取协议版本 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Server: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - // 读取请求模式 - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Server: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - CLIENT_UPDATE_MODE => { - // 客户端更新模式 - println!("Server: Client update mode detected"); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let target_version = i32::from_be_bytes(version_buf); - println!("Server: Target version: {}", target_version); - - println!("Server: Sending version delta..."); - let result = self.server_send_version_delta(path, target_version).await; - println!("Server: Version delta sent, result: {:?}", result); - result - } - CLIENT_UPLOAD_MODE => { - // 客户端上传模式 - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let client_version = i32::from_be_bytes(version_buf); - - self.server_receive_client_changes(path, client_version) - .await - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, mode_buf[0] - ))); - } - } - } - - /// 服务端:发送指定版本的增量数据 - pub async fn server_send_delta_to_version( - &mut self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送响应头 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[1u8]).await?; // 服务端增量模式 - - self.server_send_version_delta_internal(path, from_version, to_version) - .await - } - - // ==================== 内部实现 ==================== - - /// 客户端:执行增量更新 - async fn client_perform_incremental_update( - &mut self, - file_path: impl AsRef, - current_version: i32, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送客户端当前版本给服务器 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 读取服务器响应 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - - match mode_buf[0] { - SERVER_DELTA_MODE => { - // 服务端增量模式 - self.client_apply_delta(path, target_version).await - } - NO_CHANGE_MODE => { - // 无变化模式,不需要更新 - Ok(()) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:应用增量数据 - async fn client_apply_delta( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取增量类型 - let mut delta_type_buf = [0u8; 1]; - self.stream.read_exact(&mut delta_type_buf).await?; - - match delta_type_buf[0] { - SERVER_FULL_MODE => { - // 完整文件传输 - self.read_file(path).await?; - } - SERVER_DELTA_MODE => { - // 增量块传输 - self.client_receive_and_apply_chunks(path).await?; - } - NO_CHANGE_MODE => { - // 无变化模式 - // 不需要做任何操作 - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, delta_type_buf[0] - ))); - } - } - - // 更新本地版本 - self.save_version(path, target_version).await?; - - // 发送确认 - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - - Ok(()) - } - - /// 客户端:接收并应用增量块 - async fn client_receive_and_apply_chunks( - &mut self, - file_path: impl AsRef, - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, None) - .await - } - - /// 客户端:执行增量上传 - async fn client_perform_incremental_upload( - &mut self, - file_path: impl AsRef, - current_version: i32, - ) -> Result { - let path = file_path.as_ref(); - - // 读取服务器响应 - println!("Client upload: Reading server response version..."); - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Client upload: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - println!("Client upload: Reading server response mode..."); - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Client upload: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - SERVER_FULL_MODE => { - // 服务端接收模式 - let new_version = self.client_send_changes(path, current_version).await?; - Ok(new_version) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:发送变化到服务器 - async fn client_send_changes( - &mut self, - file_path: impl AsRef, - client_version: i32, - ) -> Result { - let path = file_path.as_ref(); - println!( - "Client send changes: Starting to send changes, client_version={}", - client_version - ); - - // 发送确认给服务器 - println!("Client: Sending acknowledgment to server..."); - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - println!("Client: Acknowledgment sent"); - - // 发送客户端版本给服务器进行验证 - self.stream.write_all(&client_version.to_be_bytes()).await?; - self.stream.flush().await?; - - // 计算当前文件块哈希 - let current_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Client: Calculated {} current chunk hashes", - current_hashes.len() - ); - - // 发送块哈希 - println!("Client: Sending chunk hashes to server..."); - match self.send_chunk_hashes(¤t_hashes).await { - Ok(_) => println!("Client: Successfully sent chunk hashes"), - Err(e) => { - println!("Client: ERROR sending chunk hashes: {:?}", e); - return Err(e); - } - } - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed stream"), - Err(e) => { - println!("Client: ERROR flushing stream: {:?}", e); - return Err(e.into()); - } - } - - // 读取服务器需要的块列表 - println!("Client: Reading chunks needed from server..."); - let chunks_to_send = self.receive_chunks_to_send().await?; - println!("Client: Received chunks to send: {:?}", chunks_to_send); - - // 发送需要的块 - println!("Client send changes: Sending file chunks to server..."); - self.send_file_chunks(path, &chunks_to_send).await?; - println!("Client send changes: File chunks sent"); - - // 读取新版本号 - println!("Client send changes: Reading new version from server..."); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let new_version = i32::from_be_bytes(version_buf); - println!("Client send changes: Received new version: {}", new_version); - - Ok(new_version) - } - - /// 服务端:发送版本增量数据 - async fn server_send_version_delta( - &mut self, - file_path: impl AsRef, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server send version delta: Reading client version..."); - // 获取客户端当前版本(需要从协议中读取) - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server send version delta: Client version: {}", - client_version - ); - - // 发送响应头 - println!( - "Server send version delta: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!( - "Server send version delta: Sending mode: {}", - SERVER_DELTA_MODE - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 服务端增量模式 - self.stream.flush().await?; - println!("Server send version delta: Response header sent"); - - println!("Server send version delta: Calling internal delta function..."); - let result = self - .server_send_version_delta_internal(path, client_version, target_version) - .await; - println!( - "Server send version delta: Internal function completed, result: {:?}", - result - ); - result - } - - /// 服务端:内部发送版本增量 - async fn server_send_version_delta_internal( - &mut self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!( - "Server send version delta internal: from_version={}, to_version={}", - from_version, to_version - ); - - if from_version == to_version { - // 版本相同,不需要传输 - println!( - "Server send version delta internal: Versions are the same, sending NO_CHANGE_MODE" - ); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - return Ok(()); - } - - // 计算版本间的差异 - println!("Server send version delta internal: Calculating version delta..."); - let delta_chunks = self - .calculate_version_delta(path, from_version, to_version) - .await?; - println!( - "Server send version delta internal: Delta chunks count: {}", - delta_chunks.len() - ); - - if delta_chunks.is_empty() { - // 没有变化或目标版本不存在 - println!("Server send version delta internal: No changes, sending NO_CHANGE_MODE"); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - } else { - // 发送增量块 - println!( - "Server send version delta internal: Sending delta chunks with SERVER_DELTA_MODE" - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 增量块模式 - self.send_file_chunks(path, &delta_chunks).await?; - } - println!("Server send version delta internal: Completed successfully"); - Ok(()) - } - - /// 服务端:接收客户端变化 - async fn server_receive_client_changes( - &mut self, - file_path: impl AsRef, - client_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - println!("Server receive changes: Client version: {}", client_version); - - // 验证客户端版本是否匹配服务器当前版本 - let server_version = self.get_current_version(path).await?; - if client_version != server_version { - return Err(TcpTargetError::Protocol(format!( - "{}: client {}, server {}", - ERR_VERSION_MISMATCH, client_version, server_version - ))); - } - - // 发送响应头 - println!( - "Server receive changes: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Server receive changes: Sending mode: {}", SERVER_FULL_MODE); - self.stream.write_all(&[SERVER_FULL_MODE]).await?; // 服务端完整文件模式 - self.stream.flush().await?; - println!("Server receive changes: Response header sent"); - - // 接收客户端块哈希 - println!("Server receive changes: Receiving client chunk hashes..."); - // 等待客户端确认 - let mut ack_buf = [0u8; 1]; - self.stream.read_exact(&mut ack_buf).await?; - if ack_buf[0] != 1 { - return Err(TcpTargetError::Protocol( - "Client acknowledgment failed".to_string(), - )); - } - println!("Server receive changes: Client acknowledged, reading client version..."); - - // 读取客户端版本进行验证 - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let received_client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server receive changes: Received client version: {}", - received_client_version - ); - - // 验证客户端版本是否匹配 - if received_client_version != client_version { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {}, got {}", - ERR_VERSION_MISMATCH, client_version, received_client_version - ))); - } - - println!("Server receive changes: Reading chunk hashes..."); - let client_chunk_hashes = match self.receive_chunk_hashes().await { - Ok(hashes) => { - println!( - "Server receive changes: Successfully received {} client chunk hashes", - hashes.len() - ); - hashes - } - Err(e) => { - println!( - "Server receive changes: ERROR receiving chunk hashes: {:?}", - e - ); - return Err(e); - } - }; - - // 计算服务器当前块哈希 - let server_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Server: Calculated {} server chunk hashes", - server_hashes.len() - ); - - // 比较差异,确定需要更新的块 - let chunks_needed = self.compare_chunk_hashes(&client_chunk_hashes, &server_hashes); - println!( - "Server: Chunks needed after comparison: {:?}", - chunks_needed - ); - - // 发送需要的块列表 - println!("Server: Sending chunks needed list: {:?}", chunks_needed); - self.send_chunks_needed(&chunks_needed).await?; - - // 接收并应用客户端块 - self.receive_and_apply_chunks(path, &chunks_needed).await?; - - // 生成新版本号(这里简化:版本号+1) - let current_version = self.get_current_version(path).await?; - let new_version = current_version + 1; - - // 保存差异而不是完整文件 - self.save_version_delta(path, current_version, new_version, &chunks_needed) - .await?; - self.save_version(path, new_version).await?; - - // 发送新版本号给客户端 - println!( - "Server receive changes: Sending new version to client: {}", - new_version - ); - self.stream.write_all(&new_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Server receive changes: New version sent to client"); - - Ok(()) - } - - // ==================== 工具函数 ==================== - - /// 计算文件的块哈希 - async fn calculate_file_chunk_hashes( - &self, - file_path: impl AsRef, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - let mut hashes = Vec::new(); - let mut buffer = vec![0u8; DEFAULT_CHUNK_SIZE]; - let mut bytes_read = 0; - - while bytes_read < file_size { - let bytes_to_read = (file_size - bytes_read).min(DEFAULT_CHUNK_SIZE as u64) as usize; - let chunk = &mut buffer[..bytes_to_read]; - - file.read_exact(chunk).await?; - - let hash = self.simple_chunk_hash(chunk); - hashes.push(hash); - - bytes_read += bytes_to_read as u64; - } - - Ok(hashes) - } - - /// 简单的块哈希函数 - fn simple_chunk_hash(&self, data: &[u8]) -> [u8; HASH_SIZE] { - // 使用稳定的blake3哈希算法,确保跨平台一致性 - let hash = blake3::hash(data); - let mut hash_bytes = [0u8; HASH_SIZE]; - hash_bytes[..32].copy_from_slice(hash.as_bytes()); - hash_bytes - } - - /// 发送块哈希列表 - async fn send_chunk_hashes( - &mut self, - hashes: &[[u8; HASH_SIZE]], - ) -> Result<(), TcpTargetError> { - let hash_count = hashes.len() as u32; - println!("Client: Sending {} chunk hashes", hash_count); - - match self.stream.write_all(&hash_count.to_be_bytes()).await { - Ok(_) => println!("Client: Successfully sent hash count"), - Err(e) => { - println!("Client: ERROR sending hash count: {:?}", e); - return Err(e.into()); - } - } - // Extra flush to ensure hash count is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed hash count"), - Err(e) => { - println!("Client: ERROR flushing hash count: {:?}", e); - return Err(e.into()); - } - } - - for (i, hash) in hashes.iter().enumerate() { - println!("Client: Sending chunk hash {}: {:?}", i, &hash[..8]); // Show first 8 bytes for debugging - match self.stream.write_all(hash).await { - Ok(_) => println!("Client: Successfully sent chunk hash {}", i), - Err(e) => { - println!("Client: ERROR sending chunk hash {}: {:?}", i, e); - return Err(e.into()); - } - } - } - - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed after sending hashes"), - Err(e) => { - println!("Client: ERROR flushing after sending hashes: {:?}", e); - return Err(e.into()); - } - } - println!("Client: All chunk hashes sent"); - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed final"), - Err(e) => { - println!("Client: ERROR flushing final: {:?}", e); - return Err(e.into()); - } - } - Ok(()) - } - - /// 接收块哈希列表 - async fn receive_chunk_hashes(&mut self) -> Result, TcpTargetError> { - println!("Server: Starting to receive chunk hashes..."); - - let mut count_buf = [0u8; 4]; - println!("Server: Reading chunk hash count..."); - self.stream.read_exact(&mut count_buf).await?; - let hash_count = u32::from_be_bytes(count_buf) as usize; - println!( - "Server: Reading {} chunk hashes, raw bytes: {:?}", - hash_count, count_buf - ); - - let mut hashes = Vec::with_capacity(hash_count); - let mut hash_buf = [0u8; HASH_SIZE]; - - for i in 0..hash_count { - println!("Server: Reading chunk hash {}...", i); - self.stream.read_exact(&mut hash_buf).await?; - println!("Server: Received chunk hash {}: {:?}", i, &hash_buf[..8]); // Show first 8 bytes for debugging - hashes.push(hash_buf); - } - - println!("Server: All {} chunk hashes received", hashes.len()); - Ok(hashes) - } - - /// 比较块哈希并返回需要更新的块索引 - fn compare_chunk_hashes( - &self, - new_hashes: &[[u8; HASH_SIZE]], - old_hashes: &[[u8; HASH_SIZE]], - ) -> Vec { - let mut chunks_needed = Vec::new(); - - for (i, (new_hash, old_hash)) in new_hashes.iter().zip(old_hashes.iter()).enumerate() { - if new_hash != old_hash { - chunks_needed.push(i); - } - } - - // 如果新文件有更多块,也需要更新 - for i in old_hashes.len()..new_hashes.len() { - chunks_needed.push(i); - } - - chunks_needed - } - - /// 接收需要传输的块列表 - async fn receive_chunks_to_send(&mut self) -> Result, TcpTargetError> { - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - let mut chunks = Vec::with_capacity(chunk_count); - let mut index_buf = [0u8; 4]; - - for _ in 0..chunk_count { - self.stream.read_exact(&mut index_buf).await?; - let index = u32::from_be_bytes(index_buf) as usize; - chunks.push(index); - } - - Ok(chunks) - } - - /// 发送需要接收的块列表 - async fn send_chunks_needed(&mut self, chunks: &[usize]) -> Result<(), TcpTargetError> { - let chunk_count = chunks.len() as u32; - self.stream.write_all(&chunk_count.to_be_bytes()).await?; - - for &chunk_index in chunks { - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 发送指定的文件块 - async fn send_file_chunks( - &mut self, - file_path: impl AsRef, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - // 发送块数量 - self.stream - .write_all(&(chunk_indices.len() as u32).to_be_bytes()) - .await?; - - if chunk_indices.is_empty() { - self.stream.flush().await?; - return Ok(()); - } - - for &chunk_index in chunk_indices { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - if chunk_offset >= file_size { - continue; // 跳过超出文件范围的块 - } - - // 定位到块位置 - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 计算块大小 - let remaining_bytes = file_size - chunk_offset; - let chunk_size = remaining_bytes.min(DEFAULT_CHUNK_SIZE as u64) as usize; - - // 发送块索引和大小 - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - self.stream - .write_all(&(chunk_size as u32).to_be_bytes()) - .await?; - - // 发送块数据 - let mut buffer = vec![0u8; chunk_size]; - file.read_exact(&mut buffer).await?; - self.stream.write_all(&buffer).await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 接收并应用增量块 - async fn receive_and_apply_chunks( - &mut self, - file_path: impl AsRef, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, Some(chunk_indices)) - .await - } - - /// 内部工具函数:接收并应用块数据 - async fn receive_and_apply_chunks_internal( - &mut self, - file_path: impl AsRef, - expected_indices: Option<&[usize]>, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取块数量 - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - // 创建临时文件来接收完整内容 - let temp_path = path.with_extension("tmp"); - let temp_file = File::create(&temp_path).await?; - let mut temp_writer = BufWriter::new(temp_file); - - // 记录接收到的块数据 - let mut received_chunks = Vec::new(); - - for i in 0..chunk_count { - // 读取块索引和大小 - let mut index_buf = [0u8; 4]; - self.stream.read_exact(&mut index_buf).await?; - let chunk_index = u32::from_be_bytes(index_buf) as usize; - - // 验证块索引(如果提供了预期索引) - if let Some(expected) = expected_indices { - if i >= expected.len() || chunk_index != expected[i] { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {:?}, got {}", - ERR_CHUNK_INDEX_OUT_OF_BOUNDS, - expected.get(i), - chunk_index - ))); - } - } - - let mut size_buf = [0u8; 4]; - self.stream.read_exact(&mut size_buf).await?; - let chunk_size = u32::from_be_bytes(size_buf) as usize; - - // 读取块数据 - let mut chunk_data = vec![0u8; chunk_size]; - self.stream.read_exact(&mut chunk_data).await?; - - // 记录接收到的块 - received_chunks.push((chunk_index, chunk_data)); - } - - // 按块索引排序并写入临时文件 - received_chunks.sort_by_key(|(index, _)| *index); - - for (chunk_index, chunk_data) in received_chunks { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut temp_writer, std::io::SeekFrom::Start(chunk_offset)) - .await?; - temp_writer.write_all(&chunk_data).await?; - } - - temp_writer.flush().await?; - - // 替换原文件 - tokio::fs::rename(&temp_path, path).await?; - - Ok(()) - } - - /// 获取当前文件版本 - async fn get_current_version( - &self, - file_path: impl AsRef, - ) -> Result { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - if !version_file_path.exists() { - return Ok(0); // 默认版本为0 - } - - let version_content = read_to_string(&version_file_path) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to read version file: {}", e)))?; - - version_content - .trim() - .parse::() - .map_err(|e| TcpTargetError::File(format!("Invalid version format: {}", e))) - } - - /// 保存文件版本 - async fn save_version( - &self, - file_path: impl AsRef, - version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - tokio::fs::write(&version_file_path, version.to_string()) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to write version file: {}", e)))?; - - Ok(()) - } - - /// 计算版本间的差异 - async fn calculate_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - - // 如果目标版本不存在,返回空差异 - let current_version = self.get_current_version(path).await?; - if to_version > current_version { - return Ok(Vec::new()); - } - - // 获取源版本和目标版本的块哈希 - let from_hashes = self.get_version_chunk_hashes(path, from_version).await?; - let to_hashes = self.get_version_chunk_hashes(path, to_version).await?; - - // 比较差异 - let delta_chunks = self.compare_chunk_hashes(&to_hashes, &from_hashes); - - Ok(delta_chunks) - } - - /// 获取指定版本的块哈希 - async fn get_version_chunk_hashes( - &self, - file_path: impl AsRef, - version: i32, - ) -> Result, TcpTargetError> { - let path = file_path.as_ref(); - - if version == 0 { - // 版本0表示空文件 - return Ok(Vec::new()); - } - - // 从版本历史中重建文件并计算哈希 - let reconstructed_path = self.reconstruct_version(path, version).await?; - let hashes = self - .calculate_file_chunk_hashes(&reconstructed_path) - .await?; - - // 清理临时文件 - let _ = remove_file(&reconstructed_path).await; - - Ok(hashes) - } - - /// 从差异重建指定版本的文件 - async fn reconstruct_version( - &self, - file_path: impl AsRef, - target_version: i32, - ) -> Result { - let path = file_path.as_ref(); - let temp_path = path.with_extension(format!("temp_{}", target_version)); - - if target_version == 0 { - // 创建空文件 - tokio::fs::write(&temp_path, b"").await?; - return Ok(temp_path); - } - - // 从版本0开始,逐步应用差异 - let mut current_version = 0; - let mut current_path = path.with_extension("temp_base"); - tokio::fs::write(¤t_path, b"").await?; // 创建基础空文件 - - while current_version < target_version { - let next_version = current_version + 1; - let (delta_chunks, chunk_data_list) = self - .load_version_delta(path, current_version, next_version) - .await?; - - // 应用差异到新文件 - let next_path = path.with_extension(format!("temp_{}", next_version)); - self.apply_delta_to_file(¤t_path, &next_path, &delta_chunks, &chunk_data_list) - .await?; - - // 清理旧临时文件 - if current_version > 0 { - let _ = tokio::fs::remove_file(¤t_path).await; - } - current_path = next_path; - current_version = next_version; - } - - Ok(current_path) - } - - /// 保存版本差异 - async fn save_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - changed_chunks: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 创建版本历史目录 - let history_dir = path - .parent() - .ok_or_else(|| TcpTargetError::File("Invalid file path".to_string()))? - .join(VERSION_HISTORY_DIR); - - if !history_dir.exists() { - create_dir_all(&history_dir).await?; - } - - // 保存差异信息(记录变化的块索引和实际数据) - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name() - .ok_or_else(|| TcpTargetError::File("Invalid file name".to_string()))? - .to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - // 读取当前版本文件以获取变化块的实际数据 - let current_file_data = read(path).await?; - - let mut delta_data = Vec::new(); - - for &chunk_index in changed_chunks { - // 写入块索引 - delta_data.extend_from_slice(&(chunk_index as u32).to_be_bytes()); - - // 计算块的起始和结束位置 - let chunk_start = chunk_index * DEFAULT_CHUNK_SIZE; - let chunk_end = - std::cmp::min(chunk_start + DEFAULT_CHUNK_SIZE, current_file_data.len()); - - // 写入块数据大小 - let chunk_size = (chunk_end - chunk_start) as u32; - delta_data.extend_from_slice(&chunk_size.to_be_bytes()); - - // 写入块数据 - delta_data.extend_from_slice(¤t_file_data[chunk_start..chunk_end]); - } - - tokio::fs::write(&delta_path, &delta_data).await?; - - Ok(()) - } - - /// 加载版本差异 - async fn load_version_delta( - &self, - file_path: impl AsRef, - from_version: i32, - to_version: i32, - ) -> Result<(Vec, Vec>), TcpTargetError> { - let path = file_path.as_ref(); - let history_dir = path.parent().unwrap().join(VERSION_HISTORY_DIR); - - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name().unwrap().to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - if !delta_path.exists() { - return Ok((Vec::new(), Vec::new())); - } - - let delta_data = read(&delta_path).await?; - let mut chunks = Vec::new(); - let mut chunk_data_list = Vec::new(); - - let mut offset = 0; - while offset < delta_data.len() { - // 读取块索引 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete index data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let index = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块大小 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete size data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_size = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块数据 - if offset + chunk_size > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete chunk data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_data = delta_data[offset..offset + chunk_size].to_vec(); - offset += chunk_size; - - chunks.push(index); - chunk_data_list.push(chunk_data); - } - - Ok((chunks, chunk_data_list)) - } - - /// 应用差异到文件 - async fn apply_delta_to_file( - &self, - source_path: impl AsRef, - target_path: impl AsRef, - delta_chunks: &[usize], - chunk_data_list: &[Vec], - ) -> Result<(), TcpTargetError> { - let source_path = source_path.as_ref(); - let target_path = target_path.as_ref(); - - // 复制源文件到目标文件 - copy(source_path, target_path).await?; - - if delta_chunks.is_empty() { - return Ok(()); - } - - // 打开目标文件进行修改 - let mut file = OpenOptions::new().write(true).open(target_path).await?; - - for (i, &chunk_index) in delta_chunks.iter().enumerate() { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 使用从delta文件中读取的实际块数据进行写入 - if i < chunk_data_list.len() { - file.write_all(&chunk_data_list[i]).await?; - } - } - - Ok(()) - } -} diff --git a/crates/utils/tcp_connection/src/lib.rs b/crates/utils/tcp_connection/src/lib.rs index a2615d1..6a2e599 100644 --- a/crates/utils/tcp_connection/src/lib.rs +++ b/crates/utils/tcp_connection/src/lib.rs @@ -3,6 +3,4 @@ pub mod instance; pub mod instance_challenge; -pub mod instance_incremental_transfer; - pub mod error; -- cgit