1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
use core::{ops::Deref, time::Duration};
use std::{
  sync::Arc,
  collections::{HashSet, HashMap},
};

use zeroize::Zeroizing;

use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};

use serai_client::{
  coins::CoinsEvent,
  in_instructions::InInstructionsEvent,
  primitives::{BlockHash, ExternalNetworkId},
  validator_sets::{
    primitives::{ExternalValidatorSet, ValidatorSet},
    ValidatorSetsEvent,
  },
  Block, Serai, SeraiError, TemporalSerai,
};

use serai_db::DbTxn;

use processor_messages::SubstrateContext;

use tokio::{sync::mpsc, time::sleep};

use crate::{
  Db,
  processors::Processors,
  tributary::{TributarySpec, SeraiDkgCompleted},
};

mod db;
pub use db::*;

mod cosign;
pub use cosign::*;

async fn in_set(
  key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
  serai: &TemporalSerai<'_>,
  set: ValidatorSet,
) -> Result<Option<bool>, SeraiError> {
  let Some(participants) = serai.validator_sets().participants(set.network).await? else {
    return Ok(None);
  };
  let key = (Ristretto::generator() * key.deref()).to_bytes();
  Ok(Some(participants.iter().any(|(participant, _)| participant.0 == key)))
}

async fn handle_new_set<D: Db>(
  txn: &mut D::Transaction<'_>,
  key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
  new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
  serai: &Serai,
  block: &Block,
  set: ExternalValidatorSet,
) -> Result<(), SeraiError> {
  if in_set(key, &serai.as_of(block.hash()), set.into())
    .await?
    .expect("NewSet for set which doesn't exist")
  {
    log::info!("present in set {:?}", set);

    let set_data = {
      let serai = serai.as_of(block.hash());
      let serai = serai.validator_sets();
      let set_participants =
        serai.participants(set.network.into()).await?.expect("NewSet for set which doesn't exist");

      set_participants.into_iter().map(|(k, w)| (k, u16::try_from(w).unwrap())).collect::<Vec<_>>()
    };

    let time = if let Ok(time) = block.time() {
      time
    } else {
      assert_eq!(block.number(), 0);
      // Use the next block's time
      loop {
        let Ok(Some(res)) = serai.finalized_block_by_number(1).await else {
          sleep(Duration::from_secs(5)).await;
          continue;
        };
        break res.time().unwrap();
      }
    };
    // The block time is in milliseconds yet the Tributary is in seconds
    let time = time / 1000;
    // Since this block is in the past, and Tendermint doesn't play nice with starting chains after
    // their start time (though it does eventually work), delay the start time by 120 seconds
    // This is meant to handle ~20 blocks of lack of finalization for this first block
    const SUBSTRATE_TO_TRIBUTARY_TIME_DELAY: u64 = 120;
    let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY;

    let spec = TributarySpec::new(block.hash(), time, set, set_data);

    log::info!("creating new tributary for {:?}", spec.set());

    // Save it to the database now, not on the channel receiver's side, so this is safe against
    // reboots
    // If this txn finishes, and we reboot, then this'll be reloaded from active Tributaries
    // If this txn doesn't finish, this will be re-fired
    // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the
    // prior fired event may have not been received yet
    crate::ActiveTributaryDb::add_participating_in_tributary(txn, &spec);

    new_tributary_spec.send(spec).unwrap();
  } else {
    log::info!("not present in new set {:?}", set);
  }

  Ok(())
}

