Added caching (#136)

Black will cache already formatted files using their file size and
modification timestamp. The cache is per-user and will always be used
unless Black is used with --diff or with code provided via standard
input.
This commit is contained in:
Jonas Obrist 2018-04-19 09:27:07 +09:00 committed by Łukasz Langa
parent f7fd36b228
commit 639b62dcd3
6 changed files with 410 additions and 116 deletions

View File

@ -7,6 +7,7 @@ name = "pypi"
attrs = ">=17.4.0"
click = "*"
setuptools = ">=38.6.0"
appdirs = "*"
[dev-packages]
pre-commit = "*"

61
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "40e1fcca5bf4adcd0e688675714c4b2a771009b6e2005a0f375de1a46a64c906"
"sha256": "b6412a09cc7dd70b0dcd83aa9f1ab659f4c0e2ba413060ab03f7ba4b064bebce"
},
"pipfile-spec": 6,
"requires": {},
@ -14,6 +14,14 @@
]
},
"default": {
"appdirs": {
"hashes": [
"sha256:9e5896d1372858f8dd3344faf4e5014d21849c756c8d5701f78f8a103b372d92",
"sha256:d8b24664561d0d34ddfaec54636d502d7cea6e29c3eaf68f3df6180863e2166e"
],
"index": "pypi",
"version": "==1.4.3"
},
"attrs": {
"hashes": [
"sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9",
@ -41,10 +49,10 @@
},
"aspy.yaml": {
"hashes": [
"sha256:6215f44900ff65f27dbd00a36b06a7926276436ed377320cfd4febd69bbd4a94",
"sha256:be70cc0ccd1ee1d30f589f2403792eb2ffa7546470af0a17179541b13d8374df"
"sha256:c959530fab398e2391516bc8d5146489f9273b07d87dd8ba5e8b73406f7cc1fa",
"sha256:da95110d120a9168c9f43601b9cb732f006d8f193ee2c9b402c823026e4a9387"
],
"version": "==1.0.0"
"version": "==1.1.0"
},
"attrs": {
"hashes": [
@ -63,17 +71,17 @@
},
"cached-property": {
"hashes": [
"sha256:6e6935ec62567fbcbc346fad84fcea9bc77e3547b7267e62bc5b7ed8e5438ae8",
"sha256:a2fa0f89dd422f7e5dd992a4a3e0ce209d5d1e47a4db28fd0a7b5273ec8da3f0"
"sha256:67acb3ee8234245e8aea3784a492272239d9c4b487eba2fdcce9d75460d34520",
"sha256:bf093e640b7294303c7cc7ba3212f00b7a07d0416c1d923465995c9ef860a139"
],
"version": "==1.4.0"
"version": "==1.4.2"
},
"certifi": {
"hashes": [
"sha256:14131608ad2fd56836d33a71ee60fa1c82bc9d2c8d98b7bdbc631fe1b3cd1296",
"sha256:edbc3f203427eef571f79a7692bb160a2b0f7ccaa31953e99bd17e307cf63f7d"
"sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7",
"sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0"
],
"version": "==2018.1.18"
"version": "==2018.4.16"
},
"cfgv": {
"hashes": [
@ -171,10 +179,10 @@
},
"identify": {
"hashes": [
"sha256:53be6ea950a5f40e13be2dd87e67413eb6879527b831333196ab2a54de38f499",
"sha256:c0bfb29634e04cde8e54aee2d55aff9dad30d6ea1f3e9e3ce731934d78635aa1"
"sha256:5cbcc7fca1263bd87fc4dea1abfd7abbbc3807c9b9f09e6f999da6857a0fe35a",
"sha256:a42f6ed9c3ad02b187c8a17027bb9042a54f463d8e617ca208038a25ec69faa7"
],
"version": "==1.0.8"
"version": "==1.0.13"
},
"idna": {
"hashes": [
@ -212,11 +220,11 @@
},
"mypy": {
"hashes": [
"sha256:3bd95a1369810f7693366911d85be9f0a0bd994f6cb7162b7a994e5ded90e3d9",
"sha256:7247f9948d7cdaae9408a4ee1662a01853c24e668117b4419acf025b05fbe3ce"
"sha256:04bffef22377b3f56f96da2d032e5d0b2e8a9062a127afc008dc4b0e64cede7a",
"sha256:cdd3ddf96a2cc2e955bcc7b2a16b25e400f132393375b45a2d719c91ac1a8291"
],
"index": "pypi",
"version": "==0.580"
"version": "==0.590"
},
"nodeenv": {
"hashes": [
@ -248,6 +256,8 @@
},
"pycodestyle": {
"hashes": [
"sha256:1ec08a51c901dfe44921576ed6e4c1f5b7ecbad403f871397feedb5eb8e4fa14",
"sha256:5ff2fbcbab997895ba9ead77e1b38b3ebc2e5c3b8a6194ef918666e4c790a00e",
"sha256:682256a5b318149ca0d2a9185d365d8864a768a28db66a84a2ea946bcc426766",
"sha256:6c4245ade1edfad79c3446fadfc96b0de2759662dc29d07d80a6f27ad1ca6ba9"
],
@ -281,17 +291,10 @@
},
"pytz": {
"hashes": [
"sha256:07edfc3d4d2705a20a6e99d97f0c4b61c800b8232dc1c04d87e8554f130148dd",
"sha256:3a47ff71597f821cd84a162e71593004286e5be07a340fd462f0d33a760782b5",
"sha256:410bcd1d6409026fbaa65d9ed33bf6dd8b1e94a499e32168acfc7b332e4095c0",
"sha256:5bd55c744e6feaa4d599a6cbd8228b4f8f9ba96de2c38d56f08e534b3c9edf0d",
"sha256:61242a9abc626379574a166dc0e96a66cd7c3b27fc10868003fa210be4bff1c9",
"sha256:887ab5e5b32e4d0c86efddd3d055c1f363cbaa583beb8da5e22d2fa2f64d51ef",
"sha256:ba18e6a243b3625513d85239b3e49055a2f0318466e0b8a92b8fb8ca7ccdf55f",
"sha256:ed6509d9af298b7995d69a440e2822288f2eca1681b8cce37673dbb10091e5fe",
"sha256:f93ddcdd6342f94cea379c73cddb5724e0d6d0a1c91c9bdef364dc0368ba4fda"
"sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555",
"sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749"
],
"version": "==2018.3"
"version": "==2018.4"
},
"pyyaml": {
"hashes": [
@ -365,10 +368,10 @@
},
"tqdm": {
"hashes": [
"sha256:4f2eb1d14804caf7095500fe11da0e481a47af912e7b57c93f886ac3c40a49dd",
"sha256:91ac47ec2ba6bb92b7ba37706f4dea37019ddd784b22fd279a4b12d93327191d"
"sha256:597e7526c85df881d51e094360181a84533aede1cb3f5a1cada8bbd4de557efd",
"sha256:fe3d218d5b61993d415aa2a9db6dd64c0e4cefb90164ebb197ef3b1d99f531dc"
],
"version": "==4.20.0"
"version": "==4.23.0"
},
"twine": {
"hashes": [

View File

@ -439,6 +439,20 @@ the line length if you really need to. If you're already using Python
that is pinned to the latest release on PyPI. If you'd rather run on
master, this is also an option.
## Ignoring non-modified files
*Black* remembers files it already formatted, unless the `--diff` flag is used or
code is passed via standard input. This information is stored per-user. The exact
location of the file depends on the black version and the system on which black
is run. The file is non-portable. The standard location on common operating systems
is:
* Windows: `C:\\Users\<username>\AppData\Local\black\black\Cache\<version>\cache.pkl`
* macOS: `/Users/<username>/Library/Caches/black/<version>/cache.pkl`
* Linux: `/home/<username>/.cache/black/<version>/cache.pkl`
## Testimonials
**Dusty Phillips**, [writer](https://smile.amazon.com/s/ref=nb_sb_noss?url=search-alias%3Daps&field-keywords=dusty+phillips):

249
black.py
View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import asyncio
import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
from enum import Enum
@ -32,6 +33,7 @@
Union,
)
from appdirs import user_cache_dir
from attr import dataclass, Factory
import click
@ -54,6 +56,10 @@
Index = int
LN = Union[Leaf, Node]
SplitFunc = Callable[["Line", bool], Iterator["Line"]]
Timestamp = float
FileSize = int
CacheInfo = Tuple[Timestamp, FileSize]
Cache = Dict[Path, CacheInfo]
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
@ -104,6 +110,12 @@ class WriteBack(Enum):
DIFF = 2
class Changed(Enum):
NO = 0
CACHED = 1
YES = 2
@click.command()
@click.option(
"-l",
@ -185,35 +197,70 @@ def main(
write_back = WriteBack.YES
if len(sources) == 0:
ctx.exit(0)
return
elif len(sources) == 1:
p = sources[0]
report = Report(check=check, quiet=quiet)
try:
if not p.is_file() and str(p) == "-":
changed = format_stdin_to_stdout(
line_length=line_length, fast=fast, write_back=write_back
)
else:
changed = format_file_in_place(
p, line_length=line_length, fast=fast, write_back=write_back
)
report.done(p, changed)
except Exception as exc:
report.failed(p, str(exc))
ctx.exit(report.return_code)
return_code = run_single_file_mode(
line_length, check, fast, quiet, write_back, sources[0]
)
else:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
return_code = 1
try:
return_code = loop.run_until_complete(
schedule_formatting(
sources, line_length, write_back, fast, quiet, loop, executor
)
return_code = run_multi_file_mode(line_length, fast, quiet, write_back, sources)
ctx.exit(return_code)
def run_single_file_mode(
line_length: int,
check: bool,
fast: bool,
quiet: bool,
write_back: WriteBack,
src: Path,
) -> int:
report = Report(check=check, quiet=quiet)
try:
if not src.is_file() and str(src) == "-":
changed = format_stdin_to_stdout(
line_length=line_length, fast=fast, write_back=write_back
)
finally:
shutdown(loop)
ctx.exit(return_code)
else:
changed = Changed.NO
cache: Cache = {}
if write_back != WriteBack.DIFF:
cache = read_cache()
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
if changed is not Changed.CACHED:
changed = format_file_in_place(
src, line_length=line_length, fast=fast, write_back=write_back
)
if write_back != WriteBack.DIFF and changed is not Changed.NO:
write_cache(cache, [src])
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
return report.return_code
def run_multi_file_mode(
line_length: int,
fast: bool,
quiet: bool,
write_back: WriteBack,
sources: List[Path],
) -> int:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
return_code = 1
try:
return_code = loop.run_until_complete(
schedule_formatting(
sources, line_length, write_back, fast, quiet, loop, executor
)
)
finally:
shutdown(loop)
return return_code
async def schedule_formatting(
@ -232,41 +279,55 @@ async def schedule_formatting(
`line_length`, `write_back`, and `fast` options are passed to
:func:`format_file_in_place`.
"""
lock = None
if write_back == WriteBack.DIFF:
# For diff output, we need locks to ensure we don't interleave output
# from different processes.
manager = Manager()
lock = manager.Lock()
tasks = {
src: loop.run_in_executor(
executor, format_file_in_place, src, line_length, fast, write_back, lock
)
for src in sources
}
_task_values = list(tasks.values())
loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
await asyncio.wait(tasks.values())
cancelled = []
report = Report(check=write_back is WriteBack.NO, quiet=quiet)
for src, task in tasks.items():
if not task.done():
report.failed(src, "timed out, cancelling")
task.cancel()
cancelled.append(task)
elif task.cancelled():
cancelled.append(task)
elif task.exception():
report.failed(src, str(task.exception()))
else:
report.done(src, task.result())
cache: Cache = {}
if write_back != WriteBack.DIFF:
cache = read_cache()
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
cancelled = []
formatted = []
if sources:
lock = None
if write_back == WriteBack.DIFF:
# For diff output, we need locks to ensure we don't interleave output
# from different processes.
manager = Manager()
lock = manager.Lock()
tasks = {
src: loop.run_in_executor(
executor, format_file_in_place, src, line_length, fast, write_back, lock
)
for src in sources
}
_task_values = list(tasks.values())
loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
await asyncio.wait(_task_values)
for src, task in tasks.items():
if not task.done():
report.failed(src, "timed out, cancelling")
task.cancel()
cancelled.append(task)
elif task.cancelled():
cancelled.append(task)
elif task.exception():
report.failed(src, str(task.exception()))
else:
formatted.append(src)
report.done(src, task.result())
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
elif not quiet:
out("All done! ✨ 🍰 ✨")
if not quiet:
click.echo(str(report))
if write_back != WriteBack.DIFF and formatted:
write_cache(cache, formatted)
return report.return_code
@ -276,12 +337,13 @@ def format_file_in_place(
fast: bool,
write_back: WriteBack = WriteBack.NO,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
) -> Changed:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
@ -289,7 +351,7 @@ def format_file_in_place(
src_contents, line_length=line_length, fast=fast
)
except NothingChanged:
return False
return Changed.NO
if write_back == write_back.YES:
with open(src, "w", encoding=src_buffer.encoding) as f:
@ -305,12 +367,12 @@ def format_file_in_place(
finally:
if lock:
lock.release()
return True
return Changed.YES
def format_stdin_to_stdout(
line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
) -> bool:
) -> Changed:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
@ -320,10 +382,10 @@ def format_stdin_to_stdout(
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast)
return True
return Changed.YES
except NothingChanged:
return False
return Changed.NO
finally:
if write_back == WriteBack.YES:
@ -2201,16 +2263,20 @@ class Report:
same_count: int = 0
failure_count: int = 0
def done(self, src: Path, changed: bool) -> None:
def done(self, src: Path, changed: Changed) -> None:
"""Increment the counter for successful reformatting. Write out a message."""
if changed:
if changed is Changed.YES:
reformatted = "would reformat" if self.check else "reformatted"
if not self.quiet:
out(f"{reformatted} {src}")
self.change_count += 1
else:
if not self.quiet:
out(f"{src} already well formatted, good job.", bold=False)
if changed is Changed.NO:
msg = f"{src} already well formatted, good job."
else:
msg = f"{src} wasn't modified on disk since last run."
out(msg, bold=False)
self.same_count += 1
def failed(self, src: Path, message: str) -> None:
@ -2409,5 +2475,62 @@ def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
return regex.sub(replacement, regex.sub(replacement, original))
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
CACHE_FILE = CACHE_DIR / "cache.pickle"
def read_cache() -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
if not CACHE_FILE.exists():
return {}
with CACHE_FILE.open("rb") as fobj:
try:
cache: Cache = pickle.load(fobj)
except pickle.UnpicklingError:
return {}
return cache
def get_cache_info(path: Path) -> CacheInfo:
"""Return the information used to check if a file is already formatted or not."""
stat = path.stat()
return stat.st_mtime, stat.st_size
def filter_cached(
cache: Cache, sources: Iterable[Path]
) -> Tuple[List[Path], List[Path]]:
"""Split a list of paths into two.
The first list contains paths of files that modified on disk or are not in the
cache. The other list contains paths to non-modified files.
"""
todo, done = [], []
for src in sources:
src = src.resolve()
if cache.get(src) != get_cache_info(src):
todo.append(src)
else:
done.append(src)
return todo, done
def write_cache(cache: Cache, sources: List[Path]) -> None:
"""Update the cache file."""
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
with CACHE_FILE.open("wb") as fobj:
pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
except OSError:
pass
if __name__ == "__main__":
main()

View File

@ -40,7 +40,7 @@ def get_version():
package_data={"blib2to3": ["*.txt"]},
python_requires=">=3.6",
zip_safe=False,
install_requires=["click", "attrs>=17.4.0"],
install_requires=["click", "attrs>=17.4.0", "appdirs"],
test_suite="tests.test_black",
classifiers=[
"Development Status :: 3 - Alpha",

View File

@ -1,14 +1,18 @@
#!/usr/bin/env python3
import asyncio
from contextlib import contextmanager
from functools import partial
from io import StringIO
import os
from pathlib import Path
import sys
from typing import Any, List, Tuple
from tempfile import TemporaryDirectory
from typing import Any, List, Tuple, Iterator
import unittest
from unittest.mock import patch
from click import unstyle
from click.testing import CliRunner
import black
@ -46,6 +50,32 @@ def read_data(name: str) -> Tuple[str, str]:
return "".join(_input).strip() + "\n", "".join(_output).strip() + "\n"
@contextmanager
def cache_dir(exists: bool = True) -> Iterator[Path]:
with TemporaryDirectory() as workspace:
cache_dir = Path(workspace)
if not exists:
cache_dir = cache_dir / "new"
cache_file = cache_dir / "cache.pkl"
with patch("black.CACHE_DIR", cache_dir), patch("black.CACHE_FILE", cache_file):
yield cache_dir
@contextmanager
def event_loop(close: bool) -> Iterator[None]:
policy = asyncio.get_event_loop_policy()
old_loop = policy.get_event_loop()
loop = policy.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield
finally:
policy.set_event_loop(old_loop)
if close:
loop.close()
class BlackTestCase(unittest.TestCase):
maxDiff = None
@ -75,7 +105,7 @@ def test_self(self) -> None:
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
self.assertFalse(ff(THIS_FILE))
self.assertIs(ff(THIS_FILE), black.Changed.NO)
@patch("black.dump_to_file", dump_to_stderr)
def test_black(self) -> None:
@ -84,7 +114,7 @@ def test_black(self) -> None:
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
self.assertFalse(ff(THIS_DIR / ".." / "black.py"))
self.assertIs(ff(THIS_DIR / ".." / "black.py"), black.Changed.NO)
def test_piping(self) -> None:
source, expected = read_data("../black")
@ -127,7 +157,7 @@ def test_setup(self) -> None:
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
self.assertFalse(ff(THIS_DIR / ".." / "setup.py"))
self.assertIs(ff(THIS_DIR / ".." / "setup.py"), black.Changed.NO)
@patch("black.dump_to_file", dump_to_stderr)
def test_function(self) -> None:
@ -291,67 +321,76 @@ def err(msg: str, **kwargs: Any) -> None:
err_lines.append(msg)
with patch("black.out", out), patch("black.err", err):
report.done(Path("f1"), changed=False)
report.done(Path("f1"), black.Changed.NO)
self.assertEqual(len(out_lines), 1)
self.assertEqual(len(err_lines), 0)
self.assertEqual(out_lines[-1], "f1 already well formatted, good job.")
self.assertEqual(unstyle(str(report)), "1 file left unchanged.")
self.assertEqual(report.return_code, 0)
report.done(Path("f2"), changed=True)
report.done(Path("f2"), black.Changed.YES)
self.assertEqual(len(out_lines), 2)
self.assertEqual(len(err_lines), 0)
self.assertEqual(out_lines[-1], "reformatted f2")
self.assertEqual(
unstyle(str(report)), "1 file reformatted, 1 file left unchanged."
)
report.done(Path("f3"), black.Changed.CACHED)
self.assertEqual(len(out_lines), 3)
self.assertEqual(len(err_lines), 0)
self.assertEqual(
out_lines[-1], "f3 wasn't modified on disk since last run."
)
self.assertEqual(
unstyle(str(report)), "1 file reformatted, 2 files left unchanged."
)
self.assertEqual(report.return_code, 0)
report.check = True
self.assertEqual(report.return_code, 1)
report.check = False
report.failed(Path("e1"), "boom")
self.assertEqual(len(out_lines), 2)
self.assertEqual(len(out_lines), 3)
self.assertEqual(len(err_lines), 1)
self.assertEqual(err_lines[-1], "error: cannot format e1: boom")
self.assertEqual(
unstyle(str(report)),
"1 file reformatted, 1 file left unchanged, "
"1 file reformatted, 2 files left unchanged, "
"1 file failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.done(Path("f3"), changed=True)
self.assertEqual(len(out_lines), 3)
report.done(Path("f3"), black.Changed.YES)
self.assertEqual(len(out_lines), 4)
self.assertEqual(len(err_lines), 1)
self.assertEqual(out_lines[-1], "reformatted f3")
self.assertEqual(
unstyle(str(report)),
"2 files reformatted, 1 file left unchanged, "
"2 files reformatted, 2 files left unchanged, "
"1 file failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.failed(Path("e2"), "boom")
self.assertEqual(len(out_lines), 3)
self.assertEqual(len(err_lines), 2)
self.assertEqual(err_lines[-1], "error: cannot format e2: boom")
self.assertEqual(
unstyle(str(report)),
"2 files reformatted, 1 file left unchanged, "
"2 files failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.done(Path("f4"), changed=False)
self.assertEqual(len(out_lines), 4)
self.assertEqual(len(err_lines), 2)
self.assertEqual(out_lines[-1], "f4 already well formatted, good job.")
self.assertEqual(err_lines[-1], "error: cannot format e2: boom")
self.assertEqual(
unstyle(str(report)),
"2 files reformatted, 2 files left unchanged, "
"2 files failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.done(Path("f4"), black.Changed.NO)
self.assertEqual(len(out_lines), 5)
self.assertEqual(len(err_lines), 2)
self.assertEqual(out_lines[-1], "f4 already well formatted, good job.")
self.assertEqual(
unstyle(str(report)),
"2 files reformatted, 3 files left unchanged, "
"2 files failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.check = True
self.assertEqual(
unstyle(str(report)),
"2 files would be reformatted, 2 files would be left unchanged, "
"2 files would be reformatted, 3 files would be left unchanged, "
"2 files would fail to reformat.",
)
@ -442,6 +481,120 @@ def err(msg: str, **kwargs: Any) -> None:
self.assertTrue("Actual tree:" in out_str)
self.assertEqual("".join(err_lines), "")
def test_cache_broken_file(self) -> None:
with cache_dir() as workspace:
with black.CACHE_FILE.open("w") as fobj:
fobj.write("this is not a pickle")
self.assertEqual(black.read_cache(), {})
src = (workspace / "test.py").resolve()
with src.open("w") as fobj:
fobj.write("print('hello')")
result = CliRunner().invoke(black.main, [str(src)])
self.assertEqual(result.exit_code, 0)
cache = black.read_cache()
self.assertIn(src, cache)
def test_cache_single_file_already_cached(self) -> None:
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
with src.open("w") as fobj:
fobj.write("print('hello')")
black.write_cache({}, [src])
result = CliRunner().invoke(black.main, [str(src)])
self.assertEqual(result.exit_code, 0)
with src.open("r") as fobj:
self.assertEqual(fobj.read(), "print('hello')")
@event_loop(close=False)
def test_cache_multiple_files(self) -> None:
with cache_dir() as workspace:
one = (workspace / "one.py").resolve()
with one.open("w") as fobj:
fobj.write("print('hello')")
two = (workspace / "two.py").resolve()
with two.open("w") as fobj:
fobj.write("print('hello')")
black.write_cache({}, [one])
result = CliRunner().invoke(black.main, [str(workspace)])
self.assertEqual(result.exit_code, 0)
with one.open("r") as fobj:
self.assertEqual(fobj.read(), "print('hello')")
with two.open("r") as fobj:
self.assertEqual(fobj.read(), 'print("hello")\n')
cache = black.read_cache()
self.assertIn(one, cache)
self.assertIn(two, cache)
def test_no_cache_when_writeback_diff(self) -> None:
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
with src.open("w") as fobj:
fobj.write("print('hello')")
result = CliRunner().invoke(black.main, [str(src), "--diff"])
self.assertEqual(result.exit_code, 0)
self.assertFalse(black.CACHE_FILE.exists())
def test_no_cache_when_stdin(self) -> None:
with cache_dir():
result = CliRunner().invoke(black.main, ["-"], input="print('hello')")
self.assertEqual(result.exit_code, 0)
self.assertFalse(black.CACHE_FILE.exists())
def test_read_cache_no_cachefile(self) -> None:
with cache_dir():
self.assertEqual(black.read_cache(), {})
def test_write_cache_read_cache(self) -> None:
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
src.touch()
black.write_cache({}, [src])
cache = black.read_cache()
self.assertIn(src, cache)
self.assertEqual(cache[src], black.get_cache_info(src))
def test_filter_cached(self) -> None:
with TemporaryDirectory() as workspace:
path = Path(workspace)
uncached = (path / "uncached").resolve()
cached = (path / "cached").resolve()
cached_but_changed = (path / "changed").resolve()
uncached.touch()
cached.touch()
cached_but_changed.touch()
cache = {cached: black.get_cache_info(cached), cached_but_changed: (0.0, 0)}
todo, done = black.filter_cached(
cache, [uncached, cached, cached_but_changed]
)
self.assertEqual(todo, [uncached, cached_but_changed])
self.assertEqual(done, [cached])
def test_write_cache_creates_directory_if_needed(self) -> None:
with cache_dir(exists=False) as workspace:
self.assertFalse(workspace.exists())
black.write_cache({}, [])
self.assertTrue(workspace.exists())
@event_loop(close=False)
def test_failed_formatting_does_not_get_cached(self) -> None:
with cache_dir() as workspace:
failing = (workspace / "failing.py").resolve()
with failing.open("w") as fobj:
fobj.write("not actually python")
clean = (workspace / "clean.py").resolve()
with clean.open("w") as fobj:
fobj.write('print("hello")\n')
result = CliRunner().invoke(black.main, [str(workspace)])
self.assertEqual(result.exit_code, 123)
cache = black.read_cache()
self.assertNotIn(failing, cache)
self.assertIn(clean, cache)
def test_write_cache_write_fail(self):
with cache_dir(), patch.object(Path, "open") as mock:
mock.side_effect = OSError
black.write_cache({}, [])
if __name__ == "__main__":
unittest.main()