home_assistant/custom_components/hacs/utils/queue_manager.py

73 lines
1.9 KiB
Python

"""The QueueManager class."""
import asyncio
import time
from ..exceptions import HacsExecutionStillInProgress
from .logger import getLogger
_LOGGER = getLogger()
class QueueManager:
"""The QueueManager class."""
running = False
queue = []
@property
def pending_tasks(self) -> int:
"""Return a count of pending tasks in the queue."""
return len(self.queue)
@property
def has_pending_tasks(self) -> bool:
"""Return a count of pending tasks in the queue."""
return self.pending_tasks != 0
def clear(self) -> None:
"""Clear the queue."""
self.queue = []
def add(self, task) -> None:
"""Add a task to the queue."""
self.queue.append(task)
async def execute(self, number_of_tasks=None) -> None:
"""Execute the tasks in the queue."""
if self.running:
_LOGGER.debug("Execution is allreay running")
raise HacsExecutionStillInProgress
if len(self.queue) == 0:
_LOGGER.debug("The queue is empty")
return
self.running = True
_LOGGER.debug("Checking out tasks to execute")
local_queue = []
if number_of_tasks:
for task in self.queue[:number_of_tasks]:
local_queue.append(task)
else:
for task in self.queue:
local_queue.append(task)
for task in local_queue:
self.queue.remove(task)
_LOGGER.debug("Starting queue execution for %s tasks", len(local_queue))
start = time.time()
await asyncio.gather(*local_queue)
end = time.time() - start
_LOGGER.debug(
"Queue execution finished for %s tasks finished in %.2f seconds",
len(local_queue),
end,
)
if self.has_pending_tasks:
_LOGGER.debug("%s tasks remaining in the queue", len(self.queue))
self.running = False