async fn handle_batch_and_burns<Pro: Processors>(
  txn: &mut impl DbTxn,
  processors: &Pro,
  serai: &Serai,
  block: &Block,
) -> Result<(), SeraiError> {
  // Track which networks had events with a Vec in ordr to preserve the insertion order
  // While that shouldn't be needed, ensuring order never hurts, and may enable design choices
  // with regards to Processor <-> Coordinator message passing
  let mut networks_with_event = vec![];
  let mut network_had_event = |burns: &mut HashMap<_, _>, batches: &mut HashMap<_, _>, network| {
    // Don't insert this network multiple times
    // A Vec is still used in order to maintain the insertion order
    if !networks_with_event.contains(&network) {
      networks_with_event.push(network);
      burns.insert(network, vec![]);
      batches.insert(network, vec![]);
    }
  };

  let mut batch_block = HashMap::new();
  let mut batches = HashMap::<ExternalNetworkId, Vec<u32>>::new();
  let mut burns = HashMap::new();

  let serai = serai.as_of(block.hash());
  for batch in serai.in_instructions().batch_events().await? {
    if let InInstructionsEvent::Batch { network, id, block: network_block, instructions_hash } =
      batch
    {
      network_had_event(&mut burns, &mut batches, network);

      BatchInstructionsHashDb::set(txn, network, id, &instructions_hash);

      // Make sure this is the only Batch event for this network in this Block
      assert!(batch_block.insert(network, network_block).is_none());

      // Add the batch included by this block
      batches.get_mut(&network).unwrap().push(id);
    } else {
      panic!("Batch event wasn't Batch: {batch:?}");
    }
  }

  for burn in serai.coins().burn_with_instruction_events().await? {
    if let CoinsEvent::BurnWithInstruction { from: _, instruction } = burn {
      let network = instruction.balance.coin.network();
      network_had_event(&mut burns, &mut batches, network);

      // network_had_event should register an entry in burns
      burns.get_mut(&network).unwrap().push(instruction);
    } else {
      panic!("Burn event wasn't Burn: {burn:?}");
    }
  }

  assert_eq!(HashSet::<&_>::from_iter(networks_with_event.iter()).len(), networks_with_event.len());

  for network in networks_with_event {
    let network_latest_finalized_block = if let Some(block) = batch_block.remove(&network) {
      block
    } else {
      // If it's had a batch or a burn, it must have had a block acknowledged
      serai
        .in_instructions()
        .latest_block_for_network(network)
        .await?
        .expect("network had a batch/burn yet never set a latest block")
    };

    processors
      .send(
        network,
        processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
          context: SubstrateContext {
            serai_time: block.time().unwrap() / 1000,
            network_latest_finalized_block,
          },
          block: block.number(),
          burns: burns.remove(&network).unwrap(),
          batches: batches.remove(&network).unwrap(),
        },
      )
      .await;
  }

  Ok(())
}

// Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding)
#[allow(clippy::too_many_arguments)]
async fn handle_block<D: Db, Pro: Processors>(
  db: &mut D,
  key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
  new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
  perform_slash_report: &mpsc::UnboundedSender<ExternalValidatorSet>,
  tributary_retired: &mpsc::UnboundedSender<ExternalValidatorSet>,
  processors: &Pro,
  serai: &Serai,
  block: Block,
) -> Result<(), SeraiError> {
  let hash = block.hash();

  // Define an indexed event ID.
  let mut event_id = 0;

  // If a new validator set was activated, create tributary/inform processor to do a DKG
  for new_set in serai.as_of(hash).validator_sets().new_set_events().await? {
    // Individually mark each event as handled so on reboot, we minimize duplicates
    // Additionally, if the Serai connection also fails 1/100 times, this means a block with 1000
    // events will successfully be incrementally handled
    // (though the Serai connection should be stable, making this unnecessary)
    let ValidatorSetsEvent::NewSet { set } = new_set else {
      panic!("NewSet event wasn't NewSet: {new_set:?}");
    };

    // We only coordinate/process external networks
    let Ok(set) = ExternalValidatorSet::try_from(set) else { continue };
    if HandledEvent::is_unhandled(db, hash, event_id) {
      log::info!("found fresh new set event {:?}", new_set);
      let mut txn = db.txn();
      handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
      HandledEvent::handle_event(&mut txn, hash, event_id);
      txn.commit();
    }
    event_id += 1;
  }

  // If a key pair was confirmed, inform the processor
  for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
    if HandledEvent::is_unhandled(db, hash, event_id) {
      log::info!("found fresh key gen event {:?}", key_gen);
      let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else {
        panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
      };
      let substrate_key = key_pair.0 .0;
      processors
        .send(
          set.network,
          processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
            context: SubstrateContext {
              serai_time: block.time().unwrap() / 1000,
              network_latest_finalized_block: serai
                .as_of(block.hash())
                .in_instructions()
                .latest_block_for_network(set.network)
                .await?
                // The processor treats this as a magic value which will cause it to find a network
                // block which has a time greater than or equal to the Serai time
                .unwrap_or(BlockHash([0; 32])),
            },
            session: set.session,
            key_pair,
          },
        )
        .await;

      // TODO: If we were in the set, yet were removed, drop the tributary

      let mut txn = db.txn();
      SeraiDkgCompleted::set(&mut txn, set, &substrate_key);
      HandledEvent::handle_event(&mut txn, hash, event_id);
      txn.commit();
    }
    event_id += 1;
  }

  for accepted_handover in serai.as_of(hash).validator_sets().accepted_handover_events().await? {
    let ValidatorSetsEvent::AcceptedHandover { set } = accepted_handover else {
      panic!("AcceptedHandover event wasn't AcceptedHandover: {accepted_handover:?}");
    };

    let Ok(set) = ExternalValidatorSet::try_from(set) else { continue };
    if HandledEvent::is_unhandled(db, hash, event_id) {
      log::info!("found fresh accepted handover event {:?}", accepted_handover);
      // TODO: This isn't atomic with the event handling
      // Send a oneshot receiver so we can await the response?
      perform_slash_report.send(set).unwrap();
      let mut txn = db.txn();
      HandledEvent::handle_event(&mut txn, hash, event_id);
      txn.commit();
    }
    event_id += 1;
  }

  for retired_set in serai.as_of(hash).validator_sets().set_retired_events().await? {
    let ValidatorSetsEvent::SetRetired { set } = retired_set else {
      panic!("SetRetired event wasn't SetRetired: {retired_set:?}");
    };

    let Ok(set) = ExternalValidatorSet::try_from(set) else { continue };
    if HandledEvent::is_unhandled(db, hash, event_id) {
      log::info!("found fresh set retired event {:?}", retired_set);
      let mut txn = db.txn();
      crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
      tributary_retired.send(set).unwrap();
      HandledEvent::handle_event(&mut txn, hash, event_id);
      txn.commit();
    }
    event_id += 1;
  }

  // Finally, tell the processor of acknowledged blocks/burns
  // This uses a single event as unlike prior events which individually executed code, all
  // following events share data collection
  if HandledEvent::is_unhandled(db, hash, event_id) {
    let mut txn = db.txn();
    handle_batch_and_burns(&mut txn, processors, serai, &block).await?;
    HandledEvent::handle_event(&mut txn, hash, event_id);
    txn.commit();
  }

  Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn handle_new_blocks<D: Db, Pro: Processors>(
  db: &mut D,
  key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
  new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
  perform_slash_report: &mpsc::UnboundedSender<ExternalValidatorSet>,
  tributary_retired: &mpsc::UnboundedSender<ExternalValidatorSet>,
  processors: &Pro,
  serai: &Serai,
  next_block: &mut u64,
) -> Result<(), SeraiError> {
  // Check if there's been a new Substrate block
  let latest_number = serai.latest_finalized_block().await?.number();

  // Advance the cosigning protocol
  advance_cosign_protocol(db, key, serai, latest_number).await?;

  // Reduce to the latest cosigned block
  let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));

  if latest_number < *next_block {
    return Ok(());
  }

  for b in *next_block ..= latest_number {
    let block = serai
      .finalized_block_by_number(b)
      .await?
      .expect("couldn't get block before the latest finalized block");

    log::info!("handling substrate block {b}");
    handle_block(
      db,
      key,
      new_tributary_spec,
      perform_slash_report,
      tributary_retired,
      processors,
      serai,
      block,
    )
    .await?;
    *next_block += 1;

    let mut txn = db.txn();
    NextBlock::set(&mut txn, next_block);
    txn.commit();

    log::info!("handled substrate block {b}");
  }

  Ok(())
}

