import atexit import json import subprocess import tarfile import tempfile import traceback import venv import zipfile from argparse import ArgumentParser, Namespace from collections.abc import Generator from concurrent.futures import ThreadPoolExecutor from functools import lru_cache, partial from pathlib import Path from typing import NamedTuple, Optional, Union, cast from urllib.request import urlopen, urlretrieve PYPI_INSTANCE = "https://pypi.org/pypi" PYPI_TOP_PACKAGES = ( "https://hugovk.github.io/top-pypi-packages/top-pypi-packages.min.json" ) INTERNAL_BLACK_REPO = f"{tempfile.gettempdir()}/__black" ArchiveKind = Union[tarfile.TarFile, zipfile.ZipFile] subprocess.run = partial(subprocess.run, check=True) # type: ignore # https://github.com/python/mypy/issues/1484 class BlackVersion(NamedTuple): version: str config: Optional[str] = None def get_pypi_download_url(package: str, version: Optional[str]) -> str: with urlopen(PYPI_INSTANCE + f"/{package}/json") as page: metadata = json.load(page) if version is None: sources = metadata["urls"] else: if version in metadata["releases"]: sources = metadata["releases"][version] else: raise ValueError( f"No releases found with version ('{version}') tag. " f"Found releases: {metadata['releases'].keys()}" ) for source in sources: if source["python_version"] == "source": break else: raise ValueError(f"Couldn't find any sources for {package}") return cast(str, source["url"]) def get_top_packages() -> list[str]: with urlopen(PYPI_TOP_PACKAGES) as page: result = json.load(page) return [package["project"] for package in result["rows"]] def get_package_source(package: str, version: Optional[str]) -> str: if package == "cpython": if version is None: version = "main" return f"https://github.com/python/cpython/archive/{version}.zip" elif package == "pypy": if version is None: version = "branch/default" return ( f"https://foss.heptapod.net/pypy/pypy/repository/{version}/archive.tar.bz2" ) else: return get_pypi_download_url(package, version) def get_archive_manager(local_file: str) -> ArchiveKind: if tarfile.is_tarfile(local_file): return tarfile.open(local_file) elif zipfile.is_zipfile(local_file): return zipfile.ZipFile(local_file) else: raise ValueError("Unknown archive kind.") def get_first_archive_member(archive: ArchiveKind) -> str: if isinstance(archive, tarfile.TarFile): return archive.getnames()[0] elif isinstance(archive, zipfile.ZipFile): return archive.namelist()[0] def download_and_extract(package: str, version: Optional[str], directory: Path) -> Path: source = get_package_source(package, version) local_file, _ = urlretrieve(source, directory / f"{package}-src") with get_archive_manager(local_file) as archive: archive.extractall(path=directory) result_dir = get_first_archive_member(archive) return directory / result_dir def get_package( package: str, version: Optional[str], directory: Path ) -> Optional[Path]: try: return download_and_extract(package, version, directory) except Exception: print(f"Caught an exception while downloading {package}.") traceback.print_exc() return None DEFAULT_SLICE = slice(None) # for flake8 def download_and_extract_top_packages( directory: Path, workers: int = 8, limit: slice = DEFAULT_SLICE, ) -> Generator[Path, None, None]: with ThreadPoolExecutor(max_workers=workers) as executor: bound_downloader = partial(get_package, version=None, directory=directory) for package in executor.map(bound_downloader, get_top_packages()[limit]): if package is not None: yield package def git_create_repository(repo: Path) -> None: subprocess.run(["git", "init"], cwd=repo) git_add_and_commit(msg="Initial commit", repo=repo) def git_add_and_commit(msg: str, repo: Path) -> None: subprocess.run(["git", "add", "."], cwd=repo) subprocess.run(["git", "commit", "-m", msg, "--allow-empty"], cwd=repo) def git_switch_branch( branch: str, repo: Path, new: bool = False, from_branch: Optional[str] = None ) -> None: args = ["git", "checkout"] if new: args.append("-b") args.append(branch) if from_branch: args.append(from_branch) subprocess.run(args, cwd=repo) def init_repos(options: Namespace) -> tuple[Path, ...]: options.output.mkdir(exist_ok=True) if options.top_packages: source_directories = tuple( download_and_extract_top_packages( directory=options.output, workers=options.workers, limit=slice(None, options.top_packages), ) ) else: source_directories = ( download_and_extract( package=options.pypi_package, version=options.version, directory=options.output, ), ) for source_directory in source_directories: git_create_repository(source_directory) if options.black_repo is None: subprocess.run( ["git", "clone", "https://github.com/psf/black.git", INTERNAL_BLACK_REPO], cwd=options.output, ) options.black_repo = options.output / INTERNAL_BLACK_REPO return source_directories @lru_cache(8) def black_runner(version: str, black_repo: Path) -> Path: directory = tempfile.TemporaryDirectory() venv.create(directory.name, with_pip=True) python = Path(directory.name) / "bin" / "python" subprocess.run([python, "-m", "pip", "install", "-e", black_repo]) atexit.register(directory.cleanup) return python def format_repo_with_version( repo: Path, from_branch: Optional[str], black_repo: Path, black_version: BlackVersion, input_directory: Path, ) -> str: current_branch = f"black-{black_version.version}" git_switch_branch(black_version.version, repo=black_repo) git_switch_branch(current_branch, repo=repo, new=True, from_branch=from_branch) format_cmd: list[Union[Path, str]] = [ black_runner(black_version.version, black_repo), (black_repo / "black.py").resolve(), ".", ] if black_version.config: format_cmd.extend(["--config", input_directory / black_version.config]) subprocess.run(format_cmd, cwd=repo, check=False) # ensure the process # continuess to run even it can't format some files. Reporting those # should be enough git_add_and_commit(f"Format with black:{black_version.version}", repo=repo) return current_branch def format_repos(repos: tuple[Path, ...], options: Namespace) -> None: black_versions = tuple( BlackVersion(*version.split(":")) for version in options.versions ) for repo in repos: from_branch = None for black_version in black_versions: from_branch = format_repo_with_version( repo=repo, from_branch=from_branch, black_repo=options.black_repo, black_version=black_version, input_directory=options.input, ) git_switch_branch("main", repo=repo) git_switch_branch("main", repo=options.black_repo) def main() -> None: parser = ArgumentParser(description="""Black Gallery is a script that automates the process of applying different Black versions to a selected PyPI package and seeing the results between versions.""") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-p", "--pypi-package", help="PyPI package to download.") group.add_argument( "-t", "--top-packages", help="Top n PyPI packages to download.", type=int ) parser.add_argument("-b", "--black-repo", help="Black's Git repository.", type=Path) parser.add_argument( "-v", "--version", help=( "Version for given PyPI package. Will be discarded if used with -t option." ), ) parser.add_argument( "-w", "--workers", help=( "Maximum number of threads to download with at the same time. " "Will be discarded if used with -p option." ), ) parser.add_argument( "-i", "--input", default=Path("/input"), type=Path, help="Input directory to read configuration.", ) parser.add_argument( "-o", "--output", default=Path("/output"), type=Path, help="Output directory to download and put result artifacts.", ) parser.add_argument("versions", nargs="*", default=("main",), help="") options = parser.parse_args() repos = init_repos(options) format_repos(repos, options) if __name__ == "__main__": main()