1

I am using DockerOperator to run a container. But I do not see any related option to publish required port. I need to publish a webserver port when the task is triggered. Any help or guide will be helpful. Thank you!

Nabin
  • 11,216
  • 8
  • 63
  • 98
  • I don't know anything about this field, but shouldn't there be some code here? Ignore me, if no code is needed. – Gaurav Mall Apr 04 '21 at 12:58

2 Answers2

1

First, don't forget docker_operator is deprecated, replaced (now) with providers.docker.operators.docker.

Second, I don't know of a command to expose a port in a live (running) Docker container.
As described in this article from Sidhartha Mani

Specifically, I needed access to the filled mysql database. .
I could think of a few ways to do this:

  • Stop the container and start a new one with the added port exposure. docker run -p 3306:3306 -p 8080:8080 -d java/server.
  • The second option is to start another container that links to this, and knows how to port forward.
  • Setup iptables rules to forward a host port into the container.

So:

Following existing rules, I created my own rule to forward to the container

iptables -t nat -D DOCKER ! -i docker0 -p tcp --dport 3306-j DNAT \
         --to-destination 172.17.0.2:3306

This just says that whenever a packet is destined to port 3306 on the host, forward it to the container with ip 172.17.0.2, and its port 3306.

Once I did this, I could connect to the container using host port 3306.
I wanted to make it easier for others to expose ports on live containers.
So, I created a small repository and a corresponding docker image (named wlan0/redirect).

The same effect as exposing host port 3306 to container 172.17.0.2:3306 can be achieved using this command.
This command saves the trouble of learning how to use iptables.

docker run --privileged -v /proc:/host/proc \
           -e HOST_PORT=3306 -e DEST_IP=172.17.0.2 -e DEST_PORT=3306 \
           wlan0/redirect:latest

In other words, this kind of solution would not be implemented from a command run in the container, through an Airflow Operator.


As per my understanding DockerOperator will create a new container, then why is there no way of exposing ports while create a new container.

First, the EXPOSE part is, as I mentioned here, just a metadata added to the image. It is not mandatory.
The runtime (docker run) -p option is about publishing, not exposing: publishing a port and mapping it to a host port (see above) or another container port.
That might be not needed with an Airflow environment, where there is a default network, and even the possibility to setup a custom network or subnetwork.

Which means other (Airflow) containers attached to the same network should be able to access a ports of any container in said network, without needing any -p (publication) or EXPOSE directive.

VonC
  • 1,262,500
  • 529
  • 4,410
  • 5,250
  • No, it's not about exposing ports to existing container. As per my understanding DockerOperator will create a new container, then why is there no way of exposing ports while create a new container. Thank you for pointing that it is deprecated, but unfortunately I am on 1.10 only. – Nabin Apr 04 '21 at 15:13
  • @Nabin OK. I have edited my answer accordingly. – VonC Apr 04 '21 at 22:48
  • Yup, EXPOSE is just an metadata. But you what I mean, right? I need to publish those port. – Nabin Apr 05 '21 at 02:49
  • @Nabin I understand. If it is publishing the port for another external service to access to, I don't see a way, except (as described in the first part of my answer) playing with iptables. – VonC Apr 05 '21 at 12:04
1

In order to accomplish this, you will need to subclass the DockerOperator and override the initializer and _run_image_with_mounts method, which uses the API client to create a container with the specified host configuration.

class DockerOperatorWithExposedPorts(DockerOperator):

    def __init__(self, *args, **kwargs):
        self.port_bindings = kwargs.pop("port_bindings", {})
        if self.port_bindings and kwargs.get("network_mode") == "host":
            self.log.warning("`port_bindings` is not supported in `host` network mode.")
            self.port_bindings = {}
        super().__init__(*args, **kwargs)

    def _run_image_with_mounts(
        self, target_mounts, add_tmp_variable: bool
    ) -> Optional[Union[List[str], str]]:
        """

        NOTE: This method was copied entirely from the base class `DockerOperator`, for the capability
        of performing port publishing. 
        """
        if add_tmp_variable:
            self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
        else:
            self.environment.pop('AIRFLOW_TMP_DIR', None)
        if not self.cli:
            raise Exception("The 'cli' should be initialized before!")
        self.container = self.cli.create_container(
            command=self.format_command(self.command),
            name=self.container_name,
            environment={**self.environment, **self._private_environment},
            ports=list(self.port_bindings.keys()) if self.port_bindings else None,
            host_config=self.cli.create_host_config(
                auto_remove=False,
                mounts=target_mounts,
                network_mode=self.network_mode,
                shm_size=self.shm_size,
                dns=self.dns,
                dns_search=self.dns_search,
                cpu_shares=int(round(self.cpus * 1024)),
                port_bindings=self.port_bindings if self.port_bindings else None,
                mem_limit=self.mem_limit,
                cap_add=self.cap_add,
                extra_hosts=self.extra_hosts,
                privileged=self.privileged,
                device_requests=self.device_requests,
            ),
            image=self.image,
            user=self.user,
            entrypoint=self.format_command(self.entrypoint),
            working_dir=self.working_dir,
            tty=self.tty,
        )
        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
        try:
            self.cli.start(self.container['Id'])

            log_lines = []
            for log_chunk in logstream:
                log_chunk = stringify(log_chunk).strip()
                log_lines.append(log_chunk)
                self.log.info("%s", log_chunk)

            result = self.cli.wait(self.container['Id'])
            if result['StatusCode'] != 0:
                joined_log_lines = "\n".join(log_lines)
                raise AirflowException(f'Docker container failed: {repr(result)} lines {joined_log_lines}')

            if self.retrieve_output:
                return self._attempt_to_retrieve_result()
            elif self.do_xcom_push:
                if len(log_lines) == 0:
                    return None
                try:
                    if self.xcom_all:
                        return log_lines
                    else:
                        return log_lines[-1]
                except StopIteration:
                    # handle the case when there is not a single line to iterate on
                    return None
            return None
        finally:
            if self.auto_remove == "success":
                self.cli.remove_container(self.container['Id'])
            elif self.auto_remove == "force":
                self.cli.remove_container(self.container['Id'], force=True)

Explanation: The create_host_config method of the APIClient has an optional port_bindings keyword argument, and create_container method has an optional ports argument. These calls aren't exposed in the DockerOperator, so you have to copy the _run_image_with_mounts method and override it with a copy and supply those arguments with the port_bindings field set in the initializer. You can then supply the ports to publish as a keyword argument. Note that in this implementation, the expectation is argument is a dictionary:

t1 = DockerOperatorWithExposedPorts(image=..., task_id=..., port_bindings={5000: 5000, 8080:8080, ...})
danielcahall
  • 2,672
  • 8
  • 14