Add black-primer unittests (#1426)
* Add black-primer unittests - Get this tool covered with some decent unittests for all unittests wins - Have a CLI and lib test class - Import it from `test_black.py` so we always run tests - Revert typing asyncio.Queue as Queue[str] so we can work in 3.6 - **mypy**: Until black > 3.6 disallow_any_generics=False for primer code Test: - Run tests: `coverage run tests/test_primer.py` or `coverage run -m unittest` ``` (b) cooper-mbp1:black cooper$ coverage report Name Stmts Miss Cover --------------------------------------------- src/black_primer/cli.py 49 8 84% src/black_primer/lib.py 148 28 81% tests/test_primer.py 114 1 99% --------------------------------------------- TOTAL 311 37 88% ``` * Use ProactorEventLoop for Windows + fix false path for Linux * Set Windows to use ProactorEventLoop in to benefit all callers * sys.platform seems to not having the loop applied - So type ignore and use platform.system() gate * Have each test loop correctly set to ProactorEventLoop on Windows for < 3.8 too
This commit is contained in:
parent
03b8304abd
commit
8acc22f114
3
mypy.ini
3
mypy.ini
@ -21,7 +21,8 @@ strict_optional=True
|
|||||||
warn_no_return=True
|
warn_no_return=True
|
||||||
warn_redundant_casts=True
|
warn_redundant_casts=True
|
||||||
warn_unused_ignores=True
|
warn_unused_ignores=True
|
||||||
disallow_any_generics=True
|
# Until we're not supporting 3.6 primer needs this
|
||||||
|
disallow_any_generics=False
|
||||||
|
|
||||||
# The following are off by default. Flip them on if you feel
|
# The following are off by default. Flip them on if you feel
|
||||||
# adventurous.
|
# adventurous.
|
||||||
|
@ -63,7 +63,7 @@ async def async_main(
|
|||||||
LOG.debug(f"Removing {work_path}")
|
LOG.debug(f"Removing {work_path}")
|
||||||
rmtree(work_path)
|
rmtree(work_path)
|
||||||
|
|
||||||
return -1
|
return -2
|
||||||
|
|
||||||
|
|
||||||
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
|
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
|
||||||
@ -131,5 +131,5 @@ def main(ctx: click.core.Context, **kwargs: Any) -> None:
|
|||||||
loop.close()
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__": # pragma: nocover
|
||||||
main()
|
main()
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
# Module '__future__' has no attribute 'annotations'
|
|
||||||
from __future__ import annotations # type: ignore
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from platform import system
|
||||||
from shutil import rmtree, which
|
from shutil import rmtree, which
|
||||||
from subprocess import CalledProcessError
|
from subprocess import CalledProcessError
|
||||||
from sys import version_info
|
from sys import version_info
|
||||||
@ -16,9 +15,20 @@
|
|||||||
import click
|
import click
|
||||||
|
|
||||||
|
|
||||||
|
WINDOWS = system() == "Windows"
|
||||||
|
BLACK_BINARY = "black.exe" if WINDOWS else "black"
|
||||||
|
GIT_BIANRY = "git.exe" if WINDOWS else "git"
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Windows needs a ProactorEventLoop if you want to exec subprocesses
|
||||||
|
# Startng 3.8 this is the default - Can remove when black >= 3.8
|
||||||
|
# mypy only respects sys.platform if directly in the evaluation
|
||||||
|
# # https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
|
||||||
|
if sys.platform == "win32":
|
||||||
|
asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
||||||
|
|
||||||
|
|
||||||
class Results(NamedTuple):
|
class Results(NamedTuple):
|
||||||
stats: Dict[str, int] = {}
|
stats: Dict[str, int] = {}
|
||||||
failed_projects: Dict[str, CalledProcessError] = {}
|
failed_projects: Dict[str, CalledProcessError] = {}
|
||||||
@ -53,7 +63,7 @@ async def _gen_check_output(
|
|||||||
return (stdout, stderr)
|
return (stdout, stderr)
|
||||||
|
|
||||||
|
|
||||||
async def analyze_results(project_count: int, results: Results) -> int:
|
def analyze_results(project_count: int, results: Results) -> int:
|
||||||
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
|
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
|
||||||
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
|
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
|
||||||
|
|
||||||
@ -95,8 +105,8 @@ async def black_run(
|
|||||||
repo_path: Path, project_config: Dict[str, Any], results: Results
|
repo_path: Path, project_config: Dict[str, Any], results: Results
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run black and record failures"""
|
"""Run black and record failures"""
|
||||||
cmd = [str(which("black"))]
|
cmd = [str(which(BLACK_BINARY))]
|
||||||
if project_config["cli_arguments"]:
|
if "cli_arguments" in project_config and project_config["cli_arguments"]:
|
||||||
cmd.extend(*project_config["cli_arguments"])
|
cmd.extend(*project_config["cli_arguments"])
|
||||||
cmd.extend(["--check", "--diff", "."])
|
cmd.extend(["--check", "--diff", "."])
|
||||||
|
|
||||||
@ -106,12 +116,27 @@ async def black_run(
|
|||||||
results.stats["failed"] += 1
|
results.stats["failed"] += 1
|
||||||
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
|
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
|
||||||
except CalledProcessError as cpe:
|
except CalledProcessError as cpe:
|
||||||
# TODO: This might need to be tuned and made smarter for higher signal
|
# TODO: Tune for smarter for higher signal
|
||||||
if not project_config["expect_formatting_changes"] and cpe.returncode == 1:
|
# If any other reutrn value than 1 we raise - can disable project in config
|
||||||
results.stats["failed"] += 1
|
if cpe.returncode == 1:
|
||||||
results.failed_projects[repo_path.name] = cpe
|
if not project_config["expect_formatting_changes"]:
|
||||||
|
results.stats["failed"] += 1
|
||||||
|
results.failed_projects[repo_path.name] = cpe
|
||||||
|
else:
|
||||||
|
results.stats["success"] += 1
|
||||||
return
|
return
|
||||||
|
|
||||||
|
LOG.error(f"Unkown error with {repo_path}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# If we get here and expect formatting changes something is up
|
||||||
|
if project_config["expect_formatting_changes"]:
|
||||||
|
results.stats["failed"] += 1
|
||||||
|
results.failed_projects[repo_path.name] = CalledProcessError(
|
||||||
|
0, cmd, b"Expected formatting changes but didn't get any!", b""
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
results.stats["success"] += 1
|
results.stats["success"] += 1
|
||||||
|
|
||||||
|
|
||||||
@ -123,7 +148,7 @@ async def git_checkout_or_rebase(
|
|||||||
depth: int = 1,
|
depth: int = 1,
|
||||||
) -> Optional[Path]:
|
) -> Optional[Path]:
|
||||||
"""git Clone project or rebase"""
|
"""git Clone project or rebase"""
|
||||||
git_bin = str(which("git"))
|
git_bin = str(which(GIT_BIANRY))
|
||||||
if not git_bin:
|
if not git_bin:
|
||||||
LOG.error("No git binary found")
|
LOG.error("No git binary found")
|
||||||
return None
|
return None
|
||||||
@ -151,7 +176,7 @@ async def git_checkout_or_rebase(
|
|||||||
|
|
||||||
async def load_projects_queue(
|
async def load_projects_queue(
|
||||||
config_path: Path,
|
config_path: Path,
|
||||||
) -> Tuple[Dict[str, Any], asyncio.Queue[str]]:
|
) -> Tuple[Dict[str, Any], asyncio.Queue]:
|
||||||
"""Load project config and fill queue with all the project names"""
|
"""Load project config and fill queue with all the project names"""
|
||||||
with config_path.open("r") as cfp:
|
with config_path.open("r") as cfp:
|
||||||
config = json.load(cfp)
|
config = json.load(cfp)
|
||||||
@ -159,7 +184,7 @@ async def load_projects_queue(
|
|||||||
# TODO: Offer more options here
|
# TODO: Offer more options here
|
||||||
# e.g. Run on X random packages or specific sub list etc.
|
# e.g. Run on X random packages or specific sub list etc.
|
||||||
project_names = sorted(config["projects"].keys())
|
project_names = sorted(config["projects"].keys())
|
||||||
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=len(project_names))
|
queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names))
|
||||||
for project in project_names:
|
for project in project_names:
|
||||||
await queue.put(project)
|
await queue.put(project)
|
||||||
|
|
||||||
@ -169,7 +194,7 @@ async def load_projects_queue(
|
|||||||
async def project_runner(
|
async def project_runner(
|
||||||
idx: int,
|
idx: int,
|
||||||
config: Dict[str, Any],
|
config: Dict[str, Any],
|
||||||
queue: asyncio.Queue[str],
|
queue: asyncio.Queue,
|
||||||
work_path: Path,
|
work_path: Path,
|
||||||
results: Results,
|
results: Results,
|
||||||
long_checkouts: bool = False,
|
long_checkouts: bool = False,
|
||||||
@ -243,7 +268,7 @@ async def process_queue(
|
|||||||
config, queue = await load_projects_queue(Path(config_file))
|
config, queue = await load_projects_queue(Path(config_file))
|
||||||
project_count = queue.qsize()
|
project_count = queue.qsize()
|
||||||
LOG.info(f"{project_count} projects to run black over")
|
LOG.info(f"{project_count} projects to run black over")
|
||||||
if not project_count:
|
if project_count < 1:
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
LOG.debug(f"Using {workers} parallel workers to run black")
|
LOG.debug(f"Using {workers} parallel workers to run black")
|
||||||
@ -258,4 +283,8 @@ async def process_queue(
|
|||||||
)
|
)
|
||||||
|
|
||||||
LOG.info("Analyzing results")
|
LOG.info("Analyzing results")
|
||||||
return await analyze_results(project_count, results)
|
return analyze_results(project_count, results)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: nocover
|
||||||
|
raise NotImplementedError("lib is a library, funnily enough.")
|
||||||
|
@ -31,6 +31,10 @@
|
|||||||
|
|
||||||
from pathspec import PathSpec
|
from pathspec import PathSpec
|
||||||
|
|
||||||
|
# Import other test classes
|
||||||
|
from .test_primer import PrimerCLITests # noqa: F401
|
||||||
|
|
||||||
|
|
||||||
ff = partial(black.format_file_in_place, mode=black.FileMode(), fast=True)
|
ff = partial(black.format_file_in_place, mode=black.FileMode(), fast=True)
|
||||||
fs = partial(black.format_str, mode=black.FileMode())
|
fs = partial(black.format_str, mode=black.FileMode())
|
||||||
THIS_FILE = Path(__file__)
|
THIS_FILE = Path(__file__)
|
||||||
|
207
tests/test_primer.py
Normal file
207
tests/test_primer.py
Normal file
@ -0,0 +1,207 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from copy import deepcopy
|
||||||
|
from io import StringIO
|
||||||
|
from os import getpid
|
||||||
|
from pathlib import Path
|
||||||
|
from platform import system
|
||||||
|
from subprocess import CalledProcessError
|
||||||
|
from tempfile import TemporaryDirectory, gettempdir
|
||||||
|
from typing import Any, Callable, Generator, Iterator, Tuple
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from click.testing import CliRunner
|
||||||
|
|
||||||
|
from black_primer import cli, lib
|
||||||
|
|
||||||
|
|
||||||
|
EXPECTED_ANALYSIS_OUTPUT = """\
|
||||||
|
-- primer results 📊 --
|
||||||
|
|
||||||
|
68 / 69 succeeded (98.55%) ✅
|
||||||
|
1 / 69 FAILED (1.45%) 💩
|
||||||
|
- 0 projects Disabled by config
|
||||||
|
- 0 projects skipped due to Python Version
|
||||||
|
- 0 skipped due to long checkout
|
||||||
|
|
||||||
|
Failed Projects:
|
||||||
|
|
||||||
|
## black:
|
||||||
|
- Returned 69
|
||||||
|
- stdout:
|
||||||
|
black didn't work
|
||||||
|
|
||||||
|
"""
|
||||||
|
FAKE_PROJECT_CONFIG = {
|
||||||
|
"cli_arguments": ["--unittest"],
|
||||||
|
"expect_formatting_changes": False,
|
||||||
|
"git_clone_url": "https://github.com/psf/black.git",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def capture_stdout(command: Callable, *args: Any, **kwargs: Any) -> Generator:
|
||||||
|
old_stdout, sys.stdout = sys.stdout, StringIO()
|
||||||
|
try:
|
||||||
|
command(*args, **kwargs)
|
||||||
|
sys.stdout.seek(0)
|
||||||
|
yield sys.stdout.read()
|
||||||
|
finally:
|
||||||
|
sys.stdout = old_stdout
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def event_loop() -> Iterator[None]:
|
||||||
|
policy = asyncio.get_event_loop_policy()
|
||||||
|
loop = policy.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
if sys.platform == "win32":
|
||||||
|
asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def raise_subprocess_error(*args: Any, **kwargs: Any) -> None:
|
||||||
|
raise CalledProcessError(1, ["unittest", "error"], b"", b"")
|
||||||
|
|
||||||
|
|
||||||
|
async def return_false(*args: Any, **kwargs: Any) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def return_subproccess_output(*args: Any, **kwargs: Any) -> Tuple[bytes, bytes]:
|
||||||
|
return (b"stdout", b"stderr")
|
||||||
|
|
||||||
|
|
||||||
|
async def return_zero(*args: Any, **kwargs: Any) -> int:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class PrimerLibTests(unittest.TestCase):
|
||||||
|
def test_analyze_results(self) -> None:
|
||||||
|
fake_results = lib.Results(
|
||||||
|
{
|
||||||
|
"disabled": 0,
|
||||||
|
"failed": 1,
|
||||||
|
"skipped_long_checkout": 0,
|
||||||
|
"success": 68,
|
||||||
|
"wrong_py_ver": 0,
|
||||||
|
},
|
||||||
|
{"black": CalledProcessError(69, ["black"], b"black didn't work", b"")},
|
||||||
|
)
|
||||||
|
with capture_stdout(lib.analyze_results, 69, fake_results) as analyze_stdout:
|
||||||
|
self.assertEqual(EXPECTED_ANALYSIS_OUTPUT, analyze_stdout)
|
||||||
|
|
||||||
|
@event_loop()
|
||||||
|
def test_black_run(self) -> None:
|
||||||
|
"""Pretend run black to ensure we cater for all scenarios"""
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
repo_path = Path(gettempdir())
|
||||||
|
project_config = deepcopy(FAKE_PROJECT_CONFIG)
|
||||||
|
results = lib.Results({"failed": 0, "success": 0}, {})
|
||||||
|
|
||||||
|
# Test a successful black run
|
||||||
|
with patch("black_primer.lib._gen_check_output", return_subproccess_output):
|
||||||
|
loop.run_until_complete(lib.black_run(repo_path, project_config, results))
|
||||||
|
self.assertEqual(1, results.stats["success"])
|
||||||
|
self.assertFalse(results.failed_projects)
|
||||||
|
|
||||||
|
# Test a fail based on expecting formatting changes but not getting any
|
||||||
|
project_config["expect_formatting_changes"] = True
|
||||||
|
results = lib.Results({"failed": 0, "success": 0}, {})
|
||||||
|
with patch("black_primer.lib._gen_check_output", return_subproccess_output):
|
||||||
|
loop.run_until_complete(lib.black_run(repo_path, project_config, results))
|
||||||
|
self.assertEqual(1, results.stats["failed"])
|
||||||
|
self.assertTrue(results.failed_projects)
|
||||||
|
|
||||||
|
# Test a fail based on returning 1 and not expecting formatting changes
|
||||||
|
project_config["expect_formatting_changes"] = False
|
||||||
|
results = lib.Results({"failed": 0, "success": 0}, {})
|
||||||
|
with patch("black_primer.lib._gen_check_output", raise_subprocess_error):
|
||||||
|
loop.run_until_complete(lib.black_run(repo_path, project_config, results))
|
||||||
|
self.assertEqual(1, results.stats["failed"])
|
||||||
|
self.assertTrue(results.failed_projects)
|
||||||
|
|
||||||
|
@event_loop()
|
||||||
|
def test_gen_check_output(self) -> None:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
stdout, stderr = loop.run_until_complete(
|
||||||
|
lib._gen_check_output([lib.BLACK_BINARY, "--help"])
|
||||||
|
)
|
||||||
|
self.assertTrue("The uncompromising code formatter" in stdout.decode("utf8"))
|
||||||
|
self.assertEqual(None, stderr)
|
||||||
|
|
||||||
|
# TODO: Add a test to see failure works on Windows
|
||||||
|
if lib.WINDOWS:
|
||||||
|
return
|
||||||
|
|
||||||
|
false_bin = "/usr/bin/false" if system() == "Darwin" else "/bin/false"
|
||||||
|
with self.assertRaises(CalledProcessError):
|
||||||
|
loop.run_until_complete(lib._gen_check_output([false_bin]))
|
||||||
|
|
||||||
|
with self.assertRaises(asyncio.TimeoutError):
|
||||||
|
loop.run_until_complete(
|
||||||
|
lib._gen_check_output(["/bin/sleep", "2"], timeout=0.1)
|
||||||
|
)
|
||||||
|
|
||||||
|
@event_loop()
|
||||||
|
def test_git_checkout_or_rebase(self) -> None:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
project_config = deepcopy(FAKE_PROJECT_CONFIG)
|
||||||
|
work_path = Path(gettempdir())
|
||||||
|
|
||||||
|
expected_repo_path = work_path / "black"
|
||||||
|
with patch("black_primer.lib._gen_check_output", return_subproccess_output):
|
||||||
|
returned_repo_path = loop.run_until_complete(
|
||||||
|
lib.git_checkout_or_rebase(work_path, project_config)
|
||||||
|
)
|
||||||
|
self.assertEqual(expected_repo_path, returned_repo_path)
|
||||||
|
|
||||||
|
@patch("sys.stdout", new_callable=StringIO)
|
||||||
|
@event_loop()
|
||||||
|
def test_process_queue(self, mock_stdout: Mock) -> None:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
config_path = Path(lib.__file__).parent / "primer.json"
|
||||||
|
with patch("black_primer.lib.git_checkout_or_rebase", return_false):
|
||||||
|
with TemporaryDirectory() as td:
|
||||||
|
return_val = loop.run_until_complete(
|
||||||
|
lib.process_queue(str(config_path), td, 2)
|
||||||
|
)
|
||||||
|
self.assertEqual(0, return_val)
|
||||||
|
|
||||||
|
|
||||||
|
class PrimerCLITests(unittest.TestCase):
|
||||||
|
@event_loop()
|
||||||
|
def test_async_main(self) -> None:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
work_dir = Path(gettempdir()) / f"primer_ut_{getpid()}"
|
||||||
|
args = {
|
||||||
|
"config": "/config",
|
||||||
|
"debug": False,
|
||||||
|
"keep": False,
|
||||||
|
"long_checkouts": False,
|
||||||
|
"rebase": False,
|
||||||
|
"workdir": str(work_dir),
|
||||||
|
"workers": 69,
|
||||||
|
}
|
||||||
|
with patch("black_primer.cli.lib.process_queue", return_zero):
|
||||||
|
return_val = loop.run_until_complete(cli.async_main(**args))
|
||||||
|
self.assertEqual(0, return_val)
|
||||||
|
|
||||||
|
def test_handle_debug(self) -> None:
|
||||||
|
self.assertTrue(cli._handle_debug(None, None, True))
|
||||||
|
|
||||||
|
def test_help_output(self) -> None:
|
||||||
|
runner = CliRunner()
|
||||||
|
result = runner.invoke(cli.main, ["--help"])
|
||||||
|
self.assertEqual(result.exit_code, 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user