black/src/black_primer/lib.py
Nipunn Koorapati 467efe1556
Add --projects cli flag to black-primer (#2555)
* Add --projects cli flag to black-primer

Makes it possible to run a subset of projects on black primer

* Refactor into click callback
2021-10-27 11:31:34 -07:00

420 lines
14 KiB
Python

import asyncio
import errno
import json
import logging
import os
import stat
import sys
from functools import partial
from pathlib import Path
from platform import system
from shutil import rmtree, which
from subprocess import CalledProcessError
from sys import version_info
from tempfile import TemporaryDirectory
from typing import (
Any,
Callable,
Dict,
List,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
from urllib.parse import urlparse
import click
TEN_MINUTES_SECONDS = 600
WINDOWS = system() == "Windows"
BLACK_BINARY = "black.exe" if WINDOWS else "black"
GIT_BINARY = "git.exe" if WINDOWS else "git"
LOG = logging.getLogger(__name__)
# Windows needs a ProactorEventLoop if you want to exec subprocesses
# Starting with 3.8 this is the default - can remove when Black >= 3.8
# mypy only respects sys.platform if directly in the evaluation
# https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
if sys.platform == "win32":
asyncio.set_event_loop(asyncio.ProactorEventLoop())
class Results(NamedTuple):
stats: Dict[str, int] = {}
failed_projects: Dict[str, CalledProcessError] = {}
async def _gen_check_output(
cmd: Sequence[str],
timeout: float = TEN_MINUTES_SECONDS,
env: Optional[Dict[str, str]] = None,
cwd: Optional[Path] = None,
stdin: Optional[bytes] = None,
) -> Tuple[bytes, bytes]:
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=cwd,
)
try:
(stdout, stderr) = await asyncio.wait_for(process.communicate(stdin), timeout)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise
# A non-optional timeout was supplied to asyncio.wait_for, guaranteeing
# a timeout or completed process. A terminated Python process will have a
# non-empty returncode value.
assert process.returncode is not None
if process.returncode != 0:
cmd_str = " ".join(cmd)
raise CalledProcessError(
process.returncode, cmd_str, output=stdout, stderr=stderr
)
return (stdout, stderr)
def analyze_results(project_count: int, results: Results) -> int:
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
click.secho("-- primer results 📊 --\n", bold=True)
click.secho(
f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
bold=True,
fg="green",
)
click.secho(
f"{results.stats['failed']} / {project_count} FAILED ({failed_pct}%) 💩",
bold=bool(results.stats["failed"]),
fg="red",
)
s = "" if results.stats["disabled"] == 1 else "s"
click.echo(f" - {results.stats['disabled']} project{s} disabled by config")
s = "" if results.stats["wrong_py_ver"] == 1 else "s"
click.echo(
f" - {results.stats['wrong_py_ver']} project{s} skipped due to Python version"
)
click.echo(
f" - {results.stats['skipped_long_checkout']} skipped due to long checkout"
)
if results.failed_projects:
click.secho("\nFailed projects:\n", bold=True)
for project_name, project_cpe in results.failed_projects.items():
print(f"## {project_name}:")
print(f" - Returned {project_cpe.returncode}")
if project_cpe.stderr:
print(f" - stderr:\n{project_cpe.stderr.decode('utf8')}")
if project_cpe.stdout:
print(f" - stdout:\n{project_cpe.stdout.decode('utf8')}")
print("")
return results.stats["failed"]
def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]:
"""Allow a user to put long arguments into a list of strs
to make the JSON human readable"""
flat_args = []
for arg in cli_args:
if isinstance(arg, str):
flat_args.append(arg)
continue
args_as_str = "".join(arg)
flat_args.append(args_as_str)
return flat_args
async def black_run(
project_name: str,
repo_path: Optional[Path],
project_config: Dict[str, Any],
results: Results,
no_diff: bool = False,
) -> None:
"""Run Black and record failures"""
if not repo_path:
results.stats["failed"] += 1
results.failed_projects[project_name] = CalledProcessError(
69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b""
)
return
stdin_test = project_name.upper() == "STDIN"
cmd = [str(which(BLACK_BINARY))]
if "cli_arguments" in project_config and project_config["cli_arguments"]:
cmd.extend(_flatten_cli_args(project_config["cli_arguments"]))
cmd.append("--check")
if not no_diff:
cmd.append("--diff")
# Workout if we should read in a python file or search from cwd
stdin = None
if stdin_test:
cmd.append("-")
stdin = repo_path.read_bytes()
elif "base_path" in project_config:
cmd.append(project_config["base_path"])
else:
cmd.append(".")
timeout = (
project_config["timeout_seconds"]
if "timeout_seconds" in project_config
else TEN_MINUTES_SECONDS
)
with TemporaryDirectory() as tmp_path:
# Prevent reading top-level user configs by manipulating environment variables
env = {
**os.environ,
"XDG_CONFIG_HOME": tmp_path, # Unix-like
"USERPROFILE": tmp_path, # Windows (changes `Path.home()` output)
}
cwd_path = repo_path.parent if stdin_test else repo_path
try:
LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}")
_stdout, _stderr = await _gen_check_output(
cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout
)
except asyncio.TimeoutError:
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
except CalledProcessError as cpe:
# TODO: Tune for smarter for higher signal
# If any other return value than 1 we raise - can disable project in config
if cpe.returncode == 1:
if not project_config["expect_formatting_changes"]:
results.stats["failed"] += 1
results.failed_projects[repo_path.name] = cpe
else:
results.stats["success"] += 1
return
elif cpe.returncode > 1:
results.stats["failed"] += 1
results.failed_projects[repo_path.name] = cpe
return
LOG.error(f"Unknown error with {repo_path}")
raise
# If we get here and expect formatting changes something is up
if project_config["expect_formatting_changes"]:
results.stats["failed"] += 1
results.failed_projects[repo_path.name] = CalledProcessError(
0, cmd, b"Expected formatting changes but didn't get any!", b""
)
return
results.stats["success"] += 1
async def git_checkout_or_rebase(
work_path: Path,
project_config: Dict[str, Any],
rebase: bool = False,
*,
depth: int = 1,
) -> Optional[Path]:
"""git Clone project or rebase"""
git_bin = str(which(GIT_BINARY))
if not git_bin:
LOG.error("No git binary found")
return None
repo_url_parts = urlparse(project_config["git_clone_url"])
path_parts = repo_url_parts.path[1:].split("/", maxsplit=1)
repo_path: Path = work_path / path_parts[1].replace(".git", "")
cmd = [git_bin, "clone", "--depth", str(depth), project_config["git_clone_url"]]
cwd = work_path
if repo_path.exists() and rebase:
cmd = [git_bin, "pull", "--rebase"]
cwd = repo_path
elif repo_path.exists():
return repo_path
try:
_stdout, _stderr = await _gen_check_output(cmd, cwd=cwd)
except (asyncio.TimeoutError, CalledProcessError) as e:
LOG.error(f"Unable to git clone / pull {project_config['git_clone_url']}: {e}")
return None
return repo_path
def handle_PermissionError(
func: Callable[..., None], path: Path, exc: Tuple[Any, Any, Any]
) -> None:
"""
Handle PermissionError during shutil.rmtree.
This checks if the erroring function is either 'os.rmdir' or 'os.unlink', and that
the error was EACCES (i.e. Permission denied). If true, the path is set writable,
readable, and executable by everyone. Finally, it tries the error causing delete
operation again.
If the check is false, then the original error will be reraised as this function
can't handle it.
"""
excvalue = exc[1]
LOG.debug(f"Handling {excvalue} from {func.__name__}... ")
if func in (os.rmdir, os.unlink) and excvalue.errno == errno.EACCES:
LOG.debug(f"Setting {path} writable, readable, and executable by everyone... ")
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # chmod 0777
func(path) # Try the error causing delete operation again
else:
raise
async def load_projects_queue(
config_path: Path,
projects_to_run: List[str],
) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
# e.g. Run on X random packages etc.
queue: asyncio.Queue = asyncio.Queue(maxsize=len(projects_to_run))
for project in projects_to_run:
await queue.put(project)
return config, queue
async def project_runner(
idx: int,
config: Dict[str, Any],
queue: asyncio.Queue,
work_path: Path,
results: Results,
long_checkouts: bool = False,
rebase: bool = False,
keep: bool = False,
no_diff: bool = False,
) -> None:
"""Check out project and run Black on it + record result"""
loop = asyncio.get_event_loop()
py_version = f"{version_info[0]}.{version_info[1]}"
while True:
try:
project_name = queue.get_nowait()
except asyncio.QueueEmpty:
LOG.debug(f"project_runner {idx} exiting")
return
LOG.debug(f"worker {idx} working on {project_name}")
project_config = config["projects"][project_name]
# Check if disabled by config
if "disabled" in project_config and project_config["disabled"]:
results.stats["disabled"] += 1
LOG.info(f"Skipping {project_name} as it's disabled via config")
continue
# Check if we should run on this version of Python
if (
"all" not in project_config["py_versions"]
and py_version not in project_config["py_versions"]
):
results.stats["wrong_py_ver"] += 1
LOG.debug(f"Skipping {project_name} as it's not enabled for {py_version}")
continue
# Check if we're doing big projects / long checkouts
if not long_checkouts and project_config["long_checkout"]:
results.stats["skipped_long_checkout"] += 1
LOG.debug(f"Skipping {project_name} as it's configured as a long checkout")
continue
repo_path: Optional[Path] = Path(__file__)
stdin_project = project_name.upper() == "STDIN"
if not stdin_project:
repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
if not repo_path:
continue
await black_run(project_name, repo_path, project_config, results, no_diff)
if not keep and not stdin_project:
LOG.debug(f"Removing {repo_path}")
rmtree_partial = partial(
rmtree, path=repo_path, onerror=handle_PermissionError
)
await loop.run_in_executor(None, rmtree_partial)
LOG.info(f"Finished {project_name}")
async def process_queue(
config_file: str,
work_path: Path,
workers: int,
projects_to_run: List[str],
keep: bool = False,
long_checkouts: bool = False,
rebase: bool = False,
no_diff: bool = False,
) -> int:
"""
Process the queue with X workers and evaluate results
- Success is guaged via the config "expect_formatting_changes"
Integer return equals the number of failed projects
"""
results = Results()
results.stats["disabled"] = 0
results.stats["failed"] = 0
results.stats["skipped_long_checkout"] = 0
results.stats["success"] = 0
results.stats["wrong_py_ver"] = 0
config, queue = await load_projects_queue(Path(config_file), projects_to_run)
project_count = queue.qsize()
s = "" if project_count == 1 else "s"
LOG.info(f"{project_count} project{s} to run Black over")
if project_count < 1:
return -1
s = "" if workers == 1 else "s"
LOG.debug(f"Using {workers} parallel worker{s} to run Black")
# Wait until we finish running all the projects before analyzing
await asyncio.gather(
*[
project_runner(
i,
config,
queue,
work_path,
results,
long_checkouts,
rebase,
keep,
no_diff,
)
for i in range(workers)
]
)
LOG.info("Analyzing results")
return analyze_results(project_count, results)
if __name__ == "__main__": # pragma: nocover
raise NotImplementedError("lib is a library, funnily enough.")