1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
use super::{
cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerPropagationCommand,
messages::EndorsementMessageSerializer, EndorsementMessage,
};
use crate::{messages::MessagesSerializer, wrap_network::ActiveConnectionsTrait};
use massa_channel::receiver::MassaReceiver;
use massa_protocol_exports::ProtocolConfig;
use massa_storage::Storage;
use std::thread::JoinHandle;
use tracing::{info, log::warn};
// protocol-endorsement-handler-propagation
const THREAD_NAME: &str = "peh-propagation";
static_assertions::const_assert!(THREAD_NAME.len() < 16);
/// Endorsements need to propagate fast, so no buffering
struct PropagationThread {
receiver: MassaReceiver<EndorsementHandlerPropagationCommand>,
config: ProtocolConfig,
cache: SharedEndorsementCache,
active_connections: Box<dyn ActiveConnectionsTrait>,
endorsement_serializer: MessagesSerializer,
}
impl PropagationThread {
fn run(&mut self) {
let mut next_message = None;
loop {
// get the next message to process
let msg = match next_message.take() {
Some(msg) => msg,
None => match self.receiver.recv() {
Ok(msg) => msg,
Err(_) => {
info!("Stop endorsement propagation thread");
return;
}
},
};
match msg {
// endorsements to propagate
EndorsementHandlerPropagationCommand::PropagateEndorsements(mut endorsements) => {
// also drain any remaining propagation messages that might have accumulated
while let Ok(msg) = self.receiver.try_recv() {
match msg {
// we got more endorsements to propagate: extend the buffer
EndorsementHandlerPropagationCommand::PropagateEndorsements(
new_endorsements,
) => {
endorsements.extend(new_endorsements);
}
// we grabbed a message that is not a propagation message, mark it for processing
other_msg => {
next_message = Some(other_msg);
break;
}
}
}
// propagate the endorsements
self.propagate_endorsements(endorsements);
}
// stop the handler
EndorsementHandlerPropagationCommand::Stop => {
info!("Stop endorsement propagation thread");
return;
}
}
}
}
/// Perform propagation of endorsements to the connected peers
fn propagate_endorsements(&mut self, endorsements: Storage) {
// get all the endorsements to send
let endorsements: Vec<_> = {
let storage_lock = endorsements.read_endorsements();
endorsements
.get_endorsement_refs()
.iter()
.filter_map(|id| storage_lock.get(id).cloned())
.collect()
};
// get connected peers
let peers_connected = self.active_connections.get_peer_ids_connected();
// get a write lock on the cache
let mut cache_write = self.cache.write();
// mark that we have checked those endorsements
for endorsement in &endorsements {
cache_write.checked_endorsements.insert(endorsement.id, ());
}
// Add peers that potentially don't exist in cache and remove the ones that disconnected
cache_write.update_cache(&peers_connected);
// Propagate to peers
'peer_loop: for peer_id in peers_connected {
// write access to the cache of which endorsements are known by the peer
let peer_knowledge = cache_write
.endorsements_known_by_peer
.get_mut(&peer_id)
.expect("update_cache should have added connected peer to cache");
// get endorsements that are not known by the peer
let to_send: Vec<_> = endorsements
.iter()
.filter(|endorsement| peer_knowledge.peek(&endorsement.id).is_none())
.collect();
if to_send.is_empty() {
// nothing to send to that peer, try the next one
continue 'peer_loop;
}
// send by chunks
for chunk in to_send.chunks(self.config.max_endorsements_per_message as usize) {
if let Err(err) = self.active_connections.send_to_peer(
&peer_id,
&self.endorsement_serializer,
EndorsementMessage::Endorsements(chunk.iter().map(|&e| e.clone()).collect())
.into(),
false,
) {
warn!(
"could not send endorsements batch to node {}: {}",
peer_id, err
);
// try with next peer, this one is probably congested
continue 'peer_loop;
}
// sent successfully: mark peer as knowing the endorsements that were sent to it
for endorsement in chunk {
peer_knowledge.insert(endorsement.id, ());
}
}
}
}
}
pub fn start_propagation_thread(
receiver: MassaReceiver<EndorsementHandlerPropagationCommand>,
cache: SharedEndorsementCache,
config: ProtocolConfig,
active_connections: Box<dyn ActiveConnectionsTrait>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name(THREAD_NAME.to_string())
.spawn(move || {
let endorsement_serializer = MessagesSerializer::new()
.with_endorsement_message_serializer(EndorsementMessageSerializer::new());
let mut propagation_thread = PropagationThread {
receiver,
config,
active_connections,
cache,
endorsement_serializer,
};
propagation_thread.run();
})
.expect("OS failed to start endorsement propagation thread")
}