Simplified statements to retrieve keys from dictionaries

Signed-off-by: Felix Rubio <felix@kngnt.org>
This commit is contained in:
Felix Rubio 2024-12-01 20:24:40 +01:00 committed by Povilas Kanapickas
parent a67fa0beb5
commit 145ae47c48

View File

@ -205,9 +205,9 @@ def fix_mount_dict(compose, mount_dict, srv_name):
return mount_dict
if mount_dict["type"] == "volume":
vols = compose.vols
source = mount_dict.get("source", None)
vol = (vols.get(source, None) or {}) if source else {}
name = vol.get("name", None)
source = mount_dict.get("source")
vol = (vols.get(source, {}) or {}) if source else {}
name = vol.get("name")
mount_dict["_vol"] = vol
# handle anonymous or implied volume
if not source:
@ -218,7 +218,7 @@ def fix_mount_dict(compose, mount_dict, srv_name):
hashlib.sha256(mount_dict["target"].encode("utf-8")).hexdigest(),
])
elif not name:
external = vol.get("external", None)
external = vol.get("external")
if isinstance(external, dict):
vol["name"] = external.get("name", f"{source}")
elif external:
@ -337,8 +337,8 @@ def norm_ulimit(inner_value):
if isinstance(inner_value, dict):
if not inner_value.keys() & {"soft", "hard"}:
raise ValueError("expected at least one soft or hard limit")
soft = inner_value.get("soft", inner_value.get("hard", None))
hard = inner_value.get("hard", inner_value.get("soft", None))
soft = inner_value.get("soft", inner_value.get("hard"))
hard = inner_value.get("hard", inner_value.get("soft"))
return f"{soft}:{hard}"
if is_list(inner_value):
return norm_ulimit(norm_as_dict(inner_value))
@ -384,7 +384,7 @@ async def assert_volume(compose, mount_dict):
inspect volume to get directory
create volume if needed
"""
vol = mount_dict.get("_vol", None)
vol = mount_dict.get("_vol")
if mount_dict["type"] == "bind":
basedir = os.path.realpath(compose.dirname)
mount_src = mount_dict["source"]
@ -395,10 +395,10 @@ async def assert_volume(compose, mount_dict):
except OSError:
pass
return
if mount_dict["type"] != "volume" or not vol or not vol.get("name", None):
if mount_dict["type"] != "volume" or not vol or not vol.get("name"):
return
vol_name = vol["name"]
is_ext = vol.get("external", None)
is_ext = vol.get("external")
log.debug("podman volume inspect %s || podman volume create %s", vol_name, vol_name)
# TODO: might move to using "volume list"
# podman volume list --format '{{.Name}}\t{{.MountPoint}}' \
@ -408,7 +408,7 @@ async def assert_volume(compose, mount_dict):
except subprocess.CalledProcessError as e:
if is_ext:
raise RuntimeError(f"External volume [{vol_name}] does not exists") from e
labels = vol.get("labels", None) or []
labels = vol.get("labels", [])
args = [
"create",
"--label",
@ -418,10 +418,10 @@ async def assert_volume(compose, mount_dict):
]
for item in norm_as_list(labels):
args.extend(["--label", item])
driver = vol.get("driver", None)
driver = vol.get("driver")
if driver:
args.extend(["--driver", driver])
driver_opts = vol.get("driver_opts", None) or {}
driver_opts = vol.get("driver_opts", {})
for opt, value in driver_opts.items():
args.extend(["--opt", f"{opt}={value}"])
args.append(vol_name)
@ -430,29 +430,29 @@ async def assert_volume(compose, mount_dict):
def mount_desc_to_mount_args(compose, mount_desc, srv_name, cnt_name): # pylint: disable=unused-argument
mount_type = mount_desc.get("type", None)
vol = mount_desc.get("_vol", None) if mount_type == "volume" else None
source = vol["name"] if vol else mount_desc.get("source", None)
mount_type = mount_desc.get("type")
vol = mount_desc.get("_vol") if mount_type == "volume" else None
source = vol["name"] if vol else mount_desc.get("source")
target = mount_desc["target"]
opts = []
if mount_desc.get(mount_type, None):
# TODO: we might need to add mount_dict[mount_type]["propagation"] = "z"
mount_prop = mount_desc.get(mount_type, {}).get("propagation", None)
mount_prop = mount_desc.get(mount_type, {}).get("propagation")
if mount_prop:
opts.append(f"{mount_type}-propagation={mount_prop}")
if mount_desc.get("read_only", False):
opts.append("ro")
if mount_type == "tmpfs":
tmpfs_opts = mount_desc.get("tmpfs", {})
tmpfs_size = tmpfs_opts.get("size", None)
tmpfs_size = tmpfs_opts.get("size")
if tmpfs_size:
opts.append(f"tmpfs-size={tmpfs_size}")
tmpfs_mode = tmpfs_opts.get("mode", None)
tmpfs_mode = tmpfs_opts.get("mode")
if tmpfs_mode:
opts.append(f"tmpfs-mode={tmpfs_mode}")
if mount_type == "bind":
bind_opts = mount_desc.get("bind", {})
selinux = bind_opts.get("selinux", None)
selinux = bind_opts.get("selinux")
if selinux is not None:
opts.append(selinux)
opts = ",".join(opts)
@ -486,7 +486,7 @@ def container_to_ulimit_args(cnt, podman_args):
def container_to_ulimit_build_args(cnt, podman_args):
build = cnt.get("build", None)
build = cnt.get("build")
if build is not None:
ulimit_to_ulimit_args(build.get("ulimits", []), podman_args)
@ -496,8 +496,8 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin
mount_type = mount_desc["type"]
if mount_type not in ("bind", "volume"):
raise ValueError("unknown mount type:" + mount_type)
vol = mount_desc.get("_vol", None) if mount_type == "volume" else None
source = vol["name"] if vol else mount_desc.get("source", None)
vol = mount_desc.get("_vol") if mount_type == "volume" else None
source = vol["name"] if vol else mount_desc.get("source")
if not source:
raise ValueError(f"missing mount source for {mount_type} on {srv_name}")
target = mount_desc["target"]
@ -517,12 +517,12 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin
# [nosuid|suid]
# [O]
# [U]
read_only = mount_desc.get("read_only", None)
read_only = mount_desc.get("read_only")
if read_only is not None:
opts.append("ro" if read_only else "rw")
if mount_type == "bind":
bind_opts = mount_desc.get("bind", {})
selinux = bind_opts.get("selinux", None)
selinux = bind_opts.get("selinux")
if selinux is not None:
opts.append(selinux)
@ -551,10 +551,10 @@ async def get_mount_args(compose, cnt, volume):
args = volume["target"]
tmpfs_opts = volume.get("tmpfs", {})
opts = []
size = tmpfs_opts.get("size", None)
size = tmpfs_opts.get("size")
if size:
opts.append(f"size={size}")
mode = tmpfs_opts.get("mode", None)
mode = tmpfs_opts.get("mode")
if mode:
opts.append(f"mode={mode}")
if opts:
@ -571,12 +571,12 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
podman_is_building: True if we are preparing arguments for an invocation of "podman build"
False if we are preparing for something else like "podman run"
"""
secret_name = secret if isinstance(secret, str) else secret.get("source", None)
secret_name = secret if isinstance(secret, str) else secret.get("source")
if not secret_name or secret_name not in compose.declared_secrets.keys():
raise ValueError(f'ERROR: undeclared secret: "{secret}", service: {cnt["_service"]}')
declared_secret = compose.declared_secrets[secret_name]
source_file = declared_secret.get("file", None)
source_file = declared_secret.get("file")
dest_file = ""
secret_opts = ""
@ -586,11 +586,11 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
secret_mode = None
secret_type = None
if isinstance(secret, dict):
secret_target = secret.get("target", None)
secret_uid = secret.get("uid", None)
secret_gid = secret.get("gid", None)
secret_mode = secret.get("mode", None)
secret_type = secret.get("type", None)
secret_target = secret.get("target")
secret_uid = secret.get("uid")
secret_gid = secret.get("gid")
secret_mode = secret.get("mode")
secret_type = secret.get("type")
if source_file:
# assemble path for source file first, because we need it for all cases
@ -636,7 +636,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
# since these commands are directly translated to
# podman-create commands, albeit we can only support a 1:1 mapping
# at the moment
if declared_secret.get("external", False) or declared_secret.get("name", None):
if declared_secret.get("external", False) or declared_secret.get("name"):
secret_opts += f",uid={secret_uid}" if secret_uid else ""
secret_opts += f",gid={secret_gid}" if secret_gid else ""
secret_opts += f",mode={secret_mode}" if secret_mode else ""
@ -647,7 +647,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
# for type=mount as well.
# having a custom name for the external secret
# has the same problem as well
ext_name = declared_secret.get("name", None)
ext_name = declared_secret.get("name")
err_str = (
'ERROR: Custom name/target reference "{}" '
'for mounted external secret "{}" is not supported'
@ -680,17 +680,17 @@ def container_to_gpu_res_args(cnt, podman_args):
# https://docs.docker.com/compose/gpu-support/
# https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html
deploy = cnt.get("deploy", None) or {}
res = deploy.get("resources", None) or {}
reservations = res.get("reservations", None) or {}
deploy = cnt.get("deploy", {})
res = deploy.get("resources", {})
reservations = res.get("reservations", {})
devices = reservations.get("devices", [])
gpu_on = False
for device in devices:
driver = device.get("driver", None)
driver = device.get("driver")
if driver is None:
continue
capabilities = device.get("capabilities", None)
capabilities = device.get("capabilities")
if capabilities is None:
continue
@ -730,21 +730,21 @@ def container_to_gpu_res_args(cnt, podman_args):
def container_to_cpu_res_args(cnt, podman_args):
# v2: https://docs.docker.com/compose/compose-file/compose-file-v2/#cpu-and-other-resources
# cpus, cpu_shares, mem_limit, mem_reservation
cpus_limit_v2 = try_float(cnt.get("cpus", None), None)
cpu_shares_v2 = try_int(cnt.get("cpu_shares", None), None)
mem_limit_v2 = cnt.get("mem_limit", None)
mem_res_v2 = cnt.get("mem_reservation", None)
cpus_limit_v2 = try_float(cnt.get("cpus"), None)
cpu_shares_v2 = try_int(cnt.get("cpu_shares"), None)
mem_limit_v2 = cnt.get("mem_limit")
mem_res_v2 = cnt.get("mem_reservation")
# v3: https://docs.docker.com/compose/compose-file/compose-file-v3/#resources
# spec: https://github.com/compose-spec/compose-spec/blob/master/deploy.md#resources
# deploy.resources.{limits,reservations}.{cpus, memory}
deploy = cnt.get("deploy", None) or {}
res = deploy.get("resources", None) or {}
limits = res.get("limits", None) or {}
cpus_limit_v3 = try_float(limits.get("cpus", None), None)
mem_limit_v3 = limits.get("memory", None)
reservations = res.get("reservations", None) or {}
deploy = cnt.get("deploy", {})
res = deploy.get("resources", {})
limits = res.get("limits", {})
cpus_limit_v3 = try_float(limits.get("cpus"), None)
mem_limit_v3 = limits.get("memory")
reservations = res.get("reservations", {})
# cpus_res_v3 = try_float(reservations.get('cpus', None), None)
mem_res_v3 = reservations.get("memory", None)
mem_res_v3 = reservations.get("memory")
# add args
cpus = cpus_limit_v3 or cpus_limit_v2
if cpus:
@ -773,10 +773,10 @@ def container_to_cpu_res_args(cnt, podman_args):
def port_dict_to_str(port_desc):
# NOTE: `mode: host|ingress` is ignored
cnt_port = port_desc.get("target", None)
published = port_desc.get("published", None) or ""
host_ip = port_desc.get("host_ip", None)
protocol = port_desc.get("protocol", None) or "tcp"
cnt_port = port_desc.get("target")
published = port_desc.get("published", "")
host_ip = port_desc.get("host_ip")
protocol = port_desc.get("protocol", "tcp")
if not cnt_port:
raise ValueError("target container port must be specified")
if host_ip:
@ -814,31 +814,31 @@ def get_network_create_args(net_desc, proj_name, net_name):
f"com.docker.compose.project={proj_name}",
]
# TODO: add more options here, like dns, ipv6, etc.
labels = net_desc.get("labels", None) or []
labels = net_desc.get("labels", [])
for item in norm_as_list(labels):
args.extend(["--label", item])
if net_desc.get("internal", None):
if net_desc.get("internal"):
args.append("--internal")
driver = net_desc.get("driver", None)
driver = net_desc.get("driver")
if driver:
args.extend(("--driver", driver))
driver_opts = net_desc.get("driver_opts", None) or {}
driver_opts = net_desc.get("driver_opts", {})
for key, value in driver_opts.items():
args.extend(("--opt", f"{key}={value}"))
ipam = net_desc.get("ipam", None) or {}
ipam_driver = ipam.get("driver", None)
ipam = net_desc.get("ipam", {})
ipam_driver = ipam.get("driver")
if ipam_driver and ipam_driver != "default":
args.extend(("--ipam-driver", ipam_driver))
ipam_config_ls = ipam.get("config", None) or []
if net_desc.get("enable_ipv6", None):
ipam_config_ls = ipam.get("config", [])
if net_desc.get("enable_ipv6"):
args.append("--ipv6")
if isinstance(ipam_config_ls, dict):
ipam_config_ls = [ipam_config_ls]
for ipam_config in ipam_config_ls:
subnet = ipam_config.get("subnet", None)
ip_range = ipam_config.get("ip_range", None)
gateway = ipam_config.get("gateway", None)
subnet = ipam_config.get("subnet")
ip_range = ipam_config.get("ip_range")
gateway = ipam_config.get("gateway")
if subnet:
args.extend(("--subnet", subnet))
if ip_range:
@ -854,19 +854,19 @@ async def assert_cnt_nets(compose, cnt):
"""
create missing networks
"""
net = cnt.get("network_mode", None)
net = cnt.get("network_mode")
if net and not net.startswith("bridge"):
return
cnt_nets = cnt.get("networks", None)
cnt_nets = cnt.get("networks")
if cnt_nets and isinstance(cnt_nets, dict):
cnt_nets = list(cnt_nets.keys())
cnt_nets = norm_as_list(cnt_nets or compose.default_net)
for net in cnt_nets:
net_desc = compose.networks[net] or {}
is_ext = net_desc.get("external", None)
is_ext = net_desc.get("external")
ext_desc = is_ext if isinstance(is_ext, dict) else {}
default_net_name = default_network_name_for_project(compose, net, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name
try:
await compose.podman.output([], "network", ["exists", net_name])
except subprocess.CalledProcessError as e:
@ -881,8 +881,8 @@ def get_net_args(compose, cnt):
service_name = cnt["service_name"]
net_args = []
is_bridge = False
mac_address = cnt.get("mac_address", None)
net = cnt.get("network_mode", None)
mac_address = cnt.get("mac_address")
net = cnt.get("network_mode")
if net:
if net == "none":
is_bridge = False
@ -911,7 +911,7 @@ def get_net_args(compose, cnt):
sys.exit(1)
else:
is_bridge = True
cnt_nets = cnt.get("networks", None)
cnt_nets = cnt.get("networks")
aliases = [service_name]
# NOTE: from podman manpage:
@ -921,23 +921,23 @@ def get_net_args(compose, cnt):
ip = None
ip6 = None
ip_assignments = 0
if cnt.get("_aliases", None):
aliases.extend(cnt.get("_aliases", None))
if cnt.get("_aliases"):
aliases.extend(cnt.get("_aliases"))
if cnt_nets and isinstance(cnt_nets, dict):
prioritized_cnt_nets = []
# cnt_nets is {net_key: net_value, ...}
for net_key, net_value in cnt_nets.items():
net_value = net_value or {}
aliases.extend(norm_as_list(net_value.get("aliases", None)))
if net_value.get("ipv4_address", None) is not None:
aliases.extend(norm_as_list(net_value.get("aliases")))
if net_value.get("ipv4_address") is not None:
ip_assignments = ip_assignments + 1
if net_value.get("ipv6_address", None) is not None:
if net_value.get("ipv6_address") is not None:
ip_assignments = ip_assignments + 1
if not ip:
ip = net_value.get("ipv4_address", None)
ip = net_value.get("ipv4_address")
if not ip6:
ip6 = net_value.get("ipv6_address", None)
ip6 = net_value.get("ipv6_address")
net_priority = net_value.get("priority", 0)
prioritized_cnt_nets.append((
net_priority,
@ -950,10 +950,10 @@ def get_net_args(compose, cnt):
net_names = []
for net in cnt_nets:
net_desc = compose.networks[net] or {}
is_ext = net_desc.get("external", None)
is_ext = net_desc.get("external")
ext_desc = is_ext if isinstance(is_ext, str) else {}
default_net_name = default_network_name_for_project(compose, net, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name
net_names.append(net_name)
net_names_str = ",".join(net_names)
@ -963,7 +963,7 @@ def get_net_args(compose, cnt):
# podman currently ignores this if a per-container network-alias is set; as pdoman-compose
# always sets a network-alias to the container name, is currently doesn't make sense to
# implement this.
multiple_nets = cnt.get("networks", None)
multiple_nets = cnt.get("networks")
if multiple_nets and len(multiple_nets) > 1:
# networks can be specified as a dict with config per network or as a plain list without
# config. Support both cases by converting the plain list to a dict with empty config.
@ -976,7 +976,7 @@ def get_net_args(compose, cnt):
# specified on the network level as well
if mac_address is not None:
for net_config_ in multiple_nets.values():
network_mac = net_config_.get("x-podman.mac_address", None)
network_mac = net_config_.get("x-podman.mac_address")
if network_mac is not None:
raise RuntimeError(
f"conflicting mac addresses {mac_address} and {network_mac}:"
@ -986,15 +986,15 @@ def get_net_args(compose, cnt):
for net_, net_config_ in multiple_nets.items():
net_desc = compose.networks[net_] or {}
is_ext = net_desc.get("external", None)
is_ext = net_desc.get("external")
ext_desc = is_ext if isinstance(is_ext, str) else {}
default_net_name = default_network_name_for_project(compose, net_, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name
ipv4 = net_config_.get("ipv4_address", None)
ipv6 = net_config_.get("ipv6_address", None)
ipv4 = net_config_.get("ipv4_address")
ipv6 = net_config_.get("ipv6_address")
# custom extension; not supported by docker-compose v3
mac = net_config_.get("x-podman.mac_address", None)
mac = net_config_.get("x-podman.mac_address")
# if a mac_address was specified on the container level, apply it to the first network
# This works for Python > 3.6, because dict insert ordering is preserved, so we are
@ -1039,7 +1039,7 @@ def get_net_args(compose, cnt):
async def container_to_args(compose, cnt, detached=True):
# TODO: double check -e , --add-host, -v, --read-only
dirname = compose.dirname
pod = cnt.get("pod", None) or ""
pod = cnt.get("pod", "")
name = cnt["name"]
podman_args = [f"--name={name}"]
@ -1049,20 +1049,20 @@ async def container_to_args(compose, cnt, detached=True):
if pod:
podman_args.append(f"--pod={pod}")
deps = []
for dep_srv in cnt.get("_deps", None) or []:
deps.extend(compose.container_names_by_service.get(dep_srv.name, None) or [])
for dep_srv in cnt.get("_deps", []):
deps.extend(compose.container_names_by_service.get(dep_srv.name, []))
if deps:
deps_csv = ",".join(deps)
podman_args.append(f"--requires={deps_csv}")
sec = norm_as_list(cnt.get("security_opt", None))
sec = norm_as_list(cnt.get("security_opt"))
for sec_item in sec:
podman_args.extend(["--security-opt", sec_item])
ann = norm_as_list(cnt.get("annotations", None))
ann = norm_as_list(cnt.get("annotations"))
for a in ann:
podman_args.extend(["--annotation", a])
if cnt.get("read_only", None):
if cnt.get("read_only"):
podman_args.append("--read-only")
if cnt.get("http_proxy", None) is False:
if cnt.get("http_proxy") is False:
podman_args.append("--http-proxy=false")
for i in cnt.get("labels", []):
podman_args.extend(["--label", i])
@ -1074,11 +1074,11 @@ async def container_to_args(compose, cnt, detached=True):
podman_args.extend(["--group-add", item])
for item in cnt.get("devices", []):
podman_args.extend(["--device", item])
for item in norm_as_list(cnt.get("dns", None)):
for item in norm_as_list(cnt.get("dns")):
podman_args.extend(["--dns", item])
for item in norm_as_list(cnt.get("dns_opt", None)):
for item in norm_as_list(cnt.get("dns_opt")):
podman_args.extend(["--dns-opt", item])
for item in norm_as_list(cnt.get("dns_search", None)):
for item in norm_as_list(cnt.get("dns_search")):
podman_args.extend(["--dns-search", item])
env_file = cnt.get("env_file", [])
if isinstance(env_file, (dict, str)):
@ -1112,10 +1112,10 @@ async def container_to_args(compose, cnt, detached=True):
await assert_cnt_nets(compose, cnt)
podman_args.extend(get_net_args(compose, cnt))
log_config = cnt.get("logging", None)
log_config = cnt.get("logging")
if log_config is not None:
podman_args.append(f'--log-driver={log_config.get("driver", "k8s-file")}')
log_opts = log_config.get("options") or {}
log_opts = log_config.get("options", {})
podman_args += [f"--log-opt={name}={value}" for name, value in log_opts.items()]
for secret in cnt.get("secrets", []):
podman_args.extend(get_secret_args(compose, cnt, secret))
@ -1123,9 +1123,9 @@ async def container_to_args(compose, cnt, detached=True):
podman_args.extend(["--add-host", i])
for i in cnt.get("expose", []):
podman_args.extend(["--expose", i])
if cnt.get("publishall", None):
if cnt.get("publishall"):
podman_args.append("-P")
ports = cnt.get("ports", None) or []
ports = cnt.get("ports", [])
if isinstance(ports, str):
ports = [ports]
for port in ports:
@ -1135,22 +1135,22 @@ async def container_to_args(compose, cnt, detached=True):
raise TypeError("port should be either string or dict")
podman_args.extend(["-p", port])
userns_mode = cnt.get("userns_mode", None)
userns_mode = cnt.get("userns_mode")
if userns_mode is not None:
podman_args.extend(["--userns", userns_mode])
user = cnt.get("user", None)
user = cnt.get("user")
if user is not None:
podman_args.extend(["-u", user])
if cnt.get("working_dir", None) is not None:
if cnt.get("working_dir") is not None:
podman_args.extend(["-w", cnt["working_dir"]])
if cnt.get("hostname", None):
if cnt.get("hostname"):
podman_args.extend(["--hostname", cnt["hostname"]])
if cnt.get("shm_size", None):
if cnt.get("shm_size"):
podman_args.extend(["--shm-size", str(cnt["shm_size"])])
if cnt.get("stdin_open", None):
if cnt.get("stdin_open"):
podman_args.append("-i")
if cnt.get("stop_signal", None):
if cnt.get("stop_signal"):
podman_args.extend(["--stop-signal", cnt["stop_signal"]])
sysctls = cnt.get("sysctls")
@ -1164,41 +1164,41 @@ async def container_to_args(compose, cnt, detached=True):
else:
raise TypeError("sysctls should be either dict or list")
if cnt.get("tty", None):
if cnt.get("tty"):
podman_args.append("--tty")
if cnt.get("privileged", None):
if cnt.get("privileged"):
podman_args.append("--privileged")
if cnt.get("pid", None):
if cnt.get("pid"):
podman_args.extend(["--pid", cnt["pid"]])
pull_policy = cnt.get("pull_policy", None)
pull_policy = cnt.get("pull_policy")
if pull_policy is not None and pull_policy != "build":
podman_args.extend(["--pull", pull_policy])
if cnt.get("restart", None) is not None:
if cnt.get("restart") is not None:
podman_args.extend(["--restart", cnt["restart"]])
container_to_ulimit_args(cnt, podman_args)
container_to_res_args(cnt, podman_args)
# currently podman shipped by fedora does not package this
if cnt.get("init", None):
if cnt.get("init"):
podman_args.append("--init")
if cnt.get("init-path", None):
if cnt.get("init-path"):
podman_args.extend(["--init-path", cnt["init-path"]])
entrypoint = cnt.get("entrypoint", None)
entrypoint = cnt.get("entrypoint")
if entrypoint is not None:
if isinstance(entrypoint, str):
entrypoint = shlex.split(entrypoint)
podman_args.extend(["--entrypoint", json.dumps(entrypoint)])
platform = cnt.get("platform", None)
platform = cnt.get("platform")
if platform is not None:
podman_args.extend(["--platform", platform])
if cnt.get("runtime", None):
if cnt.get("runtime"):
podman_args.extend(["--runtime", cnt["runtime"]])
# WIP: healthchecks are still work in progress
healthcheck = cnt.get("healthcheck", None) or {}
healthcheck = cnt.get("healthcheck", {})
if not isinstance(healthcheck, dict):
raise ValueError("'healthcheck' must be a key-value mapping")
healthcheck_disable = healthcheck.get("disable", False)
healthcheck_test = healthcheck.get("test", None)
healthcheck_test = healthcheck.get("test")
if healthcheck_disable:
healthcheck_test = ["NONE"]
if healthcheck_test:
@ -1257,7 +1257,7 @@ async def container_to_args(compose, cnt, detached=True):
podman_args.extend(["--gidmap", gidmap])
if cnt.get("x-podman.no_hosts", False):
podman_args.extend(["--no-hosts"])
rootfs = cnt.get('x-podman.rootfs', None)
rootfs = cnt.get('x-podman.rootfs')
if rootfs is not None:
rootfs_mode = True
podman_args.extend(["--rootfs", rootfs])
@ -1265,7 +1265,7 @@ async def container_to_args(compose, cnt, detached=True):
if not rootfs_mode:
podman_args.append(cnt["image"]) # command, ..etc.
command = cnt.get("command", None)
command = cnt.get("command")
if command is not None:
if isinstance(command, str):
podman_args.extend(shlex.split(command))
@ -1341,7 +1341,7 @@ def rec_deps(services, service_name, start_point=None):
# avoid A depens on A
if dep_name.name == service_name:
continue
dep_srv = services.get(dep_name.name, None)
dep_srv = services.get(dep_name.name)
if not dep_srv:
continue
# NOTE: avoid creating loops, A->B->A
@ -1362,7 +1362,7 @@ def flat_deps(services, with_extends=False):
srv["_deps"] = deps
# TODO: manage properly the dependencies coming from base services when extended
if with_extends:
ext = srv.get("extends", {}).get("service", None)
ext = srv.get("extends", {}).get("service")
if ext:
if ext != name:
deps.add(ServiceDependency(ext, "service_started"))
@ -1374,7 +1374,7 @@ def flat_deps(services, with_extends=False):
deps_ls = [ServiceDependency(k, v["condition"]) for k, v in deps_ls.items()]
deps.update(deps_ls)
# parse link to get service name and remove alias
links_ls = srv.get("links", None) or []
links_ls = srv.get("links", [])
if not is_list(links_ls):
links_ls = [links_ls]
deps.update([ServiceDependency(c.split(":")[0], "service_started") for c in links_ls])
@ -1557,7 +1557,7 @@ def normalize_service(service, sub_dir=""):
service["build"] = {"context": build}
if sub_dir and "build" in service:
build = service["build"]
context = build.get("context", None) or ""
context = build.get("context", "")
if context or sub_dir:
if context.startswith("./"):
context = context[2:]
@ -1616,7 +1616,7 @@ def normalize(compose):
"""
convert compose dict of some keys from string or dicts into arrays
"""
services = compose.get("services", None) or {}
services = compose.get("services", {})
for service in services.values():
normalize_service(service)
return compose
@ -1634,7 +1634,7 @@ def normalize_service_final(service: dict, project_dir: str) -> dict:
def normalize_final(compose: dict, project_dir: str) -> dict:
services = compose.get("services", None) or {}
services = compose.get("services", {})
for service in services.values():
normalize_service_final(service, project_dir)
return compose
@ -1701,10 +1701,10 @@ def resolve_extends(services, service_names, environ):
ext = service.get("extends", {})
if isinstance(ext, str):
ext = {"service": ext}
from_service_name = ext.get("service", None)
from_service_name = ext.get("service")
if not from_service_name:
continue
filename = ext.get("file", None)
filename = ext.get("file")
if filename:
if filename.startswith("./"):
filename = filename[2:]
@ -1798,7 +1798,7 @@ class PodmanCompose:
for args in self.global_args.podman_args:
xargs.extend(shlex.split(args))
cmd_norm = cmd if cmd != "create" else "run"
cmd_args = self.global_args.__dict__.get(f"podman_{cmd_norm}_args", None) or []
cmd_args = self.global_args.__dict__.get(f"podman_{cmd_norm}_args", [])
for args in cmd_args:
xargs.extend(shlex.split(args))
return xargs
@ -1850,12 +1850,12 @@ class PodmanCompose:
def _parse_compose_file(self):
args = self.global_args
# cmd = args.command
dirname = os.environ.get("COMPOSE_PROJECT_DIR", None)
dirname = os.environ.get("COMPOSE_PROJECT_DIR")
if dirname and os.path.isdir(dirname):
os.chdir(dirname)
pathsep = os.environ.get("COMPOSE_PATH_SEPARATOR", None) or os.pathsep
pathsep = os.environ.get("COMPOSE_PATH_SEPARATOR", os.pathsep)
if not args.file:
default_str = os.environ.get("COMPOSE_FILE", None)
default_str = os.environ.get("COMPOSE_FILE")
if default_str:
default_ls = default_str.split(pathsep)
else:
@ -1943,7 +1943,7 @@ class PodmanCompose:
content = rec_subs(content, self.environ)
rec_merge(compose, content)
# If `include` is used, append included files to files
include = compose.get("include", None)
include = compose.get("include")
if include:
files.extend(include)
# As compose obj is updated and tested with every loop, not deleting `include`
@ -1963,16 +1963,14 @@ class PodmanCompose:
# debug mode
if len(files) > 1:
log.debug(" ** merged:\n%s", json.dumps(compose, indent=2))
# ver = compose.get('version', None)
# ver = compose.get('version')
if not project_name:
project_name = compose.get("name", None)
project_name = compose.get("name")
if project_name is None:
# More strict then actually needed for simplicity:
# podman requires [a-zA-Z0-9][a-zA-Z0-9_.-]*
project_name = (
self.environ.get("COMPOSE_PROJECT_NAME", None) or dir_basename.lower()
)
project_name = self.environ.get("COMPOSE_PROJECT_NAME", dir_basename.lower())
project_name = norm_re.sub("", project_name)
if not project_name:
raise RuntimeError(f"Project name [{dir_basename}] normalized to empty")
@ -1980,7 +1978,7 @@ class PodmanCompose:
self.project_name = project_name
self.environ.update({"COMPOSE_PROJECT_NAME": self.project_name})
services = compose.get("services", None)
services = compose.get("services")
if services is None:
services = {}
log.warning("WARNING: No services defined")
@ -1995,7 +1993,7 @@ class PodmanCompose:
flat_deps(services)
service_names = sorted([(len(srv["_deps"]), name) for name, srv in services.items()])
service_names = [name for _, name in service_names]
nets = compose.get("networks", None) or {}
nets = compose.get("networks", {})
if not nets:
nets["default"] = None
self.networks = nets
@ -2007,7 +2005,7 @@ class PodmanCompose:
self.default_net = None
allnets = set()
for name, srv in services.items():
srv_nets = srv.get("networks", None) or self.default_net
srv_nets = srv.get("networks", self.default_net)
srv_nets = (
list(srv_nets.keys()) if isinstance(srv_nets, dict) else norm_as_list(srv_nets)
)
@ -2057,12 +2055,12 @@ class PodmanCompose:
"service_name": service_name,
**service_desc,
}
x_podman = service_desc.get("x-podman", None)
rootfs_mode = x_podman is not None and x_podman.get("rootfs", None) is not None
x_podman = service_desc.get("x-podman")
rootfs_mode = x_podman is not None and x_podman.get("rootfs") is not None
if "image" not in cnt and not rootfs_mode:
cnt["image"] = f"{project_name}_{service_name}"
labels = norm_as_list(cnt.get("labels", None))
cnt["ports"] = norm_ports(cnt.get("ports", None))
labels = norm_as_list(cnt.get("labels"))
cnt["ports"] = norm_ports(cnt.get("ports"))
labels.extend(podman_compose_labels)
labels.extend([
f"com.docker.compose.container-number={num}",
@ -2072,11 +2070,11 @@ class PodmanCompose:
cnt["_service"] = service_name
cnt["_project"] = project_name
given_containers.append(cnt)
volumes = cnt.get("volumes", None) or []
volumes = cnt.get("volumes", [])
for volume in volumes:
mnt_dict = get_mnt_dict(self, cnt, volume)
if (
mnt_dict.get("type", None) == "volume"
mnt_dict.get("type") == "volume"
and mnt_dict["source"]
and mnt_dict["source"] not in self.vols
):
@ -2087,7 +2085,7 @@ class PodmanCompose:
container_by_name = {c["name"]: c for c in given_containers}
# log("deps:", [(c["name"], c["_deps"]) for c in given_containers])
given_containers = list(container_by_name.values())
given_containers.sort(key=lambda c: len(c.get("_deps", None) or []))
given_containers.sort(key=lambda c: len(c.get("_deps", [])))
# log("sorted:", [c["name"] for c in given_containers])
self.x_podman = compose.get("x-podman", {})
@ -2452,7 +2450,7 @@ async def build_one(compose, args, cnt):
if not hasattr(build_desc, "items"):
build_desc = {"context": build_desc}
ctx = build_desc.get("context", ".")
dockerfile = build_desc.get("dockerfile", None)
dockerfile = build_desc.get("dockerfile")
if dockerfile:
dockerfile = os.path.join(ctx, dockerfile)
else:
@ -2548,7 +2546,7 @@ async def create_pods(compose, args): # pylint: disable=unused-argument
podman_args.extend(shlex.split(args.pod_args))
# if compose.podman_version and not strverscmp_lt(compose.podman_version, "3.4.0"):
# podman_args.append("--infra-name={}_infra".format(pod["name"]))
ports = pod.get("ports", None) or []
ports = pod.get("ports", [])
if isinstance(ports, str):
ports = [ports]
for i in ports:
@ -2669,7 +2667,7 @@ async def compose_up(compose: PodmanCompose, args):
# TODO: handle already existing
# TODO: if error creating do not enter loop
# TODO: colors if sys.stdout.isatty()
exit_code_from = args.__dict__.get("exit_code_from", None)
exit_code_from = args.__dict__.get("exit_code_from")
if exit_code_from:
args.abort_on_container_exit = True
@ -2747,7 +2745,7 @@ def get_volume_names(compose, cnt):
mount_type = volume["type"]
if mount_type != "volume":
continue
volume_name = (volume.get("_vol", None) or {}).get("name", None)
volume_name = volume.get("_vol", {}).get("name")
ls.append(volume_name)
return ls
@ -2767,7 +2765,7 @@ async def compose_down(compose: PodmanCompose, args):
podman_stop_args = [*podman_args]
timeout = timeout_global
if timeout is None:
timeout_str = cnt.get("stop_grace_period", None) or STOP_GRACE_PERIOD
timeout_str = cnt.get("stop_grace_period", STOP_GRACE_PERIOD)
timeout = str_to_seconds(timeout_str)
if timeout is not None:
podman_stop_args.extend(["-t", str(timeout)])
@ -2912,7 +2910,7 @@ def compose_run_update_container_from_args(compose, cnt, args):
cnt["ports"] = ports
if args.volume:
# TODO: handle volumes
volumes = clone(cnt.get("volumes", None) or [])
volumes = clone(cnt.get("volumes", []))
volumes.extend(args.volume)
cnt["volumes"] = volumes
cnt["tty"] = not args.T
@ -2978,9 +2976,8 @@ async def transfer_service_status(compose, args, action):
if action != "start":
timeout = timeout_global
if timeout is None:
timeout_str = (
compose.container_by_name[target].get("stop_grace_period", None)
or STOP_GRACE_PERIOD
timeout_str = compose.container_by_name[target].get(
"stop_grace_period", STOP_GRACE_PERIOD
)
timeout = str_to_seconds(timeout_str)
if timeout is not None: