summaryrefslogtreecommitdiff
path: root/policy
diff options
context:
space:
mode:
Diffstat (limited to 'policy')
-rw-r--r--policy/README.md96
-rw-r--r--policy/_policies/Cargo.lock7
-rw-r--r--policy/_policies/Cargo.toml15
-rw-r--r--policy/_policies/Cargo.toml.t19
-rw-r--r--policy/_policies/src/error.rs14
-rw-r--r--policy/_policies/src/lib.rs75
-rw-r--r--policy/_policies/src/lib.rs.t117
-rw-r--r--policy/_policies/src/stream_read.rs46
-rw-r--r--policy/butck/butck_fixed_size/Cargo.lock7
-rw-r--r--policy/butck/butck_fixed_size/Cargo.toml7
-rw-r--r--policy/butck/butck_fixed_size/src/lib.rs48
11 files changed, 451 insertions, 0 deletions
diff --git a/policy/README.md b/policy/README.md
new file mode 100644
index 0000000..c995506
--- /dev/null
+++ b/policy/README.md
@@ -0,0 +1,96 @@
+# Write your Policy!
+
+Welcome to the Butchunker Policy Development Guide. This guide explains how to create a custom chunking policy for Butchunker. A chunking policy defines how to split data streams or files into chunks. This is a core task for data deduplication, storage, and transfer.
+
+Before starting, you should know basic Rust and understand the Butchunker framework. Your policy will decide where to split the data based on its content and your settings.
+
+## Creating a Policy Crate
+
+First, create a new `Rust Crate` to host your chunking policy.
+
+### Writing `Cargo.toml`
+
+```toml
+[package]
+name = "butck_fixed_size" # Policy name
+authors = ["Butchunker"] # Author info
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+```
+
+## Implementing Policy Logic
+
+### Writing `src/lib.rs`
+
+In `src/lib.rs`, implement one or both of the following schemes:
+
+#### Scheme 1: Streaming Processing Scheme
+
+Suitable for processing large files where subsequent content cannot be predicted, but also does not require loading the entire file into memory.
+
+```rust
+use std::collections::HashMap;
+
+// Streaming policy struct (must implement the Default trait)
+#[derive(Default)]
+pub struct YourPolicyStream {
+ // Define your state fields here
+}
+
+// Streaming processing function
+pub async fn your_policy_stream(
+ current_data: &[u8], // Current data chunk
+ len: u32, // Data length
+ stream: &mut FixedSizeStream, // Streaming processing context
+ params: &HashMap<&str, &str>, // Configuration parameters
+) -> Option<u32> {
+ // Implement your chunking logic
+ // Return the split position (offset from the start of current_data), or None if no split
+ None
+}
+```
+
+#### Scheme 2: Simple Processing Scheme
+
+Suitable for processing small to medium-sized files that can be loaded entirely at once, allowing knowledge of subsequent data during chunking for better results.
+
+```rust
+use std::collections::HashMap;
+
+// Simple processing function
+pub async fn your_policy(
+ raw_data: &[u8], // Raw data
+ params: &HashMap<&str, &str>, // Configuration parameters
+) -> Vec<u32> {
+ // Implement your chunking logic
+ // Return a vector of all split positions (offsets from the start of raw_data)
+ vec![]
+}
+```
+
+## Registration and Usage
+
+### Deploying the Policy
+
+1. Place the completed policy `Crate` into the `./policy/` directory of the Butchunker repository.
+2. Use the `butckrepo-refresh` program to refresh the registry:
+ - If the program is not yet installed, you can execute the following in the root directory of the Butchunker repository:
+
+ ```bash
+ cargo install --path ./
+ ```
+3. After each policy library update, you must:
+ - Execute `butckrepo-refresh` to refresh the registry.
+ - Reinstall the `butck` binary: `cargo install --path ./`.
+
+### Calling the Policy
+
+- The policy will be automatically registered in Butchunker's registry.
+
+ Use the following command to call the policy:
+
+ ````rust
+ butck write <file> --policy <policy_name> --storage ./
+ ````
diff --git a/policy/_policies/Cargo.lock b/policy/_policies/Cargo.lock
new file mode 100644
index 0000000..8f7bc05
--- /dev/null
+++ b/policy/_policies/Cargo.lock
@@ -0,0 +1,7 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "butck_policies"
+version = "0.1.0"
diff --git a/policy/_policies/Cargo.toml b/policy/_policies/Cargo.toml
new file mode 100644
index 0000000..d939dd2
--- /dev/null
+++ b/policy/_policies/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "butck_policies"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+thiserror = "2"
+tokio = { version = "1", features = ["fs"] }
+
+# Auto generated dependencies
+# If you find issues with the dependencies, please
+# 1. Delete all code after this comment
+# 2. Clear the file `policy/_policies/src/lib.rs`
+# 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory
+butck_fixed_size = { path = "../../policy/butck/butck_fixed_size" } \ No newline at end of file
diff --git a/policy/_policies/Cargo.toml.t b/policy/_policies/Cargo.toml.t
new file mode 100644
index 0000000..aab90b9
--- /dev/null
+++ b/policy/_policies/Cargo.toml.t
@@ -0,0 +1,19 @@
+[package]
+name = "butck_policies"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+thiserror = "2"
+tokio = { version = "1", features = ["fs"] }
+
+# Auto generated dependencies
+# If you find issues with the dependencies, please
+# 1. Delete all code after this comment
+# 2. Clear the file `policy/_policies/src/lib.rs`
+# 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory
+>>>>>>>>>> deps
+
+@@@ >>> deps
+<<<crate_name>>> = { path = "../../<<<path>>>" }
+@@@ <<<
diff --git a/policy/_policies/src/error.rs b/policy/_policies/src/error.rs
new file mode 100644
index 0000000..975749d
--- /dev/null
+++ b/policy/_policies/src/error.rs
@@ -0,0 +1,14 @@
+#[derive(Debug, thiserror::Error)]
+pub enum ChunkFailed {
+ #[error("IO error: {0}")]
+ Io(#[from] std::io::Error),
+
+ #[error("Target policy not found")]
+ PolicyNotFound,
+
+ #[error("File read failed: {0}")]
+ FileReadFailed(std::path::PathBuf),
+
+ #[error("File open failed: {0}")]
+ FileOpenFailed(std::path::PathBuf),
+}
diff --git a/policy/_policies/src/lib.rs b/policy/_policies/src/lib.rs
new file mode 100644
index 0000000..397579a
--- /dev/null
+++ b/policy/_policies/src/lib.rs
@@ -0,0 +1,75 @@
+// Auto generated dependencies
+// If you find issues with the dependencies, please
+// 1. Delete all code after this comment
+// 2. Clear the auto generated part in `policy/_policies/Cargo.toml`
+// 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory
+pub mod error;
+pub mod stream_read;
+
+use error::ChunkFailed;
+use std::{collections::HashMap, path::Path};
+
+use crate::stream_read::chunk_stream_process;
+
+/// Chunks the specified raw data using the specified chunking policy
+///
+/// # Parameters
+/// - `policy_name`: Chunking policy name, currently supports 1 policies
+/// - `raw_data`: Raw data byte slice
+/// - `params`: Hashmap of parameters required by the chunking policy
+pub async fn chunk_with(
+ policy_name: &str,
+ raw_data: &[u8],
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+ "butck_fixed_size" => Ok(butck_fixed_size::chunk(raw_data, params).await),
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
+pub async fn chunk_stream_with(
+ policy_name: &str,
+ size: u32,
+ path: &Path,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+ "butck_fixed_size" => {
+ let mut stream = butck_fixed_size::FixedSizeStream::default();
+ chunk_stream_process(
+ path, &mut stream, size, params,
+ async |current_data, len, stream, params| {
+ butck_fixed_size::chunk_stream(current_data, len, stream, params).await
+ },
+ )
+ .await
+ }
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
+pub fn policies() -> Vec<&'static str> {
+ vec![
+ // butck_fixed_size
+ "butck_fixed_size",
+ ]
+}
+
+pub mod butck_fixed_size {
+ pub use butck_fixed_size::FixedSizeStream;
+ use std::collections::HashMap;
+
+ pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> {
+ butck_fixed_size::chunk_fixed_size(raw_data, params).await
+ }
+
+ pub async fn chunk_stream(
+ current_data: &[u8],
+ len: u32,
+ stream: &mut butck_fixed_size::FixedSizeStream,
+ params: &HashMap<&str, &str>,
+ ) -> Option<u32> {
+ butck_fixed_size::chunk_fixed_size_stream(current_data, len, stream, params).await
+ }
+} \ No newline at end of file
diff --git a/policy/_policies/src/lib.rs.t b/policy/_policies/src/lib.rs.t
new file mode 100644
index 0000000..873a4cd
--- /dev/null
+++ b/policy/_policies/src/lib.rs.t
@@ -0,0 +1,117 @@
+// Auto generated dependencies
+// If you find issues with the dependencies, please
+// 1. Delete all code after this comment
+// 2. Clear the auto generated part in `policy/_policies/Cargo.toml`
+// 3. Run `cargo run --bin butckrepo-refresh` in the Butchunker root directory
+pub mod error;
+pub mod stream_read;
+
+use error::ChunkFailed;
+use std::{collections::HashMap, path::Path};
+
+use crate::stream_read::chunk_stream_process;
+
+/// Chunks the specified raw data using the specified chunking policy
+///
+/// # Parameters
+/// - `policy_name`: Chunking policy name, currently supports <<<policy_count>>> policies
+/// - `raw_data`: Raw data byte slice
+/// - `params`: Hashmap of parameters required by the chunking policy
+pub async fn chunk_with(
+ policy_name: &str,
+ raw_data: &[u8],
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+>>>>>>>>>> match_arms
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
+pub async fn chunk_stream_with(
+ policy_name: &str,
+ size: u32,
+ path: &Path,
+ params: &HashMap<&str, &str>,
+) -> Result<Vec<u32>, ChunkFailed> {
+ match policy_name {
+>>>>>>>>>> match_arms_stream
+ _ => Err(ChunkFailed::PolicyNotFound),
+ }
+}
+
+pub fn policies() -> Vec<&'static str> {
+ vec![
+>>>>>>>>>> policy_names
+ ]
+}
+
+>>>>>>>>>> exports_simple
+>>>>>>>>>> exports_stream
+>>>>>>>>>> exports_both
+
+@@@ >>> match_arms
+ "<<<crate_name>>>" => Ok(<<<crate_name>>>::chunk(raw_data, params).await),
+@@@ <<<
+
+@@@ >>> match_arms_stream
+ "<<<crate_name>>>" => {
+ let mut stream = <<<stream_struct_id>>>::default();
+ chunk_stream_process(
+ path, &mut stream, size, params,
+ async |current_data, len, stream, params| {
+ <<<crate_name>>>::chunk_stream(current_data, len, stream, params).await
+ },
+ )
+ .await
+ }
+@@@ <<<
+
+@@@ >>> policy_names
+ // <<<name>>>
+ "<<<name>>>",
+@@@ <<<
+
+@@@ >>> exports_simple
+pub mod <<<crate_name>>> {
+ use std::collections::HashMap;
+ pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> {
+ <<<crate_name>>>::<<<matched_func>>>(raw_data, params)<<<has_await>>>
+ }
+}
+@@@ <<<
+
+@@@ >>> exports_stream
+pub mod <<<crate_name>>> {
+ pub use <<<stream_struct_id>>>;
+
+ pub async fn chunk_stream(
+ current_data: &[u8],
+ len: u32,
+ stream: &mut <<<stream_struct_id>>>,
+ params: &std::collections::HashMap<&str, &str>,
+ ) -> Option<u32> {
+ <<<crate_name>>>::<<<matched_func_stream>>>(current_data, len, stream, params)<<<has_await_stream>>>
+ }
+}
+@@@ <<<
+
+@@@ >>> exports_both
+pub mod <<<crate_name>>> {
+ pub use <<<stream_struct_id>>>;
+ use std::collections::HashMap;
+
+ pub async fn chunk(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> {
+ <<<crate_name>>>::<<<matched_func>>>(raw_data, params)<<<has_await>>>
+ }
+
+ pub async fn chunk_stream(
+ current_data: &[u8],
+ len: u32,
+ stream: &mut <<<stream_struct_id>>>,
+ params: &HashMap<&str, &str>,
+ ) -> Option<u32> {
+ <<<crate_name>>>::<<<matched_func_stream>>>(current_data, len, stream, params)<<<has_await_stream>>>
+ }
+}
+@@@ <<<
diff --git a/policy/_policies/src/stream_read.rs b/policy/_policies/src/stream_read.rs
new file mode 100644
index 0000000..5cf7791
--- /dev/null
+++ b/policy/_policies/src/stream_read.rs
@@ -0,0 +1,46 @@
+use crate::error::ChunkFailed;
+use std::{collections::HashMap, path::Path};
+
+pub async fn chunk_stream_process<T, F>(
+ path: &Path,
+ stream_data: &mut T,
+ size: u32,
+ params: &HashMap<&str, &str>,
+ chunk_func: F,
+) -> Result<Vec<u32>, ChunkFailed>
+where
+ T: Default,
+ F: AsyncFn(&[u8], u32, &mut T, &HashMap<&str, &str>) -> Option<u32>,
+{
+ let mut file = tokio::fs::File::open(path)
+ .await
+ .map_err(|_| ChunkFailed::FileOpenFailed(path.to_path_buf()))?;
+ let mut buffer = vec![0u8; size as usize];
+ let mut splits = Vec::new();
+ let mut total_read = 0;
+
+ loop {
+ let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
+ .await
+ .map_err(|_| ChunkFailed::FileReadFailed(path.to_path_buf()))?;
+
+ if bytes_read == 0 {
+ break Ok(splits);
+ }
+
+ // Process chunking on the buffer slice
+ let chunk_result = chunk_func(
+ &buffer[..bytes_read],
+ bytes_read as u32,
+ stream_data,
+ params,
+ )
+ .await;
+
+ if let Some(offset) = chunk_result {
+ splits.push(total_read + offset);
+ }
+
+ total_read += bytes_read as u32;
+ }
+}
diff --git a/policy/butck/butck_fixed_size/Cargo.lock b/policy/butck/butck_fixed_size/Cargo.lock
new file mode 100644
index 0000000..c1e1873
--- /dev/null
+++ b/policy/butck/butck_fixed_size/Cargo.lock
@@ -0,0 +1,7 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "butck_fixed_size"
+version = "0.1.0"
diff --git a/policy/butck/butck_fixed_size/Cargo.toml b/policy/butck/butck_fixed_size/Cargo.toml
new file mode 100644
index 0000000..1550cb9
--- /dev/null
+++ b/policy/butck/butck_fixed_size/Cargo.toml
@@ -0,0 +1,7 @@
+[package]
+name = "butck_fixed_size"
+authors = ["Butchunker"]
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
diff --git a/policy/butck/butck_fixed_size/src/lib.rs b/policy/butck/butck_fixed_size/src/lib.rs
new file mode 100644
index 0000000..28cabff
--- /dev/null
+++ b/policy/butck/butck_fixed_size/src/lib.rs
@@ -0,0 +1,48 @@
+use std::collections::HashMap;
+
+const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024; // 1MB
+
+fn get_chunk_size(params: &HashMap<&str, &str>) -> usize {
+ params
+ .get("size")
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(DEFAULT_CHUNK_SIZE)
+}
+
+pub async fn chunk_fixed_size(raw_data: &[u8], params: &HashMap<&str, &str>) -> Vec<u32> {
+ let chunk_size = get_chunk_size(params);
+ (chunk_size..raw_data.len())
+ .step_by(chunk_size)
+ .map(|pos| pos as u32)
+ .collect()
+}
+
+#[derive(Default)]
+pub struct FixedSizeStream {
+ processed_bytes: usize,
+}
+
+pub async fn chunk_fixed_size_stream(
+ _current_data: &[u8],
+ len: u32,
+ stream: &mut FixedSizeStream,
+ params: &HashMap<&str, &str>,
+) -> Option<u32> {
+ let chunk_size = get_chunk_size(params);
+ let valid_len = len as usize;
+
+ let prev_chunk = stream.processed_bytes / chunk_size;
+ let new_processed = stream.processed_bytes + valid_len;
+ let new_chunk = new_processed / chunk_size;
+
+ if prev_chunk != new_chunk {
+ // Find chunk boundary in current data, update processed bytes, return position
+ let boundary_in_chunk = chunk_size - (stream.processed_bytes % chunk_size);
+ stream.processed_bytes += boundary_in_chunk;
+ Some(boundary_in_chunk.min(valid_len) as u32)
+ } else {
+ // Update bytes processed
+ stream.processed_bytes = new_processed;
+ None
+ }
+}