summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write.rs
blob: 9348901235767ed9ae4264a26919238ffb3b1749 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
};

use log::trace;

use crate::{
    chunker::{
        constants::BUTCK_INDEX_FILE_SUFFIX,
        context::ButckContext,
        rw::{
            error::{ButckRWError, ButckRWErrorKind},
            storage::generate_unique_path,
        },
    },
    storage::{simple::write_file_simple, stream::write_file_stream},
};

pub mod simple;
pub mod stream;

pub async fn write(ctx: ButckContext) -> Result<(), ButckRWError> {
    if ctx.storage_path.is_none() {
        return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx));
    }
    if ctx.policy_name.is_none() {
        return Err(ButckRWErrorKind::ChunkingPolicyNotSpecified.pack(ctx));
    }
    if ctx.file_paths.len() > 1 && ctx.output_file.is_some() {
        return Err(ButckRWErrorKind::OutputCountMismatch.pack(ctx));
    }

    // Cannot enable both memory-mapped and stream reading simultaneously.
    // Stream reading uses butck_policies::chunk_stream_with,
    // while memory-mapped or default reading uses butck_policies::chunk_with.
    if ctx.memmap_read && ctx.stream_read.is_some() {
        return Err(ButckRWErrorKind::ReadingMethodAmbiguous.pack(ctx));
    }

    let param_refs: HashMap<&str, &str> = ctx
        .params
        .iter()
        .map(|(k, v)| (k.as_str(), v.as_str()))
        .collect();

    let tasks: Vec<_> = ctx
        .file_paths
        .iter()
        .map(|path| async {
            trace!("Preparing to write file `{}`", path.display());
            write_file(path, &ctx, &param_refs).await
        })
        .collect();

    let results = futures::future::join_all(tasks).await;

    for result in results {
        if let Err(e) = result {
            return Err(e.pack(ctx));
        }
    }

    Ok(())
}

async fn write_file(
    path: &PathBuf,
    ctx: &ButckContext,
    params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
    if let Some(stream_read_size) = ctx.stream_read {
        write_file_stream(path, stream_read_size, ctx, params).await
    } else {
        write_file_simple(path, ctx, params).await
    }
}

pub fn get_index_file_name(path: &Path, ctx: &ButckContext) -> PathBuf {
    let output_file = if let Some(output_file) = &ctx.output_file {
        return output_file.clone();
    } else {
        ctx.output_dir.join(path.file_name().unwrap_or_default())
    };

    // Append .bidx suffix directly to the original file name
    let desired_filename = if let Some(ext) = output_file.extension() {
        let ext_str = ext.to_string_lossy();
        if ext_str.is_empty() {
            format!(
                "{}.{}",
                output_file
                    .file_stem()
                    .unwrap_or_default()
                    .to_string_lossy(),
                BUTCK_INDEX_FILE_SUFFIX
            )
        } else {
            format!(
                "{}.{}.{}",
                output_file
                    .file_stem()
                    .unwrap_or_default()
                    .to_string_lossy(),
                ext_str,
                BUTCK_INDEX_FILE_SUFFIX
            )
        }
    } else {
        format!(
            "{}.{}",
            output_file
                .file_name()
                .unwrap_or_default()
                .to_string_lossy(),
            BUTCK_INDEX_FILE_SUFFIX
        )
    };

    generate_unique_path(&ctx.output_dir, &desired_filename)
}