add TQDM and implement pool.join

This commit is contained in:
Muayyad Alsadi 2024-02-05 01:00:26 +03:00
parent 831caa6276
commit d475260951
2 changed files with 84 additions and 60 deletions

View File

@ -20,6 +20,8 @@ import glob
import asyncio.subprocess
import signal
from multiprocessing import cpu_count
import shlex
from asyncio import Task
@ -31,6 +33,12 @@ except ImportError:
# import fnmatch
# fnmatch.fnmatchcase(env, "*_HOST")
from tqdm.asyncio import tqdm
# TODO: we need a do-nothing fallback for tqdm
#except ImportError:
# tqdm = asyncio
import yaml
from dotenv import dotenv_values
@ -1158,29 +1166,28 @@ def flat_deps(services, with_extends=False):
class Podman:
def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)):
def __init__(self, compose, podman_path="podman", dry_run=False):
self.compose = compose
self.podman_path = podman_path
self.dry_run = dry_run
self.semaphore = semaphore
async def output(self, podman_args, cmd="", cmd_args=None):
async with self.semaphore:
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# NOTE: do we need pool.output? like pool.run
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
def exec(
self,
@ -1199,12 +1206,7 @@ class Podman:
podman_args,
cmd="",
cmd_args=None,
log_formatter=None,
*,
# Intentionally mutable default argument to hold references to tasks
task_reference=set()
) -> int:
async with self.semaphore:
log_formatter=None) -> int:
cmd_args = list(map(str, cmd_args or []))
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
@ -1225,16 +1227,9 @@ class Podman:
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
) # pylint: disable=consider-using-with
# This is hacky to make the tasks not get garbage collected
# https://github.com/python/cpython/issues/91887
out_t = asyncio.create_task(format_out(p.stdout))
task_reference.add(out_t)
out_t.add_done_callback(task_reference.discard)
err_t = asyncio.create_task(format_out(p.stderr))
task_reference.add(err_t)
err_t.add_done_callback(task_reference.discard)
else:
p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
@ -1273,6 +1268,34 @@ class Podman:
volumes = output.splitlines()
return volumes
class Pool:
def __init__(self, podman: Podman, parallel):
self.podman: Podman = podman
self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
self.tasks = []
def create_task(self, coro, *, name=None, context=None):
return self.tasks.append(asyncio.create_task(coro, name=name, context=context))
def run(self, *args, name=None, **kwargs):
return self.create_task(self._run_one(*args, **kwargs), name=name)
async def _run_one(self, *args, **kwargs) -> int:
async with self.semaphore:
return await self.podman.run(*args, **kwargs)
async def join(self, *, desc="joining enqueued tasks") -> int:
if not self.tasks: return 0
ls = await tqdm.gather(*self.tasks, desc=desc)
failed = [ i for i in ls if i != 0]
del self.tasks[:]
count = len(failed)
if count>1:
log(f"** EE ** got multiple failures: [{count}] failures")
if failed:
log(f"** EE ** retcode: [{failed[0]}]")
return failed[0]
return 0
def normalize_service(service, sub_dir=""):
if "build" in service:
@ -1469,6 +1492,7 @@ COMPOSE_DEFAULT_LS = [
class PodmanCompose:
def __init__(self):
self.podman = None
self.pool = None
self.podman_version = None
self.environ = {}
self.exit_code = None
@ -1529,7 +1553,8 @@ class PodmanCompose:
if args.dry_run is False:
log(f"Binary {podman_path} has not been found.")
sys.exit(1)
self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel))
self.podman = Podman(self, podman_path, args.dry_run)
self.pool = Pool(self.podman, args.parallel)
if not args.dry_run:
# just to make sure podman is running
@ -1553,6 +1578,7 @@ class PodmanCompose:
self._parse_compose_file()
cmd = self.commands[cmd_name]
retcode = await cmd(self, args)
print("retcode", retcode)
if isinstance(retcode, int):
sys.exit(retcode)
@ -1934,7 +1960,7 @@ class PodmanCompose:
parser.add_argument(
"--parallel",
type=int,
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", sys.maxsize)
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", 2*cpu_count())
)
@ -2101,7 +2127,7 @@ while in your project type `podman-compose systemd -a register`
@cmd_run(podman_compose, "pull", "pull stack images")
async def compose_pull(compose, args):
async def compose_pull(compose: PodmanCompose, args):
img_containers = [cnt for cnt in compose.containers if "image" in cnt]
if args.services:
services = set(args.services)
@ -2111,7 +2137,13 @@ async def compose_pull(compose, args):
local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)}
images -= local_images
await asyncio.gather(*[compose.podman.run([], "pull", [image]) for image in images])
images_process = tqdm(images, desc="pulling images")
for image in images_process:
# images_process.set_postfix_str(image)
compose.pool.run([], "pull", [image])
# uncomment to see how progress work
# await asyncio.sleep(1)
return await compose.pool.join(desc="waiting pull")
@cmd_run(podman_compose, "push", "push stack images")
@ -2191,27 +2223,20 @@ async def build_one(compose, args, cnt):
@cmd_run(podman_compose, "build", "build stack images")
async def compose_build(compose, args):
tasks = []
async def compose_build(compose: PodmanCompose, args):
if args.services:
container_names_by_service = compose.container_names_by_service
compose.assert_services(args.services)
for service in args.services:
for service in tqdm(args.services, desc="building"):
cnt = compose.container_by_name[container_names_by_service[service][0]]
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))
compose.pool.create_task(build_one(compose, args, cnt))
else:
for cnt in compose.containers:
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))
for cnt in tqdm(compose.containers, desc="building"):
compose.pool.create_task(build_one(compose, args, cnt))
status = 0
for t in asyncio.as_completed(tasks):
s = await t
if s is not None:
status = s
return status
return await compose.pool.join("waiting build")
async def create_pods(compose, args): # pylint: disable=unused-argument
@ -2366,15 +2391,13 @@ def get_volume_names(compose, cnt):
@cmd_run(podman_compose, "down", "tear down entire stack")
async def compose_down(compose, args):
async def compose_down(compose: PodmanCompose, args):
excluded = get_excluded(compose, args)
podman_args = []
timeout_global = getattr(args, "timeout", None)
containers = list(reversed(compose.containers))
down_tasks = []
for cnt in containers:
for cnt in tqdm(containers, "stopping ..."):
if cnt["_service"] in excluded:
continue
podman_stop_args = [*podman_args]
@ -2384,8 +2407,9 @@ async def compose_down(compose, args):
timeout = str_to_seconds(timeout_str)
if timeout is not None:
podman_stop_args.extend(["-t", str(timeout)])
down_tasks.append(asyncio.create_task(compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"]))
await asyncio.gather(*down_tasks)
compose.pool.run([], "stop", [*podman_stop_args, cnt["name"]], name=cnt["name"])
await compose.pool.join(desc="waiting to be stopped")
for cnt in containers:
if cnt["_service"] in excluded:
continue
@ -2407,7 +2431,7 @@ async def compose_down(compose, args):
.splitlines()
)
for name in names:
await compose.podman.run([], "stop", [*podman_args, name])
compose.podman.run([], "stop", [*podman_args, name])
for name in names:
await compose.podman.run([], "rm", [name])
if args.volumes:

View File

@ -1,7 +1,7 @@
version: "3"
services:
redis:
image: redis:alpine
image: docker.io/library/redis:alpine
command: ["redis-server", "--appendonly yes", "--notify-keyspace-events", "Ex"]
volumes:
- ./data/redis:/data:z
@ -12,7 +12,7 @@ services:
- SECRET_KEY=aabbcc
- ENV_IS_SET
web:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8000"]
working_dir: /var/www/html
volumes:
@ -21,19 +21,19 @@ services:
- /run
- /tmp
web1:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8001"]
working_dir: /var/www/html
volumes:
- ./data/web:/var/www/html:ro,z
web2:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8002"]
working_dir: /var/www/html
volumes:
- ~/Downloads/www:/var/www/html:ro,z
web3:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8003"]
working_dir: /var/www/html
volumes: