mirror of
synced 2025-03-29 00:56:08 +01:00
This commit is contained in:
@ -1,17 +1,18 @@
# C0111 missing-docstring
# missing-class-docstring, missing-function-docstring, missing-method-docstring, missing-module-docstrin
# C0111 missing-docstring: missing-class-docstring, missing-function-docstring, missing-method-docstring, missing-module-docstrin
# invalid-name
# allow _ for ignored variables
# allow generic names like a,b,c and i,j,k,l,m,n and x,y,z
# allow k,v for key/value
# allow e for exceptions, it for iterator
# ip for ip address
# allow e for exceptions, it for iterator, ix for index
# allow ip for ip address
# allow w,h for width, height
# allow op for operation/operator/opcode
# allow t, t0, t1, t2, and t3 for time
# allow dt for delta time
# allow db for database
# allow ls for list
# allow p for pipe
# allow ex for examples, exists ..etc
@ -193,7 +193,7 @@ def rec_subs(value, subs_dict):
do bash-like substitution in value and if list of dictionary do that recursively
if is_dict(value):
value = dict([(k, rec_subs(v, subs_dict)) for k, v in value.items()])
value = {k: rec_subs(v, subs_dict) for k, v in value.items()}
elif is_str(value):
def convert(m):
if m.group("escaped") is not None:
@ -239,7 +239,8 @@ def norm_as_dict(src):
dst = dict(src)
elif is_list(src):
dst = [i.split("=", 1) for i in src if i]
dst = dict([(a if len(a) == 2 else (a[0], None)) for a in dst])
dst = [(a if len(a) == 2 else (a[0], None)) for a in dst]
dst = dict(dst)
elif is_str(src):
key, value = src.split("=", 1) if "=" in src else (src, None)
dst = {key: value}
@ -254,7 +255,7 @@ def norm_ulimit(inner_value):
soft = inner_value.get("soft", inner_value.get("hard", None))
hard = inner_value.get("hard", inner_value.get("soft", None))
return "{}:{}".format(soft, hard)
elif is_list(inner_value): return norm_ulimit(norm_as_dict(inner_value))
if is_list(inner_value): return norm_ulimit(norm_as_dict(inner_value))
# if int or string return as is
return inner_value
@ -343,19 +344,18 @@ def mount_desc_to_mount_args(compose, mount_desc, srv_name, cnt_name): # pylint:
elif mount_type == 'volume':
if mount_type == 'volume':
return "type=volume,source={source},destination={target},{opts}".format(
elif mount_type == 'tmpfs':
if mount_type == 'tmpfs':
return "type=tmpfs,destination={target},{opts}".format(
raise ValueError("unknown mount type:"+mount_type)
raise ValueError("unknown mount type:"+mount_type)
def container_to_ulimit_args(cnt, podman_args):
ulimit = cnt.get('ulimits', [])
@ -372,7 +372,7 @@ def container_to_ulimit_args(cnt, podman_args):
def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylint: disable=unused-argument
mount_type = mount_desc["type"]
if mount_type != 'bind' and mount_type != 'volume':
if mount_type not in ('bind', 'volume'):
raise ValueError("unknown mount type:"+mount_type)
vol = mount_desc.get("_vol", None) if mount_type=="volume" else None
source = vol["name"] if vol else mount_desc.get("source", None)
@ -430,9 +430,8 @@ def get_mount_args(compose, cnt, volume):
return ['--tmpfs', args]
args = mount_desc_to_volume_args(compose, volume, srv_name, cnt['name'])
return ['-v', args]
args = mount_desc_to_mount_args(compose, volume, srv_name, cnt['name'])
return ['--mount', args]
args = mount_desc_to_mount_args(compose, volume, srv_name, cnt['name'])
return ['--mount', args]
def get_secret_args(compose, cnt, secret):
@ -490,9 +489,9 @@ def get_secret_args(compose, cnt, secret):
err_str = 'ERROR: Custom name/target reference "{}" for mounted external secret "{}" is not supported'
if ext_name and ext_name != secret_name:
raise ValueError(err_str.format(secret_name, ext_name))
elif target and target != secret_name:
if target and target != secret_name:
raise ValueError(err_str.format(target, secret_name))
elif target:
if target:
log('WARNING: Service "{}" uses target: "{}" for secret: "{}".'
.format(cnt['_service'], target, secret_name)
+ ' That is un-supported and a no-op and is ignored.')
@ -680,8 +679,8 @@ def container_to_args(compose, cnt, detached=True):
if deps:
sec = norm_as_list(cnt.get("security_opt", None))
for s in sec:
podman_args.extend(['--security-opt', s])
for sec_item in sec:
podman_args.extend(['--security-opt', sec_item])
ann = norm_as_list(cnt.get("annotations", None))
for a in ann:
podman_args.extend(['--annotation', a])
@ -693,14 +692,14 @@ def container_to_args(compose, cnt, detached=True):
podman_args.extend(['--cap-add', c])
for c in cnt.get('cap_drop', []):
podman_args.extend(['--cap-drop', c])
for d in cnt.get('devices', []):
podman_args.extend(['--device', d])
for d in norm_as_list(cnt.get('dns', None)):
podman_args.extend(['--dns', d])
for d in norm_as_list(cnt.get('dns_opt', None)):
podman_args.extend(['--dns-opt', d])
for d in norm_as_list(cnt.get('dns_search', None)):
podman_args.extend(['--dns-search', d])
for item in cnt.get('devices', []):
podman_args.extend(['--device', item])
for item in norm_as_list(cnt.get('dns', None)):
podman_args.extend(['--dns', item])
for item in norm_as_list(cnt.get('dns_opt', None)):
podman_args.extend(['--dns-opt', item])
for item in norm_as_list(cnt.get('dns_search', None)):
podman_args.extend(['--dns-search', item])
env_file = cnt.get('env_file', [])
if is_str(env_file): env_file = [env_file]
for i in env_file:
@ -907,11 +906,11 @@ class Podman:
# subprocess.Popen(args, bufsize = 0, executable = None, stdin = None, stdout = None, stderr = None, preexec_fn = None, close_fds = False, shell = False, cwd = None, env = None, universal_newlines = False, startupinfo = None, creationflags = 0)
if log_formatter is not None:
# Pipe podman process output through log_formatter (which can add colored prefix)
p = subprocess.Popen(cmd_ls, stdout=subprocess.PIPE)
_ = subprocess.Popen(log_formatter, stdin=p.stdout)
p = subprocess.Popen(cmd_ls, stdout=subprocess.PIPE) # pylint: disable=consider-using-with
_ = subprocess.Popen(log_formatter, stdin=p.stdout) # pylint: disable=consider-using-with
p.stdout.close() # Allow p_process to receive a SIGPIPE if logging process exits.
p = subprocess.Popen(cmd_ls)
p = subprocess.Popen(cmd_ls) # pylint: disable=consider-using-with
if wait:
exit_code = p.wait()
@ -956,7 +955,7 @@ def normalize_service(service, sub_dir=''):
if "security_opt" in service:
sec_ls = service["security_opt"]
for ix, item in enumerate(sec_ls):
if item=="seccomp:unconfined" or item=="apparmor:unconfined":
if item in ('seccomp:unconfined', 'apparmor:unconfined'):
sec_ls[ix] = item.replace(":", "=")
for key in ("environment", "labels"):
if key not in service: continue
@ -990,12 +989,12 @@ def rec_merge_one(target, source):
if key in done: continue
if key not in source: continue
value2 = source[key]
if type(value2)!=type(value):
if isinstance(value2, type(value)):
raise ValueError("can't merge value of {} of type {} and {}".format(key, type(value), type(value2)))
if is_list(value2):
if key == 'volumes':
# clean duplicate mount targets
pts = set([ v.split(':', 1)[1] for v in value2 if ":" in v ])
pts = { v.split(':', 1)[1] for v in value2 if ":" in v }
del_ls = [ ix for (ix, v) in enumerate(value) if ":" in v and v.split(':', 1)[1] in pts ]
for ix in reversed(del_ls):
del value[ix]
@ -1100,7 +1099,7 @@ class PodmanCompose:
if missing:
missing_csv = ",".join(missing)
log(f"missing services [{missing_csv}]")
def get_podman_args(self, cmd):
xargs = []
@ -1121,9 +1120,9 @@ class PodmanCompose:
podman_path = os.path.realpath(podman_path)
# this also works if podman hasn't been installed now
if args.dry_run == False:
if args.dry_run is False:
sys.stderr.write("Binary {} has not been found.\n".format(podman_path))
self.podman = Podman(self, podman_path, args.dry_run)
if not args.dry_run:
# just to make sure podman is running
@ -1134,10 +1133,10 @@ class PodmanCompose:
self.podman_version = None
if not self.podman_version:
sys.stderr.write("it seems that you do not have `podman` installed\n")
log("using podman version: "+self.podman_version)
cmd_name = args.command
if (cmd_name != "version"):
if cmd_name != "version":
cmd = self.commands[cmd_name]
cmd(self, args)
@ -1156,12 +1155,12 @@ class PodmanCompose:
files = args.file
if not files:
log("no compose.yaml, docker-compose.yml or container-compose.yml file found, pass files with -f")
ex = map(os.path.exists, files)
missing = [ fn0 for ex0, fn0 in zip(ex, files) if not ex0 ]
if missing:
log("missing files: ", missing)
# make absolute
relative_files = files
files = list(map(os.path.realpath, files))
@ -1206,7 +1205,7 @@ class PodmanCompose:
#log(filename, json.dumps(content, indent = 2))
if not isinstance(content, dict):
sys.stderr.write("Compose file does not contain a top level object: %s\n"%filename)
content = normalize(content)
#log(filename, json.dumps(content, indent = 2))
content = rec_subs(content, self.environ)
@ -1311,7 +1310,7 @@ class PodmanCompose:
raise RuntimeError(f"volume [{vol_name}] not defined in top level")
self.container_names_by_service = container_names_by_service
self.all_services = set(container_names_by_service.keys())
container_by_name = dict([(c["name"], c) for c in given_containers])
container_by_name = {c["name"]: c for c in given_containers}
#log("deps:", [(c["name"], c["_deps"]) for c in given_containers])
given_containers = list(container_by_name.values())
given_containers.sort(key=lambda c: len(c.get('_deps', None) or []))
@ -1319,7 +1318,7 @@ class PodmanCompose:
pods, containers = tr_identity(project_name, given_containers)
self.pods = pods
self.containers = containers
self.container_by_name = dict([ (c["name"], c) for c in containers])
self.container_by_name = {c["name"]: c for c in containers}
def _parse_args(self):
parser = argparse.ArgumentParser(
@ -1337,10 +1336,11 @@ class PodmanCompose:
self.global_args.command = "version"
if not self.global_args.command or self.global_args.command=='help':
return self.global_args
def _init_global_parser(self, parser):
def _init_global_parser(parser):
parser.add_argument("-v", "--version",
help="show version", action='store_true')
parser.add_argument("-f", "--file",
@ -1372,7 +1372,7 @@ podman_compose = PodmanCompose()
# decorators to add commands and parse options
class cmd_run: # pylint: disable=invalid-name
class cmd_run: # pylint: disable=invalid-name,too-few-public-methods
def __init__(self, compose, cmd_name, cmd_desc):
self.compose = compose
self.cmd_name = cmd_name
@ -1387,7 +1387,7 @@ class cmd_run: # pylint: disable=invalid-name
self.compose.commands[self.cmd_name] = wrapped
return wrapped
class cmd_parse: # pylint: disable=invalid-name
class cmd_parse: # pylint: disable=invalid-name,too-few-public-methods
def __init__(self, compose, cmd_names):
self.compose = compose
self.cmd_names = cmd_names if is_list(cmd_names) else [cmd_names]
@ -1588,7 +1588,7 @@ def compose_up(compose, args):
if args.abort_on_container_exit:
exit_code = compose.exit_code if compose.exit_code is not None else -1
def get_volume_names(compose, cnt):
proj_name = compose.project_name
@ -1638,7 +1638,7 @@ def compose_down(compose, args):
@cmd_run(podman_compose, 'ps', 'show status of containers')
def compose_ps(compose, args):
proj_name = compose.project_name
if args.quiet == True:
if args.quiet is True:
compose.podman.run([], "ps", ["-a", "--format", "{{.ID}}", "--filter", f"label=io.podman.compose.project={proj_name}"])
compose.podman.run([], "ps", ["-a", "--filter", f"label=io.podman.compose.project={proj_name}"])
@ -1677,7 +1677,7 @@ def compose_run(compose, args):
if args.volume:
# TODO: handle volumes
cnt['tty']=False if args.T else True
cnt['tty'] = not args.T
if args.cnt_command is not None and len(args.cnt_command) > 0:
# can't restart and --rm
@ -1690,7 +1690,7 @@ def compose_run(compose, args):
if args.rm:
podman_args.insert(1, '--rm')
p = compose.podman.run([], 'run', podman_args, sleep=0)
@cmd_run(podman_compose, 'exec', 'execute a command in a running container')
def compose_exec(compose, args):
@ -1713,7 +1713,7 @@ def compose_exec(compose, args):
if args.cnt_command is not None and len(args.cnt_command) > 0:
podman_args += args.cnt_command
p = compose.podman.run([], 'exec', podman_args, sleep=0)
def transfer_service_status(compose, args, action):
Reference in New Issue
Block a user