1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::thread::JoinHandle;

use massa_channel::{receiver::MassaReceiver, sender::MassaSender};
use massa_consensus_exports::ConsensusController;
use massa_metrics::MassaMetrics;
use massa_pool_exports::PoolController;
use massa_pos_exports::SelectorController;
use massa_protocol_exports::ProtocolConfig;
use massa_storage::Storage;
use massa_versioning::versioning::MipStore;

use crate::wrap_network::ActiveConnectionsTrait;

use self::{
    cache::SharedBlockCache, commands_propagation::BlockHandlerPropagationCommand,
    commands_retrieval::BlockHandlerRetrievalCommand, propagation::start_propagation_thread,
    retrieval::start_retrieval_thread,
};

pub mod cache;
pub mod commands_propagation;
pub mod commands_retrieval;
pub mod messages;
mod propagation;
mod retrieval;

pub(crate) use messages::{BlockMessage, BlockMessageSerializer};

#[cfg(test)]
pub use messages::{AskForBlockInfo, BlockInfoReply};

use super::{
    endorsement_handler::{
        cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerPropagationCommand,
    },
    operation_handler::{
        cache::SharedOperationCache, commands_propagation::OperationHandlerPropagationCommand,
    },
    peer_handler::models::{PeerManagementCmd, PeerMessageTuple},
};

pub struct BlockHandler {
    pub block_retrieval_thread: Option<(MassaSender<BlockHandlerRetrievalCommand>, JoinHandle<()>)>,
    pub block_propagation_thread:
        Option<(MassaSender<BlockHandlerPropagationCommand>, JoinHandle<()>)>,
}

impl BlockHandler {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        active_connections: Box<dyn ActiveConnectionsTrait>,
        selector_controller: Box<dyn SelectorController>,
        consensus_controller: Box<dyn ConsensusController>,
        pool_controller: Box<dyn PoolController>,
        receiver_network: MassaReceiver<PeerMessageTuple>,
        sender_ext: MassaSender<BlockHandlerRetrievalCommand>,
        receiver_ext: MassaReceiver<BlockHandlerRetrievalCommand>,
        internal_receiver: MassaReceiver<BlockHandlerPropagationCommand>,
        internal_sender: MassaSender<BlockHandlerPropagationCommand>,
        sender_propagations_ops: MassaSender<OperationHandlerPropagationCommand>,
        sender_propagations_endorsements: MassaSender<EndorsementHandlerPropagationCommand>,
        peer_cmd_sender: MassaSender<PeerManagementCmd>,
        config: ProtocolConfig,
        endorsement_cache: SharedEndorsementCache,
        operation_cache: SharedOperationCache,
        cache: SharedBlockCache,
        storage: Storage,
        mip_store: MipStore,
        massa_metrics: MassaMetrics,
    ) -> Self {
        let block_retrieval_thread = start_retrieval_thread(
            active_connections.clone(),
            selector_controller,
            consensus_controller,
            pool_controller,
            receiver_network,
            receiver_ext,
            internal_sender.clone(),
            sender_propagations_ops,
            sender_propagations_endorsements,
            peer_cmd_sender.clone(),
            config.clone(),
            endorsement_cache,
            operation_cache,
            cache.clone(),
            storage.clone_without_refs(),
            mip_store,
            massa_metrics,
        );
        let block_propagation_thread = start_propagation_thread(
            active_connections,
            internal_receiver,
            peer_cmd_sender,
            config,
            cache,
        );
        Self {
            block_retrieval_thread: Some((sender_ext, block_retrieval_thread)),
            block_propagation_thread: Some((internal_sender, block_propagation_thread)),
        }
    }

    pub fn stop(&mut self) {
        if let Some((tx, thread)) = self.block_retrieval_thread.take() {
            let _ = tx.send(BlockHandlerRetrievalCommand::Stop);
            thread.join().unwrap();
        }
        if let Some((tx, thread)) = self.block_propagation_thread.take() {
            let _ = tx.send(BlockHandlerPropagationCommand::Stop);
            thread.join().unwrap();
        }
    }
}