#![warn(missing_docs)]
mod block_indexes;
mod endorsement_indexes;
mod operation_indexes;
#[cfg(test)]
mod tests;
use block_indexes::BlockIndexes;
use endorsement_indexes::EndorsementIndexes;
use massa_models::prehash::{CapacityAllocator, PreHashMap, PreHashSet, PreHashed};
use massa_models::secure_share::Id;
use massa_models::{
block::SecureShareBlock,
block_id::BlockId,
endorsement::{EndorsementId, SecureShareEndorsement},
operation::{OperationId, SecureShareOperation},
};
use operation_indexes::OperationIndexes;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::fmt::Debug;
use std::hash::Hash;
use std::{collections::hash_map, sync::Arc};
pub struct Storage {
blocks: Arc<RwLock<BlockIndexes>>,
operations: Arc<RwLock<OperationIndexes>>,
endorsements: Arc<RwLock<EndorsementIndexes>>,
block_owners: Arc<RwLock<PreHashMap<BlockId, usize>>>,
operation_owners: Arc<RwLock<PreHashMap<OperationId, usize>>>,
endorsement_owners: Arc<RwLock<PreHashMap<EndorsementId, usize>>>,
local_used_blocks: PreHashSet<BlockId>,
local_used_ops: PreHashSet<OperationId>,
local_used_endorsements: PreHashSet<EndorsementId>,
}
impl Debug for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("")
}
}
impl Clone for Storage {
fn clone(&self) -> Self {
let mut res = Self::clone_without_refs(self);
Storage::internal_claim_refs(
&self.local_used_ops.clone(),
&mut res.operation_owners.write(),
&mut res.local_used_ops,
);
Storage::internal_claim_refs(
&self.local_used_blocks.clone(),
&mut res.block_owners.write(),
&mut res.local_used_blocks,
);
Storage::internal_claim_refs(
&self.local_used_endorsements.clone(),
&mut res.endorsement_owners.write(),
&mut res.local_used_endorsements,
);
res
}
}
impl Storage {
pub fn create_root() -> Storage {
Storage {
blocks: Default::default(),
operations: Default::default(),
endorsements: Default::default(),
block_owners: Default::default(),
operation_owners: Default::default(),
endorsement_owners: Default::default(),
local_used_blocks: Default::default(),
local_used_ops: Default::default(),
local_used_endorsements: Default::default(),
}
}
pub fn clone_without_refs(&self) -> Self {
Self {
blocks: self.blocks.clone(),
operations: self.operations.clone(),
endorsements: self.endorsements.clone(),
operation_owners: self.operation_owners.clone(),
block_owners: self.block_owners.clone(),
endorsement_owners: self.endorsement_owners.clone(),
local_used_ops: Default::default(),
local_used_blocks: Default::default(),
local_used_endorsements: Default::default(),
}
}
pub fn replace_with(&mut self, other: &Storage) {
{
let added_ops = other.get_op_refs() - self.get_op_refs();
let removed_ops = self.get_op_refs() - other.get_op_refs();
{
let owners = &mut self.operation_owners.write();
let orphaned_ids =
Self::internal_drop_refs(&removed_ops, owners, &mut self.local_used_ops);
if !orphaned_ids.is_empty() {
let mut ops = self.operations.write();
for op_id in orphaned_ids {
ops.remove(&op_id);
}
}
Self::internal_claim_refs(&added_ops, owners, &mut self.local_used_ops);
}
}
{
let added_blocks = other.get_block_refs() - self.get_block_refs();
let removed_blocks = self.get_block_refs() - other.get_block_refs();
{
let owners = &mut self.block_owners.write();
let orphaned_ids =
Self::internal_drop_refs(&removed_blocks, owners, &mut self.local_used_blocks);
if !orphaned_ids.is_empty() {
let mut blocks = self.blocks.write();
for block_id in orphaned_ids {
blocks.remove(&block_id);
}
}
Self::internal_claim_refs(&added_blocks, owners, &mut self.local_used_blocks);
}
}
{
let added_endorsements = other.get_endorsement_refs() - self.get_endorsement_refs();
let removed_endorsements = self.get_endorsement_refs() - other.get_endorsement_refs();
{
let owners = &mut self.endorsement_owners.write();
let orphaned_ids = Self::internal_drop_refs(
&removed_endorsements,
owners,
&mut self.local_used_endorsements,
);
if !orphaned_ids.is_empty() {
let mut endorsements = self.endorsements.write();
for endorsement_id in orphaned_ids {
endorsements.remove(&endorsement_id);
}
}
Self::internal_claim_refs(
&added_endorsements,
owners,
&mut self.local_used_endorsements,
);
}
}
}
pub fn extend(&mut self, mut other: Storage) {
other
.local_used_ops
.retain(|id| !self.local_used_ops.insert(*id));
other
.local_used_blocks
.retain(|id| !self.local_used_blocks.insert(*id));
other
.local_used_endorsements
.retain(|id| !self.local_used_endorsements.insert(*id));
}
pub fn split_off(
&mut self,
blocks: &PreHashSet<BlockId>,
operations: &PreHashSet<OperationId>,
endorsements: &PreHashSet<EndorsementId>,
) -> Storage {
let mut res = self.clone_without_refs();
res.local_used_blocks = blocks
.iter()
.map(|id| {
self.local_used_blocks
.take(id)
.expect("split block ref not owned by source")
})
.collect();
res.local_used_ops = operations
.iter()
.map(|id| {
self.local_used_ops
.take(id)
.expect("split op ref not owned by source")
})
.collect();
res.local_used_endorsements = endorsements
.iter()
.map(|id| {
self.local_used_endorsements
.take(id)
.expect("split endorsement ref not owned by source")
})
.collect();
res
}
fn internal_claim_refs<IdT: Id + PartialEq + Eq + Hash + PreHashed + Copy>(
ids: &PreHashSet<IdT>,
owners: &mut RwLockWriteGuard<PreHashMap<IdT, usize>>,
local_used_ids: &mut PreHashSet<IdT>,
) {
for &id in ids {
if local_used_ids.insert(id) {
owners.entry(id).and_modify(|v| *v += 1).or_insert(1);
}
}
}
pub fn get_block_refs(&self) -> &PreHashSet<BlockId> {
&self.local_used_blocks
}
pub fn claim_block_refs(&mut self, ids: &PreHashSet<BlockId>) -> PreHashSet<BlockId> {
let mut claimed = PreHashSet::with_capacity(ids.len());
if ids.is_empty() {
return claimed;
}
let owners = &mut self.block_owners.write();
claimed.extend(ids.iter().filter(|id| owners.contains_key(id)));
Storage::internal_claim_refs(&claimed, owners, &mut self.local_used_blocks);
claimed
}
fn internal_drop_refs<IdT: Id + PartialEq + Eq + Hash + PreHashed + Copy>(
ids: &PreHashSet<IdT>,
owners: &mut RwLockWriteGuard<PreHashMap<IdT, usize>>,
local_used_ids: &mut PreHashSet<IdT>,
) -> Vec<IdT> {
let mut orphaned_ids = Vec::new();
for id in ids {
if !local_used_ids.remove(id) {
continue;
}
match owners.entry(*id) {
hash_map::Entry::Occupied(mut occ) => {
let res_count = {
let cnt = occ.get_mut();
*cnt = cnt
.checked_sub(1)
.expect("less than 1 owner on storage object reference drop");
*cnt
};
if res_count == 0 {
orphaned_ids.push(*id);
occ.remove();
}
}
hash_map::Entry::Vacant(_vac) => {
panic!("missing object in storage on storage object reference drop");
}
}
}
orphaned_ids
}
pub fn drop_block_refs(&mut self, ids: &PreHashSet<BlockId>) {
if ids.is_empty() {
return;
}
let mut owners = self.block_owners.write();
let orphaned_ids = Self::internal_drop_refs(ids, &mut owners, &mut self.local_used_blocks);
if !orphaned_ids.is_empty() {
let mut blocks = self.blocks.write();
for b_id in orphaned_ids {
blocks.remove(&b_id);
}
}
}
pub fn store_block(&mut self, block: SecureShareBlock) {
let id = block.id;
let mut owners = self.block_owners.write();
let mut blocks = self.blocks.write();
blocks.insert(block);
Storage::internal_claim_refs(
&vec![id].into_iter().collect(),
&mut owners,
&mut self.local_used_blocks,
);
}
pub fn claim_operation_refs(
&mut self,
ids: &PreHashSet<OperationId>,
) -> PreHashSet<OperationId> {
let mut claimed = PreHashSet::with_capacity(ids.len());
if ids.is_empty() {
return claimed;
}
let owners = &mut self.operation_owners.write();
claimed.extend(ids.iter().filter(|id| owners.contains_key(id)));
Storage::internal_claim_refs(&claimed, owners, &mut self.local_used_ops);
claimed
}
pub fn get_op_refs(&self) -> &PreHashSet<OperationId> {
&self.local_used_ops
}
pub fn drop_operation_refs(&mut self, ids: &PreHashSet<OperationId>) {
if ids.is_empty() {
return;
}
let mut owners = self.operation_owners.write();
let orphaned_ids = Self::internal_drop_refs(ids, &mut owners, &mut self.local_used_ops);
if !orphaned_ids.is_empty() {
let mut ops = self.operations.write();
for op_id in orphaned_ids {
ops.remove(&op_id);
}
}
}
pub fn store_operations(&mut self, operations: Vec<SecureShareOperation>) {
if operations.is_empty() {
return;
}
let mut owners = self.operation_owners.write();
let mut op_store = self.operations.write();
let ids: PreHashSet<OperationId> = operations.iter().map(|op| op.id).collect();
for op in operations {
op_store.insert(op);
}
Storage::internal_claim_refs(&ids, &mut owners, &mut self.local_used_ops);
}
pub fn read_operations(&self) -> RwLockReadGuard<OperationIndexes> {
self.operations.read()
}
pub fn read_endorsements(&self) -> RwLockReadGuard<EndorsementIndexes> {
self.endorsements.read()
}
pub fn read_blocks(&self) -> RwLockReadGuard<BlockIndexes> {
self.blocks.read()
}
pub fn claim_endorsement_refs(
&mut self,
ids: &PreHashSet<EndorsementId>,
) -> PreHashSet<EndorsementId> {
let mut claimed = PreHashSet::with_capacity(ids.len());
if ids.is_empty() {
return claimed;
}
let owners = &mut self.endorsement_owners.write();
claimed.extend(ids.iter().filter(|id| owners.contains_key(id)));
Storage::internal_claim_refs(&claimed, owners, &mut self.local_used_endorsements);
claimed
}
pub fn get_endorsement_refs(&self) -> &PreHashSet<EndorsementId> {
&self.local_used_endorsements
}
pub fn drop_endorsement_refs(&mut self, ids: &PreHashSet<EndorsementId>) {
if ids.is_empty() {
return;
}
let mut owners = self.endorsement_owners.write();
let orphaned_ids =
Self::internal_drop_refs(ids, &mut owners, &mut self.local_used_endorsements);
if !orphaned_ids.is_empty() {
let mut endorsements = self.endorsements.write();
for e_id in orphaned_ids {
endorsements.remove(&e_id);
}
}
}
pub fn store_endorsements(&mut self, endorsements: Vec<SecureShareEndorsement>) {
if endorsements.is_empty() {
return;
}
let mut owners = self.endorsement_owners.write();
let mut endo_store = self.endorsements.write();
let ids: PreHashSet<EndorsementId> = endorsements.iter().map(|op| op.id).collect();
for endorsement in endorsements {
endo_store.insert(endorsement);
}
Storage::internal_claim_refs(&ids, &mut owners, &mut self.local_used_endorsements);
}
}
impl Drop for Storage {
fn drop(&mut self) {
self.drop_block_refs(&self.local_used_blocks.clone());
self.drop_operation_refs(&self.local_used_ops.clone());
self.drop_endorsement_refs(&self.local_used_endorsements.clone());
}
}