1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//! Represents a container scheduled for cleanup.

use crate::{
    composition::{LogAction, LogOptions},
    container::{OperationalContainer, PendingContainer},
    docker::{ContainerLogSource, Docker, LogEntry},
    waitfor::MessageSource,
    DockerTestError, LogSource,
};
use futures::StreamExt;
use std::{
    io::{self, Write},
    time::Duration,
};
use tracing::info;

/// A container representation of a pending or running container, that requires us to
/// perform cleanup on it.
///
/// This structure is an implementation detail of dockertest and shall NOT be publicly
/// exposed.
#[derive(Clone, Debug)]
pub(crate) struct CleanupContainer {
    pub(crate) id: String,
    is_static: bool,
    /// The generated docker name for this container.
    pub(crate) name: String,
    /// Client obtained from `PendingContainer` or `OperationalContainer`, we need it because
    /// we want to call `client.logs` to get container logs.
    pub(crate) client: Docker,
    /// Container log options.
    pub(crate) log_options: Option<LogOptions>,
}

impl CleanupContainer {
    pub(crate) fn is_static(&self) -> bool {
        self.is_static
    }

    /// Handle one log entry.
    async fn handle_log_line(
        &self,
        action: &LogAction,
        entry: LogEntry,
        file: &mut Option<tokio::fs::File>,
    ) -> Result<(), DockerTestError> {
        let write_to_stdout = |message| {
            io::stdout()
                .write(message)
                .map_err(|error| DockerTestError::LogWriteError(format!("stdout: {}", error)))?;
            Ok(())
        };

        let write_to_stderr = |message| {
            io::stderr()
                .write(message)
                .map_err(|error| DockerTestError::LogWriteError(format!("stderr: {}", error)))?;
            Ok(())
        };

        match action {
            // forward-only, print stdout/stderr output to current process stdout/stderr
            LogAction::Forward => match entry.source {
                MessageSource::Stdout => write_to_stdout(&entry.message[..]),
                MessageSource::Stderr => write_to_stderr(&entry.message[..]),
            },
            // forward everything to stderr
            LogAction::ForwardToStdErr => write_to_stderr(&entry.message[..]),
            // forward everything to stdout
            LogAction::ForwardToStdOut => write_to_stdout(&entry.message[..]),
            // forward everything to a file, file should be already opened
            LogAction::ForwardToFile { .. } => {
                use tokio::io::AsyncWriteExt;

                if let Some(ref mut file) = file {
                    file.write(&entry.message[..])
                        .await
                        .map_err(|error| {
                            DockerTestError::LogWriteError(format!(
                                "unable to write to log file: {}",
                                error
                            ))
                        })
                        .map(|_| ())
                } else {
                    Err(DockerTestError::LogWriteError(
                        "log file should not be None".to_string(),
                    ))
                }
            }
        }
    }

    /// Handle container logs.
    pub(crate) async fn handle_log(
        &self,
        action: &LogAction,
        source: &LogSource,
    ) -> Result<(), DockerTestError> {
        // check if we need to capture stderr and/or stdout
        let should_log_stderr = match source {
            LogSource::StdErr => true,
            LogSource::StdOut => false,
            LogSource::Both => true,
        };

        let should_log_stdout = match source {
            LogSource::StdErr => false,
            LogSource::StdOut => true,
            LogSource::Both => true,
        };

        let source = ContainerLogSource {
            log_stderr: should_log_stderr,
            log_stdout: should_log_stdout,
            ..Default::default()
        };

        info!("Trying to get logs from container: id={}", self.id);
        let mut stream = self.client.container_logs(&self.name, source);

        // let's open file if need it, we are doing this because we dont want to open
        // file in every log reading iteration
        let mut file = match action {
            LogAction::ForwardToFile { path } => {
                let filepath = format!("{}/{}", path, self.name);
                // try to create file, bail if we cannot create file
                tokio::fs::File::create(filepath)
                    .await
                    .map(Some)
                    .map_err(|error| {
                        DockerTestError::LogWriteError(format!(
                            "unable to create log file: {}",
                            error
                        ))
                    })
            }
            _ => Ok(None),
        }?;

        // If the container has not exited the log stream will hang forever, we therefore exit
        // if we have not gotten a message within 5 seconds of the last received log entry.
        let mut interval = tokio::time::interval(Duration::from_secs(5));
        interval.tick().await;
        tokio::select! {
            data = stream.next() => {
                interval.reset();
                if let Some(data) = data {
                    match data {
                        Ok(line) => self.handle_log_line(action, line, &mut file).await?,
                        Err(error) => {
                            return Err(DockerTestError::LogWriteError(format!(
                                "unable to read docker log: {}",
                                error
                            )))
                        }
                    }
                } else {
                    return Ok(());
                }
            }
            _ = interval.tick() => {
                return Ok(());
            }
        }

        Ok(())
    }
}

impl From<PendingContainer> for CleanupContainer {
    fn from(container: PendingContainer) -> CleanupContainer {
        CleanupContainer {
            id: container.id,
            is_static: container.is_static,
            client: container.client,
            log_options: container.log_options,
            name: container.name,
        }
    }
}

impl From<&PendingContainer> for CleanupContainer {
    fn from(container: &PendingContainer) -> CleanupContainer {
        CleanupContainer {
            id: container.id.clone(),
            is_static: container.is_static,
            client: container.client.clone(),
            log_options: container.log_options.clone(),
            name: container.name.clone(),
        }
    }
}

impl From<OperationalContainer> for CleanupContainer {
    fn from(container: OperationalContainer) -> CleanupContainer {
        CleanupContainer {
            id: container.id,
            is_static: container.is_static,
            client: container.client,
            log_options: container.log_options,
            name: container.name,
        }
    }
}

impl From<&OperationalContainer> for CleanupContainer {
    fn from(container: &OperationalContainer) -> CleanupContainer {
        CleanupContainer {
            id: container.id.clone(),
            is_static: container.is_static,
            client: container.client.clone(),
            log_options: container.log_options.clone(),
            name: container.name.clone(),
        }
    }
}