mirror of
https://github.com/containers/podman-compose.git
synced 2025-07-01 13:10:24 +02:00
Compare commits
3 Commits
v1.2.0
...
devel-asyn
Author | SHA1 | Date | |
---|---|---|---|
1b85ebe506 | |||
9ed05a23ef | |||
d475260951 |
@ -20,6 +20,8 @@ import glob
|
|||||||
import asyncio.subprocess
|
import asyncio.subprocess
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
|
from multiprocessing import cpu_count
|
||||||
|
|
||||||
import shlex
|
import shlex
|
||||||
from asyncio import Task
|
from asyncio import Task
|
||||||
|
|
||||||
@ -31,10 +33,16 @@ except ImportError:
|
|||||||
# import fnmatch
|
# import fnmatch
|
||||||
# fnmatch.fnmatchcase(env, "*_HOST")
|
# fnmatch.fnmatchcase(env, "*_HOST")
|
||||||
|
|
||||||
|
|
||||||
|
from tqdm.asyncio import tqdm
|
||||||
|
# TODO: we need a do-nothing fallback for tqdm
|
||||||
|
#except ImportError:
|
||||||
|
# tqdm = asyncio
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from dotenv import dotenv_values
|
from dotenv import dotenv_values
|
||||||
|
|
||||||
__version__ = "1.0.7"
|
__version__ = "1.0.8"
|
||||||
|
|
||||||
script = os.path.realpath(sys.argv[0])
|
script = os.path.realpath(sys.argv[0])
|
||||||
|
|
||||||
@ -1158,29 +1166,28 @@ def flat_deps(services, with_extends=False):
|
|||||||
|
|
||||||
|
|
||||||
class Podman:
|
class Podman:
|
||||||
def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)):
|
def __init__(self, compose, podman_path="podman", dry_run=False):
|
||||||
self.compose = compose
|
self.compose = compose
|
||||||
self.podman_path = podman_path
|
self.podman_path = podman_path
|
||||||
self.dry_run = dry_run
|
self.dry_run = dry_run
|
||||||
self.semaphore = semaphore
|
|
||||||
|
|
||||||
async def output(self, podman_args, cmd="", cmd_args=None):
|
async def output(self, podman_args, cmd="", cmd_args=None):
|
||||||
async with self.semaphore:
|
# NOTE: do we need pool.output? like pool.run
|
||||||
cmd_args = cmd_args or []
|
cmd_args = cmd_args or []
|
||||||
xargs = self.compose.get_podman_args(cmd) if cmd else []
|
xargs = self.compose.get_podman_args(cmd) if cmd else []
|
||||||
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
|
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
|
||||||
log(cmd_ls)
|
log(cmd_ls)
|
||||||
p = await asyncio.subprocess.create_subprocess_exec(
|
p = await asyncio.subprocess.create_subprocess_exec(
|
||||||
*cmd_ls,
|
*cmd_ls,
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE
|
stderr=asyncio.subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout_data, stderr_data = await p.communicate()
|
stdout_data, stderr_data = await p.communicate()
|
||||||
if p.returncode == 0:
|
if p.returncode == 0:
|
||||||
return stdout_data
|
return stdout_data
|
||||||
else:
|
else:
|
||||||
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
|
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
|
||||||
|
|
||||||
def exec(
|
def exec(
|
||||||
self,
|
self,
|
||||||
@ -1199,12 +1206,7 @@ class Podman:
|
|||||||
podman_args,
|
podman_args,
|
||||||
cmd="",
|
cmd="",
|
||||||
cmd_args=None,
|
cmd_args=None,
|
||||||
log_formatter=None,
|
log_formatter=None) -> int:
|
||||||
*,
|
|
||||||
# Intentionally mutable default argument to hold references to tasks
|
|
||||||
task_reference=set()
|
|
||||||
) -> int:
|
|
||||||
async with self.semaphore:
|
|
||||||
cmd_args = list(map(str, cmd_args or []))
|
cmd_args = list(map(str, cmd_args or []))
|
||||||
xargs = self.compose.get_podman_args(cmd) if cmd else []
|
xargs = self.compose.get_podman_args(cmd) if cmd else []
|
||||||
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
|
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
|
||||||
@ -1225,16 +1227,9 @@ class Podman:
|
|||||||
p = await asyncio.subprocess.create_subprocess_exec(
|
p = await asyncio.subprocess.create_subprocess_exec(
|
||||||
*cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
*cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||||||
) # pylint: disable=consider-using-with
|
) # pylint: disable=consider-using-with
|
||||||
|
|
||||||
# This is hacky to make the tasks not get garbage collected
|
|
||||||
# https://github.com/python/cpython/issues/91887
|
|
||||||
out_t = asyncio.create_task(format_out(p.stdout))
|
out_t = asyncio.create_task(format_out(p.stdout))
|
||||||
task_reference.add(out_t)
|
|
||||||
out_t.add_done_callback(task_reference.discard)
|
|
||||||
|
|
||||||
err_t = asyncio.create_task(format_out(p.stderr))
|
err_t = asyncio.create_task(format_out(p.stderr))
|
||||||
task_reference.add(err_t)
|
|
||||||
err_t.add_done_callback(task_reference.discard)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
|
p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
|
||||||
@ -1273,6 +1268,34 @@ class Podman:
|
|||||||
volumes = output.splitlines()
|
volumes = output.splitlines()
|
||||||
return volumes
|
return volumes
|
||||||
|
|
||||||
|
class Pool:
|
||||||
|
def __init__(self, podman: Podman, parallel):
|
||||||
|
self.podman: Podman = podman
|
||||||
|
self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
|
||||||
|
self.tasks = []
|
||||||
|
|
||||||
|
def create_task(self, coro, *, name=None, context=None):
|
||||||
|
return self.tasks.append(asyncio.create_task(coro, name=name, context=context))
|
||||||
|
|
||||||
|
def run(self, *args, name=None, **kwargs):
|
||||||
|
return self.create_task(self._run_one(*args, **kwargs), name=name)
|
||||||
|
|
||||||
|
async def _run_one(self, *args, **kwargs) -> int:
|
||||||
|
async with self.semaphore:
|
||||||
|
return await self.podman.run(*args, **kwargs)
|
||||||
|
|
||||||
|
async def join(self, *, desc="joining enqueued tasks") -> int:
|
||||||
|
if not self.tasks: return 0
|
||||||
|
ls = await tqdm.gather(*self.tasks, desc=desc)
|
||||||
|
failed = [ i for i in ls if i != 0]
|
||||||
|
del self.tasks[:]
|
||||||
|
count = len(failed)
|
||||||
|
if count>1:
|
||||||
|
log(f"** EE ** got multiple failures: [{count}] failures")
|
||||||
|
if failed:
|
||||||
|
log(f"** EE ** retcode: [{failed[0]}]")
|
||||||
|
return failed[0]
|
||||||
|
return 0
|
||||||
|
|
||||||
def normalize_service(service, sub_dir=""):
|
def normalize_service(service, sub_dir=""):
|
||||||
if "build" in service:
|
if "build" in service:
|
||||||
@ -1469,6 +1492,7 @@ COMPOSE_DEFAULT_LS = [
|
|||||||
class PodmanCompose:
|
class PodmanCompose:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.podman = None
|
self.podman = None
|
||||||
|
self.pool = None
|
||||||
self.podman_version = None
|
self.podman_version = None
|
||||||
self.environ = {}
|
self.environ = {}
|
||||||
self.exit_code = None
|
self.exit_code = None
|
||||||
@ -1529,7 +1553,8 @@ class PodmanCompose:
|
|||||||
if args.dry_run is False:
|
if args.dry_run is False:
|
||||||
log(f"Binary {podman_path} has not been found.")
|
log(f"Binary {podman_path} has not been found.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel))
|
self.podman = Podman(self, podman_path, args.dry_run)
|
||||||
|
self.pool = Pool(self.podman, args.parallel)
|
||||||
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
# just to make sure podman is running
|
# just to make sure podman is running
|
||||||
@ -1553,6 +1578,7 @@ class PodmanCompose:
|
|||||||
self._parse_compose_file()
|
self._parse_compose_file()
|
||||||
cmd = self.commands[cmd_name]
|
cmd = self.commands[cmd_name]
|
||||||
retcode = await cmd(self, args)
|
retcode = await cmd(self, args)
|
||||||
|
print("retcode", retcode)
|
||||||
if isinstance(retcode, int):
|
if isinstance(retcode, int):
|
||||||
sys.exit(retcode)
|
sys.exit(retcode)
|
||||||
|
|
||||||
@ -1934,7 +1960,7 @@ class PodmanCompose:
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--parallel",
|
"--parallel",
|
||||||
type=int,
|
type=int,
|
||||||
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", sys.maxsize)
|
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", 2*cpu_count())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -2101,7 +2127,7 @@ while in your project type `podman-compose systemd -a register`
|
|||||||
|
|
||||||
|
|
||||||
@cmd_run(podman_compose, "pull", "pull stack images")
|
@cmd_run(podman_compose, "pull", "pull stack images")
|
||||||
async def compose_pull(compose, args):
|
async def compose_pull(compose: PodmanCompose, args):
|
||||||
img_containers = [cnt for cnt in compose.containers if "image" in cnt]
|
img_containers = [cnt for cnt in compose.containers if "image" in cnt]
|
||||||
if args.services:
|
if args.services:
|
||||||
services = set(args.services)
|
services = set(args.services)
|
||||||
@ -2111,7 +2137,13 @@ async def compose_pull(compose, args):
|
|||||||
local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)}
|
local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)}
|
||||||
images -= local_images
|
images -= local_images
|
||||||
|
|
||||||
await asyncio.gather(*[compose.podman.run([], "pull", [image]) for image in images])
|
images_process = tqdm(images, desc="pulling images")
|
||||||
|
for image in images_process:
|
||||||
|
# images_process.set_postfix_str(image)
|
||||||
|
compose.pool.run([], "pull", [image])
|
||||||
|
# uncomment to see how progress work
|
||||||
|
# await asyncio.sleep(1)
|
||||||
|
return await compose.pool.join(desc="waiting pull")
|
||||||
|
|
||||||
|
|
||||||
@cmd_run(podman_compose, "push", "push stack images")
|
@cmd_run(podman_compose, "push", "push stack images")
|
||||||
@ -2191,27 +2223,20 @@ async def build_one(compose, args, cnt):
|
|||||||
|
|
||||||
|
|
||||||
@cmd_run(podman_compose, "build", "build stack images")
|
@cmd_run(podman_compose, "build", "build stack images")
|
||||||
async def compose_build(compose, args):
|
async def compose_build(compose: PodmanCompose, args):
|
||||||
tasks = []
|
|
||||||
|
|
||||||
if args.services:
|
if args.services:
|
||||||
container_names_by_service = compose.container_names_by_service
|
container_names_by_service = compose.container_names_by_service
|
||||||
compose.assert_services(args.services)
|
compose.assert_services(args.services)
|
||||||
for service in args.services:
|
for service in tqdm(args.services, desc="building"):
|
||||||
cnt = compose.container_by_name[container_names_by_service[service][0]]
|
cnt = compose.container_by_name[container_names_by_service[service][0]]
|
||||||
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))
|
compose.pool.create_task(build_one(compose, args, cnt))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
for cnt in compose.containers:
|
for cnt in tqdm(compose.containers, desc="building"):
|
||||||
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))
|
compose.pool.create_task(build_one(compose, args, cnt))
|
||||||
|
|
||||||
status = 0
|
return await compose.pool.join("waiting build")
|
||||||
for t in asyncio.as_completed(tasks):
|
|
||||||
s = await t
|
|
||||||
if s is not None:
|
|
||||||
status = s
|
|
||||||
|
|
||||||
return status
|
|
||||||
|
|
||||||
|
|
||||||
async def create_pods(compose, args): # pylint: disable=unused-argument
|
async def create_pods(compose, args): # pylint: disable=unused-argument
|
||||||
@ -2366,15 +2391,13 @@ def get_volume_names(compose, cnt):
|
|||||||
|
|
||||||
|
|
||||||
@cmd_run(podman_compose, "down", "tear down entire stack")
|
@cmd_run(podman_compose, "down", "tear down entire stack")
|
||||||
async def compose_down(compose, args):
|
async def compose_down(compose: PodmanCompose, args):
|
||||||
excluded = get_excluded(compose, args)
|
excluded = get_excluded(compose, args)
|
||||||
podman_args = []
|
podman_args = []
|
||||||
timeout_global = getattr(args, "timeout", None)
|
timeout_global = getattr(args, "timeout", None)
|
||||||
containers = list(reversed(compose.containers))
|
containers = list(reversed(compose.containers))
|
||||||
|
|
||||||
down_tasks = []
|
for cnt in tqdm(containers, "stopping ..."):
|
||||||
|
|
||||||
for cnt in containers:
|
|
||||||
if cnt["_service"] in excluded:
|
if cnt["_service"] in excluded:
|
||||||
continue
|
continue
|
||||||
podman_stop_args = [*podman_args]
|
podman_stop_args = [*podman_args]
|
||||||
@ -2384,8 +2407,9 @@ async def compose_down(compose, args):
|
|||||||
timeout = str_to_seconds(timeout_str)
|
timeout = str_to_seconds(timeout_str)
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
podman_stop_args.extend(["-t", str(timeout)])
|
podman_stop_args.extend(["-t", str(timeout)])
|
||||||
down_tasks.append(asyncio.create_task(compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"]))
|
compose.pool.run([], "stop", [*podman_stop_args, cnt["name"]], name=cnt["name"])
|
||||||
await asyncio.gather(*down_tasks)
|
|
||||||
|
await compose.pool.join(desc="waiting to be stopped")
|
||||||
for cnt in containers:
|
for cnt in containers:
|
||||||
if cnt["_service"] in excluded:
|
if cnt["_service"] in excluded:
|
||||||
continue
|
continue
|
||||||
@ -2407,7 +2431,7 @@ async def compose_down(compose, args):
|
|||||||
.splitlines()
|
.splitlines()
|
||||||
)
|
)
|
||||||
for name in names:
|
for name in names:
|
||||||
await compose.podman.run([], "stop", [*podman_args, name])
|
compose.podman.run([], "stop", [*podman_args, name])
|
||||||
for name in names:
|
for name in names:
|
||||||
await compose.podman.run([], "rm", [name])
|
await compose.podman.run([], "rm", [name])
|
||||||
if args.volumes:
|
if args.volumes:
|
||||||
@ -2556,7 +2580,7 @@ async def compose_exec(compose, args):
|
|||||||
sys.exit(p)
|
sys.exit(p)
|
||||||
|
|
||||||
|
|
||||||
async def transfer_service_status(compose, args, action):
|
async def transfer_service_status(compose: PodmanCompose, args, action):
|
||||||
# TODO: handle dependencies, handle creations
|
# TODO: handle dependencies, handle creations
|
||||||
container_names_by_service = compose.container_names_by_service
|
container_names_by_service = compose.container_names_by_service
|
||||||
if not args.services:
|
if not args.services:
|
||||||
@ -2571,8 +2595,7 @@ async def transfer_service_status(compose, args, action):
|
|||||||
targets = list(reversed(targets))
|
targets = list(reversed(targets))
|
||||||
podman_args = []
|
podman_args = []
|
||||||
timeout_global = getattr(args, "timeout", None)
|
timeout_global = getattr(args, "timeout", None)
|
||||||
tasks = []
|
for target in tqdm(targets):
|
||||||
for target in targets:
|
|
||||||
if action != "start":
|
if action != "start":
|
||||||
timeout = timeout_global
|
timeout = timeout_global
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
@ -2583,8 +2606,8 @@ async def transfer_service_status(compose, args, action):
|
|||||||
timeout = str_to_seconds(timeout_str)
|
timeout = str_to_seconds(timeout_str)
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
podman_args.extend(["-t", str(timeout)])
|
podman_args.extend(["-t", str(timeout)])
|
||||||
tasks.append(asyncio.create_task(compose.podman.run([], action, podman_args + [target])))
|
compose.pool.run([], action, podman_args + [target], name=target)
|
||||||
await asyncio.gather(*tasks)
|
await compose.pool.join()
|
||||||
|
|
||||||
|
|
||||||
@cmd_run(podman_compose, "start", "start specific services")
|
@cmd_run(podman_compose, "start", "start specific services")
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
version: "3"
|
version: "3"
|
||||||
services:
|
services:
|
||||||
redis:
|
redis:
|
||||||
image: redis:alpine
|
image: docker.io/library/redis:alpine
|
||||||
command: ["redis-server", "--appendonly yes", "--notify-keyspace-events", "Ex"]
|
command: ["redis-server", "--appendonly yes", "--notify-keyspace-events", "Ex"]
|
||||||
volumes:
|
volumes:
|
||||||
- ./data/redis:/data:z
|
- ./data/redis:/data:z
|
||||||
@ -12,7 +12,7 @@ services:
|
|||||||
- SECRET_KEY=aabbcc
|
- SECRET_KEY=aabbcc
|
||||||
- ENV_IS_SET
|
- ENV_IS_SET
|
||||||
web:
|
web:
|
||||||
image: busybox
|
image: docker.io/library/busybox
|
||||||
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8000"]
|
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8000"]
|
||||||
working_dir: /var/www/html
|
working_dir: /var/www/html
|
||||||
volumes:
|
volumes:
|
||||||
@ -21,19 +21,19 @@ services:
|
|||||||
- /run
|
- /run
|
||||||
- /tmp
|
- /tmp
|
||||||
web1:
|
web1:
|
||||||
image: busybox
|
image: docker.io/library/busybox
|
||||||
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8001"]
|
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8001"]
|
||||||
working_dir: /var/www/html
|
working_dir: /var/www/html
|
||||||
volumes:
|
volumes:
|
||||||
- ./data/web:/var/www/html:ro,z
|
- ./data/web:/var/www/html:ro,z
|
||||||
web2:
|
web2:
|
||||||
image: busybox
|
image: docker.io/library/busybox
|
||||||
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8002"]
|
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8002"]
|
||||||
working_dir: /var/www/html
|
working_dir: /var/www/html
|
||||||
volumes:
|
volumes:
|
||||||
- ~/Downloads/www:/var/www/html:ro,z
|
- ~/Downloads/www:/var/www/html:ro,z
|
||||||
web3:
|
web3:
|
||||||
image: busybox
|
image: docker.io/library/busybox
|
||||||
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8003"]
|
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8003"]
|
||||||
working_dir: /var/www/html
|
working_dir: /var/www/html
|
||||||
volumes:
|
volumes:
|
||||||
|
Reference in New Issue
Block a user