# /// script # requires-python = ">=3.9" # dependencies = [ # "githubkit", # "loguru", # ] # /// from __future__ import annotations import asyncio import os import re import sys from collections.abc import Iterator from contextlib import contextmanager from itertools import chain from githubkit import GitHub from githubkit.exception import RequestFailed from loguru import logger ORG_NAME = "ghostty-org" REPO_NAME = "ghostty" ALLOWED_PARENT_TEAM = "localization" LOCALIZATION_TEAM_NAME_PATTERN = re.compile(r"[a-z]{2}_[A-Z]{2}") LEVEL_MAP = {"DEBUG": "DBG", "WARNING": "WRN", "ERROR": "ERR"} logger.remove() logger.add( sys.stderr, format=lambda record: ( "{time:YYYY-MM-DD HH:mm:ss.SSS} | " f"{LEVEL_MAP[record['level'].name]} | " "{function}:{line} - " "{message}\n" ), backtrace=True, diagnose=True, ) @contextmanager def log_fail(message: str, *, die: bool = True) -> Iterator[None]: try: yield except RequestFailed as exc: logger.error(message) logger.error(exc) logger.error(exc.response.raw_response.json()) if die: sys.exit(1) gh = GitHub(os.environ["GITHUB_TOKEN"]) with log_fail("Invalid token"): # Do the simplest request as a test gh.rest.rate_limit.get() async def fetch_and_parse_codeowners() -> dict[str, str]: logger.debug("Fetching CODEOWNERS file...") with log_fail("Failed to fetch CODEOWNERS file"): content = ( await gh.rest.repos.async_get_content( ORG_NAME, REPO_NAME, "CODEOWNERS", headers={"Accept": "application/vnd.github.raw+json"}, ) ).text logger.debug("Parsing CODEOWNERS file...") codeowners: dict[str, str] = {} for line in content.splitlines(): if not line or line.lstrip().startswith("#"): continue # This assumes that all entries only list one owner # and that this owner is a team (ghostty-org/foobar) path, owner = line.split() path = path.lstrip("/") owner = owner.removeprefix(f"@{ORG_NAME}/") if not is_localization_team(owner): logger.debug(f"Skipping non-l11n codeowner {owner!r} for {path}") continue codeowners[path] = owner logger.debug(f"Found codeowner {owner!r} for {path}") return codeowners async def get_team_members(team_name: str) -> list[str]: logger.debug(f"Fetching team {team_name!r}...") with log_fail(f"Failed to fetch team {team_name!r}"): team = (await gh.rest.teams.async_get_by_name(ORG_NAME, team_name)).parsed_data if team.parent and team.parent.slug == ALLOWED_PARENT_TEAM: logger.debug(f"Fetching team {team_name!r} members...") with log_fail(f"Failed to fetch team {team_name!r} members"): resp = await gh.rest.teams.async_list_members_in_org(ORG_NAME, team_name) members = [m.login for m in resp.parsed_data] logger.debug(f"Team {team_name!r} members: {', '.join(members)}") return members logger.warning(f"Team {team_name} does not have a {ALLOWED_PARENT_TEAM!r} parent") return [] async def get_changed_files(pr_number: int) -> list[str]: logger.debug("Gathering changed files...") with log_fail("Failed to gather changed files"): diff_entries = ( await gh.rest.pulls.async_list_files( ORG_NAME, REPO_NAME, pr_number, per_page=3000, headers={"Accept": "application/vnd.github+json"}, ) ).parsed_data return [d.filename for d in diff_entries] async def request_review(pr_number: int, user: str, pr_author: str) -> None: if user == pr_author: logger.debug(f"Skipping review request for {user!r} (is PR author)") logger.debug(f"Requesting review from {user!r}...") with log_fail(f"Failed to request review from {user}", die=False): await gh.rest.pulls.async_request_reviewers( ORG_NAME, REPO_NAME, pr_number, headers={"Accept": "application/vnd.github+json"}, data={"reviewers": [user]}, ) def is_localization_team(team_name: str) -> bool: return LOCALIZATION_TEAM_NAME_PATTERN.fullmatch(team_name) is not None async def get_pr_author(pr_number: int) -> str: logger.debug("Fetching PR author...") with log_fail("Failed to fetch PR author"): resp = await gh.rest.pulls.async_get(ORG_NAME, REPO_NAME, pr_number) pr_author = resp.parsed_data.user.login logger.debug(f"Found author: {pr_author!r}") return pr_author async def main() -> None: logger.debug("Reading PR number...") pr_number = int(os.environ["PR_NUMBER"]) logger.debug(f"Starting review request process for PR #{pr_number}...") changed_files = await get_changed_files(pr_number) logger.debug(f"Changed files: {', '.join(map(repr, changed_files))}") pr_author = await get_pr_author(pr_number) codeowners = await fetch_and_parse_codeowners() found_owners = set[str]() for file in changed_files: logger.debug(f"Finding owner for {file!r}...") for path, owner in codeowners.items(): if file.startswith(path): logger.debug(f"Found owner: {owner!r}") break else: logger.debug("No owner found") continue found_owners.add(owner) member_lists = await asyncio.gather( *(get_team_members(owner) for owner in found_owners) ) await asyncio.gather( *( request_review(pr_number, user, pr_author) for user in chain.from_iterable(member_lists) ) ) if __name__ == "__main__": asyncio.run(main())