1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::{
    ops::Deref,
    time::{Duration, Instant},
};

use crossbeam::channel::{SendError, SendTimeoutError, Sender, TrySendError};
use prometheus::Gauge;

#[derive(Clone, Debug)]
pub struct MassaSender<T> {
    pub(crate) sender: Sender<T>,
    #[allow(dead_code)]
    pub(crate) name: String,
    /// channel size
    pub(crate) actual_len: Gauge,
}

impl<T> MassaSender<T> {
    /// Send a message to the channel
    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
        match self.sender.send(msg) {
            Ok(()) => {
                self.actual_len.inc();
                Ok(())
            }
            Err(e) => Err(e),
        }
    }

    pub fn send_timeout(&self, msg: T, duration: Duration) -> Result<(), SendTimeoutError<T>> {
        match self.sender.send_timeout(msg, duration) {
            Ok(()) => {
                self.actual_len.inc();
                Ok(())
            }
            Err(e) => Err(e),
        }
    }

    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
        match self.sender.send_deadline(msg, deadline) {
            Ok(()) => {
                self.actual_len.inc();
                Ok(())
            }
            Err(e) => Err(e),
        }
    }

    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
        match self.sender.try_send(msg) {
            Ok(()) => {
                self.actual_len.inc();
                Ok(())
            }
            Err(e) => Err(e),
        }
    }
}

impl<T> Deref for MassaSender<T> {
    type Target = Sender<T>;

    fn deref(&self) -> &Self::Target {
        &self.sender
    }
}