use std::{collections::HashMap, net::SocketAddr, time::Duration};
use massa_channel::{sender::MassaSender, MassaChannel};
use massa_models::{
block_header::SecuredHeader,
block_id::BlockId,
prehash::{PreHashMap, PreHashSet},
stats::NetworkStats,
};
use massa_protocol_exports::{BootstrapPeers, PeerId, ProtocolController, ProtocolError};
use massa_storage::Storage;
use peernet::peer::PeerConnectionType;
use crate::{
connectivity::ConnectivityCommand,
handlers::{
block_handler::{
commands_propagation::BlockHandlerPropagationCommand,
commands_retrieval::BlockHandlerRetrievalCommand,
},
endorsement_handler::commands_propagation::EndorsementHandlerPropagationCommand,
operation_handler::commands_propagation::OperationHandlerPropagationCommand,
peer_handler::models::PeerManagementCmd,
},
};
#[derive(Clone)]
pub struct ProtocolControllerImpl {
pub sender_block_retrieval_handler: Option<MassaSender<BlockHandlerRetrievalCommand>>,
pub sender_block_handler: Option<MassaSender<BlockHandlerPropagationCommand>>,
pub sender_operation_handler: Option<MassaSender<OperationHandlerPropagationCommand>>,
pub sender_endorsement_handler: Option<MassaSender<EndorsementHandlerPropagationCommand>>,
pub sender_connectivity_thread: Option<MassaSender<ConnectivityCommand>>,
pub sender_peer_management_thread: Option<MassaSender<PeerManagementCmd>>,
}
impl ProtocolControllerImpl {
pub fn new(
sender_block_retrieval_handler: MassaSender<BlockHandlerRetrievalCommand>,
sender_block_handler: MassaSender<BlockHandlerPropagationCommand>,
sender_operation_handler: MassaSender<OperationHandlerPropagationCommand>,
sender_endorsement_handler: MassaSender<EndorsementHandlerPropagationCommand>,
sender_connectivity_thread: MassaSender<ConnectivityCommand>,
sender_peer_management_thread: MassaSender<PeerManagementCmd>,
) -> Self {
ProtocolControllerImpl {
sender_block_retrieval_handler: Some(sender_block_retrieval_handler),
sender_block_handler: Some(sender_block_handler),
sender_operation_handler: Some(sender_operation_handler),
sender_endorsement_handler: Some(sender_endorsement_handler),
sender_connectivity_thread: Some(sender_connectivity_thread),
sender_peer_management_thread: Some(sender_peer_management_thread),
}
}
}
impl ProtocolController for ProtocolControllerImpl {
fn stop(&mut self) {
drop(self.sender_block_handler.take());
drop(self.sender_operation_handler.take());
drop(self.sender_endorsement_handler.take());
drop(self.sender_block_retrieval_handler.take());
}
fn integrated_block(&self, block_id: BlockId, storage: Storage) -> Result<(), ProtocolError> {
self.sender_block_handler
.as_ref()
.unwrap()
.try_send(BlockHandlerPropagationCommand::IntegratedBlock { block_id, storage })
.map_err(|_| ProtocolError::ChannelError("integrated_block command send error".into()))
}
fn notify_block_attack(&self, block_id: BlockId) -> Result<(), ProtocolError> {
self.sender_block_handler
.as_ref()
.unwrap()
.try_send(BlockHandlerPropagationCommand::AttackBlockDetected(
block_id,
))
.map_err(|_| {
ProtocolError::ChannelError("notify_block_attack command send error".into())
})
}
fn send_wishlist_delta(
&self,
new: PreHashMap<BlockId, Option<SecuredHeader>>,
remove: PreHashSet<BlockId>,
) -> Result<(), ProtocolError> {
self.sender_block_retrieval_handler
.as_ref()
.unwrap()
.send(BlockHandlerRetrievalCommand::WishlistDelta { new, remove })
.map_err(|_| {
ProtocolError::ChannelError("send_wishlist_delta command send error".into())
})
}
fn propagate_operations(&self, operations: Storage) -> Result<(), ProtocolError> {
self.sender_operation_handler
.as_ref()
.unwrap()
.try_send(OperationHandlerPropagationCommand::PropagateOperations(
operations,
))
.map_err(|_| {
ProtocolError::ChannelError("propagate_operations command send error".into())
})
}
fn propagate_endorsements(&self, endorsements: Storage) -> Result<(), ProtocolError> {
self.sender_endorsement_handler
.as_ref()
.unwrap()
.try_send(EndorsementHandlerPropagationCommand::PropagateEndorsements(
endorsements,
))
.map_err(|_| {
ProtocolError::ChannelError("propagate_endorsements command send error".into())
})
}
fn get_stats(
&self,
) -> Result<
(
NetworkStats,
HashMap<PeerId, (SocketAddr, PeerConnectionType)>,
),
ProtocolError,
> {
let (sender, receiver) = MassaChannel::new("get_stats".to_string(), Some(1));
self.sender_connectivity_thread
.as_ref()
.unwrap()
.try_send(ConnectivityCommand::GetStats { responder: sender })
.map_err(|_| ProtocolError::ChannelError("get_stats command send error".into()))?;
receiver
.recv_timeout(Duration::from_secs(10))
.map_err(|_| ProtocolError::ChannelError("get_stats command receive error".into()))
}
fn ban_peers(&self, peer_ids: Vec<PeerId>) -> Result<(), ProtocolError> {
self.sender_peer_management_thread
.as_ref()
.unwrap()
.try_send(PeerManagementCmd::Ban(peer_ids))
.map_err(|_| ProtocolError::ChannelError("ban_peers command send error".into()))
}
fn unban_peers(&self, peer_ids: Vec<PeerId>) -> Result<(), ProtocolError> {
self.sender_peer_management_thread
.as_ref()
.unwrap()
.try_send(PeerManagementCmd::Unban(peer_ids))
.map_err(|_| ProtocolError::ChannelError("unban_peers command send error".into()))
}
fn get_bootstrap_peers(&self) -> Result<BootstrapPeers, ProtocolError> {
let (sender, receiver) = MassaChannel::new("get_bootstrap_peers".to_string(), Some(1));
self.sender_peer_management_thread
.as_ref()
.unwrap()
.try_send(PeerManagementCmd::GetBootstrapPeers { responder: sender })
.map_err(|_| {
ProtocolError::ChannelError("get_bootstrap_peers command send error".into())
})?;
receiver.recv_timeout(Duration::from_secs(10)).map_err(|_| {
ProtocolError::ChannelError("get_bootstrap_peers command receive error".into())
})
}
fn clone_box(&self) -> Box<dyn ProtocolController> {
Box::new(self.clone())
}
}