1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
//! Represents a container that has been started, completing its WaitFor condition.

use crate::{
    composition::LogOptions,
    container::PendingContainer,
    docker::{ContainerState, Docker},
    waitfor::{wait_for_message, MessageSource},
    DockerTestError,
};

use bollard::models::{PortBinding, PortMap};
use serde::Serialize;

use std::{
    collections::HashMap,
    convert::TryFrom,
    net::{IpAddr, Ipv4Addr},
    str::FromStr,
    sync::{Arc, Mutex},
};

/// Represent a docker container that has passed its `WaitFor` implementation and available is to the test body.
/// The state of the container will depend on what `WaitFor` implementation was used, e.g.
/// using the `ExitedWait` will result in a container in an exited state.
// NOTE: Fields within this structure are pub(crate) only for testability.
// None of these fields should be externally public.
#[derive(Clone, Debug)]
pub struct OperationalContainer {
    pub(crate) client: Docker,
    pub(crate) handle: String,
    /// The unique docker container identifier assigned at creation.
    pub(crate) id: String,
    /// The generated docker name for this running container.
    pub(crate) name: String,
    /// IP address of the container
    pub(crate) ip: std::net::Ipv4Addr,
    /// Published container ports
    pub(crate) ports: HostPortMappings,
    pub(crate) is_static: bool,
    pub(crate) log_options: Option<LogOptions>,
    pub(crate) assumed_state: Arc<Mutex<ContainerState>>,
}

#[derive(Clone, Debug, Default)]
pub(crate) struct HostPortMappings {
    mappings: HashMap<u32, (Ipv4Addr, u32)>,
}

#[derive(thiserror::Error, Debug, PartialEq, Clone)]
pub(crate) enum HostPortMappingError {
    #[error("failed to extract host port from docker details, malformed ip/protocol key: {0}")]
    HostPortKey(String),
    #[error("port mapping did not contain a host port or host ip")]
    NoMapping,
    #[error("failed to convert host ip/port to appropriate types: {0}")]
    Conversion(String),
}

impl TryFrom<PortMap> for HostPortMappings {
    type Error = HostPortMappingError;
    fn try_from(p: PortMap) -> Result<HostPortMappings, Self::Error> {
        let mut map: HashMap<u32, (Ipv4Addr, u32)> = HashMap::new();
        for (host_port_string, ports) in p.into_iter() {
            if let Some(port_bindings) = ports {
                let split: Vec<&str> = host_port_string.split('/').collect();
                // We expect the split to contain "port/protocol" e.g. "8080/tcp"
                // If it does not contain this, we cannot extract the host_port and do not
                // have any fallback and return an early error.
                if split.len() < 2 {
                    return Err(HostPortMappingError::HostPortKey(host_port_string));
                }

                let host_port = u32::from_str(split[0])
                    .map_err(|e| HostPortMappingError::Conversion(e.to_string()))?;

                for binding in port_bindings {
                    if let Some((ip, port)) = from_port_binding(binding)? {
                        map.entry(host_port).or_insert((ip, port));
                    }
                }
            }
        }

        Ok(HostPortMappings { mappings: map })
    }
}

fn from_port_binding(ports: PortBinding) -> Result<Option<(Ipv4Addr, u32)>, HostPortMappingError> {
    match (ports.host_ip, ports.host_port) {
        (Some(ip), Some(port)) => {
            // When 'publish_all_ports' is used 2 port bindings are made for each port, one for
            // ipv4 and one for ipv6.
            // We only want to return a single address/port pair in our API so we skip all ipv6
            // pairs.
            let ip = IpAddr::from_str(&ip)
                .map_err(|e| HostPortMappingError::Conversion(e.to_string()))?;
            match ip {
                IpAddr::V4(ipv4) => {
                    let parsed_port = u32::from_str(&port)
                        .map_err(|e| HostPortMappingError::Conversion(e.to_string()))?;
                    Ok(Some((ipv4, parsed_port)))
                }
                IpAddr::V6(_) => Ok(None),
            }
        }
        _ => Err(HostPortMappingError::NoMapping),
    }
}

