1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
use std::collections::HashMap;

use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};

use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::Participant;

use serai_client::validator_sets::primitives::{KeyPair, ExternalValidatorSet};

use processor_messages::coordinator::SubstrateSignableId;

pub use serai_db::*;

use tributary::ReadWrite;

use crate::tributary::{Label, Transaction};

#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum Topic {
  Dkg,
  DkgConfirmation,
  SubstrateSign(SubstrateSignableId),
  Sign([u8; 32]),
}

// A struct to refer to a piece of data all validators will presumably provide a value for.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)]
pub struct DataSpecification {
  pub topic: Topic,
  pub label: Label,
  pub attempt: u32,
}

pub enum DataSet {
  Participating(HashMap<Participant, Vec<u8>>),
  NotParticipating,
}

pub enum Accumulation {
  Ready(DataSet),
  NotReady,
}

// TODO: Move from genesis to set for indexing
create_db!(
  Tributary {
    SeraiBlockNumber: (hash: [u8; 32]) -> u64,
    SeraiDkgCompleted: (spec: ExternalValidatorSet) -> [u8; 32],

    TributaryBlockNumber: (block: [u8; 32]) -> u32,
    LastHandledBlock: (genesis: [u8; 32]) -> [u8; 32],

    // TODO: Revisit the point of this
    FatalSlashes: (genesis: [u8; 32]) -> Vec<[u8; 32]>,
    RemovedAsOfDkgAttempt: (genesis: [u8; 32], attempt: u32) -> Vec<[u8; 32]>,
    OfflineDuringDkg: (genesis: [u8; 32]) -> Vec<[u8; 32]>,
    // TODO: Combine these two
    FatallySlashed: (genesis: [u8; 32], account: [u8; 32]) -> (),
    SlashPoints: (genesis: [u8; 32], account: [u8; 32]) -> u32,

    VotedToRemove: (genesis: [u8; 32], voter: [u8; 32], to_remove: [u8; 32]) -> (),
    VotesToRemove: (genesis: [u8; 32], to_remove: [u8; 32]) -> u16,

    AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32,
    ReattemptDb: (genesis: [u8; 32], block: u32) -> Vec<Topic>,
    DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16,
    DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec<u8>,

    DkgShare: (genesis: [u8; 32], from: u16, to: u16) -> Vec<u8>,
    ConfirmationNonces: (genesis: [u8; 32], attempt: u32) -> HashMap<Participant, Vec<u8>>,
    DkgKeyPair: (genesis: [u8; 32], attempt: u32) -> KeyPair,
    KeyToDkgAttempt: (key: [u8; 32]) -> u32,
    DkgLocallyCompleted: (genesis: [u8; 32]) -> (),

    PlanIds: (genesis: &[u8], block: u64) -> Vec<[u8; 32]>,

    SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,

    SlashReports: (genesis: [u8; 32], signer: [u8; 32]) -> Vec<u32>,
    SlashReported: (genesis: [u8; 32]) -> u16,
    SlashReportCutOff: (genesis: [u8; 32]) -> u64,
    SlashReport: (set: ExternalValidatorSet) -> Vec<([u8; 32], u32)>,
  }
);

impl FatalSlashes {
  pub fn get_as_keys(getter: &impl Get, genesis: [u8; 32]) -> Vec<<Ristretto as Ciphersuite>::G> {
    FatalSlashes::get(getter, genesis)
      .unwrap_or(vec![])
      .iter()
      .map(|key| <Ristretto as Ciphersuite>::G::from_bytes(key).unwrap())
      .collect::<Vec<_>>()
  }
}

impl FatallySlashed {
  pub fn set_fatally_slashed(txn: &mut impl DbTxn, genesis: [u8; 32], account: [u8; 32]) {
    Self::set(txn, genesis, account, &());
    let mut existing = FatalSlashes::get(txn, genesis).unwrap_or_default();

    // Don't append if we already have it, which can occur upon multiple faults
    if existing.iter().any(|existing| existing == &account) {
      return;
    }

    existing.push(account);
    FatalSlashes::set(txn, genesis, &existing);
  }
}

impl AttemptDb {
  pub fn recognize_topic(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) {
    Self::set(txn, genesis, &topic, &0u32);
  }

  pub fn start_next_attempt(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) -> u32 {
    let next =
      Self::attempt(txn, genesis, topic).expect("starting next attempt for unknown topic") + 1;
    Self::set(txn, genesis, &topic, &next);
    next
  }

  pub fn attempt(getter: &impl Get, genesis: [u8; 32], topic: Topic) -> Option<u32> {
    let attempt = Self::get(getter, genesis, &topic);
    // Don't require explicit recognition of the Dkg topic as it starts when the chain does
    // Don't require explicit recognition of the SlashReport topic as it isn't a DoS risk and it
    // should always happen (eventually)
    if attempt.is_none() &&
      ((topic == Topic::Dkg) ||
        (topic == Topic::DkgConfirmation) ||
        (topic == Topic::SubstrateSign(SubstrateSignableId::SlashReport)))
    {
      return Some(0);
    }
    attempt
  }
}

impl ReattemptDb {
  pub fn schedule_reattempt(
    txn: &mut impl DbTxn,
    genesis: [u8; 32],
    current_block_number: u32,
    topic: Topic,
  ) {
    // 5 minutes
    #[cfg(not(feature = "longer-reattempts"))]
    const BASE_REATTEMPT_DELAY: u32 = (5 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME;

    // 10 minutes, intended for latent environments like the GitHub CI
    #[cfg(feature = "longer-reattempts")]
    const BASE_REATTEMPT_DELAY: u32 = (10 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME;

    // 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5
    // Assumes no event will take longer than 15 minutes, yet grows the time in case there are
    // network bandwidth issues
    let mut reattempt_delay = BASE_REATTEMPT_DELAY *
      ((AttemptDb::attempt(txn, genesis, topic)
        .expect("scheduling re-attempt for unknown topic") /
        3) +
        1)
      .min(3);
    // Allow more time for DKGs since they have an extra round and much more data
    if matches!(topic, Topic::Dkg) {
      reattempt_delay *= 4;
    }
    let upon_block = current_block_number + reattempt_delay;

    let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]);
    reattempts.push(topic);
    Self::set(txn, genesis, upon_block, &reattempts);
  }

  pub fn take(txn: &mut impl DbTxn, genesis: [u8; 32], block_number: u32) -> Vec<Topic> {
    let res = Self::get(txn, genesis, block_number).unwrap_or(vec![]);
    if !res.is_empty() {
      Self::del(txn, genesis, block_number);
    }
    res
  }
}

impl SignedTransactionDb {
  pub fn take_signed_transaction(
    txn: &mut impl DbTxn,
    order: &[u8],
    nonce: u32,
  ) -> Option<Transaction> {
    let res = SignedTransactionDb::get(txn, order, nonce)
      .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
    if res.is_some() {
      Self::del(txn, order, nonce);
    }
    res
  }
}