use core::{marker::PhantomData, ops::Deref, future::Future, time::Duration};
use std::{sync::Arc, collections::HashSet};
use zeroize::Zeroizing;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use tokio::sync::broadcast;
use scale::{Encode, Decode};
use serai_client::{
primitives::{SeraiAddress, Signature},
validator_sets::primitives::{ExternalValidatorSet, KeyPair},
Serai,
};
use serai_db::DbTxn;
use processor_messages::coordinator::{SubstrateSignId, SubstrateSignableId};
use tributary::{
TransactionKind, Transaction as TributaryTransaction, TransactionError, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
};
use crate::{Db, processors::Processors, substrate::BatchInstructionsHashDb, tributary::*, P2p};
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
pub enum RecognizedIdType {
Batch,
Plan,
}
#[async_trait::async_trait]
pub trait RIDTrait {
async fn recognized_id(
&self,
set: ExternalValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
);
}
#[async_trait::async_trait]
impl<
FRid: Send + Future<Output = ()>,
F: Sync + Fn(ExternalValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> FRid,
> RIDTrait for F
{
async fn recognized_id(
&self,
set: ExternalValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
) {
(self)(set, genesis, kind, id).await
}
}
#[async_trait::async_trait]
pub trait PublishSeraiTransaction {
async fn publish_set_keys(
&self,
db: &(impl Sync + Get),
set: ExternalValidatorSet,
removed: Vec<SeraiAddress>,
key_pair: KeyPair,
signature: Signature,
);
}
mod impl_pst_for_serai {
use super::*;
use serai_client::SeraiValidatorSets;
macro_rules! common_pst {
($Meta: ty, $check: ident) => {
async fn publish(
serai: &Serai,
db: &impl Get,
set: ExternalValidatorSet,
tx: serai_client::Transaction,
meta: $Meta,
) -> bool {
loop {
match serai.publish(&tx).await {
Ok(_) => return true,
Err(e) => {
if crate::RetiredTributaryDb::get(db, set).is_some() {
log::warn!("trying to publish a TX relevant to set {set:?} which isn't the latest");
return false;
}
if let Ok(serai) = serai.as_of_latest_finalized_block().await {
let serai = serai.validator_sets();
if $check(serai, set, meta).await {
return false;
}
}
log::error!("couldn't connect to Serai node to publish TX: {e:?}");
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
}
}
}
};
}
#[async_trait::async_trait]
impl PublishSeraiTransaction for Serai {
async fn publish_set_keys(
&self,
db: &(impl Sync + Get),
set: ExternalValidatorSet,
removed: Vec<SeraiAddress>,
key_pair: KeyPair,
signature: Signature,
) {
let tx = SeraiValidatorSets::set_keys(
set.network,
removed.try_into().expect("removing more than allowed"),
key_pair,
signature,
);
async fn check(serai: SeraiValidatorSets<'_>, set: ExternalValidatorSet, (): ()) -> bool {
if matches!(serai.keys(set).await, Ok(Some(_))) {
log::info!("another coordinator set key pair for {:?}", set);
return true;
}
false
}
common_pst!((), check);
if publish(self, db, set, tx, ()).await {
log::info!("published set keys for {set:?}");
}
}
}
}
#[async_trait::async_trait]
pub trait PTTTrait {
async fn publish_tributary_tx(&self, tx: Transaction);
}
#[async_trait::async_trait]
impl<FPtt: Send + Future<Output = ()>, F: Sync + Fn(Transaction) -> FPtt> PTTTrait for F {
async fn publish_tributary_tx(&self, tx: Transaction) {
(self)(tx).await
}
}
pub struct TributaryBlockHandler<
'a,
D: Db,
T: DbTxn,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> {
pub db: &'a D,
pub txn: &'a mut T,
pub our_key: &'a Zeroizing<<Ristretto as Ciphersuite>::F>,
pub recognized_id: &'a RID,
pub processors: &'a Pro,
pub publish_serai_tx: &'a PST,
pub publish_tributary_tx: &'a PTT,
pub spec: &'a TributarySpec,
block: Block<Transaction>,
pub block_number: u32,
_p2p: PhantomData<P>,
}
impl<
D: Db,
T: DbTxn,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> TributaryBlockHandler<'_, D, T, Pro, PST, PTT, RID, P>
{
pub fn fatal_slash(&mut self, slashing: [u8; 32], reason: &str) {
let genesis = self.spec.genesis();
log::warn!("fatally slashing {}. reason: {}", hex::encode(slashing), reason);
FatallySlashed::set_fatally_slashed(self.txn, genesis, slashing);
}
async fn handle(mut self) {
log::info!("found block for Tributary {:?}", self.spec.set());
let transactions = self.block.transactions.clone();
for tx in transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
let msgs = (
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
self.fatal_slash(msgs.0.msg.sender, &format!("invalid tendermint messages: {msgs:?}"));
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(tx).await;
}
}
}
let genesis = self.spec.genesis();
let current_fatal_slashes = FatalSlashes::get_as_keys(self.txn, genesis);
let still_present_shares = {
let mut present_shares = self.spec.n(&[]);
for removed in ¤t_fatal_slashes {
let original_i_for_removed =
self.spec.i(&[], *removed).expect("removed party was never present");
let removed_shares =
u16::from(original_i_for_removed.end) - u16::from(original_i_for_removed.start);
present_shares -= removed_shares;
}
if present_shares < self.spec.t() {
loop {
log::error!(
"fatally slashed so many participants for {:?} we no longer meet the threshold",
self.spec.set()
);
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
present_shares
};
for topic in ReattemptDb::take(self.txn, genesis, self.block_number) {
let attempt = AttemptDb::start_next_attempt(self.txn, genesis, topic);
log::info!("re-attempting {topic:?} with attempt {attempt}");
{
let prior_attempt = attempt - 1;
let (removed, expected_participants) = match topic {
Topic::Dkg => {
let removed =
crate::tributary::removed_as_of_dkg_attempt(self.txn, genesis, prior_attempt)
.expect("prior attempt didn't have its removed saved to disk");
let removed_set = removed.iter().copied().collect::<HashSet<_>>();
(
removed,
self
.spec
.validators()
.into_iter()
.filter_map(|(validator, _)| {
Some(validator).filter(|validator| !removed_set.contains(validator))
})
.collect(),
)
}
Topic::DkgConfirmation => {
panic!("TODO: re-attempting DkgConfirmation when we should be re-attempting the Dkg")
}
Topic::SubstrateSign(_) | Topic::Sign(_) => {
let removed =
crate::tributary::removed_as_of_set_keys(self.txn, self.spec.set(), genesis)
.expect("SubstrateSign/Sign yet have yet to set keys");
let expected_participants = vec![];
(removed, expected_participants)
}
};
let (expected_topic, expected_label) = match topic {
Topic::Dkg => {
let n = self.spec.n(&removed);
let share_spec =
DataSpecification { topic: Topic::Dkg, label: Label::Share, attempt: prior_attempt };
if DataReceived::get(self.txn, genesis, &share_spec).unwrap_or(0) == n {
(Topic::DkgConfirmation, Label::Share)
} else {
let preprocess_spec = DataSpecification {
topic: Topic::Dkg,
label: Label::Preprocess,
attempt: prior_attempt,
};
if DataReceived::get(self.txn, genesis, &preprocess_spec).unwrap_or(0) == n {
(Topic::Dkg, Label::Share)
} else {
(Topic::Dkg, Label::Preprocess)
}
}
}
Topic::DkgConfirmation => unreachable!(),
Topic::SubstrateSign(_) | Topic::Sign(_) => (topic, Label::Share),
};
let mut did_not_participate = vec![];
for expected_participant in expected_participants {
if DataDb::get(
self.txn,
genesis,
&DataSpecification {
topic: expected_topic,
label: expected_label,
attempt: prior_attempt,
},
&expected_participant.to_bytes(),
)
.is_none()
{
did_not_participate.push(expected_participant);
}
}
if topic == Topic::Dkg {
let mut existing = OfflineDuringDkg::get(self.txn, genesis).unwrap_or(vec![]);
for did_not_participate in did_not_participate {
existing.push(did_not_participate.to_bytes());
}
OfflineDuringDkg::set(self.txn, genesis, &existing);
}
}
match topic {
Topic::Dkg => {
let mut removed = current_fatal_slashes.clone();
let t = self.spec.t();
{
let mut present_shares = still_present_shares;
let mut offline = OfflineDuringDkg::get(self.txn, genesis)
.unwrap_or(vec![])
.iter()
.map(|key| <Ristretto as Ciphersuite>::G::from_bytes(key).unwrap())
.collect::<Vec<_>>();
while let Some(offline) = offline.pop() {
if removed.contains(&offline) {
continue;
}
let original_i_for_offline =
self.spec.i(&[], offline).expect("offline was never present?");
let offline_shares =
u16::from(original_i_for_offline.end) - u16::from(original_i_for_offline.start);
if (present_shares - offline_shares) >= t {
present_shares -= offline_shares;
removed.push(offline);
}
if present_shares == t {
break;
}
}
}
RemovedAsOfDkgAttempt::set(
self.txn,
genesis,
attempt,
&removed.iter().map(<Ristretto as Ciphersuite>::G::to_bytes).collect(),
);
if DkgLocallyCompleted::get(self.txn, genesis).is_none() {
let Some(our_i) = self.spec.i(&removed, Ristretto::generator() * self.our_key.deref())
else {
continue;
};
let id =
processor_messages::key_gen::KeyGenId { session: self.spec.set().session, attempt };
let params =
frost::ThresholdParams::new(t, self.spec.n(&removed), our_i.start).unwrap();
let shares = u16::from(our_i.end) - u16::from(our_i.start);
self
.processors
.send(
self.spec.set().network,
processor_messages::key_gen::CoordinatorMessage::GenerateKey { id, params, shares },
)
.await;
}
}
Topic::DkgConfirmation => unreachable!(),
Topic::SubstrateSign(inner_id) => {
let id = processor_messages::coordinator::SubstrateSignId {
session: self.spec.set().session,
id: inner_id,
attempt,
};
match inner_id {
SubstrateSignableId::CosigningSubstrateBlock(block) => {
let block_number = SeraiBlockNumber::get(self.txn, block)
.expect("couldn't get the block number for prior attempted cosign");
let latest_cosign =
crate::cosign_evaluator::LatestCosign::get(self.txn, self.spec.set().network)
.map_or(0, |cosign| cosign.block_number);
if latest_cosign < block_number {
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
id,
block_number,
},
)
.await;
}
}
SubstrateSignableId::Batch(batch) => {
if BatchInstructionsHashDb::get(self.txn, self.spec.set().network, batch).is_none() {
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::BatchReattempt { id },
)
.await;
}
}
SubstrateSignableId::SlashReport => {
if crate::RetiredTributaryDb::get(self.txn, self.spec.set()).is_none() {
let report = SlashReport::get(self.txn, self.spec.set())
.expect("re-attempting signing a SlashReport we don't have?");
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::SignSlashReport {
id,
report,
},
)
.await;
}
}
}
}
Topic::Sign(id) => {
self
.processors
.send(
self.spec.set().network,
processor_messages::sign::CoordinatorMessage::Reattempt {
id: processor_messages::sign::SignId {
session: self.spec.set().session,
id,
attempt,
},
},
)
.await;
}
}
}
if Some(u64::from(self.block_number)) == SlashReportCutOff::get(self.txn, genesis) {
let mut all_reports = vec![];
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
let Some(mut report) = SlashReports::get(self.txn, genesis, validator.to_bytes()) else {
continue;
};
report.insert(i, 0);
let signer_i = self.spec.i(&[], validator).unwrap();
let signer_len = u16::from(signer_i.end) - u16::from(signer_i.start);
for _ in 0 .. signer_len {
all_reports.push(report.clone());
}
}
let mut medians = vec![];
for p in 0 .. self.spec.validators().len() {
let mut median_calc = vec![];
for report in &all_reports {
median_calc.push(report[p]);
}
median_calc.sort_unstable();
medians.push(median_calc[median_calc.len() / 2]);
}
let mut sorted_medians = vec![];
for (i, (_, shares)) in self.spec.validators().into_iter().enumerate() {
for _ in 0 .. shares {
sorted_medians.push(medians[i]);
}
}
sorted_medians.sort_unstable();
let worst_points_by_party_within_threshold = sorted_medians[usize::from(self.spec.t()) - 1];
for median in &mut medians {
*median = median.saturating_sub(worst_points_by_party_within_threshold);
}
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
if FatallySlashed::get(self.txn, genesis, validator.to_bytes()).is_some() {
medians[i] = u32::MAX;
}
}
let mut report = vec![];
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
if medians[i] != 0 {
report.push((validator.to_bytes(), medians[i]));
}
}
SlashReport::set(self.txn, self.spec.set(), &report);
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::SignSlashReport {
id: SubstrateSignId {
session: self.spec.set().session,
id: SubstrateSignableId::SlashReport,
attempt: 0,
},
report,
},
)
.await;
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_new_blocks<
D: Db,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &RID,
processors: &Pro,
publish_serai_tx: &PST,
publish_tributary_tx: &PTT,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
) {
let genesis = tributary.genesis();
let mut last_block = LastHandledBlock::get(db, genesis).unwrap_or(genesis);
let mut block_number = TributaryBlockNumber::get(db, last_block).unwrap_or(0);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
block_number += 1;
for tx in &block.transactions {
let TransactionKind::Provided(order) = tx.kind() else {
break;
};
if !tributary.locally_provided_txs_in_block(&block.hash(), order) {
return;
}
}
let mut db_clone = db.clone();
let mut txn = db_clone.txn();
TributaryBlockNumber::set(&mut txn, next, &block_number);
(TributaryBlockHandler {
db,
txn: &mut txn,
spec,
our_key: key,
recognized_id,
processors,
publish_serai_tx,
publish_tributary_tx,
block,
block_number,
_p2p: PhantomData::<P>,
})
.handle()
.await;
last_block = next;
LastHandledBlock::set(&mut txn, genesis, &next);
txn.commit();
}
}
pub(crate) async fn scan_tributaries_task<
D: Db,
Pro: Processors,
P: P2p,
RID: 'static + Send + Sync + Clone + RIDTrait,
>(
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: RID,
processors: Pro,
serai: Arc<Serai>,
mut tributary_event: broadcast::Receiver<crate::TributaryEvent<D, P>>,
) {
log::info!("scanning tributaries");
loop {
match tributary_event.recv().await {
Ok(crate::TributaryEvent::NewTributary(crate::ActiveTributary { spec, tributary })) => {
tokio::spawn({
let raw_db = raw_db.clone();
let key = key.clone();
let recognized_id = recognized_id.clone();
let processors = processors.clone();
let serai = serai.clone();
async move {
let spec = &spec;
let reader = tributary.reader();
let mut tributary_db = raw_db.clone();
loop {
if crate::db::RetiredTributaryDb::get(&raw_db, spec.set()).is_some() {
break;
}
let next_block_notification = tributary.next_block_notification().await;
handle_new_blocks::<_, _, _, _, _, P>(
&mut tributary_db,
&key,
&recognized_id,
&processors,
&*serai,
&|tx: Transaction| {
let tributary = tributary.clone();
async move {
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
Err(TransactionError::InvalidNonce) => {
log::warn!(
"publishing TX {tx:?} returned InvalidNonce. was it already added?"
)
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
},
spec,
&reader,
)
.await;
let _ = tokio::time::timeout(
Duration::from_secs(tributary::Tributary::<D, Transaction, P>::block_time().into()),
next_block_notification,
)
.await;
}
}
});
}
Ok(crate::TributaryEvent::TributaryRetired(_)) => {}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("scan_tributaries lagged to handle tributary_event")
}
Err(broadcast::error::RecvError::Closed) => panic!("tributary_event sender closed"),
}
}
}