1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::thread::JoinHandle;

use massa_channel::{receiver::MassaReceiver, sender::MassaSender};
use massa_metrics::MassaMetrics;
use massa_pool_exports::PoolController;
use massa_pos_exports::SelectorController;
use massa_protocol_exports::ProtocolConfig;
use massa_storage::Storage;

use crate::wrap_network::ActiveConnectionsTrait;

use self::{
    cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerPropagationCommand,
    commands_retrieval::EndorsementHandlerRetrievalCommand, propagation::start_propagation_thread,
    retrieval::start_retrieval_thread,
};

pub mod cache;
pub mod commands_propagation;
pub mod commands_retrieval;
mod messages;
mod propagation;
mod retrieval;

pub(crate) use messages::{EndorsementMessage, EndorsementMessageSerializer};
pub(crate) use retrieval::note_endorsements_from_peer;

use super::peer_handler::models::{PeerManagementCmd, PeerMessageTuple};

pub struct EndorsementHandler {
    pub endorsement_retrieval_thread: Option<(
        MassaSender<EndorsementHandlerRetrievalCommand>,
        JoinHandle<()>,
    )>,
    pub endorsement_propagation_thread: Option<(
        MassaSender<EndorsementHandlerPropagationCommand>,
        JoinHandle<()>,
    )>,
}

impl EndorsementHandler {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        pool_controller: Box<dyn PoolController>,
        selector_controller: Box<dyn SelectorController>,
        cache: SharedEndorsementCache,
        storage: Storage,
        config: ProtocolConfig,
        active_connections: Box<dyn ActiveConnectionsTrait>,
        receiver: MassaReceiver<PeerMessageTuple>,
        sender_retrieval_ext: MassaSender<EndorsementHandlerRetrievalCommand>,
        receiver_retrieval_ext: MassaReceiver<EndorsementHandlerRetrievalCommand>,
        local_sender: MassaSender<EndorsementHandlerPropagationCommand>,
        local_receiver: MassaReceiver<EndorsementHandlerPropagationCommand>,
        sender_peer_cmd: MassaSender<PeerManagementCmd>,
        massa_metrics: MassaMetrics,
    ) -> Self {
        let endorsement_retrieval_thread = start_retrieval_thread(
            receiver,
            receiver_retrieval_ext,
            local_sender.clone(),
            sender_peer_cmd,
            cache.clone(),
            selector_controller,
            pool_controller,
            config.clone(),
            storage.clone_without_refs(),
            massa_metrics,
        );

        let endorsement_propagation_thread =
            start_propagation_thread(local_receiver, cache, config, active_connections);
        Self {
            endorsement_retrieval_thread: Some((
                sender_retrieval_ext,
                endorsement_retrieval_thread,
            )),
            endorsement_propagation_thread: Some((local_sender, endorsement_propagation_thread)),
        }
    }

    pub fn stop(&mut self) {
        if let Some((tx, thread)) = self.endorsement_retrieval_thread.take() {
            let _ = tx.send(EndorsementHandlerRetrievalCommand::Stop);
            thread.join().unwrap();
        }
        if let Some((tx, thread)) = self.endorsement_propagation_thread.take() {
            let _ = tx.send(EndorsementHandlerPropagationCommand::Stop);
            thread.join().unwrap();
        }
    }
}