1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//! Copyright (c) 2022 MASSA LABS <info@massa.net>
//! Json RPC API for a massa-node
use std::net::SocketAddr;

use crate::api_trait::MassaApiServer;
use crate::{ApiServer, ApiV2, StopHandle, API};
use async_trait::async_trait;
use futures::future::{self, Either};
use futures::StreamExt;
use jsonrpsee::core::{client::Error as JsonRpseeError, RpcResult, SubscriptionResult};
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage};
use massa_api_exports::config::APIConfig;
use massa_api_exports::error::ApiError;
use massa_api_exports::page::{PageRequest, PagedVec, PagedVecV2};
use massa_api_exports::ApiRequest;
use massa_consensus_exports::{ConsensusBroadcasts, ConsensusController};
use massa_execution_exports::ExecutionController;
use massa_models::address::Address;
use massa_models::block_id::BlockId;
use massa_models::slot::Slot;
use massa_models::timeslots::get_latest_block_slot_at_timestamp;
use massa_models::version::Version;
use massa_pool_exports::PoolBroadcasts;
use massa_time::MassaTime;
use serde::Serialize;
use tokio_stream::wrappers::BroadcastStream;

impl API<ApiV2> {
    /// generate a new massa API
    pub fn new(
        consensus_controller: Box<dyn ConsensusController>,
        consensus_broadcasts: ConsensusBroadcasts,
        execution_controller: Box<dyn ExecutionController>,
        pool_broadcasts: PoolBroadcasts,
        api_settings: APIConfig,
        version: Version,
    ) -> Self {
        API(ApiV2 {
            consensus_controller,
            consensus_broadcasts,
            execution_controller,
            pool_broadcasts,
            api_settings,
            version,
        })
    }
}

#[async_trait]
impl ApiServer for API<ApiV2> {
    async fn serve(
        self,
        url: &SocketAddr,
        api_config: &APIConfig,
    ) -> Result<StopHandle, JsonRpseeError> {
        crate::serve(self.into_rpc(), url, api_config).await
    }
}

#[doc(hidden)]
#[async_trait]
impl MassaApiServer for API<ApiV2> {
    async fn get_largest_stakers(
        &self,
        api_request: Option<ApiRequest>,
    ) -> RpcResult<PagedVecV2<(Address, u64)>> {
        let cfg = self.0.api_settings.clone();

        let now = MassaTime::now();

        let latest_block_slot_at_timestamp_result = get_latest_block_slot_at_timestamp(
            cfg.thread_count,
            cfg.t0,
            cfg.genesis_timestamp,
            now,
        );

        let curr_cycle = match latest_block_slot_at_timestamp_result {
            Ok(Some(cur_slot)) if cur_slot.period <= self.0.api_settings.last_start_period => {
                Slot::new(self.0.api_settings.last_start_period, 0).get_cycle(cfg.periods_per_cycle)
            }
            Ok(Some(cur_slot)) => cur_slot.get_cycle(cfg.periods_per_cycle),
            Ok(None) => 0,
            Err(e) => return Err(ApiError::ModelsError(e).into()),
        };

        let mut staker_vec = self
            .0
            .execution_controller
            .get_cycle_active_rolls(curr_cycle)
            .into_iter()
            .collect::<Vec<(Address, u64)>>();

        staker_vec
            .sort_by(|&(_, roll_counts_a), &(_, roll_counts_b)| roll_counts_b.cmp(&roll_counts_a));

        let paged_vec = if let Some(api_request) = api_request {
            PagedVec::new(staker_vec, api_request.page_request)
        } else {
            PagedVec::new(
                staker_vec,
                Some(PageRequest {
                    offset: 0,
                    limit: 50,
                }),
            )
        };

        Ok(paged_vec.into())
    }

    async fn get_next_block_best_parents(&self) -> RpcResult<Vec<(BlockId, u64)>> {
        Ok(self.0.consensus_controller.get_best_parents())
    }

    async fn get_version(&self) -> RpcResult<Version> {
        Ok(self.0.version)
    }

    async fn subscribe_new_blocks(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
        broadcast_via_ws(self.0.consensus_broadcasts.block_sender.clone(), pending).await
    }

    async fn subscribe_new_blocks_headers(
        &self,
        pending: PendingSubscriptionSink,
    ) -> SubscriptionResult {
        broadcast_via_ws(
            self.0.consensus_broadcasts.block_header_sender.clone(),
            pending,
        )
        .await
    }

    async fn subscribe_new_filled_blocks(
        &self,
        pending: PendingSubscriptionSink,
    ) -> SubscriptionResult {
        broadcast_via_ws(
            self.0.consensus_broadcasts.filled_block_sender.clone(),
            pending,
        )
        .await
    }

    async fn subscribe_new_operations(
        &self,
        pending: PendingSubscriptionSink,
    ) -> SubscriptionResult {
        broadcast_via_ws(self.0.pool_broadcasts.operation_sender.clone(), pending).await
    }
}

// Broadcast the stream(sender) content via a WebSocket
async fn broadcast_via_ws<T: Serialize + Send + Clone + 'static>(
    sender: tokio::sync::broadcast::Sender<T>,
    pending: PendingSubscriptionSink,
) -> SubscriptionResult {
    let sink = pending.accept().await?;
    let closed = sink.closed();
    let stream = BroadcastStream::new(sender.subscribe());
    futures::pin_mut!(closed, stream);

    loop {
        match future::select(closed, stream.next()).await {
            // subscription closed.
            Either::Left((_, _)) => break Ok(()),

            // received new item from the stream.
            Either::Right((Some(Ok(item)), c)) => {
                let notif = SubscriptionMessage::from_json(&item)?;

                if sink.send(notif).await.is_err() {
                    break Ok(());
                }

                closed = c;
            }

            // Send back back the error.
            Either::Right((Some(Err(e)), _)) => break Err(e.into()),

            // Stream is closed.
            Either::Right((None, _)) => break Ok(()),
        }
    }
}