1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//! Massa Channel is a crossbeam channel wrapper with prometheus metrics
//! expose for each channel :
//! - actual length of channel (can be inc() when sending msg or dec() when receive)
//! - total received messages (inc() when receive)
//!
//! # Example
//! ```
//! use massa_channel::MassaChannel;
//! let (sender, receiver) = MassaChannel::new::<String>("test".to_string(), None);
//! ```
//!
//! # Warning
//! care about use MassaReceiver with select! macro
//! select! does not call recv() so metrics will not be updated
//! you should call `your_receiver.update_metrics()` manually

use std::sync::Arc;

use receiver::MassaReceiver;
use sender::MassaSender;

pub mod receiver;
pub mod sender;

#[derive(Clone)]
pub struct MassaChannel {}

impl MassaChannel {
    #[allow(clippy::new_ret_no_self)]
    pub fn new<T>(name: String, capacity: Option<usize>) -> (MassaSender<T>, MassaReceiver<T>) {
        use prometheus::{Counter, Gauge};

        let (s, r) = if let Some(capacity) = capacity {
            crossbeam::channel::bounded::<T>(capacity)
        } else {
            crossbeam::channel::unbounded::<T>()
        };

        // Create gauge for actual length of channel
        // this can be inc() when sending msg or dec() when receive
        let actual_len = Gauge::new(
            format!("{}_channel_actual_size", name),
            "Actual length of channel",
        )
        .expect("Failed to create gauge");

        // Create counter for total received messages
        let received = Counter::new(
            format!("{}_channel_total_receive", name),
            "Total received messages",
        )
        .expect("Failed to create counter");

        // Register metrics in prometheus
        // error here if metrics already registered (ex : ProtocolController>::get_stats )

        #[cfg(not(feature = "test-exports"))]
        {
            use tracing::debug;
            if let Err(e) = prometheus::register(Box::new(actual_len.clone())) {
                debug!("Failed to register actual_len gauge for {} : {}", name, e);
            }

            if let Err(e) = prometheus::register(Box::new(received.clone())) {
                debug!("Failed to register received counter for {} : {}", name, e);
            }
        }

        let sender = MassaSender {
            sender: s,
            name: name.clone(),
            actual_len: actual_len.clone(),
        };

        let receiver = MassaReceiver {
            receiver: r,
            name,
            actual_len,
            received,
            ref_counter: Arc::new(()),
        };

        (sender, receiver)
    }
}