1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
use std::path::PathBuf;
use crate::{error::StorageIOError, store::ChunkingResult};
/// Rabin fingerprint parameters for CDC
const WINDOW_SIZE: usize = 48;
const MIN_CHUNK_RATIO: f64 = 0.75;
const MAX_CHUNK_RATIO: f64 = 2.0;
const BASE_TARGET_MASK: u64 = 0xFFFF;
/// Rabin fingerprint polynomial (irreducible polynomial)
const POLYNOMIAL: u64 = 0x3DA3358B4DC173;
/// Precomputed table for Rabin fingerprint sliding window
static FINGERPRINT_TABLE: [u64; 256] = {
let mut table = [0u64; 256];
let mut i = 0;
while i < 256 {
let mut fingerprint = i as u64;
let mut j = 0;
while j < 8 {
if fingerprint & 1 != 0 {
fingerprint = (fingerprint >> 1) ^ POLYNOMIAL;
} else {
fingerprint >>= 1;
}
j += 1;
}
table[i] = fingerprint;
i += 1;
}
table
};
/// Calculate Rabin fingerprint for a byte
#[inline]
fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 {
let old_byte_index = old_byte as usize;
let fingerprint =
(old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index];
fingerprint
}
/// Split data using Content-Defined Chunking with Rabin fingerprinting
pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> {
let avg_size = avg_size as usize;
if avg_size == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Average chunk size must be greater than 0",
)
.into());
}
// Calculate min and max chunk sizes based on average size
let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize;
let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize;
// Ensure reasonable bounds
let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes
let max_chunk_size = max_chunk_size.max(avg_size * 2);
let target_mask = {
let scale_factor = avg_size as f64 / (64.0 * 1024.0);
let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64;
mask_value.max(0xC000)
};
let mut chunks = Vec::new();
let mut start = 0;
let data_len = data.len();
while start < data_len {
let chunk_start = start;
let search_end = (start + max_chunk_size).min(data_len);
let mut boundary_found = false;
let mut boundary_pos = search_end;
let mut window = Vec::with_capacity(WINDOW_SIZE);
let mut fingerprint: u64 = 0;
for i in start..search_end {
let byte = data[i];
// Update sliding window
if window.len() >= WINDOW_SIZE {
let old_byte = window.remove(0);
fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte);
} else {
fingerprint = (fingerprint << 8) ^ (byte as u64);
}
window.push(byte);
if i - chunk_start >= min_chunk_size {
if (fingerprint & target_mask) == 0 {
boundary_found = true;
boundary_pos = i + 1;
break;
}
}
}
let chunk_end = if boundary_found {
boundary_pos
} else {
search_end
};
let chunk_data = data[chunk_start..chunk_end].to_vec();
let chunk = crate::store::create_chunk(chunk_data);
chunks.push(chunk);
start = chunk_end;
}
if chunks.len() > 1 {
let mut merged_chunks = Vec::new();
let mut i = 0;
while i < chunks.len() {
let current_chunk = &chunks[i];
if i < chunks.len() - 1 {
let next_chunk = &chunks[i + 1];
if current_chunk.data.len() < min_chunk_size
&& current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size
{
// Merge chunks
let mut merged_data = current_chunk.data.clone();
merged_data.extend_from_slice(&next_chunk.data);
let merged_chunk = crate::store::create_chunk(merged_data);
merged_chunks.push(merged_chunk);
i += 2;
} else {
merged_chunks.push(current_chunk.clone());
i += 1;
}
} else {
merged_chunks.push(current_chunk.clone());
i += 1;
}
}
chunks = merged_chunks;
}
Ok(ChunkingResult {
chunks,
total_size: data_len as u64,
})
}
/// Split file using CDC with specified average chunk size
pub async fn write_file_cdc<I: Into<PathBuf>>(
file_to_write: I,
storage_dir: I,
output_index_file: I,
avg_size: u32,
) -> Result<(), StorageIOError> {
use crate::store::{StorageConfig, write_file};
let config = StorageConfig::cdc(avg_size);
write_file(file_to_write, storage_dir, output_index_file, &config).await
}
/// Test function for CDC chunking
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cdc_basic() {
let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
let mut data = Vec::new();
for _ in 0..10 {
data.extend_from_slice(&pattern);
}
let result = split_cdc_impl(&data, 4096).unwrap();
// Should have at least one chunk
assert!(!result.chunks.is_empty());
assert!(
result.chunks.len() >= 1 && result.chunks.len() <= 10,
"Expected 1-10 chunks for 10KB data with 4KB average, got {}",
result.chunks.len()
);
// Verify total size
let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
assert_eq!(total_chunk_size, data.len());
// Verify chunk size bounds (more relaxed for CDC)
let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize;
for chunk in &result.chunks {
let chunk_size = chunk.data.len();
// CDC can produce chunks smaller than min_size due to content patterns
// But they should not exceed max_size
assert!(
chunk_size <= max_size,
"Chunk size {} exceeds maximum {}",
chunk_size,
max_size
);
}
}
#[test]
fn test_cdc_small_file() {
// Small file should result in single chunk
let data = vec![1, 2, 3, 4, 5];
let result = split_cdc_impl(&data, 4096).unwrap();
// Should have exactly one chunk
assert_eq!(result.chunks.len(), 1);
assert_eq!(result.chunks[0].data.len(), data.len());
}
#[test]
fn test_cdc_large_avg_size() {
// Test with larger average size (256KB)
let mut data = Vec::new();
for i in 0..(256 * 1024 * 3) {
// 768KB of data
data.push((i % 256) as u8);
}
let result = split_cdc_impl(&data, 256 * 1024).unwrap();
// With 768KB data and 256KB average, should have reasonable number of chunks
// CDC is content-defined, so exact count varies
assert!(
result.chunks.len() >= 1 && result.chunks.len() <= 10,
"Expected 1-10 chunks for 768KB data with 256KB average, got {}",
result.chunks.len()
);
// Verify total size
let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
assert_eq!(total_chunk_size, data.len());
// Verify chunk size bounds
let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize;
for chunk in &result.chunks {
// CDC can produce chunks smaller than min_size
// But they should not exceed max_size
assert!(
chunk.data.len() <= max_size,
"Chunk size {} exceeds maximum {}",
chunk.data.len(),
max_size
);
}
}
#[test]
fn test_cdc_zero_avg_size() {
let data = vec![1, 2, 3];
let result = split_cdc_impl(&data, 0);
assert!(result.is_err());
match result {
Err(StorageIOError::IOErr(e)) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
}
_ => panic!("Expected IOErr with InvalidInput"),
}
}
#[test]
fn test_rabin_fingerprint() {
// Test that rabin_fingerprint_byte function is deterministic
let test_bytes = [0x01, 0x02, 0x03, 0x04];
// Calculate fingerprint with specific sequence
let mut fingerprint1 = 0;
for &byte in &test_bytes {
fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte);
}
// Calculate again with same inputs
let mut fingerprint2 = 0;
for &byte in &test_bytes {
fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte);
}
// Same input should produce same output
assert_eq!(fingerprint1, fingerprint2);
// Test with different old_byte produces different result
let mut fingerprint3 = 0;
for &byte in &test_bytes {
fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte);
}
// Different old_byte should produce different fingerprint
assert_ne!(fingerprint1, fingerprint3);
}
}
|