use std::{
io::{self, Read},
collections::{VecDeque, HashMap},
};
use ciphersuite::{group::GroupEncoding, Ciphersuite};
use serai_client::primitives::{ExternalNetworkId, ExternalCoin, Amount, ExternalBalance};
use crate::{
DbTxn, Db, Payment, Plan,
networks::{OutputType, Output, Network, UtxoNetwork},
multisigs::scheduler::Scheduler as SchedulerTrait,
};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Scheduler<N: UtxoNetwork> {
key: <N::Curve as Ciphersuite>::G,
coin: ExternalCoin,
queued_plans: HashMap<u64, VecDeque<Vec<Payment<N>>>>,
plans: HashMap<u64, VecDeque<Vec<Payment<N>>>>,
utxos: Vec<N::Output>,
payments: VecDeque<Payment<N>>,
}
fn scheduler_key<D: Db, G: GroupEncoding>(key: &G) -> Vec<u8> {
D::key(b"SCHEDULER", b"scheduler", key.to_bytes())
}
impl<N: UtxoNetwork<Scheduler = Self>> Scheduler<N> {
pub fn empty(&self) -> bool {
self.queued_plans.is_empty() &&
self.plans.is_empty() &&
self.utxos.is_empty() &&
self.payments.is_empty()
}
fn read<R: Read>(
key: <N::Curve as Ciphersuite>::G,
coin: ExternalCoin,
reader: &mut R,
) -> io::Result<Self> {
let mut read_plans = || -> io::Result<_> {
let mut all_plans = HashMap::new();
let mut all_plans_len = [0; 4];
reader.read_exact(&mut all_plans_len)?;
for _ in 0 .. u32::from_le_bytes(all_plans_len) {
let mut amount = [0; 8];
reader.read_exact(&mut amount)?;
let amount = u64::from_le_bytes(amount);
let mut plans = VecDeque::new();
let mut plans_len = [0; 4];
reader.read_exact(&mut plans_len)?;
for _ in 0 .. u32::from_le_bytes(plans_len) {
let mut payments = vec![];
let mut payments_len = [0; 4];
reader.read_exact(&mut payments_len)?;
for _ in 0 .. u32::from_le_bytes(payments_len) {
payments.push(Payment::read(reader)?);
}
plans.push_back(payments);
}
all_plans.insert(amount, plans);
}
Ok(all_plans)
};
let queued_plans = read_plans()?;
let plans = read_plans()?;
let mut utxos = vec![];
let mut utxos_len = [0; 4];
reader.read_exact(&mut utxos_len)?;
for _ in 0 .. u32::from_le_bytes(utxos_len) {
utxos.push(N::Output::read(reader)?);
}
let mut payments = VecDeque::new();
let mut payments_len = [0; 4];
reader.read_exact(&mut payments_len)?;
for _ in 0 .. u32::from_le_bytes(payments_len) {
payments.push_back(Payment::read(reader)?);
}
Ok(Scheduler { key, coin, queued_plans, plans, utxos, payments })
}
fn serialize(&self) -> Vec<u8> {
let mut res = Vec::with_capacity(4096);
let mut write_plans = |plans: &HashMap<u64, VecDeque<Vec<Payment<N>>>>| {
res.extend(u32::try_from(plans.len()).unwrap().to_le_bytes());
for (amount, list_of_plans) in plans {
res.extend(amount.to_le_bytes());
res.extend(u32::try_from(list_of_plans.len()).unwrap().to_le_bytes());
for plan in list_of_plans {
res.extend(u32::try_from(plan.len()).unwrap().to_le_bytes());
for payment in plan {
payment.write(&mut res).unwrap();
}
}
}
};
write_plans(&self.queued_plans);
write_plans(&self.plans);
res.extend(u32::try_from(self.utxos.len()).unwrap().to_le_bytes());
for utxo in &self.utxos {
utxo.write(&mut res).unwrap();
}
res.extend(u32::try_from(self.payments.len()).unwrap().to_le_bytes());
for payment in &self.payments {
payment.write(&mut res).unwrap();
}
debug_assert_eq!(&Self::read(self.key, self.coin, &mut res.as_slice()).unwrap(), self);
res
}
pub fn new<D: Db>(
txn: &mut D::Transaction<'_>,
key: <N::Curve as Ciphersuite>::G,
network: ExternalNetworkId,
) -> Self {
assert!(N::branch_address(key).is_some());
assert!(N::change_address(key).is_some());
assert!(N::forward_address(key).is_some());
let coin = {
let coins = network.coins();
assert_eq!(coins.len(), 1);
coins[0]
};
let res = Scheduler {
key,
coin,
queued_plans: HashMap::new(),
plans: HashMap::new(),
utxos: vec![],
payments: VecDeque::new(),
};
txn.put(scheduler_key::<D, _>(&res.key), res.serialize());
res
}
pub fn from_db<D: Db>(
db: &D,
key: <N::Curve as Ciphersuite>::G,
network: ExternalNetworkId,
) -> io::Result<Self> {
let coin = {
let coins = network.coins();
assert_eq!(coins.len(), 1);
coins[0]
};
let scheduler = db.get(scheduler_key::<D, _>(&key)).unwrap_or_else(|| {
panic!("loading scheduler from DB without scheduler for {}", hex::encode(key.to_bytes()))
});
let mut reader_slice = scheduler.as_slice();
let reader = &mut reader_slice;
Self::read(key, coin, reader)
}
pub fn can_use_branch(&self, balance: ExternalBalance) -> bool {
assert_eq!(balance.coin, self.coin);
self.plans.contains_key(&balance.amount.0)
}
fn execute(
&mut self,
inputs: Vec<N::Output>,
mut payments: Vec<Payment<N>>,
key_for_any_change: <N::Curve as Ciphersuite>::G,
) -> Plan<N> {
let mut change = false;
let mut max = N::MAX_OUTPUTS;
let payment_amounts = |payments: &Vec<Payment<N>>| {
payments.iter().map(|payment| payment.balance.amount.0).sum::<u64>()
};
if inputs.iter().map(|output| output.balance().amount.0).sum::<u64>() !=
payment_amounts(&payments)
{
change = true;
max -= 1;
}
let mut add_plan = |payments| {
let amount = payment_amounts(&payments);
self.queued_plans.entry(amount).or_insert(VecDeque::new()).push_back(payments);
amount
};
let branch_address = N::branch_address(self.key).unwrap();
while payments.len() > max {
let to_remove = (payments.len() + 1) - N::MAX_OUTPUTS;
let to_remove = to_remove.min(N::MAX_OUTPUTS);
let removed = payments.drain((payments.len() - to_remove) ..).collect::<Vec<_>>();
assert_eq!(removed.len(), to_remove);
let amount = add_plan(removed);
payments.insert(
0,
Payment {
address: branch_address.clone(),
data: None,
balance: ExternalBalance { coin: self.coin, amount: Amount(amount) },
},
);
}
Plan {
key: self.key,
inputs,
payments,
change: Some(N::change_address(key_for_any_change).unwrap()).filter(|_| change),
scheduler_addendum: (),
}
}
fn add_outputs(
&mut self,
mut utxos: Vec<N::Output>,
key_for_any_change: <N::Curve as Ciphersuite>::G,
) -> Vec<Plan<N>> {
log::info!("adding {} outputs", utxos.len());
let mut txs = vec![];
for utxo in utxos.drain(..) {
if utxo.kind() == OutputType::Branch {
let amount = utxo.balance().amount.0;
if let Some(plans) = self.plans.get_mut(&amount) {
let payments = plans.pop_front().unwrap();
assert!(amount >= payments.iter().map(|payment| payment.balance.amount.0).sum::<u64>());
if plans.is_empty() {
self.plans.remove(&amount);
}
txs.push(self.execute(vec![utxo], payments, key_for_any_change));
continue;
}
}
self.utxos.push(utxo);
}
log::info!("{} planned TXs have had their required inputs confirmed", txs.len());
txs
}
pub fn schedule<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
utxos: Vec<N::Output>,
mut payments: Vec<Payment<N>>,
key_for_any_change: <N::Curve as Ciphersuite>::G,
force_spend: bool,
) -> Vec<Plan<N>> {
for utxo in &utxos {
assert_eq!(utxo.balance().coin, self.coin);
}
for payment in &payments {
assert_eq!(payment.balance.coin, self.coin);
}
{
let branch_address = N::branch_address(self.key).unwrap();
payments =
payments.drain(..).filter(|payment| payment.address != branch_address).collect::<Vec<_>>();
}
let mut plans = self.add_outputs(utxos, key_for_any_change);
log::info!("scheduling {} new payments", payments.len());
self.payments.extend(payments);
let payments_at_start = self.payments.len();
log::info!("{} payments are now scheduled", payments_at_start);
if self.utxos.is_empty() {
log::info!("no utxos currently available");
return plans;
}
self.utxos.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse());
let utxos = self.utxos.drain(..).collect::<Vec<_>>();
let mut utxo_chunks =
utxos.chunks(N::MAX_INPUTS).map(<[<N as Network>::Output]>::to_vec).collect::<Vec<_>>();
let utxos = utxo_chunks.remove(0);
let mut to_restore = None;
if let Some(mut chunk) = utxo_chunks.pop() {
if chunk.len() == 1 {
to_restore = Some(chunk.pop().unwrap());
} else {
utxo_chunks.push(chunk);
}
}
for chunk in utxo_chunks.drain(..) {
log::debug!("aggregating a chunk of {} inputs", chunk.len());
plans.push(Plan {
key: self.key,
inputs: chunk,
payments: vec![],
change: Some(N::change_address(key_for_any_change).unwrap()),
scheduler_addendum: (),
})
}
let mut balance = utxos.iter().map(|output| output.balance().amount.0).sum::<u64>();
let mut executing = vec![];
while !self.payments.is_empty() {
let amount = self.payments[0].balance.amount.0;
if balance.checked_sub(amount).is_some() {
balance -= amount;
executing.push(self.payments.pop_front().unwrap());
} else {
break;
}
}
if !executing.is_empty() {
plans.push(self.execute(utxos, executing, key_for_any_change));
} else {
self.utxos.extend(utxos);
}
if force_spend && (!self.utxos.is_empty()) {
assert!(self.utxos.len() <= N::MAX_INPUTS);
plans.push(Plan {
key: self.key,
inputs: self.utxos.drain(..).collect::<Vec<_>>(),
payments: vec![],
change: Some(N::change_address(key_for_any_change).unwrap()),
scheduler_addendum: (),
});
}
if let Some(to_restore) = to_restore {
self.utxos.push(to_restore);
}
txn.put(scheduler_key::<D, _>(&self.key), self.serialize());
log::info!(
"created {} plans containing {} payments to sign, with {} payments pending scheduling",
plans.len(),
payments_at_start - self.payments.len(),
self.payments.len(),
);
plans
}
pub fn consume_payments<D: Db>(&mut self, txn: &mut D::Transaction<'_>) -> Vec<Payment<N>> {
let res: Vec<_> = self.payments.drain(..).collect();
if !res.is_empty() {
txn.put(scheduler_key::<D, _>(&self.key), self.serialize());
}
res
}
pub fn created_output<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
expected: u64,
actual: Option<u64>,
) {
log::debug!("output expected to have {} had {:?} after fees", expected, actual);
let queued = self.queued_plans.get_mut(&expected).unwrap();
let mut payments = queued.pop_front().unwrap();
assert_eq!(expected, payments.iter().map(|payment| payment.balance.amount.0).sum::<u64>());
if queued.is_empty() {
self.queued_plans.remove(&expected);
}
let Some(actual) = actual else { return };
{
let mut to_amortize = actual - expected;
if payments.iter().map(|payment| payment.balance.amount.0).sum::<u64>() < to_amortize {
return;
}
while to_amortize != 0 {
let payments_len = u64::try_from(payments.len()).unwrap();
let per_payment = to_amortize / payments_len;
let mut overage = to_amortize % payments_len;
for payment in &mut payments {
let to_subtract = per_payment + overage;
overage = 0;
let subtractable = payment.balance.amount.0.min(to_subtract);
to_amortize -= subtractable;
payment.balance.amount.0 -= subtractable;
}
}
}
let payments = payments
.into_iter()
.filter(|payment| payment.balance.amount.0 >= N::DUST)
.collect::<Vec<_>>();
assert!(actual >= payments.iter().map(|payment| payment.balance.amount.0).sum::<u64>());
if payments.is_empty() {
return;
}
self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments);
txn.put(scheduler_key::<D, _>(&self.key), self.serialize());
}
}
impl<N: UtxoNetwork<Scheduler = Self>> SchedulerTrait<N> for Scheduler<N> {
type Addendum = ();
fn empty(&self) -> bool {
Scheduler::empty(self)
}
fn new<D: Db>(
txn: &mut D::Transaction<'_>,
key: <N::Curve as Ciphersuite>::G,
network: ExternalNetworkId,
) -> Self {
Scheduler::new::<D>(txn, key, network)
}
fn from_db<D: Db>(
db: &D,
key: <N::Curve as Ciphersuite>::G,
network: ExternalNetworkId,
) -> io::Result<Self> {
Scheduler::from_db::<D>(db, key, network)
}
fn can_use_branch(&self, balance: ExternalBalance) -> bool {
Scheduler::can_use_branch(self, balance)
}
fn schedule<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
utxos: Vec<N::Output>,
payments: Vec<Payment<N>>,
key_for_any_change: <N::Curve as Ciphersuite>::G,
force_spend: bool,
) -> Vec<Plan<N>> {
Scheduler::schedule::<D>(self, txn, utxos, payments, key_for_any_change, force_spend)
}
fn consume_payments<D: Db>(&mut self, txn: &mut D::Transaction<'_>) -> Vec<Payment<N>> {
Scheduler::consume_payments::<D>(self, txn)
}
fn created_output<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
expected: u64,
actual: Option<u64>,
) {
Scheduler::created_output::<D>(self, txn, expected, actual)
}
fn refund_plan<D: Db>(
&mut self,
_: &mut D::Transaction<'_>,
output: N::Output,
refund_to: N::Address,
) -> Plan<N> {
let output_id = output.id().as_ref().to_vec();
let res = Plan {
key: output.key(),
payments: vec![Payment { address: refund_to, data: None, balance: output.balance() }],
inputs: vec![output],
change: None,
scheduler_addendum: (),
};
log::info!("refund plan for {} has ID {}", hex::encode(output_id), hex::encode(res.id()));
res
}
fn shim_forward_plan(output: N::Output, to: <N::Curve as Ciphersuite>::G) -> Option<Plan<N>> {
Some(Plan {
key: output.key(),
payments: vec![Payment {
address: N::forward_address(to).unwrap(),
data: None,
balance: output.balance(),
}],
inputs: vec![output],
change: None,
scheduler_addendum: (),
})
}
fn forward_plan<D: Db>(
&mut self,
_: &mut D::Transaction<'_>,
output: N::Output,
to: <N::Curve as Ciphersuite>::G,
) -> Option<Plan<N>> {
assert_eq!(self.key, output.key());
Self::shim_forward_plan(output, to)
}
}