1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
//! Massa Channel is a crossbeam channel wrapper with prometheus metrics
//! expose for each channel :
//! - actual length of channel (can be inc() when sending msg or dec() when receive)
//! - total received messages (inc() when receive)
//!
//! # Example
//! ```
//! use massa_channel::MassaChannel;
//! let (sender, receiver) = MassaChannel::new::<String>("test".to_string(), None);
//! ```
//!
//! # Warning
//! care about use MassaReceiver with select! macro
//! select! does not call recv() so metrics will not be updated
//! you should call `your_receiver.update_metrics()` manually
use std::sync::Arc;
use receiver::MassaReceiver;
use sender::MassaSender;
pub mod receiver;
pub mod sender;
#[derive(Clone)]
pub struct MassaChannel {}
impl MassaChannel {
#[allow(clippy::new_ret_no_self)]
pub fn new<T>(name: String, capacity: Option<usize>) -> (MassaSender<T>, MassaReceiver<T>) {
use prometheus::{Counter, Gauge};
let (s, r) = if let Some(capacity) = capacity {
crossbeam::channel::bounded::<T>(capacity)
} else {
crossbeam::channel::unbounded::<T>()
};
// Create gauge for actual length of channel
// this can be inc() when sending msg or dec() when receive
let actual_len = Gauge::new(
format!("{}_channel_actual_size", name),
"Actual length of channel",
)
.expect("Failed to create gauge");
// Create counter for total received messages
let received = Counter::new(
format!("{}_channel_total_receive", name),
"Total received messages",
)
.expect("Failed to create counter");
// Register metrics in prometheus
// error here if metrics already registered (ex : ProtocolController>::get_stats )
#[cfg(not(feature = "test-exports"))]
{
use tracing::debug;
if let Err(e) = prometheus::register(Box::new(actual_len.clone())) {
debug!("Failed to register actual_len gauge for {} : {}", name, e);
}
if let Err(e) = prometheus::register(Box::new(received.clone())) {
debug!("Failed to register received counter for {} : {}", name, e);
}
}
let sender = MassaSender {
sender: s,
name: name.clone(),
actual_len: actual_len.clone(),
};
let receiver = MassaReceiver {
receiver: r,
name,
actual_len,
received,
ref_counter: Arc::new(()),
};
(sender, receiver)
}
}