use crate::container::{OperationalContainer, PendingContainer};
use crate::docker::{ContainerLogSource, Docker};
use crate::waitfor::{async_trait, WaitFor};
use crate::DockerTestError;
use futures::StreamExt;
use serde::Serialize;
use std::sync::atomic::{self, AtomicBool};
use std::sync::Arc;
use tokio::{time, time::Duration};
use tracing::{event, Level};
#[derive(Clone, Debug)]
pub struct MessageWait {
pub message: String,
pub source: MessageSource,
pub timeout: u16,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MessageSource {
Stdout,
Stderr,
}
#[async_trait]
impl WaitFor for MessageWait {
async fn wait_for_ready(
&self,
container: PendingContainer,
) -> Result<OperationalContainer, DockerTestError> {
pending_container_wait_for_message(
container,
self.source,
self.message.clone(),
self.timeout,
)
.await
}
}
async fn pending_container_wait_for_message(
container: PendingContainer,
source: MessageSource,
msg: String,
timeout: u16,
) -> Result<OperationalContainer, DockerTestError> {
let client = &container.client;
match wait_for_message(
client,
&container.id,
&container.handle,
source,
msg,
timeout,
)
.await
{
Ok(_) => Ok(container.into()),
Err(e) => Err(e),
}
}
pub(crate) async fn wait_for_message<T>(
client: &Docker,
container_id: &str,
handle: &str,
source: MessageSource,
msg: T,
timeout: u16,
) -> Result<(), DockerTestError>
where
T: Into<String> + Serialize,
{
let s1 = Arc::new(AtomicBool::new(false));
let s2 = s1.clone();
let msg_clone1: String = msg.into();
let msg_clone2: String = msg_clone1.clone();
let mut source: ContainerLogSource = source.into();
source.follow = true;
let stream = client.container_logs(container_id, source);
let log_stream_future = async {
stream
.take_while(move |message| match message {
Ok(message) => {
if String::from_utf8(message.message.to_vec())
.unwrap()
.contains(&msg_clone1)
{
s1.store(true, atomic::Ordering::SeqCst);
futures::future::ready(false)
} else {
futures::future::ready(true)
}
}
Err(_) => futures::future::ready(false),
})
.collect::<Vec<_>>()
.await
};
match time::timeout(Duration::from_secs(timeout.into()), log_stream_future).await {
Ok(_) => {
if s2.load(atomic::Ordering::SeqCst) {
Ok(())
} else {
Err(DockerTestError::Startup(
format!("container `{}` ended log stream (terminated) before waitfor message triggered: `{}`", handle, msg_clone2),
))
}
}
Err(_) => {
event!(Level::WARN, "awaiting container message timed out");
Err(DockerTestError::Startup(
"awaiting container message timed out".to_string(),
))
}
}
}