Use asyncio for subprocess calls

Removes the threads from compose_up and manages it using async. Also
uses async processing to format the log messages instead of piping
through sed. This should work on windows without having sed installed

Adds --parallel to support pull and build in parallel, same as docker
compose

Signed-off-by: Falmarri <463948+Falmarri@users.noreply.github.com>
This commit is contained in:
Falmarri 2023-12-09 22:51:54 -08:00 committed by Muayyad Alsadi
parent bce40c2db3
commit 38b13a34ea
2 changed files with 195 additions and 165 deletions

View File

@ -1,29 +1,24 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# https://docs.docker.com/compose/compose-file/#service-configuration-reference # https://docs.docker.com/compose/compose-file/#service-configuration-reference
# https://docs.docker.com/samples/ # https://docs.docker.com/samples/
# https://docs.docker.com/compose/gettingstarted/ # https://docs.docker.com/compose/gettingstarted/
# https://docs.docker.com/compose/django/ # https://docs.docker.com/compose/django/
# https://docs.docker.com/compose/wordpress/ # https://docs.docker.com/compose/wordpress/
# TODO: podman pod logs --color -n -f pod_testlogs # TODO: podman pod logs --color -n -f pod_testlogs
import sys import sys
import os import os
import getpass import getpass
import argparse import argparse
import itertools import itertools
import subprocess import subprocess
import time
import re import re
import hashlib import hashlib
import random import random
import json import json
import glob import glob
import asyncio.subprocess
from threading import Thread import signal
import shlex import shlex
@ -371,7 +366,7 @@ def transform(args, project_name, given_containers):
return pods, containers return pods, containers
def assert_volume(compose, mount_dict): async def assert_volume(compose, mount_dict):
""" """
inspect volume to get directory inspect volume to get directory
create volume if needed create volume if needed
@ -398,7 +393,7 @@ def assert_volume(compose, mount_dict):
# TODO: might move to using "volume list" # TODO: might move to using "volume list"
# podman volume list --format '{{.Name}}\t{{.MountPoint}}' -f 'label=io.podman.compose.project=HERE' # podman volume list --format '{{.Name}}\t{{.MountPoint}}' -f 'label=io.podman.compose.project=HERE'
try: try:
_ = compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") _ = (await compose.podman.output([], "volume", ["inspect", vol_name])).decode("utf-8")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
if is_ext: if is_ext:
raise RuntimeError(f"External volume [{vol_name}] does not exists") from e raise RuntimeError(f"External volume [{vol_name}] does not exists") from e
@ -419,8 +414,8 @@ def assert_volume(compose, mount_dict):
for opt, value in driver_opts.items(): for opt, value in driver_opts.items():
args.extend(["--opt", f"{opt}={value}"]) args.extend(["--opt", f"{opt}={value}"])
args.append(vol_name) args.append(vol_name)
compose.podman.output([], "volume", args) await compose.podman.output([], "volume", args)
_ = compose.podman.output([], "volume", ["inspect", vol_name]).decode("utf-8") _ = (await compose.podman.output([], "volume", ["inspect", vol_name])).decode("utf-8")
def mount_desc_to_mount_args( def mount_desc_to_mount_args(
@ -522,12 +517,12 @@ def get_mnt_dict(compose, cnt, volume):
return fix_mount_dict(compose, volume, proj_name, srv_name) return fix_mount_dict(compose, volume, proj_name, srv_name)
def get_mount_args(compose, cnt, volume): async def get_mount_args(compose, cnt, volume):
volume = get_mnt_dict(compose, cnt, volume) volume = get_mnt_dict(compose, cnt, volume)
# proj_name = compose.project_name # proj_name = compose.project_name
srv_name = cnt["_service"] srv_name = cnt["_service"]
mount_type = volume["type"] mount_type = volume["type"]
assert_volume(compose, volume) await assert_volume(compose, volume)
if compose.prefer_volume_over_mount: if compose.prefer_volume_over_mount:
if mount_type == "tmpfs": if mount_type == "tmpfs":
# TODO: --tmpfs /tmp:rw,size=787448k,mode=1777 # TODO: --tmpfs /tmp:rw,size=787448k,mode=1777
@ -710,7 +705,7 @@ def norm_ports(ports_in):
return ports_out return ports_out
def assert_cnt_nets(compose, cnt): async def assert_cnt_nets(compose, cnt):
""" """
create missing networks create missing networks
""" """
@ -733,7 +728,7 @@ def assert_cnt_nets(compose, cnt):
ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
) )
try: try:
compose.podman.output([], "network", ["exists", net_name]) await compose.podman.output([], "network", ["exists", net_name])
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
if is_ext: if is_ext:
raise RuntimeError( raise RuntimeError(
@ -776,8 +771,8 @@ def assert_cnt_nets(compose, cnt):
if gateway: if gateway:
args.extend(("--gateway", gateway)) args.extend(("--gateway", gateway))
args.append(net_name) args.append(net_name)
compose.podman.output([], "network", args) await compose.podman.output([], "network", args)
compose.podman.output([], "network", ["exists", net_name]) await compose.podman.output([], "network", ["exists", net_name])
def get_net_args(compose, cnt): def get_net_args(compose, cnt):
@ -898,7 +893,7 @@ def get_net_args(compose, cnt):
return net_args return net_args
def container_to_args(compose, cnt, detached=True): async def container_to_args(compose, cnt, detached=True):
# TODO: double check -e , --add-host, -v, --read-only # TODO: double check -e , --add-host, -v, --read-only
dirname = compose.dirname dirname = compose.dirname
pod = cnt.get("pod", None) or "" pod = cnt.get("pod", None) or ""
@ -955,9 +950,9 @@ def container_to_args(compose, cnt, detached=True):
for i in tmpfs_ls: for i in tmpfs_ls:
podman_args.extend(["--tmpfs", i]) podman_args.extend(["--tmpfs", i])
for volume in cnt.get("volumes", []): for volume in cnt.get("volumes", []):
podman_args.extend(get_mount_args(compose, cnt, volume)) podman_args.extend(await get_mount_args(compose, cnt, volume))
assert_cnt_nets(compose, cnt) await assert_cnt_nets(compose, cnt)
podman_args.extend(get_net_args(compose, cnt)) podman_args.extend(get_net_args(compose, cnt))
logging = cnt.get("logging", None) logging = cnt.get("logging", None)
@ -1161,12 +1156,22 @@ class Podman:
self.podman_path = podman_path self.podman_path = podman_path
self.dry_run = dry_run self.dry_run = dry_run
def output(self, podman_args, cmd="", cmd_args=None): async def output(self, podman_args, cmd="", cmd_args=None):
cmd_args = cmd_args or [] cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else [] xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls) log(cmd_ls)
return subprocess.check_output(cmd_ls) p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
def exec( def exec(
self, self,
@ -1180,55 +1185,71 @@ class Podman:
log(" ".join([str(i) for i in cmd_ls])) log(" ".join([str(i) for i in cmd_ls]))
os.execlp(self.podman_path, *cmd_ls) os.execlp(self.podman_path, *cmd_ls)
def run( async def run(
self, self,
podman_args, podman_args,
cmd="", cmd="",
cmd_args=None, cmd_args=None,
wait=True, wait=True,
sleep=1, sleep=1,
obj=None,
log_formatter=None, log_formatter=None,
*,
# Intentionally mutable default argument to hold references to tasks
task_reference=set()
): ):
if obj is not None:
obj.exit_code = None
cmd_args = list(map(str, cmd_args or [])) cmd_args = list(map(str, cmd_args or []))
xargs = self.compose.get_podman_args(cmd) if cmd else [] xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(" ".join([str(i) for i in cmd_ls])) log(" ".join([str(i) for i in cmd_ls]))
if self.dry_run: if self.dry_run:
return None return None
# subprocess.Popen(
# args, bufsize = 0, executable = None, stdin = None, stdout = None, stderr = None, preexec_fn = None,
# close_fds = False, shell = False, cwd = None, env = None, universal_newlines = False, startupinfo = None,
# creationflags = 0
# )
if log_formatter is not None: if log_formatter is not None:
async def format_out(stdout):
while True:
l = await stdout.readline()
if l:
print(log_formatter, l.decode('utf-8'), end='')
if stdout.at_eof():
break
# read, write = os.pipe()
# Pipe podman process output through log_formatter (which can add colored prefix) # Pipe podman process output through log_formatter (which can add colored prefix)
p = subprocess.Popen( p = await asyncio.subprocess.create_subprocess_exec(
cmd_ls, stdout=subprocess.PIPE *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
) # pylint: disable=consider-using-with ) # pylint: disable=consider-using-with
_ = subprocess.Popen(
log_formatter, stdin=p.stdout # This is hacky to make the tasks not get garbage collected
) # pylint: disable=consider-using-with # https://github.com/python/cpython/issues/91887
p.stdout.close() # Allow p_process to receive a SIGPIPE if logging process exits. out_t = asyncio.create_task(format_out(p.stdout))
task_reference.add(out_t)
out_t.add_done_callback(task_reference.discard)
err_t = asyncio.create_task(format_out(p.stderr))
task_reference.add(err_t)
err_t.add_done_callback(task_reference.discard)
else: else:
p = subprocess.Popen(cmd_ls) # pylint: disable=consider-using-with p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
if wait: if wait:
exit_code = p.wait() try:
exit_code = await p.wait()
except asyncio.CancelledError:
p.terminate()
exit_code = await p.wait()
log("exit code:", exit_code) log("exit code:", exit_code)
if obj is not None: return exit_code
obj.exit_code = exit_code
if sleep: if sleep:
time.sleep(sleep) log(f"Sleep {sleep}")
await asyncio.sleep(sleep)
return p return p
def volume_ls(self, proj=None): async def volume_ls(self, proj=None):
if not proj: if not proj:
proj = self.compose.project_name proj = self.compose.project_name
output = self.output( output = (await self.output(
[], [],
"volume", "volume",
[ [
@ -1239,7 +1260,7 @@ class Podman:
"--format", "--format",
"{{.Name}}", "{{.Name}}",
], ],
).decode("utf-8") )).decode("utf-8")
volumes = output.splitlines() volumes = output.splitlines()
return volumes return volumes
@ -1487,7 +1508,7 @@ class PodmanCompose:
xargs.extend(shlex.split(args)) xargs.extend(shlex.split(args))
return xargs return xargs
def run(self): async def run(self):
log("podman-compose version: " + __version__) log("podman-compose version: " + __version__)
args = self._parse_args() args = self._parse_args()
podman_path = args.podman_path podman_path = args.podman_path
@ -1504,8 +1525,8 @@ class PodmanCompose:
# just to make sure podman is running # just to make sure podman is running
try: try:
self.podman_version = ( self.podman_version = (
self.podman.output(["--version"], "", []).decode("utf-8").strip() (await self.podman.output(["--version"], "", [])).decode("utf-8").strip()
or "" or ""
) )
self.podman_version = (self.podman_version.split() or [""])[-1] self.podman_version = (self.podman_version.split() or [""])[-1]
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
@ -1521,7 +1542,7 @@ class PodmanCompose:
if compose_required: if compose_required:
self._parse_compose_file() self._parse_compose_file()
cmd = self.commands[cmd_name] cmd = self.commands[cmd_name]
retcode = cmd(self, args) retcode = await cmd(self, args)
if isinstance(retcode, int): if isinstance(retcode, int):
sys.exit(retcode) sys.exit(retcode)
@ -1919,6 +1940,8 @@ class cmd_run: # pylint: disable=invalid-name,too-few-public-methods
def wrapped(*args, **kw): def wrapped(*args, **kw):
return func(*args, **kw) return func(*args, **kw)
if not asyncio.iscoroutinefunction(func):
raise Exception("Command must be async")
wrapped._compose = self.compose wrapped._compose = self.compose
# Trim extra indentation at start of multiline docstrings. # Trim extra indentation at start of multiline docstrings.
wrapped.desc = self.cmd_desc or re.sub(r"^\s+", "", func.__doc__) wrapped.desc = self.cmd_desc or re.sub(r"^\s+", "", func.__doc__)
@ -1947,7 +1970,7 @@ class cmd_parse: # pylint: disable=invalid-name,too-few-public-methods
@cmd_run(podman_compose, "version", "show version") @cmd_run(podman_compose, "version", "show version")
def compose_version(compose, args): async def compose_version(compose, args):
if getattr(args, "short", False): if getattr(args, "short", False):
print(__version__) print(__version__)
return return
@ -1956,7 +1979,7 @@ def compose_version(compose, args):
print(json.dumps(res)) print(json.dumps(res))
return return
print("podman-compose version", __version__) print("podman-compose version", __version__)
compose.podman.run(["--version"], "", [], sleep=0) await compose.podman.run(["--version"], "", [], sleep=0)
def is_local(container: dict) -> bool: def is_local(container: dict) -> bool:
@ -1972,15 +1995,15 @@ def is_local(container: dict) -> bool:
@cmd_run(podman_compose, "wait", "wait running containers to stop") @cmd_run(podman_compose, "wait", "wait running containers to stop")
def compose_wait(compose, args): # pylint: disable=unused-argument async def compose_wait(compose, args): # pylint: disable=unused-argument
containers = [cnt["name"] for cnt in compose.containers] containers = [cnt["name"] for cnt in compose.containers]
cmd_args = ["--"] cmd_args = ["--"]
cmd_args.extend(containers) cmd_args.extend(containers)
compose.podman.exec([], "wait", cmd_args) await compose.podman.exec([], "wait", cmd_args)
@cmd_run(podman_compose, "systemd") @cmd_run(podman_compose, "systemd")
def compose_systemd(compose, args): async def compose_systemd(compose, args):
""" """
create systemd unit file and register its compose stacks create systemd unit file and register its compose stacks
@ -2063,7 +2086,7 @@ while in your project type `podman-compose systemd -a register`
@cmd_run(podman_compose, "pull", "pull stack images") @cmd_run(podman_compose, "pull", "pull stack images")
def compose_pull(compose, args): async def compose_pull(compose, args):
img_containers = [cnt for cnt in compose.containers if "image" in cnt] img_containers = [cnt for cnt in compose.containers if "image" in cnt]
if args.services: if args.services:
services = set(args.services) services = set(args.services)
@ -2072,27 +2095,33 @@ def compose_pull(compose, args):
if not args.force_local: if not args.force_local:
local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)} local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)}
images -= local_images images -= local_images
for image in images:
compose.podman.run([], "pull", [image], sleep=0) sem = asyncio.Semaphore(args.parallel)
async def _pull(image: str):
async with sem:
return await compose.podman.run([], "pull", [image], sleep=0)
await asyncio.gather(*[_pull(image) for image in images])
@cmd_run(podman_compose, "push", "push stack images") @cmd_run(podman_compose, "push", "push stack images")
def compose_push(compose, args): async def compose_push(compose, args):
services = set(args.services) services = set(args.services)
for cnt in compose.containers: for cnt in compose.containers:
if "build" not in cnt: if "build" not in cnt:
continue continue
if services and cnt["_service"] not in services: if services and cnt["_service"] not in services:
continue continue
compose.podman.run([], "push", [cnt["image"]], sleep=0) await compose.podman.run([], "push", [cnt["image"]], sleep=0)
def build_one(compose, args, cnt): async def build_one(compose, args, cnt):
if "build" not in cnt: if "build" not in cnt:
return None return None
if getattr(args, "if_not_exists", None): if getattr(args, "if_not_exists", None):
try: try:
img_id = compose.podman.output( img_id = await compose.podman.output(
[], "inspect", ["-t", "image", "-f", "{{.Id}}", cnt["image"]] [], "inspect", ["-t", "image", "-f", "{{.Id}}", cnt["image"]]
) )
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
@ -2148,40 +2177,40 @@ def build_one(compose, args, cnt):
) )
) )
build_args.append(ctx) build_args.append(ctx)
status = compose.podman.run([], "build", build_args, sleep=0) status = await compose.podman.run([], "build", build_args, sleep=0)
return status return status
@cmd_run(podman_compose, "build", "build stack images") @cmd_run(podman_compose, "build", "build stack images")
def compose_build(compose, args): async def compose_build(compose, args):
# keeps the status of the last service/container built tasks = []
status = 0 sem = asyncio.Semaphore(args.parallel)
def parse_return_code(obj, current_status): async def safe_build_task(cnt):
if obj and obj.returncode != 0: async with sem:
return obj.returncode return await build_one(compose, args, cnt)
return current_status
if args.services: if args.services:
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
compose.assert_services(args.services) compose.assert_services(args.services)
for service in args.services: for service in args.services:
cnt = compose.container_by_name[container_names_by_service[service][0]] cnt = compose.container_by_name[container_names_by_service[service][0]]
p = build_one(compose, args, cnt) tasks.append(asyncio.create_task(safe_build_task(cnt)))
status = parse_return_code(p, status)
if status != 0:
return status
else: else:
for cnt in compose.containers: for cnt in compose.containers:
p = build_one(compose, args, cnt) tasks.append(asyncio.create_task(safe_build_task(cnt)))
status = parse_return_code(p, status)
if status != 0: status = 0
return status for t in asyncio.as_completed(tasks):
s = await t
if s is not None:
status = s
return status return status
def create_pods(compose, args): # pylint: disable=unused-argument async def create_pods(compose, args): # pylint: disable=unused-argument
for pod in compose.pods: for pod in compose.pods:
podman_args = [ podman_args = [
"create", "create",
@ -2196,7 +2225,7 @@ def create_pods(compose, args): # pylint: disable=unused-argument
ports = [ports] ports = [ports]
for i in ports: for i in ports:
podman_args.extend(["-p", str(i)]) podman_args.extend(["-p", str(i)])
compose.podman.run([], "pod", podman_args) await compose.podman.run([], "pod", podman_args)
def get_excluded(compose, args): def get_excluded(compose, args):
@ -2213,16 +2242,16 @@ def get_excluded(compose, args):
@cmd_run( @cmd_run(
podman_compose, "up", "Create and start the entire stack or some of its services" podman_compose, "up", "Create and start the entire stack or some of its services"
) )
def compose_up(compose, args): async def compose_up(compose: PodmanCompose, args):
proj_name = compose.project_name proj_name = compose.project_name
excluded = get_excluded(compose, args) excluded = get_excluded(compose, args)
if not args.no_build: if not args.no_build:
# `podman build` does not cache, so don't always build # `podman build` does not cache, so don't always build
build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__) build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__)
compose.commands["build"](compose, build_args) ret = await compose.commands["build"](compose, build_args)
hashes = ( hashes = (
compose.podman.output( (await compose.podman.output(
[], [],
"ps", "ps",
[ [
@ -2232,7 +2261,7 @@ def compose_up(compose, args):
"--format", "--format",
'{{ index .Labels "io.podman.compose.config-hash"}}', '{{ index .Labels "io.podman.compose.config-hash"}}',
], ],
) ))
.decode("utf-8") .decode("utf-8")
.splitlines() .splitlines()
) )
@ -2240,21 +2269,21 @@ def compose_up(compose, args):
if args.force_recreate or len(diff_hashes): if args.force_recreate or len(diff_hashes):
log("recreating: ...") log("recreating: ...")
down_args = argparse.Namespace(**dict(args.__dict__, volumes=False)) down_args = argparse.Namespace(**dict(args.__dict__, volumes=False))
compose.commands["down"](compose, down_args) await compose.commands["down"](compose, down_args)
log("recreating: done\n\n") log("recreating: done\n\n")
# args.no_recreate disables check for changes (which is not implemented) # args.no_recreate disables check for changes (which is not implemented)
podman_command = "run" if args.detach and not args.no_start else "create" podman_command = "run" if args.detach and not args.no_start else "create"
create_pods(compose, args) await create_pods(compose, args)
for cnt in compose.containers: for cnt in compose.containers:
if cnt["_service"] in excluded: if cnt["_service"] in excluded:
log("** skipping: ", cnt["name"]) log("** skipping: ", cnt["name"])
continue continue
podman_args = container_to_args(compose, cnt, detached=args.detach) podman_args = await container_to_args(compose, cnt, detached=args.detach)
subproc = compose.podman.run([], podman_command, podman_args) subproc = await compose.podman.run([], podman_command, podman_args)
if podman_command == "run" and subproc and subproc.returncode: if podman_command == "run" and subproc:
compose.podman.run([], "start", [cnt["name"]]) await compose.podman.run([], "start", [cnt["name"]])
if args.no_start or args.detach or args.dry_run: if args.no_start or args.detach or args.dry_run:
return return
# TODO: handle already existing # TODO: handle already existing
@ -2264,54 +2293,41 @@ def compose_up(compose, args):
if exit_code_from: if exit_code_from:
args.abort_on_container_exit = True args.abort_on_container_exit = True
threads = []
max_service_length = 0 max_service_length = 0
for cnt in compose.containers: for cnt in compose.containers:
curr_length = len(cnt["_service"]) curr_length = len(cnt["_service"])
max_service_length = ( max_service_length = (
curr_length if curr_length > max_service_length else max_service_length curr_length if curr_length > max_service_length else max_service_length
) )
has_sed = os.path.isfile("/bin/sed")
tasks = set()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: [t.cancel("User exit") for t in tasks])
for i, cnt in enumerate(compose.containers): for i, cnt in enumerate(compose.containers):
# Add colored service prefix to output by piping output through sed # Add colored service prefix to output by piping output through sed
color_idx = i % len(compose.console_colors) color_idx = i % len(compose.console_colors)
color = compose.console_colors[color_idx] color = compose.console_colors[color_idx]
space_suffix = " " * (max_service_length - len(cnt["_service"]) + 1) space_suffix = " " * (max_service_length - len(cnt["_service"]) + 1)
log_formatter = "s/^/{}[{}]{}|\x1B[0m\\ /;".format( log_formatter = "{}[{}]{}|\x1B[0m".format(
color, cnt["_service"], space_suffix color, cnt["_service"], space_suffix
) )
log_formatter = ["sed", "-e", log_formatter] if has_sed else None
if cnt["_service"] in excluded: if cnt["_service"] in excluded:
log("** skipping: ", cnt["name"]) log("** skipping: ", cnt["name"])
continue continue
# TODO: remove sleep from podman.run
obj = compose if exit_code_from == cnt["_service"] else None
thread = Thread(
target=compose.podman.run,
args=[[], "start", ["-a", cnt["name"]]],
kwargs={"obj": obj, "log_formatter": log_formatter},
daemon=True,
name=cnt["name"],
)
thread.start()
threads.append(thread)
time.sleep(1)
while threads: tasks.add(asyncio.create_task(compose.podman.run([], "start", ["-a", cnt["name"]], wait=True, sleep=None, log_formatter=log_formatter), name=cnt["_service"]))
to_remove = []
for thread in threads: exit_code = 0
thread.join(timeout=1.0) for task in asyncio.as_completed(tasks):
if not thread.is_alive(): t = await task
to_remove.append(thread) if args.abort_on_container_exit:
if args.abort_on_container_exit: [_.cancel() for _ in tasks if not _.cancelling() and not _.cancelled()]
time.sleep(1) if task.get_name() == exit_code_from:
exit_code = ( exit_code = t
compose.exit_code if compose.exit_code is not None else -1
) sys.exit(exit_code)
sys.exit(exit_code)
for thread in to_remove:
threads.remove(thread)
def get_volume_names(compose, cnt): def get_volume_names(compose, cnt):
@ -2332,7 +2348,7 @@ def get_volume_names(compose, cnt):
@cmd_run(podman_compose, "down", "tear down entire stack") @cmd_run(podman_compose, "down", "tear down entire stack")
def compose_down(compose, args): async def compose_down(compose, args):
excluded = get_excluded(compose, args) excluded = get_excluded(compose, args)
podman_args = [] podman_args = []
timeout_global = getattr(args, "timeout", None) timeout_global = getattr(args, "timeout", None)
@ -2348,14 +2364,14 @@ def compose_down(compose, args):
timeout = str_to_seconds(timeout_str) timeout = str_to_seconds(timeout_str)
if timeout is not None: if timeout is not None:
podman_stop_args.extend(["-t", str(timeout)]) podman_stop_args.extend(["-t", str(timeout)])
compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]], sleep=0) await compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]], sleep=0)
for cnt in containers: for cnt in containers:
if cnt["_service"] in excluded: if cnt["_service"] in excluded:
continue continue
compose.podman.run([], "rm", [cnt["name"]], sleep=0) await compose.podman.run([], "rm", [cnt["name"]], sleep=0)
if args.remove_orphans: if args.remove_orphans:
names = ( names = (
compose.podman.output( (await compose.podman.output(
[], [],
"ps", "ps",
[ [
@ -2365,14 +2381,14 @@ def compose_down(compose, args):
"--format", "--format",
"{{ .Names }}", "{{ .Names }}",
], ],
) ))
.decode("utf-8") .decode("utf-8")
.splitlines() .splitlines()
) )
for name in names: for name in names:
compose.podman.run([], "stop", [*podman_args, name], sleep=0) await compose.podman.run([], "stop", [*podman_args, name], sleep=0)
for name in names: for name in names:
compose.podman.run([], "rm", [name], sleep=0) await compose.podman.run([], "rm", [name], sleep=0)
if args.volumes: if args.volumes:
vol_names_to_keep = set() vol_names_to_keep = set()
for cnt in containers: for cnt in containers:
@ -2380,22 +2396,22 @@ def compose_down(compose, args):
continue continue
vol_names_to_keep.update(get_volume_names(compose, cnt)) vol_names_to_keep.update(get_volume_names(compose, cnt))
log("keep", vol_names_to_keep) log("keep", vol_names_to_keep)
for volume_name in compose.podman.volume_ls(): async for volume_name in compose.podman.volume_ls():
if volume_name in vol_names_to_keep: if volume_name in vol_names_to_keep:
continue continue
compose.podman.run([], "volume", ["rm", volume_name]) await compose.podman.run([], "volume", ["rm", volume_name])
if excluded: if excluded:
return return
for pod in compose.pods: for pod in compose.pods:
compose.podman.run([], "pod", ["rm", pod["name"]], sleep=0) await compose.podman.run([], "pod", ["rm", pod["name"]], sleep=0)
@cmd_run(podman_compose, "ps", "show status of containers") @cmd_run(podman_compose, "ps", "show status of containers")
def compose_ps(compose, args): async def compose_ps(compose, args):
proj_name = compose.project_name proj_name = compose.project_name
if args.quiet is True: if args.quiet is True:
compose.podman.run( await compose.podman.run(
[], [],
"ps", "ps",
[ [
@ -2407,7 +2423,7 @@ def compose_ps(compose, args):
], ],
) )
else: else:
compose.podman.run( await compose.podman.run(
[], "ps", ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"] [], "ps", ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"]
) )
@ -2417,7 +2433,7 @@ def compose_ps(compose, args):
"run", "run",
"create a container similar to a service to run a one-off command", "create a container similar to a service to run a one-off command",
) )
def compose_run(compose, args): async def compose_run(compose, args):
create_pods(compose, args) create_pods(compose, args)
compose.assert_services(args.service) compose.assert_services(args.service)
container_names = compose.container_names_by_service[args.service] container_names = compose.container_names_by_service[args.service]
@ -2483,17 +2499,17 @@ def compose_run(compose, args):
if args.rm and "restart" in cnt: if args.rm and "restart" in cnt:
del cnt["restart"] del cnt["restart"]
# run podman # run podman
podman_args = container_to_args(compose, cnt, args.detach) podman_args = await container_to_args(compose, cnt, args.detach)
if not args.detach: if not args.detach:
podman_args.insert(1, "-i") podman_args.insert(1, "-i")
if args.rm: if args.rm:
podman_args.insert(1, "--rm") podman_args.insert(1, "--rm")
p = compose.podman.run([], "run", podman_args, sleep=0) p = await compose.podman.run([], "run", podman_args, sleep=0)
sys.exit(p.returncode) sys.exit(p)
@cmd_run(podman_compose, "exec", "execute a command in a running container") @cmd_run(podman_compose, "exec", "execute a command in a running container")
def compose_exec(compose, args): async def compose_exec(compose, args):
compose.assert_services(args.service) compose.assert_services(args.service)
container_names = compose.container_names_by_service[args.service] container_names = compose.container_names_by_service[args.service]
container_name = container_names[args.index - 1] container_name = container_names[args.index - 1]
@ -2518,11 +2534,11 @@ def compose_exec(compose, args):
podman_args += [container_name] podman_args += [container_name]
if args.cnt_command is not None and len(args.cnt_command) > 0: if args.cnt_command is not None and len(args.cnt_command) > 0:
podman_args += args.cnt_command podman_args += args.cnt_command
p = compose.podman.run([], "exec", podman_args, sleep=0) p = await compose.podman.run([], "exec", podman_args, sleep=0)
sys.exit(p.returncode) sys.exit(p)
def transfer_service_status(compose, args, action): async def transfer_service_status(compose, args, action):
# TODO: handle dependencies, handle creations # TODO: handle dependencies, handle creations
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
if not args.services: if not args.services:
@ -2537,6 +2553,7 @@ def transfer_service_status(compose, args, action):
targets = list(reversed(targets)) targets = list(reversed(targets))
podman_args = [] podman_args = []
timeout_global = getattr(args, "timeout", None) timeout_global = getattr(args, "timeout", None)
tasks = []
for target in targets: for target in targets:
if action != "start": if action != "start":
timeout = timeout_global timeout = timeout_global
@ -2548,26 +2565,27 @@ def transfer_service_status(compose, args, action):
timeout = str_to_seconds(timeout_str) timeout = str_to_seconds(timeout_str)
if timeout is not None: if timeout is not None:
podman_args.extend(["-t", str(timeout)]) podman_args.extend(["-t", str(timeout)])
compose.podman.run([], action, podman_args + [target], sleep=0) tasks.append(asyncio.create_task(compose.podman.run([], action, podman_args + [target], sleep=0)))
await asyncio.gather(*tasks)
@cmd_run(podman_compose, "start", "start specific services") @cmd_run(podman_compose, "start", "start specific services")
def compose_start(compose, args): async def compose_start(compose, args):
transfer_service_status(compose, args, "start") transfer_service_status(compose, args, "start")
@cmd_run(podman_compose, "stop", "stop specific services") @cmd_run(podman_compose, "stop", "stop specific services")
def compose_stop(compose, args): async def compose_stop(compose, args):
transfer_service_status(compose, args, "stop") transfer_service_status(compose, args, "stop")
@cmd_run(podman_compose, "restart", "restart specific services") @cmd_run(podman_compose, "restart", "restart specific services")
def compose_restart(compose, args): async def compose_restart(compose, args):
transfer_service_status(compose, args, "restart") transfer_service_status(compose, args, "restart")
@cmd_run(podman_compose, "logs", "show logs from services") @cmd_run(podman_compose, "logs", "show logs from services")
def compose_logs(compose, args): async def compose_logs(compose, args):
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
if not args.services and not args.latest: if not args.services and not args.latest:
args.services = container_names_by_service.keys() args.services = container_names_by_service.keys()
@ -2594,11 +2612,11 @@ def compose_logs(compose, args):
podman_args.extend(["--until", args.until]) podman_args.extend(["--until", args.until])
for target in targets: for target in targets:
podman_args.append(target) podman_args.append(target)
compose.podman.run([], "logs", podman_args) await compose.podman.run([], "logs", podman_args)
@cmd_run(podman_compose, "config", "displays the compose file") @cmd_run(podman_compose, "config", "displays the compose file")
def compose_config(compose, args): async def compose_config(compose, args):
if args.services: if args.services:
for service in compose.services: for service in compose.services:
print(service) print(service)
@ -2607,7 +2625,7 @@ def compose_config(compose, args):
@cmd_run(podman_compose, "port", "Prints the public port for a port binding.") @cmd_run(podman_compose, "port", "Prints the public port for a port binding.")
def compose_port(compose, args): async def compose_port(compose, args):
# TODO - deal with pod index # TODO - deal with pod index
compose.assert_services(args.service) compose.assert_services(args.service)
containers = compose.container_names_by_service[args.service] containers = compose.container_names_by_service[args.service]
@ -2635,31 +2653,31 @@ def compose_port(compose, args):
@cmd_run(podman_compose, "pause", "Pause all running containers") @cmd_run(podman_compose, "pause", "Pause all running containers")
def compose_pause(compose, args): async def compose_pause(compose, args):
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
if not args.services: if not args.services:
args.services = container_names_by_service.keys() args.services = container_names_by_service.keys()
targets = [] targets = []
for service in args.services: for service in args.services:
targets.extend(container_names_by_service[service]) targets.extend(container_names_by_service[service])
compose.podman.run([], "pause", targets) await compose.podman.run([], "pause", targets)
@cmd_run(podman_compose, "unpause", "Unpause all running containers") @cmd_run(podman_compose, "unpause", "Unpause all running containers")
def compose_unpause(compose, args): async def compose_unpause(compose, args):
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
if not args.services: if not args.services:
args.services = container_names_by_service.keys() args.services = container_names_by_service.keys()
targets = [] targets = []
for service in args.services: for service in args.services:
targets.extend(container_names_by_service[service]) targets.extend(container_names_by_service[service])
compose.podman.run([], "unpause", targets) await compose.podman.run([], "unpause", targets)
@cmd_run( @cmd_run(
podman_compose, "kill", "Kill one or more running containers with a specific signal" podman_compose, "kill", "Kill one or more running containers with a specific signal"
) )
def compose_kill(compose, args): async def compose_kill(compose, args):
# to ensure that the user did not execute the command by mistake # to ensure that the user did not execute the command by mistake
if not args.services and not args.all: if not args.services and not args.all:
print( print(
@ -2680,15 +2698,14 @@ def compose_kill(compose, args):
targets.extend(container_names_by_service[service]) targets.extend(container_names_by_service[service])
for target in targets: for target in targets:
podman_args.append(target) podman_args.append(target)
compose.podman.run([], "kill", podman_args) await compose.podman.run([], "kill", podman_args)
elif args.services:
if args.services:
targets = [] targets = []
for service in args.services: for service in args.services:
targets.extend(container_names_by_service[service]) targets.extend(container_names_by_service[service])
for target in targets: for target in targets:
podman_args.append(target) podman_args.append(target)
compose.podman.run([], "kill", podman_args) await compose.podman.run([], "kill", podman_args)
@cmd_run( @cmd_run(
@ -2696,7 +2713,7 @@ def compose_kill(compose, args):
"stats", "stats",
"Display percentage of CPU, memory, network I/O, block I/O and PIDs for services.", "Display percentage of CPU, memory, network I/O, block I/O and PIDs for services.",
) )
def compose_stats(compose, args): async def compose_stats(compose, args):
container_names_by_service = compose.container_names_by_service container_names_by_service = compose.container_names_by_service
if not args.services: if not args.services:
args.services = container_names_by_service.keys() args.services = container_names_by_service.keys()
@ -2717,7 +2734,7 @@ def compose_stats(compose, args):
podman_args.append(target) podman_args.append(target)
try: try:
compose.podman.run([], "stats", podman_args) await compose.podman.run([], "stats", podman_args)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -3075,6 +3092,15 @@ def compose_ps_parse(parser):
) )
@cmd_parse(podman_compose, ["build", "pull", "up"])
def compose_build_pull_parse(parser):
parser.add_argument(
"--parallel",
type=int,
default=os.environ.get("PODMAN_PARALLEL", sys.maxsize)
)
@cmd_parse(podman_compose, ["build", "up"]) @cmd_parse(podman_compose, ["build", "up"])
def compose_build_up_parse(parser): def compose_build_up_parse(parser):
parser.add_argument( parser.add_argument(
@ -3201,9 +3227,9 @@ def compose_stats_parse(parser):
) )
def main(): async def main():
podman_compose.run() await podman_compose.run()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

View File

@ -33,6 +33,8 @@ def test_podman_compose_extends_w_file_subdir():
"podman", "podman",
"container", "container",
"ps", "ps",
"--sort",
"status",
"--all", "--all",
"--format", "--format",
'"{{.Image}}"', '"{{.Image}}"',
@ -51,14 +53,16 @@ def test_podman_compose_extends_w_file_subdir():
# check container was created and exists # check container was created and exists
out, _, returncode = capture(command_check_container) out, _, returncode = capture(command_check_container)
assert 0 == returncode assert 0 == returncode
assert out == b'"localhost/subdir_test:me"\n' assert b'"localhost/subdir_test:me"\n' in out
out, _, returncode = capture(command_down) out, _, returncode = capture(command_down)
# cleanup test image(tags) # cleanup test image(tags)
assert 0 == returncode assert 0 == returncode
print('ok')
# check container did not exists anymore # check container did not exists anymore
out, _, returncode = capture(command_check_container) out, _, returncode = capture(command_check_container)
print(out)
assert 0 == returncode assert 0 == returncode
assert out == b"" assert b'"localhost/subdir_test:me"\n' not in out
def test_podman_compose_extends_w_empty_service(): def test_podman_compose_extends_w_empty_service():