Graceful shutdown in case of cancellation

This commit is contained in:
Łukasz Langa 2018-03-30 19:31:05 -07:00
parent 2ad642754b
commit ba61bfe386

View File

@ -5,9 +5,11 @@
from concurrent.futures import Executor, ProcessPoolExecutor from concurrent.futures import Executor, ProcessPoolExecutor
from functools import partial, wraps from functools import partial, wraps
import keyword import keyword
import logging
import os import os
from pathlib import Path from pathlib import Path
import tokenize import tokenize
import signal
import sys import sys
from typing import ( from typing import (
Callable, Callable,
@ -167,7 +169,7 @@ def main(
) )
) )
finally: finally:
loop.close() shutdown(loop)
ctx.exit(return_code) ctx.exit(return_code)
@ -192,6 +194,9 @@ async def schedule_formatting(
) )
for src in sources for src in sources
} }
_task_values = list(tasks.values())
loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
await asyncio.wait(tasks.values()) await asyncio.wait(tasks.values())
cancelled = [] cancelled = []
report = Report(check=not write_back) report = Report(check=not write_back)
@ -200,13 +205,16 @@ async def schedule_formatting(
report.failed(src, 'timed out, cancelling') report.failed(src, 'timed out, cancelling')
task.cancel() task.cancel()
cancelled.append(task) cancelled.append(task)
elif task.cancelled():
cancelled.append(task)
elif task.exception(): elif task.exception():
report.failed(src, str(task.exception())) report.failed(src, str(task.exception()))
else: else:
report.done(src, task.result()) report.done(src, task.result())
if cancelled: if cancelled:
await asyncio.wait(cancelled, timeout=2) await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
out('All done! ✨ 🍰 ✨') else:
out('All done! ✨ 🍰 ✨')
click.echo(str(report)) click.echo(str(report))
return report.return_code return report.return_code
@ -1986,5 +1994,34 @@ def diff(a: str, b: str, a_name: str, b_name: str) -> str:
) )
def cancel(tasks: List[asyncio.Task]) -> None:
"""asyncio signal handler that cancels all `tasks` and reports to stderr."""
err("Aborted!")
for task in tasks:
task.cancel()
def shutdown(loop: BaseEventLoop) -> None:
"""Cancel all pending tasks on `loop`, wait for them, and close the loop."""
try:
# This part is borrowed from asyncio/runners.py in Python 3.7b2.
to_cancel = [task for task in asyncio.Task.all_tasks(loop) if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
)
finally:
# `concurrent.futures.Future` objects cannot be cancelled once they
# are already running. There might be some when the `shutdown()` happened.
# Silence their logger's spew about the event loop being closed.
cf_logger = logging.getLogger("concurrent.futures")
cf_logger.setLevel(logging.CRITICAL)
loop.close()
if __name__ == '__main__': if __name__ == '__main__':
main() main()