You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster_docker.ops.docker_container_op

from typing import Any, Mapping, Optional, Sequence

import docker
from dagster import Field, In, Nothing, OpExecutionContext, StringSource, op
from dagster._annotations import experimental
from dagster._core.utils import parse_env_var
from dagster._serdes.utils import hash_str

from ..container_context import DockerContainerContext
from ..docker_run_launcher import DockerRunLauncher
from ..utils import DOCKER_CONFIG_SCHEMA, validate_docker_image

DOCKER_CONTAINER_OP_CONFIG = {
    **DOCKER_CONFIG_SCHEMA,
    "image": Field(
        StringSource,
        is_required=True,
        description="The image in which to run the Docker container.",
    ),
    "entrypoint": Field(
        [str],
        is_required=False,
        description="The ENTRYPOINT for the Docker container",
    ),
    "command": Field(
        [str],
        is_required=False,
        description="The command to run in the container within the launched Docker container.",
    ),
}


def _get_client(docker_container_context: DockerContainerContext):
    client = docker.client.from_env()
    if docker_container_context.registry:
        client.login(
            registry=docker_container_context.registry["url"],
            username=docker_container_context.registry["username"],
            password=docker_container_context.registry["password"],
        )
    return client


def _get_container_name(run_id, op_name, retry_number):
    container_name = hash_str(run_id + op_name)

    retry_number = retry_number
    if retry_number > 0:
        container_name = f"{container_name}-{retry_number}"

    return container_name


def _create_container(
    op_context: OpExecutionContext,
    client,
    container_context: DockerContainerContext,
    image: str,
    entrypoint: Optional[Sequence[str]],
    command: Optional[Sequence[str]],
):
    env_vars = dict([parse_env_var(env_var) for env_var in container_context.env_vars])
    return client.containers.create(
        image,
        name=_get_container_name(op_context.run_id, op_context.op.name, op_context.retry_number),
        detach=True,
        network=container_context.networks[0] if len(container_context.networks) else None,
        entrypoint=entrypoint,
        command=command,
        environment=env_vars,
        **container_context.container_kwargs,
    )


[docs]@experimental def execute_docker_container( context: OpExecutionContext, image: str, entrypoint: Optional[Sequence[str]] = None, command: Optional[Sequence[str]] = None, networks: Optional[Sequence[str]] = None, registry: Optional[Mapping[str, str]] = None, env_vars: Optional[Sequence[str]] = None, container_kwargs: Optional[Mapping[str, Any]] = None, ): """This function is a utility for executing a Docker container from within a Dagster op. Args: image (str): The image to use for the launched Docker container. entrypoint (Optional[Sequence[str]]): The ENTRYPOINT to run in the launched Docker container. Default: None. command (Optional[Sequence[str]]): The CMD to run in the launched Docker container. Default: None. networks (Optional[Sequence[str]]): Names of the Docker networks to which to connect the launched container. Default: None. registry: (Optional[Mapping[str, str]]): Information for using a non local/public Docker registry. Can have "url", "username", or "password" keys. env_vars (Optional[Sequence[str]]): List of environemnt variables to include in the launched container. ach can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the calling environment. container_kwargs (Optional[Dict[str[Any]]]): key-value pairs that can be passed into containers.create in the Docker Python API. See https://docker-py.readthedocs.io/en/stable/containers.html for the full list of available options. """ run_container_context = DockerContainerContext.create_for_run( context.dagster_run, ( context.instance.run_launcher if isinstance(context.instance.run_launcher, DockerRunLauncher) else None ), ) validate_docker_image(image) op_container_context = DockerContainerContext( registry=registry, env_vars=env_vars, networks=networks, container_kwargs=container_kwargs ) container_context = run_container_context.merge(op_container_context) client = _get_client(container_context) try: container = _create_container( context, client, container_context, image, entrypoint, command ) except docker.errors.ImageNotFound: client.images.pull(image) container = _create_container( context, client, container_context, image, entrypoint, command ) if len(container_context.networks) > 1: for network_name in container_context.networks[1:]: network = client.networks.get(network_name) network.connect(container) container.start() for line in container.logs(stdout=True, stderr=True, stream=True, follow=True): print(line) # noqa: T201 exit_status = container.wait()["StatusCode"] if exit_status != 0: raise Exception(f"Docker container returned exit code {exit_status}")
[docs]@op(ins={"start_after": In(Nothing)}, config_schema=DOCKER_CONTAINER_OP_CONFIG) @experimental def docker_container_op(context): """An op that runs a Docker container using the docker Python API. Contrast with the `docker_executor`, which runs each Dagster op in a Dagster job in its own Docker container. This op may be useful when: - You need to orchestrate a command that isn't a Dagster op (or isn't written in Python) - You want to run the rest of a Dagster job using a specific executor, and only a single op in docker. For example: .. literalinclude:: ../../../../../../python_modules/libraries/dagster-docker/dagster_docker_tests/test_example_docker_container_op.py :start-after: start_marker :end-before: end_marker :language: python You can create your own op with the same implementation by calling the `execute_docker_container` function inside your own op. """ execute_docker_container(context, **context.op_config)