summaryrefslogtreecommitdiff
path: root/crates/utils/tcp_connection/src/target_connection.rs
blob: b03093cd5ee2bca900cd38649305d430759af718 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use tokio::{
    net::{TcpListener, TcpSocket},
    spawn,
};

use crate::{
    error::TcpTargetError,
    handle::{ClientHandle, ServerHandle},
    instance::ConnectionInstance,
    target::TcpServerTarget,
    target_configure::ServerTargetConfig,
};

impl<Client, Server> TcpServerTarget<Client, Server>
where
    Client: ClientHandle<Server>,
    Server: ServerHandle<Client>,
{
    /// Attempts to establish a connection to the TCP server.
    /// This function initiates a connection to the server address specified in the target configuration.
    /// This is a Block operation.
    pub async fn connect(&self) -> Result<(), TcpTargetError> {
        let addr = self.get_addr();
        let Ok(socket) = TcpSocket::new_v4() else {
            return Err(TcpTargetError::from("Create tcp socket failed!"));
        };
        let stream = match socket.connect(addr.clone()).await {
            Ok(stream) => stream,
            Err(e) => {
                let err = format!("Connect to `{}` failed: {}", addr, e);
                return Err(TcpTargetError::from(err));
            }
        };
        let instance = ConnectionInstance::from(stream);
        Client::process(instance).await;
        Ok(())
    }

    /// Attempts to establish a connection to the TCP server.
    /// This function initiates a connection to the server address specified in the target configuration.
    pub async fn listen(&self) -> Result<(), TcpTargetError> {
        let addr = self.get_addr();
        let listener = match TcpListener::bind(addr.clone()).await {
            Ok(listener) => listener,
            Err(_) => {
                let err = format!("Bind to `{}` failed", addr);
                return Err(TcpTargetError::from(err));
            }
        };

        let cfg: ServerTargetConfig = match self.get_server_cfg() {
            Some(cfg) => cfg.clone(),
            None => ServerTargetConfig::default(),
        };

        if cfg.is_once() {
            // Process once (Blocked)
            let (stream, _) = match listener.accept().await {
                Ok(result) => result,
                Err(e) => {
                    let err = format!("Accept connection failed: {}", e);
                    return Err(TcpTargetError::from(err));
                }
            };
            let instance = ConnectionInstance::from(stream);
            Server::process(instance).await;
        } else {
            loop {
                // Process multiple times (Concurrent)
                let (stream, _) = match listener.accept().await {
                    Ok(result) => result,
                    Err(e) => {
                        let err = format!("Accept connection failed: {}", e);
                        return Err(TcpTargetError::from(err));
                    }
                };
                let instance = ConnectionInstance::from(stream);
                spawn(async move {
                    Server::process(instance).await;
                });
            }
        }
        Ok(())
    }
}