diff options
Diffstat (limited to 'policy')
| -rw-r--r-- | policy/README.md | 96 | ||||
| -rw-r--r-- | policy/_policies/Cargo.lock | 7 | ||||
| -rw-r--r-- | policy/_policies/Cargo.toml | 15 | ||||
| -rw-r--r-- | policy/_policies/Cargo.toml.t | 19 | ||||
| -rw-r--r-- | policy/_policies/src/error.rs | 14 | ||||
| -rw-r--r-- | policy/_policies/src/lib.rs | 75 | ||||
| -rw-r--r-- | policy/_policies/src/lib.rs.t | 117 | ||||
| -rw-r--r-- | policy/_policies/src/stream_read.rs | 46 | ||||
| -rw-r--r-- | policy/butck/butck_fixed_size/Cargo.lock | 7 | ||||
| -rw-r--r-- | policy/butck/butck_fixed_size/Cargo.toml | 7 | ||||
| -rw-r--r-- | policy/butck/butck_fixed_size/src/lib.rs | 48 |
11 files changed, 451 insertions, 0 deletions
diff --git a/policy/README.md b/policy/README.md new file mode 100644 index 0000000..c995506 --- /dev/null +++ b/policy/README.md @@ -0,0 +1,96 @@ +# Write your Policy! + +Welcome to the Butchunker Policy Development Guide. This guide explains how to create a custom chunking policy for Butchunker. A chunking policy defines how to split data streams or files into chunks. This is a core task for data deduplication, storage, and transfer. + +Before starting, you should know basic Rust and understand the Butchunker framework. Your policy will decide where to split the data based on its content and your settings. + +## Creating a Policy Crate + +First, create a new `Rust Crate` to host your chunking policy. + +### Writing `Cargo.toml` + +```toml +[package] +name = "butck_fixed_size" # Policy name +authors = ["Butchunker"] # Author info +version = "0.1.0" +edition = "2024" + +[dependencies] +``` + +## Implementing Policy Logic + +### Writing `src/lib.rs` + +In `src/lib.rs`, implement one or both of the following schemes: + +#### Scheme 1: Streaming Processing Scheme + +Suitable for processing large files where subsequent content cannot be predicted, but also does not require loading the entire file into memory. + +```rust +use std::collections::HashMap; + +// Streaming policy struct (must implement the Default trait) +#[derive(Default)] +pub struct YourPolicyStream { + // Define your state fields here +} + +// Streaming processing function +pub async fn your_policy_stream( + current_data: &[u8], // Current data chunk + len: u32, // Data length + stream: &mut FixedSizeStream, // Streaming processing context + params: &HashMap<&str, &str>, // Configuration parameters +) -> Option<u32> { + // Implement your chunking logic + // Return the split position (offset from the start of current_data), or None if no split + None +} +``` + +#### Scheme 2: Simple Processing Scheme + +Suitable for processing small to medium-sized files that can be loaded entirely at once, allowing knowledge of subsequent data during chunking for better results. + +```rust +use std::collections::HashMap; + +// Simple processing function +pub async fn your_policy( + raw_data: &[u8], // Raw data + params: &HashMap<&str, &str>, // Configuration parameters +) -> Vec<u32> { + // Implement your chunking logic + // Return a vector of all split positions (offsets from the start of raw_data) + vec![] +} +``` + +## Registration and Usage + +### Deploying the Policy + +1. Place the completed policy `Crate` into the `./policy/` directory of the Butchunker repository. +2. Use the `butckrepo-refresh` program to refresh the registry: + - If the program is not yet installed, you can execute the following in the root directory of the Butchunker repository: + + ```bash + cargo install --path ./ + ``` +3. After each policy library update, you must: + - Execute `butckrepo-refresh` to refresh the registry. + - Reinstall the `butck` binary: `cargo install --path ./`. + +### Calling the Policy + +- The policy will be automatically registered in Butchunker's registry. + + Use the following command to call the policy: + + ````rust + butck write <file> --policy <policy_name> --storage ./ + ```` diff --git a/policy/_policies/Cargo.lock b/policy/_policies/Cargo.lock new file mode 100644 index 0000000..8f7bc05 --- /dev/null +++ b/policy/_policies/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "butck_policies" +version = "0.1.0" diff --git a/policy/_policies/Cargo.toml b/policy/_policies/Cargo.toml new file mode 100644 index 0000000..d939dd2 --- /dev/null +++ b/policy/_policies/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "butck_policies" +version = "0.1.0" +edition = "2024" + +[dependencies] +thiserror = "2" +tokio = { version = "1", features = ["fs"] } + +# Auto generated dependencies +# If you find issues with the dependencies, please +# 1. Delete all code after this comment +# 2. Clear the file `policy/_policies/src/lib.rs` +# 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory +butck_fixed_size = { path = "../../policy/butck/butck_fixed_size" }
\ No newline at end of file diff --git a/policy/_policies/Cargo.toml.t b/policy/_policies/Cargo.toml.t new file mode 100644 index 0000000..aab90b9 --- /dev/null +++ b/policy/_policies/Cargo.toml.t @@ -0,0 +1,19 @@ +[package] +name = "butck_policies" +version = "0.1.0" +edition = "2024" + +[dependencies] +thiserror = "2" +tokio = { version = "1", features = ["fs"] } + +# Auto generated dependencies +# If you find issues with the dependencies, please +# 1. Delete all code after this comment +# 2. Clear the file `policy/_policies/src/lib.rs` +# 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory +>>>>>>>>>> deps + +@@@ >>> deps +<<<crate_name>>> = { path = "../../<<<path>>>" } +@@@ <<< diff --git a/policy/_policies/src/error.rs b/policy/_policies/src/error.rs new file mode 100644 index 0000000..975749d --- /dev/null +++ b/policy/_policies/src/error.rs @@ -0,0 +1,14 @@ +#[derive(Debug, thiserror::Error)] +pub enum ChunkFailed { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Target policy not found")] + PolicyNotFound, + + #[error("File read failed: {0}")] + FileReadFailed(std::path::PathBuf), + + #[error("File open failed: {0}")] + FileOpenFailed(std::path::PathBuf), +} diff --git a/policy/_policies/src/lib.rs b/policy/_policies/src/lib.rs new file mode 100644 index 0000000..397579a --- /dev/null +++ b/policy/_policies/src/lib.rs @@ -0,0 +1,75 @@ +// Auto generated dependencies +// If you find issues with the dependencies, please +// 1. Delete all code after this comment +// 2. Clear the auto generated part in `policy/_policies/Cargo.toml` +// 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory +pub mod error; +pub mod stream_read; + +use error::ChunkFailed; +use std::{collections::HashMap, path::Path}; + +use crate::stream_read::chunk_stream_process; + +/// Chunks the specified raw data using the specified chunking policy +/// +/// # Parameters +/// - `policy_name`: Chunking policy name, currently supports 1 policies +/// - `raw_data`: Raw data byte slice +/// - `params`: Hashmap of parameters required by the chunking policy +pub async fn chunk_with( + policy_name: &str, + raw_data: &[u8], + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { + "butck_fixed_size" => Ok(butck_fixed_size::chunk(raw_data, params).await), + _ => Err(ChunkFailed::PolicyNotFound), + } +} + +pub async fn chunk_stream_with( + policy_name: &str, + size: u32, + path: &Path, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { + "butck_fixed_size" => { + let mut stream = butck_fixed_size::FixedSizeStream::default(); + chunk_stream_process( + path, &mut stream, size, params, + async |current_data, len, stream, params| { + butck_fixed_size::chunk_stream(current_data, len, stream, params).await + }, + ) + .await + } + _ => Err(ChunkFailed::PolicyNotFound), + } +} + +pub fn policies() -> Vec<&'static str> { + vec![ + // butck_fixed_size + "butck_fixed_size", + ] +} + +pub mod butck_fixed_size { + pub use butck_fixed_size::FixedSizeStream; + use std::collections::HashMap; + + pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> { + butck_fixed_size::chunk_fixed_size(raw_data, params).await + } + + pub async fn chunk_stream( + current_data: &[u8], + len: u32, + stream: &mut butck_fixed_size::FixedSizeStream, + params: &HashMap<&str, &str>, + ) -> Option<u32> { + butck_fixed_size::chunk_fixed_size_stream(current_data, len, stream, params).await + } +}
\ No newline at end of file diff --git a/policy/_policies/src/lib.rs.t b/policy/_policies/src/lib.rs.t new file mode 100644 index 0000000..873a4cd --- /dev/null +++ b/policy/_policies/src/lib.rs.t @@ -0,0 +1,117 @@ +// Auto generated dependencies +// If you find issues with the dependencies, please +// 1. Delete all code after this comment +// 2. Clear the auto generated part in `policy/_policies/Cargo.toml` +// 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory +pub mod error; +pub mod stream_read; + +use error::ChunkFailed; +use std::{collections::HashMap, path::Path}; + +use crate::stream_read::chunk_stream_process; + +/// Chunks the specified raw data using the specified chunking policy +/// +/// # Parameters +/// - `policy_name`: Chunking policy name, currently supports <<<policy_count>>> policies +/// - `raw_data`: Raw data byte slice +/// - `params`: Hashmap of parameters required by the chunking policy +pub async fn chunk_with( + policy_name: &str, + raw_data: &[u8], + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { +>>>>>>>>>> match_arms + _ => Err(ChunkFailed::PolicyNotFound), + } +} + +pub async fn chunk_stream_with( + policy_name: &str, + size: u32, + path: &Path, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ChunkFailed> { + match policy_name { +>>>>>>>>>> match_arms_stream + _ => Err(ChunkFailed::PolicyNotFound), + } +} + +pub fn policies() -> Vec<&'static str> { + vec![ +>>>>>>>>>> policy_names + ] +} + +>>>>>>>>>> exports_simple +>>>>>>>>>> exports_stream +>>>>>>>>>> exports_both + +@@@ >>> match_arms + "<<<crate_name>>>" => Ok(<<<crate_name>>>::chunk(raw_data, params).await), +@@@ <<< + +@@@ >>> match_arms_stream + "<<<crate_name>>>" => { + let mut stream = <<<stream_struct_id>>>::default(); + chunk_stream_process( + path, &mut stream, size, params, + async |current_data, len, stream, params| { + <<<crate_name>>>::chunk_stream(current_data, len, stream, params).await + }, + ) + .await + } +@@@ <<< + +@@@ >>> policy_names + // <<<name>>> + "<<<name>>>", +@@@ <<< + +@@@ >>> exports_simple +pub mod <<<crate_name>>> { + use std::collections::HashMap; + pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> { + <<<crate_name>>>::<<<matched_func>>>(raw_data, params)<<<has_await>>> + } +} +@@@ <<< + +@@@ >>> exports_stream +pub mod <<<crate_name>>> { + pub use <<<stream_struct_id>>>; + + pub async fn chunk_stream( + current_data: &[u8], + len: u32, + stream: &mut <<<stream_struct_id>>>, + params: &std::collections::HashMap<&str, &str>, + ) -> Option<u32> { + <<<crate_name>>>::<<<matched_func_stream>>>(current_data, len, stream, params)<<<has_await_stream>>> + } +} +@@@ <<< + +@@@ >>> exports_both +pub mod <<<crate_name>>> { + pub use <<<stream_struct_id>>>; + use std::collections::HashMap; + + pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> { + <<<crate_name>>>::<<<matched_func>>>(raw_data, params)<<<has_await>>> + } + + pub async fn chunk_stream( + current_data: &[u8], + len: u32, + stream: &mut <<<stream_struct_id>>>, + params: &HashMap<&str, &str>, + ) -> Option<u32> { + <<<crate_name>>>::<<<matched_func_stream>>>(current_data, len, stream, params)<<<has_await_stream>>> + } +} +@@@ <<< diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs new file mode 100644 index 0000000..5cf7791 --- /dev/null +++ b/policy/_policies/src/stream_read.rs @@ -0,0 +1,46 @@ +use crate::error::ChunkFailed; +use std::{collections::HashMap, path::Path}; + +pub async fn chunk_stream_process<T, F>( + path: &Path, + stream_data: &mut T, + size: u32, + params: &HashMap<&str, &str>, + chunk_func: F, +) -> Result<Vec<u32>, ChunkFailed> +where + T: Default, + F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>, +{ + let mut file = tokio::fs::File::open(path) + .await + .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?; + let mut buffer = vec![0u8; size as usize]; + let mut splits = Vec::new(); + let mut total_read = 0; + + loop { + let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer) + .await + .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?; + + if bytes_read == 0 { + break Ok(splits); + } + + // Process chunking on the buffer slice + let chunk_result = chunk_func( + &buffer[..bytes_read], + bytes_read as u32, + stream_data, + params, + ) + .await; + + if let Some(offset) = chunk_result { + splits.push(total_read + offset); + } + + total_read += bytes_read as u32; + } +} diff --git a/policy/butck/butck_fixed_size/Cargo.lock b/policy/butck/butck_fixed_size/Cargo.lock new file mode 100644 index 0000000..c1e1873 --- /dev/null +++ b/policy/butck/butck_fixed_size/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "butck_fixed_size" +version = "0.1.0" diff --git a/policy/butck/butck_fixed_size/Cargo.toml b/policy/butck/butck_fixed_size/Cargo.toml new file mode 100644 index 0000000..1550cb9 --- /dev/null +++ b/policy/butck/butck_fixed_size/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "butck_fixed_size" +authors = ["Butchunker"] +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/policy/butck/butck_fixed_size/src/lib.rs b/policy/butck/butck_fixed_size/src/lib.rs new file mode 100644 index 0000000..28cabff --- /dev/null +++ b/policy/butck/butck_fixed_size/src/lib.rs @@ -0,0 +1,48 @@ +use std::collections::HashMap; + +const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024; // 1MB + +fn get_chunk_size(params: &HashMap<&str, &str>) -> usize { + params + .get("size") + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_CHUNK_SIZE) +} + +pub async fn chunk_fixed_size(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> { + let chunk_size = get_chunk_size(params); + (chunk_size..raw_data.len()) + .step_by(chunk_size) + .map(|pos| pos as u32) + .collect() +} + +#[derive(Default)] +pub struct FixedSizeStream { + processed_bytes: usize, +} + +pub async fn chunk_fixed_size_stream( + _current_data: &[u8], + len: u32, + stream: &mut FixedSizeStream, + params: &HashMap<&str, &str>, +) -> Option<u32> { + let chunk_size = get_chunk_size(params); + let valid_len = len as usize; + + let prev_chunk = stream.processed_bytes / chunk_size; + let new_processed = stream.processed_bytes + valid_len; + let new_chunk = new_processed / chunk_size; + + if prev_chunk != new_chunk { + // Find chunk boundary in current data, update processed bytes, return position + let boundary_in_chunk = chunk_size - (stream.processed_bytes % chunk_size); + stream.processed_bytes += boundary_in_chunk; + Some(boundary_in_chunk.min(valid_len) as u32) + } else { + // Update bytes processed + stream.processed_bytes = new_processed; + None + } +} |
