1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use massa_serialization::{
    Deserializer, OptionDeserializer, OptionSerializer, SerializeError, Serializer,
    U64VarIntDeserializer, U64VarIntSerializer,
};
use nom::{
    error::{context, ContextError, ParseError},
    IResult, Parser,
};
use std::{marker::PhantomData, ops::Bound::Included};

/// Streaming step cursor
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub enum StreamingStep<T> {
    /// Started step, only when launching the streaming
    Started,
    /// Ongoing step, as long as you are streaming
    Ongoing(T),
    /// Finished step, after all the information has been streamed
    ///
    /// Also can keep an indicator of the last content streamed
    Finished(Option<T>),
}

impl<T> StreamingStep<T> {
    /// Indicates if the current step if finished or not without caring about the values
    pub fn finished(&self) -> bool {
        matches!(self, StreamingStep::Finished(_))
    }
}

/// `StreamingStep` serializer
pub struct StreamingStepSerializer<T, ST>
where
    ST: Serializer<T>,
{
    u64_serializer: U64VarIntSerializer,
    data_serializer: ST,
    option_serializer: OptionSerializer<T, ST>,
    phantom_t: PhantomData<T>,
}

impl<T, ST> StreamingStepSerializer<T, ST>
where
    ST: Serializer<T> + Clone,
{
    /// Creates a new `StreamingStep` serializer
    pub fn new(data_serializer: ST) -> Self {
        Self {
            u64_serializer: U64VarIntSerializer::new(),
            option_serializer: OptionSerializer::new(data_serializer.clone()),
            data_serializer,
            phantom_t: PhantomData,
        }
    }
}

impl<T, ST> Serializer<StreamingStep<T>> for StreamingStepSerializer<T, ST>
where
    ST: Serializer<T>,
    T: std::fmt::Debug,
{
    fn serialize(
        &self,
        value: &StreamingStep<T>,
        buffer: &mut Vec<u8>,
    ) -> Result<(), SerializeError> {
        match value {
            StreamingStep::Started => self.u64_serializer.serialize(&0u64, buffer)?,
            StreamingStep::Ongoing(data) => {
                self.u64_serializer.serialize(&1u64, buffer)?;
                self.data_serializer.serialize(data, buffer)?;
            }
            StreamingStep::Finished(opt_data) => {
                self.u64_serializer.serialize(&2u64, buffer)?;
                self.option_serializer.serialize(opt_data, buffer)?;
            }
        };
        Ok(())
    }
}

/// `StreamingStep` deserializer
pub struct StreamingStepDeserializer<T, ST>
where
    ST: Deserializer<T>,
    T: Clone,
{
    u64_deser: U64VarIntDeserializer,
    data_deser: ST,
    opt_deser: OptionDeserializer<T, ST>,
    phantom_t: PhantomData<T>,
}

impl<T, ST> StreamingStepDeserializer<T, ST>
where
    ST: Deserializer<T> + Clone,
    T: Clone,
{
    /// Creates a new `StreamingStep` deserializer
    pub fn new(data_deser: ST) -> Self {
        Self {
            u64_deser: U64VarIntDeserializer::new(Included(u64::MIN), Included(u64::MAX)),
            opt_deser: OptionDeserializer::new(data_deser.clone()),
            data_deser,
            phantom_t: PhantomData,
        }
    }
}

impl<T, ST> Deserializer<StreamingStep<T>> for StreamingStepDeserializer<T, ST>
where
    ST: Deserializer<T>,
    T: Clone,
{
    fn deserialize<'a, E: ParseError<&'a [u8]> + ContextError<&'a [u8]>>(
        &self,
        buffer: &'a [u8],
    ) -> IResult<&'a [u8], StreamingStep<T>, E> {
        context("StreamingStep", |input| {
            let (rest, ident) =
                context("identifier", |input| self.u64_deser.deserialize(input)).parse(input)?;
            match ident {
                0u64 => Ok((rest, StreamingStep::Started)),
                1u64 => context("ongoing data", |input| self.data_deser.deserialize(input))
                    .map(StreamingStep::Ongoing)
                    .parse(rest),
                2u64 => context("finished data", |input| self.opt_deser.deserialize(input))
                    .map(StreamingStep::Finished)
                    .parse(rest),
                _ => Err(nom::Err::Error(ParseError::from_error_kind(
                    buffer,
                    nom::error::ErrorKind::Digit,
                ))),
            }
        })
        .parse(buffer)
    }
}