ohmyzsh/.github/workflows/dependencies/updater.py

599 lines
19 KiB
Python

import json
import os
import re
import shutil
import subprocess
import sys
import timeit
from copy import deepcopy
from typing import Literal, NotRequired, Optional, TypedDict
import requests
import yaml
from semver import Version
# Get TMP_DIR variable from environment
TMP_DIR = os.path.join(os.environ.get("TMP_DIR", "/tmp"), "ohmyzsh")
# Relative path to dependencies.yml file
DEPS_YAML_FILE = ".github/dependencies.yml"
# Dry run flag
DRY_RUN = os.environ.get("DRY_RUN", "0") == "1"
# utils for tag comparison
BASEVERSION = re.compile(
r"""[vV]?
(?P<major>(0|[1-9])\d*)
(\.
(?P<minor>(0|[1-9])\d*)
(\.
(?P<patch>(0|[1-9])\d*)
)?
)?
""",
re.VERBOSE,
)
def coerce(version: str) -> Optional[Version]:
match = BASEVERSION.search(version)
if not match:
return None
# BASEVERSION looks for `MAJOR.minor.patch` in the string given
# it fills with None if any of them is missing (for example `2.1`)
ver = {
key: 0 if value is None else value for key, value in match.groupdict().items()
}
# Version takes `major`, `minor`, `patch` arguments
ver = Version(**ver) # pyright: ignore[reportArgumentType]
return ver
class CodeTimer:
def __init__(self, name=None):
self.name = " '" + name + "'" if name else ""
def __enter__(self):
self.start = timeit.default_timer()
def __exit__(self, exc_type, exc_value, traceback):
self.took = (timeit.default_timer() - self.start) * 1000.0
print("Code block" + self.name + " took: " + str(self.took) + " ms")
### YAML representation
def str_presenter(dumper, data):
"""
Configures yaml for dumping multiline strings
Ref: https://stackoverflow.com/a/33300001
"""
if len(data.splitlines()) > 1: # check for multiline string
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml.add_representer(str, str_presenter)
yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
# Types
class DependencyDict(TypedDict):
repo: str
branch: str
version: str
precopy: NotRequired[str]
postcopy: NotRequired[str]
class DependencyYAML(TypedDict):
dependencies: dict[str, DependencyDict]
class UpdateStatusFalse(TypedDict):
has_updates: Literal[False]
class UpdateStatusTrue(TypedDict):
has_updates: Literal[True]
version: str
compare_url: str
head_ref: str
head_url: str
class CommandRunner:
class Exception(Exception):
def __init__(self, message, returncode, stage, stdout, stderr):
super().__init__(message)
self.returncode = returncode
self.stage = stage
self.stdout = stdout
self.stderr = stderr
@staticmethod
def run_or_fail(command: list[str], stage: str, *args, **kwargs):
if DRY_RUN and command[0] == "gh":
command.insert(0, "echo")
result = subprocess.run(command, *args, capture_output=True, **kwargs)
if result.returncode != 0:
raise CommandRunner.Exception(
f"{stage} command failed with exit code {result.returncode}",
returncode=result.returncode,
stage=stage,
stdout=result.stdout.decode("utf-8"),
stderr=result.stderr.decode("utf-8"),
)
return result
class DependencyStore:
store: DependencyYAML = {"dependencies": {}}
@staticmethod
def set(data: DependencyYAML):
DependencyStore.store = data
@staticmethod
def update_dependency_version(path: str, version: str) -> DependencyYAML:
with CodeTimer(f"store deepcopy: {path}"):
store_copy = deepcopy(DependencyStore.store)
dependency = store_copy["dependencies"].get(path)
if dependency is None:
raise ValueError(f"Dependency {path} {version} not found")
dependency["version"] = version
store_copy["dependencies"][path] = dependency
return store_copy
@staticmethod
def write_store(file: str, data: DependencyYAML):
with open(file, "w") as yaml_file:
yaml.safe_dump(data, yaml_file, sort_keys=False)
class Dependency:
def __init__(self, path: str, values: DependencyDict):
self.path = path
self.values = values
self.name: str = ""
self.desc: str = ""
self.kind: str = ""
match path.split("/"):
case ["plugins", name]:
self.name = name
self.kind = "plugin"
self.desc = f"{name} plugin"
case ["themes", name]:
self.name = name.replace(".zsh-theme", "")
self.kind = "theme"
self.desc = f"{self.name} theme"
case _:
self.name = self.desc = path
def __str__(self):
output: str = ""
for key in DependencyDict.__dict__["__annotations__"].keys():
if key not in self.values:
output += f"{key}: None\n"
continue
value = self.values[key]
if "\n" not in value:
output += f"{key}: {value}\n"
else:
output += f"{key}:\n "
output += value.replace("\n", "\n ", value.count("\n") - 1)
return output
def update_or_notify(self):
# Print dependency settings
print(f"Processing {self.desc}...", file=sys.stderr)
print(self, file=sys.stderr)
# Check for updates
repo = self.values["repo"]
remote_branch = self.values["branch"]
version = self.values["version"]
is_tag = version.startswith("tag:")
try:
with CodeTimer(f"update check: {repo}"):
if is_tag:
status = GitHub.check_newer_tag(repo, version.replace("tag:", ""))
else:
status = GitHub.check_updates(repo, remote_branch, version)
if status["has_updates"] is True:
short_sha = status["head_ref"][:8]
new_version = status["version"] if is_tag else short_sha
try:
branch_name = f"update/{self.path}/{new_version}"
# Create new branch
branch = Git.checkout_or_create_branch(branch_name)
# Update dependencies.yml file
self.__update_yaml(
f"tag:{new_version}" if is_tag else status["version"]
)
# Update dependency files
self.__apply_upstream_changes()
# Add all changes and commit
has_new_commit = Git.add_and_commit(self.name, short_sha)
if has_new_commit:
# Push changes to remote
Git.push(branch)
# Create GitHub PR
GitHub.create_pr(
branch,
f"feat({self.name}): update to version {new_version}",
f"""## Description
Update for **{self.desc}**: update to version [{new_version}]({status['head_url']}).
Check out the [list of changes]({status['compare_url']}).
""",
)
# Clean up repository
Git.clean_repo()
except (CommandRunner.Exception, shutil.Error) as e:
# Handle exception on automatic update
match type(e):
case CommandRunner.Exception:
# Print error message
print(
f"Error running {e.stage} command: {e.returncode}", # pyright: ignore[reportAttributeAccessIssue]
file=sys.stderr,
)
print(e.stderr, file=sys.stderr) # pyright: ignore[reportAttributeAccessIssue]
case shutil.Error:
print(f"Error copying files: {e}", file=sys.stderr)
try:
Git.clean_repo()
except CommandRunner.Exception as e:
print(
f"Error reverting repository to clean state: {e}",
file=sys.stderr,
)
sys.exit(1)
# Create a GitHub issue to notify maintainer
title = f"{self.path}: update to {new_version}"
body = f"""## Description
There is a new version of `{self.name}` {self.kind} available.
New version: [{new_version}]({status['head_url']})
Check out the [list of changes]({status['compare_url']}).
"""
print("Creating GitHub issue", file=sys.stderr)
print(f"{title}\n\n{body}", file=sys.stderr)
GitHub.create_issue(title, body)
except Exception as e:
print(e, file=sys.stderr)
def __update_yaml(self, new_version: str) -> None:
dep_yaml = DependencyStore.update_dependency_version(self.path, new_version)
DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml)
def __apply_upstream_changes(self) -> None:
# Patterns to ignore in copying files from upstream repo
GLOBAL_IGNORE = [".git", ".github", ".gitignore"]
path = os.path.abspath(self.path)
precopy = self.values.get("precopy")
postcopy = self.values.get("postcopy")
repo = self.values["repo"]
branch = self.values["branch"]
remote_url = f"https://github.com/{repo}.git"
repo_dir = os.path.join(TMP_DIR, repo)
# Clone repository
Git.clone(remote_url, branch, repo_dir, reclone=True)
# Run precopy on tmp repo
if precopy is not None:
print("Running precopy script:", end="\n ", file=sys.stderr)
print(
precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr
)
CommandRunner.run_or_fail(
["bash", "-c", precopy], cwd=repo_dir, stage="Precopy"
)
# Copy files from upstream repo
print(f"Copying files from {repo_dir} to {path}", file=sys.stderr)
shutil.copytree(
repo_dir,
path,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(*GLOBAL_IGNORE),
)
# Run postcopy on our repository
if postcopy is not None:
print("Running postcopy script:", end="\n ", file=sys.stderr)
print(
postcopy.replace("\n", "\n ", postcopy.count("\n") - 1),
file=sys.stderr,
)
CommandRunner.run_or_fail(
["bash", "-c", postcopy], cwd=path, stage="Postcopy"
)
class Git:
default_branch = "master"
@staticmethod
def clone(remote_url: str, branch: str, repo_dir: str, reclone=False):
# If repo needs to be fresh
if reclone and os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
# Clone repo in tmp directory and checkout branch
if not os.path.exists(repo_dir):
print(
f"Cloning {remote_url} to {repo_dir} and checking out {branch}",
file=sys.stderr,
)
CommandRunner.run_or_fail(
["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir],
stage="Clone",
)
@staticmethod
def checkout_or_create_branch(branch_name: str):
# Get current branch name
result = CommandRunner.run_or_fail(
["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch"
)
Git.default_branch = result.stdout.decode("utf-8").strip()
# Create new branch and return created branch name
try:
# try to checkout already existing branch
CommandRunner.run_or_fail(
["git", "checkout", branch_name], stage="CreateBranch"
)
except CommandRunner.Exception:
# otherwise create new branch
CommandRunner.run_or_fail(
["git", "checkout", "-b", branch_name], stage="CreateBranch"
)
return branch_name
@staticmethod
def add_and_commit(scope: str, version: str) -> bool:
"""
Returns `True` if there were changes and were indeed commited.
Returns `False` if the repo was clean and no changes were commited.
"""
# check if repo is clean (clean => no error, no commit)
try:
CommandRunner.run_or_fail(
["git", "diff", "--exit-code"], stage="CheckRepoClean"
)
return False
except CommandRunner.Exception:
# if it's other kind of error just throw!
pass
user_name = os.environ.get("GIT_APP_NAME")
user_email = os.environ.get("GIT_APP_EMAIL")
# Add all files to git staging
CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles")
# Reset environment and git config
clean_env = os.environ.copy()
clean_env["LANG"] = "C.UTF-8"
clean_env["GIT_CONFIG_GLOBAL"] = "/dev/null"
clean_env["GIT_CONFIG_NOSYSTEM"] = "1"
# Commit with settings above
CommandRunner.run_or_fail(
[
"git",
"-c",
f"user.name={user_name}",
"-c",
f"user.email={user_email}",
"commit",
"-m",
f"feat({scope}): update to {version}",
],
stage="CreateCommit",
env=clean_env,
)
return True
@staticmethod
def push(branch: str):
CommandRunner.run_or_fail(
["git", "push", "-u", "origin", branch], stage="PushBranch"
)
@staticmethod
def clean_repo():
CommandRunner.run_or_fail(
["git", "reset", "--hard", "HEAD"], stage="ResetRepository"
)
CommandRunner.run_or_fail(
["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch"
)
class GitHub:
@staticmethod
def check_newer_tag(repo, current_tag) -> UpdateStatusFalse | UpdateStatusTrue:
# GET /repos/:owner/:repo/git/refs/tags
url = f"https://api.github.com/repos/{repo}/git/refs/tags"
# Send a GET request to the GitHub API
response = requests.get(url)
current_version = coerce(current_tag)
if current_version is None:
raise ValueError(
f"Stored {current_version} from {repo} does not follow semver"
)
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
if len(data) == 0:
return {
"has_updates": False,
}
latest_ref = None
latest_version: Optional[Version] = None
for ref in data:
# we find the tag since GitHub returns it as plain git ref
tag_version = coerce(ref["ref"].replace("refs/tags/", ""))
if tag_version is None:
# we skip every tag that is not semver-complaint
continue
if latest_version is None or tag_version.compare(latest_version) > 0:
# if we have a "greater" semver version, set it as latest
latest_version = tag_version
latest_ref = ref
# raise if no valid semver tag is found
if latest_ref is None or latest_version is None:
raise ValueError(f"No tags following semver found in {repo}")
# we get the tag since GitHub returns it as plain git ref
latest_tag = latest_ref["ref"].replace("refs/tags/", "")
if latest_version.compare(current_version) <= 0:
return {
"has_updates": False,
}
return {
"has_updates": True,
"version": latest_tag,
"compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}",
"head_ref": latest_ref["object"]["sha"],
"head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}",
}
else:
# If the request was not successful, raise an exception
raise Exception(
f"GitHub API request failed with status code {response.status_code}: {response.json()}"
)
@staticmethod
def check_updates(repo, branch, version) -> UpdateStatusFalse | UpdateStatusTrue:
url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}"
# Send a GET request to the GitHub API
response = requests.get(url)
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# If the base is behind the head, there is a newer version
has_updates = data["status"] != "identical"
if not has_updates:
return {
"has_updates": False,
}
return {
"has_updates": data["status"] != "identical",
"version": data["commits"][-1]["sha"],
"compare_url": data["permalink_url"],
"head_ref": data["commits"][-1]["sha"],
"head_url": data["commits"][-1]["html_url"],
}
else:
# If the request was not successful, raise an exception
raise Exception(
f"GitHub API request failed with status code {response.status_code}: {response.json()}"
)
@staticmethod
def create_issue(title: str, body: str) -> None:
cmd = ["gh", "issue", "create", "-t", title, "-b", body]
CommandRunner.run_or_fail(cmd, stage="CreateIssue")
@staticmethod
def create_pr(branch: str, title: str, body: str) -> None:
# first of all let's check if PR is already open
check_cmd = [
"gh",
"pr",
"list",
"--state",
"open",
"--head",
branch,
"--json",
"title",
]
# returncode is 0 also if no PRs are found
output = json.loads(
CommandRunner.run_or_fail(check_cmd, stage="CheckPullRequestOpen")
.stdout.decode("utf-8")
.strip()
)
# we have PR in this case!
if len(output) > 0:
return
cmd = [
"gh",
"pr",
"create",
"-B",
Git.default_branch,
"-H",
branch,
"-t",
title,
"-b",
body,
]
CommandRunner.run_or_fail(cmd, stage="CreatePullRequest")
def main():
# Load the YAML file
with open(DEPS_YAML_FILE, "r") as yaml_file:
data: DependencyYAML = yaml.safe_load(yaml_file)
if "dependencies" not in data:
raise Exception("dependencies.yml not properly formatted")
# Cache YAML version
DependencyStore.set(data)
dependencies = data["dependencies"]
for path in dependencies:
dependency = Dependency(path, dependencies[path])
dependency.update_or_notify()
if __name__ == "__main__":
main()