diff --git a/podman_compose.py b/podman_compose.py index 973ddb2..3295bc3 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -21,6 +21,7 @@ import asyncio.subprocess import signal import shlex +from asyncio import Task try: from shlex import quote as cmd_quote @@ -82,7 +83,13 @@ def try_float(i, fallback=None): def log(*msgs, sep=" ", end="\n"): + try: + current_task = asyncio.current_task() + except RuntimeError: + current_task = None line = (sep.join([str(msg) for msg in msgs])) + end + if current_task and not current_task.get_name().startswith("Task"): + line = f"[{current_task.get_name()}] " + line sys.stderr.write(line) sys.stderr.flush() @@ -1151,27 +1158,29 @@ def flat_deps(services, with_extends=False): class Podman: - def __init__(self, compose, podman_path="podman", dry_run=False): + def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)): self.compose = compose self.podman_path = podman_path self.dry_run = dry_run + self.semaphore = semaphore async def output(self, podman_args, cmd="", cmd_args=None): - cmd_args = cmd_args or [] - xargs = self.compose.get_podman_args(cmd) if cmd else [] - cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args - log(cmd_ls) - p = await asyncio.subprocess.create_subprocess_exec( - *cmd_ls, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + async with self.semaphore: + cmd_args = cmd_args or [] + xargs = self.compose.get_podman_args(cmd) if cmd else [] + cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args + log(cmd_ls) + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - stdout_data, stderr_data = await p.communicate() - if p.returncode == 0: - return stdout_data - else: - raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) + stdout_data, stderr_data = await p.communicate() + if p.returncode == 0: + return stdout_data + else: + raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) def exec( self, @@ -1190,62 +1199,62 @@ class Podman: podman_args, cmd="", cmd_args=None, - wait=True, - sleep=1, log_formatter=None, *, # Intentionally mutable default argument to hold references to tasks task_reference=set() - ): - cmd_args = list(map(str, cmd_args or [])) - xargs = self.compose.get_podman_args(cmd) if cmd else [] - cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args - log(" ".join([str(i) for i in cmd_ls])) - if self.dry_run: - return None - if log_formatter is not None: + ) -> int: + async with self.semaphore: + cmd_args = list(map(str, cmd_args or [])) + xargs = self.compose.get_podman_args(cmd) if cmd else [] + cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args + log(" ".join([str(i) for i in cmd_ls])) + if self.dry_run: + return None - async def format_out(stdout): - while True: - l = await stdout.readline() - if l: - print(log_formatter, l.decode('utf-8'), end='') - if stdout.at_eof(): - break + if log_formatter is not None: - # read, write = os.pipe() - # Pipe podman process output through log_formatter (which can add colored prefix) - p = await asyncio.subprocess.create_subprocess_exec( - *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) # pylint: disable=consider-using-with + async def format_out(stdout): + while True: + l = await stdout.readline() + if l: + print(log_formatter, l.decode('utf-8'), end='') + if stdout.at_eof(): + break - # This is hacky to make the tasks not get garbage collected - # https://github.com/python/cpython/issues/91887 - out_t = asyncio.create_task(format_out(p.stdout)) - task_reference.add(out_t) - out_t.add_done_callback(task_reference.discard) + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) # pylint: disable=consider-using-with - err_t = asyncio.create_task(format_out(p.stderr)) - task_reference.add(err_t) - err_t.add_done_callback(task_reference.discard) - else: - p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with + # This is hacky to make the tasks not get garbage collected + # https://github.com/python/cpython/issues/91887 + out_t = asyncio.create_task(format_out(p.stdout)) + task_reference.add(out_t) + out_t.add_done_callback(task_reference.discard) + + err_t = asyncio.create_task(format_out(p.stderr)) + task_reference.add(err_t) + err_t.add_done_callback(task_reference.discard) + + else: + p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with - if wait: try: exit_code = await p.wait() - except asyncio.CancelledError: + except asyncio.CancelledError as e: + log(f"Sending termination signal") p.terminate() - exit_code = await p.wait() + try: + async with asyncio.timeout(10): + exit_code = await p.wait() + except TimeoutError: + log(f"container did not shut down after 10 seconds, killing") + p.kill() + exit_code = await p.wait() - log("exit code:", exit_code) + log(f"exit code: {exit_code}") return exit_code - if sleep: - log(f"Sleep {sleep}") - await asyncio.sleep(sleep) - return p - async def volume_ls(self, proj=None): if not proj: proj = self.compose.project_name @@ -1520,7 +1529,8 @@ class PodmanCompose: if args.dry_run is False: log(f"Binary {podman_path} has not been found.") sys.exit(1) - self.podman = Podman(self, podman_path, args.dry_run) + self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel)) + if not args.dry_run: # just to make sure podman is running try: @@ -1921,6 +1931,11 @@ class PodmanCompose: help="No action; perform a simulation of commands", action="store_true", ) + parser.add_argument( + "--parallel", + type=int, + default=os.environ.get("COMPOSE_PARALLEL_LIMIT", sys.maxsize) + ) podman_compose = PodmanCompose() @@ -1979,7 +1994,7 @@ async def compose_version(compose, args): print(json.dumps(res)) return print("podman-compose version", __version__) - await compose.podman.run(["--version"], "", [], sleep=0) + await compose.podman.run(["--version"], "", []) def is_local(container: dict) -> bool: @@ -2023,8 +2038,8 @@ async def compose_systemd(compose, args): f.write(f"{k}={v}\n") print(f"writing [{fn}]: done.") print("\n\ncreating the pod without starting it: ...\n\n") - process = subprocess.run([script, "up", "--no-start"], check=False) - print("\nfinal exit code is ", process.returncode) + process = await asyncio.subprocess.create_subprocess_exec(script, ["up", "--no-start"]) + print("\nfinal exit code is ", process) username = getpass.getuser() print( f""" @@ -2096,13 +2111,7 @@ async def compose_pull(compose, args): local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)} images -= local_images - sem = asyncio.Semaphore(args.parallel) - - async def _pull(image: str): - async with sem: - return await compose.podman.run([], "pull", [image], sleep=0) - - await asyncio.gather(*[_pull(image) for image in images]) + await asyncio.gather(*[compose.podman.run([], "pull", [image]) for image in images]) @cmd_run(podman_compose, "push", "push stack images") @@ -2113,7 +2122,7 @@ async def compose_push(compose, args): continue if services and cnt["_service"] not in services: continue - await compose.podman.run([], "push", [cnt["image"]], sleep=0) + await compose.podman.run([], "push", [cnt["image"]]) async def build_one(compose, args, cnt): @@ -2177,29 +2186,24 @@ async def build_one(compose, args, cnt): ) ) build_args.append(ctx) - status = await compose.podman.run([], "build", build_args, sleep=0) + status = await compose.podman.run([], "build", build_args) return status @cmd_run(podman_compose, "build", "build stack images") async def compose_build(compose, args): tasks = [] - sem = asyncio.Semaphore(args.parallel) - - async def safe_build_task(cnt): - async with sem: - return await build_one(compose, args, cnt) if args.services: container_names_by_service = compose.container_names_by_service compose.assert_services(args.services) for service in args.services: cnt = compose.container_by_name[container_names_by_service[service][0]] - tasks.append(asyncio.create_task(safe_build_task(cnt))) + tasks.append(asyncio.create_task(build_one(compose, args, cnt))) else: for cnt in compose.containers: - tasks.append(asyncio.create_task(safe_build_task(cnt))) + tasks.append(asyncio.create_task(build_one(compose, args, cnt))) status = 0 for t in asyncio.as_completed(tasks): @@ -2283,7 +2287,7 @@ async def compose_up(compose: PodmanCompose, args): continue podman_args = await container_to_args(compose, cnt, detached=args.detach) subproc = await compose.podman.run([], podman_command, podman_args) - if podman_command == "run" and subproc: + if podman_command == "run" and subproc is not None: await compose.podman.run([], "start", [cnt["name"]]) if args.no_start or args.detach or args.dry_run: return @@ -2320,20 +2324,28 @@ async def compose_up(compose: PodmanCompose, args): tasks.add( asyncio.create_task( - compose.podman.run([], "start", ["-a", cnt["name"]], sleep=None, log_formatter=log_formatter), + compose.podman.run([], "start", ["-a", cnt["name"]], log_formatter=log_formatter), name=cnt["_service"] ) ) exit_code = 0 - for task in asyncio.as_completed(tasks): - t = await task + exiting = False + while tasks: + done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) if args.abort_on_container_exit: - [_.cancel() for _ in tasks if not _.cancelling() and not _.cancelled()] - if task.get_name() == exit_code_from: - exit_code = t + if not exiting: + # If 2 containers exit at the exact same time, the cancellation of the other ones cause the status + # to overwrite. Sleeping for 1 seems to fix this and make it match docker-compose + await asyncio.sleep(1) + [_.cancel() for _ in tasks if not _.cancelling() and not _.cancelled()] + t: Task + exiting = True + for t in done: + if t.get_name() == exit_code_from: + exit_code = t.result() - sys.exit(exit_code) + return exit_code def get_volume_names(compose, cnt): @@ -2360,6 +2372,8 @@ async def compose_down(compose, args): timeout_global = getattr(args, "timeout", None) containers = list(reversed(compose.containers)) + down_tasks = [] + for cnt in containers: if cnt["_service"] in excluded: continue @@ -2370,11 +2384,12 @@ async def compose_down(compose, args): timeout = str_to_seconds(timeout_str) if timeout is not None: podman_stop_args.extend(["-t", str(timeout)]) - await compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]], sleep=0) + down_tasks.append(asyncio.create_task(compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"])) + await asyncio.gather(*down_tasks) for cnt in containers: if cnt["_service"] in excluded: continue - await compose.podman.run([], "rm", [cnt["name"]], sleep=0) + await compose.podman.run([], "rm", [cnt["name"]]) if args.remove_orphans: names = ( (await compose.podman.output( @@ -2392,9 +2407,9 @@ async def compose_down(compose, args): .splitlines() ) for name in names: - await compose.podman.run([], "stop", [*podman_args, name], sleep=0) + await compose.podman.run([], "stop", [*podman_args, name]) for name in names: - await compose.podman.run([], "rm", [name], sleep=0) + await compose.podman.run([], "rm", [name]) if args.volumes: vol_names_to_keep = set() for cnt in containers: @@ -2402,7 +2417,7 @@ async def compose_down(compose, args): continue vol_names_to_keep.update(get_volume_names(compose, cnt)) log("keep", vol_names_to_keep) - async for volume_name in compose.podman.volume_ls(): + for volume_name in await compose.podman.volume_ls(): if volume_name in vol_names_to_keep: continue await compose.podman.run([], "volume", ["rm", volume_name]) @@ -2410,28 +2425,23 @@ async def compose_down(compose, args): if excluded: return for pod in compose.pods: - await compose.podman.run([], "pod", ["rm", pod["name"]], sleep=0) + await compose.podman.run([], "pod", ["rm", pod["name"]]) @cmd_run(podman_compose, "ps", "show status of containers") async def compose_ps(compose, args): proj_name = compose.project_name + ps_args = ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"] if args.quiet is True: - await compose.podman.run( - [], - "ps", - [ - "-a", - "--format", - "{{.ID}}", - "--filter", - f"label=io.podman.compose.project={proj_name}", - ], - ) - else: - await compose.podman.run( - [], "ps", ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"] - ) + ps_args.extend(["--format", "{{.ID}}"]) + elif args.format: + ps_args.extend(["--format", args.format]) + + await compose.podman.run( + [], + "ps", + ps_args, + ) @cmd_run( @@ -2459,17 +2469,19 @@ async def compose_run(compose, args): no_start=False, no_cache=False, build_arg=[], + parallel=1, + remove_orphans=True ) ) - compose.commands["up"](compose, up_args) + await compose.commands["up"](compose, up_args) build_args = argparse.Namespace( services=[args.service], if_not_exists=(not args.build), build_arg=[], - **args.__dict__, + **args.__dict__ ) - compose.commands["build"](compose, build_args) + await compose.commands["build"](compose, build_args) # adjust one-off container options name0 = "{}_{}_tmp{}".format( @@ -2510,7 +2522,7 @@ async def compose_run(compose, args): podman_args.insert(1, "-i") if args.rm: podman_args.insert(1, "--rm") - p = await compose.podman.run([], "run", podman_args, sleep=0) + p = await compose.podman.run([], "run", podman_args) sys.exit(p) @@ -2540,7 +2552,7 @@ async def compose_exec(compose, args): podman_args += [container_name] if args.cnt_command is not None and len(args.cnt_command) > 0: podman_args += args.cnt_command - p = await compose.podman.run([], "exec", podman_args, sleep=0) + p = await compose.podman.run([], "exec", podman_args) sys.exit(p) @@ -2571,23 +2583,23 @@ async def transfer_service_status(compose, args, action): timeout = str_to_seconds(timeout_str) if timeout is not None: podman_args.extend(["-t", str(timeout)]) - tasks.append(asyncio.create_task(compose.podman.run([], action, podman_args + [target], sleep=0))) + tasks.append(asyncio.create_task(compose.podman.run([], action, podman_args + [target]))) await asyncio.gather(*tasks) @cmd_run(podman_compose, "start", "start specific services") async def compose_start(compose, args): - transfer_service_status(compose, args, "start") + await transfer_service_status(compose, args, "start") @cmd_run(podman_compose, "stop", "stop specific services") async def compose_stop(compose, args): - transfer_service_status(compose, args, "stop") + await transfer_service_status(compose, args, "stop") @cmd_run(podman_compose, "restart", "restart specific services") async def compose_restart(compose, args): - transfer_service_status(compose, args, "restart") + await transfer_service_status(compose, args, "restart") @cmd_run(podman_compose, "logs", "show logs from services") @@ -3098,15 +3110,6 @@ def compose_ps_parse(parser): ) -@cmd_parse(podman_compose, ["build", "pull", "up"]) -def compose_build_pull_parse(parser): - parser.add_argument( - "--parallel", - type=int, - default=os.environ.get("PODMAN_PARALLEL", sys.maxsize) - ) - - @cmd_parse(podman_compose, ["build", "up"]) def compose_build_up_parse(parser): parser.add_argument( @@ -3215,12 +3218,6 @@ def compose_stats_parse(parser): type=int, help="Time in seconds between stats reports (default 5)", ) - parser.add_argument( - "-f", - "--format", - type=str, - help="Pretty-print container statistics to JSON or using a Go template", - ) parser.add_argument( "--no-reset", help="Disable resetting the screen between intervals", @@ -3233,6 +3230,15 @@ def compose_stats_parse(parser): ) +@cmd_parse(podman_compose, ["ps", "stats"]) +def compose_format_parse(parser): + parser.add_argument( + "-f", + "--format", + type=str, + help="Pretty-print container statistics to JSON or using a Go template", + ) + async def main(): await podman_compose.run() diff --git a/tests/deps/README.md b/tests/deps/README.md index bde213a..f689ed5 100644 --- a/tests/deps/README.md +++ b/tests/deps/README.md @@ -1,4 +1,4 @@ ``` -podman-compose run --rm sleep /bin/sh -c 'wget -O - http://localhost:8000/hosts' +podman-compose run --rm sleep /bin/sh -c 'wget -O - http://web:8000/hosts' ``` diff --git a/tests/deps/docker-compose.yaml b/tests/deps/docker-compose.yaml index 0f06bbd..7799323 100644 --- a/tests/deps/docker-compose.yaml +++ b/tests/deps/docker-compose.yaml @@ -9,7 +9,8 @@ services: sleep: image: busybox command: ["/bin/busybox", "sh", "-c", "sleep 3600"] - depends_on: "web" + depends_on: + - "web" tmpfs: - /run - /tmp diff --git a/tests/test_podman_compose.py b/tests/test_podman_compose.py index 5be6629..453e79a 100644 --- a/tests/test_podman_compose.py +++ b/tests/test_podman_compose.py @@ -31,14 +31,14 @@ def test_podman_compose_extends_w_file_subdir(): ] command_check_container = [ - "podman", - "container", + "coverage", + "run", + str(main_path.joinpath("podman_compose.py")), + "-f", + str(main_path.joinpath("tests", "extends_w_file_subdir", "docker-compose.yml")), "ps", - "--sort", - "status", - "--all", "--format", - '"{{.Image}}"', + '{{.Image}}', ] command_down = [ @@ -52,18 +52,17 @@ def test_podman_compose_extends_w_file_subdir(): out, _, returncode = capture(command_up) assert 0 == returncode # check container was created and exists - out, _, returncode = capture(command_check_container) + out, err, returncode = capture(command_check_container) assert 0 == returncode - assert b'"localhost/subdir_test:me"\n' in out + assert b'localhost/subdir_test:me\n' == out out, _, returncode = capture(command_down) # cleanup test image(tags) assert 0 == returncode print('ok') # check container did not exists anymore out, _, returncode = capture(command_check_container) - print(out) assert 0 == returncode - assert b'"localhost/subdir_test:me"\n' not in out + assert b'' == out def test_podman_compose_extends_w_empty_service(): diff --git a/tests/test_podman_compose_tests.py b/tests/test_podman_compose_tests.py new file mode 100644 index 0000000..378e5bd --- /dev/null +++ b/tests/test_podman_compose_tests.py @@ -0,0 +1,180 @@ +""" +test_podman_compose_up_down.py + +Tests the podman compose up and down commands used to create and remove services. +""" +# pylint: disable=redefined-outer-name +import os +import time + +from test_podman_compose import capture + + +def test_exit_from(podman_compose_path, test_path): + up_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "exit-from", "docker-compose.yaml"), + "up" + ] + + out, _, return_code = capture(up_cmd + ["--exit-code-from", "sh1"]) + assert return_code == 1 + + out, _, return_code = capture(up_cmd + ["--exit-code-from", "sh2"]) + assert return_code == 2 + + +def test_run(podman_compose_path, test_path): + """ + This will test depends_on as well + """ + run_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "deps", "docker-compose.yaml"), + "run", + "--rm", + "sleep", + "/bin/sh", + "-c", + "wget -q -O - http://web:8000/hosts" + ] + + out, _, return_code = capture(run_cmd) + assert b'127.0.0.1\tlocalhost' in out + + # Run it again to make sure we can run it twice. I saw an issue where a second run, with the container left up, + # would fail + run_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "deps", "docker-compose.yaml"), + "run", + "--rm", + "sleep", + "/bin/sh", + "-c", + "wget -q -O - http://web:8000/hosts" + ] + + out, _, return_code = capture(run_cmd) + assert b'127.0.0.1\tlocalhost' in out + assert return_code == 0 + + # This leaves a container running. Not sure it's intended, but it matches docker-compose + down_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "deps", "docker-compose.yaml"), + "down", + ] + + out, _, return_code = capture(run_cmd) + assert return_code == 0 + + +def test_up_with_ports(podman_compose_path, test_path): + + + up_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "ports", "docker-compose.yml"), + "up", + "-d", + "--force-recreate" + ] + + down_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "ports", "docker-compose.yml"), + "down", + "--volumes" + ] + + try: + out, _, return_code = capture(up_cmd) + assert return_code == 0 + + + finally: + out, _, return_code = capture(down_cmd) + assert return_code == 0 + + +def test_down_with_vols(podman_compose_path, test_path): + + up_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "vol", "docker-compose.yaml"), + "up", + "-d" + ] + + down_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "vol", "docker-compose.yaml"), + "down", + "--volumes" + ] + + try: + out, _, return_code = capture(["podman", "volume", "create", "my-app-data"]) + assert return_code == 0 + out, _, return_code = capture(["podman", "volume", "create", "actual-name-of-volume"]) + assert return_code == 0 + + out, _, return_code = capture(up_cmd) + assert return_code == 0 + + capture(["podman", "inspect", "volume", ""]) + + finally: + out, _, return_code = capture(down_cmd) + capture(["podman", "volume", "rm", "my-app-data"]) + capture(["podman", "volume", "rm", "actual-name-of-volume"]) + assert return_code == 0 + + +def test_down_with_orphans(podman_compose_path, test_path): + + container_id, _ , return_code = capture(["podman", "run", "--rm", "-d", "busybox", "/bin/busybox", "httpd", "-f", "-h", "/etc/", "-p", "8000"]) + + down_cmd = [ + "coverage", + "run", + podman_compose_path, + "-f", + os.path.join(test_path, "ports", "docker-compose.yml"), + "down", + "--volumes", + "--remove-orphans" + ] + + out, _, return_code = capture(down_cmd) + assert return_code == 0 + + _, _, exists = capture(["podman", "container", "exists", container_id.decode("utf-8")]) + + assert exists == 1 +