impl OperationalContainer {
    /// Return the generated name on the docker container object for this `OperationalContainer`.
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Return the docker assigned identifier for this `OperationalContainer`.
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Return the IPv4 address for this container on the local docker network adapter.
    /// Use this address to contact the `OperationalContainer` in the test body.
    ///
    /// This property is retrieved from the docker daemon prior to entering the test body.
    /// It is cached internally and not updated between invocations. This means that
    /// if the docker container enters an exited state, this function will still return
    /// the original ip assigned to the container.
    ///
    /// If the [ExitedWait] for strategy is employed, the `OperationalContainer` will
    /// be in an exited status when the test body is entered.
    /// For this scenarion, this function will return [Ipv4Addr::UNSPECIFIED].
    ///
    /// On Windows this method always returns `127.0.0.1` due to Windows not supporting using
    /// container IPs outside a container-context.
    ///
    /// [Ipv4Addr::UNSPECIFIED]: https://doc.rust-lang.org/std/net/struct.Ipv4Addr.html#associatedconstant.UNSPECIFIED
    /// [ExitedWait]: crate::waitfor::ExitedWait
    pub fn ip(&self) -> &std::net::Ipv4Addr {
        &self.ip
    }

    /// Returns host ip/port binding for the given container port. Useful in MacOS where there is no
    /// network connectivity between Mac system and containers.
    pub fn host_port(&self, exposed_port: u32) -> Option<&(Ipv4Addr, u32)> {
        self.ports.mappings.get(&exposed_port)
    }

    /// Same as `host_port`, but panics if the mapping could not be found.
    pub fn host_port_unchecked(&self, exposed_port: u32) -> &(Ipv4Addr, u32) {
        self.ports.mappings.get(&exposed_port).unwrap()
    }

    /// Pauses the container
    ///
    /// # Panics
    /// Will panic if the container was in any state but `Runnning` prior to calling this
    /// method or if we fail the docker operation.
    pub async fn pause(&self) {
        self.goto_state(ContainerState::Paused, &[ContainerState::Running])
            .unwrap();
        self.client.pause(&self.name).await.unwrap();
    }

    /// Kills the container
    ///
    /// # Panics
    /// Will panic if the container was in any state but `Runnning` or `Paused` prior to calling this
    /// method or if we fail the docker operation.
    pub async fn kill(&self) {
        self.goto_state(
            ContainerState::Exited,
            &[ContainerState::Running, ContainerState::Paused],
        )
        .unwrap();
        self.client.kill(&self.name).await.unwrap();
    }

    /// Unpauses the container
    ///
    /// # Panics
    /// Will panic if the container was in any state but `Paused` prior to calling this
    /// method or if we fail the docker operation.
    pub async fn unpause(&self) {
        self.goto_state(ContainerState::Running, &[ContainerState::Paused])
            .unwrap();
        self.client.unpause(&self.name).await.unwrap();
    }

    /// Inspect the output of this container and await the presence of a log line.
    ///
    /// # Panics
    /// This function panics if the log message is not present on the log output
    /// within the specified timeout.
    pub async fn assert_message<T>(&self, message: T, source: MessageSource, timeout: u16)
    where
        T: Into<String> + Serialize,
    {
        if let Err(e) = wait_for_message(
            &self.client,
            &self.id,
            &self.handle,
            source,
            message,
            timeout,
        )
        .await
        {
            panic!("{}", e)
        }
    }

    fn goto_state(
        &self,
        dest: ContainerState,
        allowed: &[ContainerState],
    ) -> Result<(), DockerTestError> {
        let mut state = self.assumed_state.lock().unwrap();
        println!("{state}");
        if !allowed.contains(&state) {
            Err(DockerTestError::ContainerState {
                current: *state,
                tried_to_enter: dest,
            })
        } else {
            *state = dest;
            Ok(())
        }
    }
}

impl From<PendingContainer> for OperationalContainer {
    fn from(container: PendingContainer) -> OperationalContainer {
        OperationalContainer {
            client: container.client,
            handle: container.handle,
            id: container.id,
            name: container.name,
            ip: std::net::Ipv4Addr::UNSPECIFIED,
            ports: HostPortMappings::default(),
            is_static: container.is_static,
            log_options: container.log_options,
            assumed_state: Arc::new(Mutex::new(container.expected_state)),
        }
    }
}