pub async fn scan_task<D: Db, Pro: Processors>(
  mut db: D,
  key: Zeroizing<<Ristretto as Ciphersuite>::F>,
  processors: Pro,
  serai: Arc<Serai>,
  new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
  perform_slash_report: mpsc::UnboundedSender<ExternalValidatorSet>,
  tributary_retired: mpsc::UnboundedSender<ExternalValidatorSet>,
) {
  log::info!("scanning substrate");
  let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default();

  /*
  let new_substrate_block_notifier = {
    let serai = &serai;
    move || async move {
      loop {
        match serai.newly_finalized_block().await {
          Ok(sub) => return sub,
          Err(e) => {
            log::error!("couldn't communicate with serai node: {e}");
            sleep(Duration::from_secs(5)).await;
          }
        }
      }
    }
  };
  */
  // TODO: Restore the above subscription-based system
  // That would require moving serai-client from HTTP to websockets
  let new_substrate_block_notifier = {
    let serai = &serai;
    move |next_substrate_block| async move {
      loop {
        match serai.latest_finalized_block().await {
          Ok(latest) => {
            if latest.header.number >= next_substrate_block {
              return latest;
            }
            sleep(Duration::from_secs(3)).await;
          }
          Err(e) => {
            log::error!("couldn't communicate with serai node: {e}");
            sleep(Duration::from_secs(5)).await;
          }
        }
      }
    }
  };

  loop {
    // await the next block, yet if our notifier had an error, re-create it
    {
      let Ok(_) = tokio::time::timeout(
        Duration::from_secs(60),
        new_substrate_block_notifier(next_substrate_block),
      )
      .await
      else {
        // Timed out, which may be because Serai isn't finalizing or may be some issue with the
        // notifier
        if serai.latest_finalized_block().await.map(|block| block.number()).ok() ==
          Some(next_substrate_block.saturating_sub(1))
        {
          log::info!("serai hasn't finalized a block in the last 60s...");
        }
        continue;
      };

      /*
      // next_block is a Option<Result>
      if next_block.and_then(Result::ok).is_none() {
        substrate_block_notifier = new_substrate_block_notifier(next_substrate_block);
        continue;
      }
      */
    }

    match handle_new_blocks(
      &mut db,
      &key,
      &new_tributary_spec,
      &perform_slash_report,
      &tributary_retired,
      &processors,
      &serai,
      &mut next_substrate_block,
    )
    .await
    {
      Ok(()) => {}
      Err(e) => {
        log::error!("couldn't communicate with serai node: {e}");
        sleep(Duration::from_secs(5)).await;
      }
    }
  }
}

/// Gets the expected ID for the next Batch.
///
/// Will log an error and apply a slight sleep on error, letting the caller simply immediately
/// retry.
pub(crate) async fn expected_next_batch(
  serai: &Serai,
  network: ExternalNetworkId,
) -> Result<u32, SeraiError> {
  async fn expected_next_batch_inner(
    serai: &Serai,
    network: ExternalNetworkId,
  ) -> Result<u32, SeraiError> {
    let serai = serai.as_of_latest_finalized_block().await?;
    let last = serai.in_instructions().last_batch_for_network(network).await?;
    Ok(if let Some(last) = last { last + 1 } else { 0 })
  }
  match expected_next_batch_inner(serai, network).await {
    Ok(next) => Ok(next),
    Err(e) => {
      log::error!("couldn't get the expected next batch from substrate: {e:?}");
      sleep(Duration::from_millis(100)).await;
      Err(e)
    }
  }
}

/// Verifies `Batch`s which have already been indexed from Substrate.
///
/// Spins if a distinct `Batch` is detected on-chain.
///
/// This has a slight malleability in that doesn't verify *who* published a `Batch` is as expected.
/// This is deemed fine.
pub(crate) async fn verify_published_batches<D: Db>(
  txn: &mut D::Transaction<'_>,
  network: ExternalNetworkId,
  optimistic_up_to: u32,
) -> Option<u32> {
  // TODO: Localize from MainDb to SubstrateDb
  let last = crate::LastVerifiedBatchDb::get(txn, network);
  for id in last.map_or(0, |last| last + 1) ..= optimistic_up_to {
    let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else {
      break;
    };
    let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();
    if on_chain != off_chain {
      // Halt operations on this network and spin, as this is a critical fault
      loop {
        log::error!(
          "{}! network: {:?} id: {} off-chain: {} on-chain: {}",
          "on-chain batch doesn't match off-chain",
          network,
          id,
          hex::encode(off_chain),
          hex::encode(on_chain),
        );
        sleep(Duration::from_secs(60)).await;
      }
    }
    crate::LastVerifiedBatchDb::set(txn, network, &id);
  }

  crate::LastVerifiedBatchDb::get(txn, network)
}