I am using DockerOperator to run a container. But I do not see any related option to publish required port. I need to publish a webserver port when the task is triggered. Any help or guide will be helpful. Thank you!
-
I don't know anything about this field, but shouldn't there be some code here? Ignore me, if no code is needed. – Gaurav Mall Apr 04 '21 at 12:58
2 Answers
First, don't forget docker_operator
is deprecated, replaced (now) with providers.docker.operators.docker
.
Second, I don't know of a command to expose a port in a live (running) Docker container.
As described in this article from Sidhartha Mani
Specifically, I needed access to the filled mysql database. .
I could think of a few ways to do this:
- Stop the container and start a new one with the added port exposure.
docker run -p 3306:3306 -p 8080:8080 -d java/server
.- The second option is to start another container that links to this, and knows how to port forward.
- Setup iptables rules to forward a host port into the container.
So:
Following existing rules, I created my own rule to forward to the container
iptables -t nat -D DOCKER ! -i docker0 -p tcp --dport 3306-j DNAT \ --to-destination 172.17.0.2:3306
This just says that whenever a packet is destined to port 3306 on the host, forward it to the container with ip 172.17.0.2, and its port 3306.
Once I did this, I could connect to the container using host port 3306.
I wanted to make it easier for others to expose ports on live containers.
So, I created a small repository and a corresponding docker image (namedwlan0/redirect
).The same effect as exposing host port 3306 to container 172.17.0.2:3306 can be achieved using this command.
This command saves the trouble of learning how to use iptables.docker run --privileged -v /proc:/host/proc \ -e HOST_PORT=3306 -e DEST_IP=172.17.0.2 -e DEST_PORT=3306 \ wlan0/redirect:latest
In other words, this kind of solution would not be implemented from a command run in the container, through an Airflow Operator.
As per my understanding DockerOperator will create a new container, then why is there no way of exposing ports while create a new container.
First, the EXPOSE
part is, as I mentioned here, just a metadata added to the image. It is not mandatory.
The runtime (docker run
) -p
option is about publishing, not exposing: publishing a port and mapping it to a host port (see above) or another container port.
That might be not needed with an Airflow environment, where there is a default network, and even the possibility to setup a custom network or subnetwork.
Which means other (Airflow) containers attached to the same network should be able to access a ports of any container in said network, without needing any -p
(publication) or EXPOSE
directive.

- 1,262,500
- 529
- 4,410
- 5,250
-
No, it's not about exposing ports to existing container. As per my understanding DockerOperator will create a new container, then why is there no way of exposing ports while create a new container. Thank you for pointing that it is deprecated, but unfortunately I am on 1.10 only. – Nabin Apr 04 '21 at 15:13
-
-
Yup, EXPOSE is just an metadata. But you what I mean, right? I need to publish those port. – Nabin Apr 05 '21 at 02:49
-
@Nabin I understand. If it is publishing the port for another external service to access to, I don't see a way, except (as described in the first part of my answer) playing with iptables. – VonC Apr 05 '21 at 12:04
In order to accomplish this, you will need to subclass the DockerOperator
and override the initializer and _run_image_with_mounts
method, which uses the API client to create a container with the specified host configuration.
class DockerOperatorWithExposedPorts(DockerOperator):
def __init__(self, *args, **kwargs):
self.port_bindings = kwargs.pop("port_bindings", {})
if self.port_bindings and kwargs.get("network_mode") == "host":
self.log.warning("`port_bindings` is not supported in `host` network mode.")
self.port_bindings = {}
super().__init__(*args, **kwargs)
def _run_image_with_mounts(
self, target_mounts, add_tmp_variable: bool
) -> Optional[Union[List[str], str]]:
"""
NOTE: This method was copied entirely from the base class `DockerOperator`, for the capability
of performing port publishing.
"""
if add_tmp_variable:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
else:
self.environment.pop('AIRFLOW_TMP_DIR', None)
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
environment={**self.environment, **self._private_environment},
ports=list(self.port_bindings.keys()) if self.port_bindings else None,
host_config=self.cli.create_host_config(
auto_remove=False,
mounts=target_mounts,
network_mode=self.network_mode,
shm_size=self.shm_size,
dns=self.dns,
dns_search=self.dns_search,
cpu_shares=int(round(self.cpus * 1024)),
port_bindings=self.port_bindings if self.port_bindings else None,
mem_limit=self.mem_limit,
cap_add=self.cap_add,
extra_hosts=self.extra_hosts,
privileged=self.privileged,
device_requests=self.device_requests,
),
image=self.image,
user=self.user,
entrypoint=self.format_command(self.entrypoint),
working_dir=self.working_dir,
tty=self.tty,
)
logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
log_lines = []
for log_chunk in logstream:
log_chunk = stringify(log_chunk).strip()
log_lines.append(log_chunk)
self.log.info("%s", log_chunk)
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
joined_log_lines = "\n".join(log_lines)
raise AirflowException(f'Docker container failed: {repr(result)} lines {joined_log_lines}')
if self.retrieve_output:
return self._attempt_to_retrieve_result()
elif self.do_xcom_push:
if len(log_lines) == 0:
return None
try:
if self.xcom_all:
return log_lines
else:
return log_lines[-1]
except StopIteration:
# handle the case when there is not a single line to iterate on
return None
return None
finally:
if self.auto_remove == "success":
self.cli.remove_container(self.container['Id'])
elif self.auto_remove == "force":
self.cli.remove_container(self.container['Id'], force=True)
Explanation: The create_host_config
method of the APIClient
has an optional port_bindings
keyword argument, and create_container
method has an optional ports
argument. These calls aren't exposed in the DockerOperator
, so you have to copy the _run_image_with_mounts
method and override it with a copy and supply those arguments with the port_bindings
field set in the initializer. You can then supply the ports to publish as a keyword argument. Note that in this implementation, the expectation is argument is a dictionary:
t1 = DockerOperatorWithExposedPorts(image=..., task_id=..., port_bindings={5000: 5000, 8080:8080, ...})

- 2,672
- 8
- 14