1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! A link in a processing chain that connects one `Node` to another.
//!
//! `Link` serves as a container for chaining two `Node` instances together,
//! where the output of the first node is fed as the input to the next.

use async_trait::async_trait;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::error::AnchorChainError;
use crate::node::{Node, Stateful};
use crate::state_manager::StateManager;

/// A link in a processing chain that connects one `Node` to another.
///
/// `Link` serves as a container for chaining two `Node` instances together,
/// where the output of the first node is fed as the input to the next.
#[derive(Debug)]
pub struct Link<C, N>
where
    C: Debug,
    N: Debug,
{
    /// The first node in the chain.
    pub node: C,
    /// The next node or link in the chain.
    pub next: N,
}

impl<C, N> Link<C, N>
where
    C: Debug,
    N: Debug,
{
    /// Creates a new `Link` connecting the specified nodes.
    ///
    /// The `node` is linked with the `next` node in the chain. Output from the
    /// `node` is passed as input to the `next` node. Either node can also be
    /// a `Link` forming a nested linked list of nodes.
    pub fn new(node: C, next: N) -> Self {
        Link { node, next }
    }
}

#[async_trait]
impl<C, N> Node for Link<C, N>
where
    C: Node + Send + Sync + Debug,
    C::Output: Send + 'static,
    C::Input: Send,
    N: Node<Input = C::Output> + Send + Sync + Debug,
    N::Output: Send,
{
    /// The input type for the current node
    type Input = C::Input;
    /// The output type of the next node
    type Output = <N as Node>::Output;

    /// Processes the given input through the chain of nodes.
    ///
    /// First, the input is processed by the current node. Then, the output of the current
    /// node is passed to the next node or link in the chain for further processing.
    async fn process(&self, input: Self::Input) -> Result<Self::Output, AnchorChainError> {
        let output = self.node.process(input).await?;
        self.next.process(output).await
    }
}

/// A stateful link in a processing chain that connects one `Node` to another.
///
/// `StatefulLink` serves as a container for chaining two `Node` instances together,
/// where the output of the first node is fed as the input to the next. The chain's
/// `StateManager` is passed to each stateful node before processing.
#[derive(Debug)]
pub struct StatefulLink<C, N, K, V>
where
    C: Debug,
    N: Debug,
    K: Debug + Send,
    V: Debug + Send,
{
    pub node: C,
    pub next: Arc<Mutex<N>>,
    pub state: StateManager<K, V>,
}

impl<C, N, K, V> StatefulLink<C, N, K, V>
where
    C: Debug,
    N: Debug,
    K: Debug + Send,
    V: Debug + Send,
{
    pub fn new(node: C, next: N, memory: StateManager<K, V>) -> Self {
        Self {
            node,
            next: Arc::new(Mutex::new(next)),
            state: memory,
        }
    }
}

#[async_trait]
impl<C, N, K, V> Node for StatefulLink<C, N, K, V>
where
    C: Node + Send + Sync + Debug,
    C::Output: Send + 'static,
    C::Input: Send,
    N: Node<Input = C::Output> + Stateful<K, V> + Send + Sync + Debug,
    N::Output: Send,
    K: Eq + Hash + Clone + Send + Sync + Debug,
    V: Clone + Send + Sync + Debug,
{
    type Input = C::Input;
    type Output = <N as Node>::Output;

    async fn process(&self, input: Self::Input) -> Result<Self::Output, AnchorChainError> {
        let output = self.node.process(input).await?;
        let mut next_node = self.next.lock().await;
        next_node.set_state(self.state.clone()).await;
        next_node.process(output).await
    }
}