Remove is_dict,is_str for better pyright inference

When support for python3.8 and python3.9 has been dropped, it will be possible to eat the cak and
have it due to PEP-647.

Signed-off-by: legobt <6wbvkn0j@anonaddy.me>
This commit is contained in:
legobt 2024-07-31 06:29:55 +00:00
parent fc90f60bf1
commit 462603383c
No known key found for this signature in database
GPG Key ID: C5F28D4B2B8927AB

View File

@ -42,16 +42,12 @@ script = os.path.realpath(sys.argv[0])
# helper functions
def is_str(string_object):
return isinstance(string_object, str)
def is_dict(dict_object):
return isinstance(dict_object, dict)
def is_list(list_object):
return not is_str(list_object) and not is_dict(list_object) and hasattr(list_object, "__iter__")
return (
not isinstance(list_object, str)
and not isinstance(list_object, dict)
and hasattr(list_object, "__iter__")
)
# identity filter
@ -263,8 +259,8 @@ def rec_subs(value, subs_dict):
"""
do bash-like substitution in value and if list of dictionary do that recursively
"""
if is_dict(value):
if 'environment' in value and is_dict(value['environment']):
if isinstance(value, dict):
if 'environment' in value and isinstance(value['environment'], dict):
# Load service's environment variables
subs_dict = subs_dict.copy()
svc_envs = {k: v for k, v in value['environment'].items() if k not in subs_dict}
@ -275,7 +271,7 @@ def rec_subs(value, subs_dict):
subs_dict.update(svc_envs)
value = {k: rec_subs(v, subs_dict) for k, v in value.items()}
elif is_str(value):
elif isinstance(value, str):
def convert(m):
if m.group("escaped") is not None:
@ -303,7 +299,7 @@ def norm_as_list(src):
"""
if src is None:
dst = []
elif is_dict(src):
elif isinstance(src, dict):
dst = [(f"{k}={v}" if v is not None else k) for k, v in src.items()]
elif is_list(src):
dst = list(src)
@ -319,13 +315,13 @@ def norm_as_dict(src):
"""
if src is None:
dst = {}
elif is_dict(src):
elif isinstance(src, dict):
dst = dict(src)
elif is_list(src):
dst = [i.split("=", 1) for i in src if i]
dst = [(a if len(a) == 2 else (a[0], None)) for a in dst]
dst = dict(dst)
elif is_str(src):
elif isinstance(src, str):
key, value = src.split("=", 1) if "=" in src else (src, None)
dst = {key: value}
else:
@ -334,7 +330,7 @@ def norm_as_dict(src):
def norm_ulimit(inner_value):
if is_dict(inner_value):
if isinstance(inner_value, dict):
if not inner_value.keys() & {"soft", "hard"}:
raise ValueError("expected at least one soft or hard limit")
soft = inner_value.get("soft", inner_value.get("hard", None))
@ -468,7 +464,7 @@ def mount_desc_to_mount_args(compose, mount_desc, srv_name, cnt_name): # pylint
def ulimit_to_ulimit_args(ulimit, podman_args):
if ulimit is not None:
# ulimit can be a single value, i.e. ulimit: host
if is_str(ulimit):
if isinstance(ulimit, str):
podman_args.extend(["--ulimit", ulimit])
# or a dictionary or list:
else:
@ -535,7 +531,7 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin
def get_mnt_dict(compose, cnt, volume):
srv_name = cnt["_service"]
basedir = compose.dirname
if is_str(volume):
if isinstance(volume, str):
volume = parse_short_mount(volume, basedir)
return fix_mount_dict(compose, volume, srv_name)
@ -571,7 +567,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
podman_is_building: True if we are preparing arguments for an invocation of "podman build"
False if we are preparing for something else like "podman run"
"""
secret_name = secret if is_str(secret) else secret.get("source", None)
secret_name = secret if isinstance(secret, str) else secret.get("source", None)
if not secret_name or secret_name not in compose.declared_secrets.keys():
raise ValueError(f'ERROR: undeclared secret: "{secret}", service: {cnt["_service"]}')
declared_secret = compose.declared_secrets[secret_name]
@ -580,11 +576,11 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False):
dest_file = ""
secret_opts = ""
secret_target = None if is_str(secret) else secret.get("target", None)
secret_uid = None if is_str(secret) else secret.get("uid", None)
secret_gid = None if is_str(secret) else secret.get("gid", None)
secret_mode = None if is_str(secret) else secret.get("mode", None)
secret_type = None if is_str(secret) else secret.get("type", None)
secret_target = None if isinstance(secret, str) else secret.get("target", None)
secret_uid = None if isinstance(secret, str) else secret.get("uid", None)
secret_gid = None if isinstance(secret, str) else secret.get("gid", None)
secret_mode = None if isinstance(secret, str) else secret.get("mode", None)
secret_type = None if isinstance(secret, str) else secret.get("type", None)
if source_file:
# assemble path for source file first, because we need it for all cases
@ -827,7 +823,7 @@ def get_network_create_args(net_desc, proj_name, net_name):
if net_desc.get("enable_ipv6", None):
args.append("--ipv6")
if is_dict(ipam_config_ls):
if isinstance(ipam_config_ls, dict):
ipam_config_ls = [ipam_config_ls]
for ipam_config in ipam_config_ls:
subnet = ipam_config.get("subnet", None)
@ -852,13 +848,13 @@ async def assert_cnt_nets(compose, cnt):
if net and not net.startswith("bridge"):
return
cnt_nets = cnt.get("networks", None)
if cnt_nets and is_dict(cnt_nets):
if cnt_nets and isinstance(cnt_nets, dict):
cnt_nets = list(cnt_nets.keys())
cnt_nets = norm_as_list(cnt_nets or compose.default_net)
for net in cnt_nets:
net_desc = compose.networks[net] or {}
is_ext = net_desc.get("external", None)
ext_desc = is_ext if is_dict(is_ext) else {}
ext_desc = is_ext if isinstance(is_ext, dict) else {}
default_net_name = default_network_name_for_project(compose, net, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
try:
@ -917,7 +913,7 @@ def get_net_args(compose, cnt):
ip_assignments = 0
if cnt.get("_aliases", None):
aliases.extend(cnt.get("_aliases", None))
if cnt_nets and is_dict(cnt_nets):
if cnt_nets and isinstance(cnt_nets, dict):
prioritized_cnt_nets = []
# cnt_nets is {net_key: net_value, ...}
for net_key, net_value in cnt_nets.items():
@ -945,7 +941,7 @@ def get_net_args(compose, cnt):
for net in cnt_nets:
net_desc = compose.networks[net] or {}
is_ext = net_desc.get("external", None)
ext_desc = is_ext if is_dict(is_ext) else {}
ext_desc = is_ext if isinstance(is_ext, str) else {}
default_net_name = default_network_name_for_project(compose, net, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
net_names.append(net_name)
@ -981,7 +977,7 @@ def get_net_args(compose, cnt):
for net_, net_config_ in multiple_nets.items():
net_desc = compose.networks[net_] or {}
is_ext = net_desc.get("external", None)
ext_desc = is_ext if is_dict(is_ext) else {}
ext_desc = is_ext if isinstance(is_ext, str) else {}
default_net_name = default_network_name_for_project(compose, net_, is_ext)
net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name
@ -1075,10 +1071,10 @@ async def container_to_args(compose, cnt, detached=True):
for item in norm_as_list(cnt.get("dns_search", None)):
podman_args.extend(["--dns-search", item])
env_file = cnt.get("env_file", [])
if is_str(env_file) or is_dict(env_file):
if isinstance(env_file, (dict, str)):
env_file = [env_file]
for i in env_file:
if is_str(i):
if isinstance(i, str):
i = {"path": i}
path = i["path"]
required = i.get("required", True)
@ -1096,7 +1092,7 @@ async def container_to_args(compose, cnt, detached=True):
for e in env:
podman_args.extend(["-e", e])
tmpfs_ls = cnt.get("tmpfs", [])
if is_str(tmpfs_ls):
if isinstance(tmpfs_ls, str):
tmpfs_ls = [tmpfs_ls]
for i in tmpfs_ls:
podman_args.extend(["--tmpfs", i])
@ -1178,7 +1174,7 @@ async def container_to_args(compose, cnt, detached=True):
podman_args.extend(["--init-path", cnt["init-path"]])
entrypoint = cnt.get("entrypoint", None)
if entrypoint is not None:
if is_str(entrypoint):
if isinstance(entrypoint, str):
entrypoint = shlex.split(entrypoint)
podman_args.extend(["--entrypoint", json.dumps(entrypoint)])
platform = cnt.get("platform", None)
@ -1189,7 +1185,7 @@ async def container_to_args(compose, cnt, detached=True):
# WIP: healthchecks are still work in progress
healthcheck = cnt.get("healthcheck", None) or {}
if not is_dict(healthcheck):
if not isinstance(healthcheck, dict):
raise ValueError("'healthcheck' must be a key-value mapping")
healthcheck_disable = healthcheck.get("disable", False)
healthcheck_test = healthcheck.get("test", None)
@ -1197,7 +1193,7 @@ async def container_to_args(compose, cnt, detached=True):
healthcheck_test = ["NONE"]
if healthcheck_test:
# If it's a string, it's equivalent to specifying CMD-SHELL
if is_str(healthcheck_test):
if isinstance(healthcheck_test, str):
# podman does not add shell to handle command with whitespace
podman_args.extend([
"--healthcheck-command",
@ -1259,7 +1255,7 @@ async def container_to_args(compose, cnt, detached=True):
podman_args.append(cnt["image"]) # command, ..etc.
command = cnt.get("command", None)
if command is not None:
if is_str(command):
if isinstance(command, str):
podman_args.extend(shlex.split(command))
else:
podman_args.extend([str(i) for i in command])
@ -1302,9 +1298,9 @@ def flat_deps(services, with_extends=False):
deps.add(ext)
continue
deps_ls = srv.get("depends_on", None) or []
if is_str(deps_ls):
if isinstance(deps_ls, str):
deps_ls = [deps_ls]
elif is_dict(deps_ls):
elif isinstance(deps_ls, dict):
deps_ls = list(deps_ls.keys())
deps.update(deps_ls)
# parse link to get service name and remove alias
@ -1467,7 +1463,7 @@ class Podman:
def normalize_service(service, sub_dir=""):
if "build" in service:
build = service["build"]
if is_str(build):
if isinstance(build, str):
service["build"] = {"context": build}
if sub_dir and "build" in service:
build = service["build"]
@ -1482,19 +1478,19 @@ def normalize_service(service, sub_dir=""):
context = "."
service["build"]["context"] = context
if "build" in service and "additional_contexts" in service["build"]:
if is_dict(build["additional_contexts"]):
if isinstance(build["additional_contexts"], dict):
new_additional_contexts = []
for k, v in build["additional_contexts"].items():
new_additional_contexts.append(f"{k}={v}")
build["additional_contexts"] = new_additional_contexts
for key in ("command", "entrypoint"):
if key in service:
if is_str(service[key]):
if isinstance(service[key], str):
service[key] = shlex.split(service[key])
for key in ("env_file", "security_opt", "volumes"):
if key not in service:
continue
if is_str(service[key]):
if isinstance(service[key], str):
service[key] = [service[key]]
if "security_opt" in service:
sec_ls = service["security_opt"]
@ -1507,12 +1503,12 @@ def normalize_service(service, sub_dir=""):
service[key] = norm_as_dict(service[key])
if "extends" in service:
extends = service["extends"]
if is_str(extends):
if isinstance(extends, str):
extends = {"service": extends}
service["extends"] = extends
if "depends_on" in service:
deps = service["depends_on"]
if is_str(deps):
if isinstance(deps, str):
deps = [deps]
if is_list(deps):
deps_dict = {}
@ -1535,9 +1531,9 @@ def normalize(compose):
def normalize_service_final(service: dict, project_dir: str) -> dict:
if "build" in service:
build = service["build"]
context = build if is_str(build) else build.get("context", ".")
context = build if isinstance(build, str) else build.get("context", ".")
context = os.path.normpath(os.path.join(project_dir, context))
if not is_dict(service["build"]):
if not isinstance(service["build"], dict):
service["build"] = {}
service["build"]["context"] = context
return service
@ -1551,7 +1547,7 @@ def normalize_final(compose: dict, project_dir: str) -> dict:
def clone(value):
return value.copy() if is_list(value) or is_dict(value) else value
return value.copy() if is_list(value) or isinstance(value, dict) else value
def rec_merge_one(target, source):
@ -1589,7 +1585,7 @@ def rec_merge_one(target, source):
value.extend(value2)
else:
value.extend(value2)
elif is_dict(value2):
elif isinstance(value2, dict):
rec_merge_one(value, value2)
else:
target[key] = value2
@ -1609,7 +1605,7 @@ def resolve_extends(services, service_names, environ):
for name in service_names:
service = services[name]
ext = service.get("extends", {})
if is_str(ext):
if isinstance(ext, str):
ext = {"service": ext}
from_service_name = ext.get("service", None)
if not from_service_name:
@ -1694,7 +1690,7 @@ class PodmanCompose:
]
def assert_services(self, services):
if is_str(services):
if isinstance(services, str):
services = [services]
given = set(services or [])
missing = given - self.all_services
@ -1918,7 +1914,9 @@ class PodmanCompose:
allnets = set()
for name, srv in services.items():
srv_nets = srv.get("networks", None) or self.default_net
srv_nets = list(srv_nets.keys()) if is_dict(srv_nets) else norm_as_list(srv_nets)
srv_nets = (
list(srv_nets.keys()) if isinstance(srv_nets, dict) else norm_as_list(srv_nets)
)
allnets.update(srv_nets)
given_nets = set(nets.keys())
missing_nets = allnets - given_nets
@ -2592,7 +2590,7 @@ def get_volume_names(compose, cnt):
srv_name = cnt["_service"]
ls = []
for volume in cnt.get("volumes", []):
if is_str(volume):
if isinstance(volume, str):
volume = parse_short_mount(volume, basedir)
volume = fix_mount_dict(compose, volume, srv_name)
mount_type = volume["type"]