home_assistant/custom_components/hacs/hacsbase/hacs.py

339 lines
13 KiB
Python

"""Initialize the HACS base."""
from datetime import timedelta
from aiogithubapi import GitHubException
from aiogithubapi.exceptions import GitHubNotModifiedException
from custom_components.hacs.helpers import HacsHelpers
from custom_components.hacs.helpers.functions.get_list_from_default import (
async_get_list_from_default,
)
from custom_components.hacs.helpers.functions.register_repository import (
register_repository,
)
from custom_components.hacs.helpers.functions.store import (
async_load_from_store,
async_save_to_store,
)
from custom_components.hacs.share import (
get_removed,
is_removed,
list_removed_repositories,
)
from ..base import HacsBase
from ..enums import HacsCategory, HacsStage
from ..exceptions import HacsExecutionStillInProgress
from ..share import get_factory, get_queue
from ..utils.queue_manager import QueueManager
class Hacs(HacsBase, HacsHelpers):
"""The base class of HACS, nested throughout the project."""
factory = get_factory()
queue = get_queue()
@property
def repositories(self):
"""Return the full repositories list."""
return self._repositories
def async_set_repositories(self, repositories):
"""Set the list of repositories."""
self._repositories = []
self._repositories_by_id = {}
self._repositories_by_full_name = {}
for repository in repositories:
self.async_add_repository(repository)
def async_set_repository_id(self, repository, repo_id):
"""Update a repository id."""
existing_repo_id = str(repository.data.id)
if existing_repo_id == repo_id:
return
if existing_repo_id != "0":
raise ValueError(
f"The repo id for {repository.data.full_name_lower} is already set to {existing_repo_id}"
)
repository.data.id = repo_id
self._repositories_by_id[repo_id] = repository
def async_add_repository(self, repository):
"""Add a repository to the list."""
if repository.data.full_name_lower in self._repositories_by_full_name:
return
self._repositories.append(repository)
repo_id = str(repository.data.id)
if repo_id != "0":
self._repositories_by_id[repo_id] = repository
self._repositories_by_full_name[repository.data.full_name_lower] = repository
def async_remove_repository(self, repository):
"""Remove a repository from the list."""
if repository.data.full_name_lower not in self._repositories_by_full_name:
return
self._repositories.remove(repository)
repo_id = str(repository.data.id)
if repo_id in self._repositories_by_id:
del self._repositories_by_id[repo_id]
del self._repositories_by_full_name[repository.data.full_name_lower]
def get_by_id(self, repository_id):
"""Get repository by ID."""
return self._repositories_by_id.get(str(repository_id))
def get_by_name(self, repository_full_name):
"""Get repository by full_name."""
if repository_full_name is None:
return None
return self._repositories_by_full_name.get(repository_full_name.lower())
def is_known(self, repository_id):
"""Return a bool if the repository is known."""
return str(repository_id) in self._repositories_by_id
@property
def sorted_by_name(self):
"""Return a sorted(by name) list of repository objects."""
return sorted(self.repositories, key=lambda x: x.display_name)
@property
def sorted_by_repository_name(self):
"""Return a sorted(by repository_name) list of repository objects."""
return sorted(self.repositories, key=lambda x: x.data.full_name)
async def register_repository(self, full_name, category, check=True):
"""Register a repository."""
await register_repository(full_name, category, check=check)
async def startup_tasks(self, _event=None):
"""Tasks that are started after startup."""
await self.async_set_stage(HacsStage.STARTUP)
self.status.background_task = True
self.hass.bus.async_fire("hacs/status", {})
await self.handle_critical_repositories_startup()
await self.async_load_default_repositories()
await self.clear_out_removed_repositories()
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.recurring_tasks_installed, timedelta(hours=2)
)
)
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.recurring_tasks_all, timedelta(hours=25)
)
)
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.prosess_queue, timedelta(minutes=10)
)
)
self.hass.bus.async_fire("hacs/reload", {"force": True})
await self.recurring_tasks_installed()
await self.prosess_queue()
self.status.startup = False
self.status.background_task = False
self.hass.bus.async_fire("hacs/status", {})
await self.async_set_stage(HacsStage.RUNNING)
async def handle_critical_repositories_startup(self):
"""Handled critical repositories during startup."""
alert = False
critical = await async_load_from_store(self.hass, "critical")
if not critical:
return
for repo in critical:
if not repo["acknowledged"]:
alert = True
if alert:
self.log.critical("URGENT!: Check the HACS panel!")
self.hass.components.persistent_notification.create(
title="URGENT!", message="**Check the HACS panel!**"
)
async def handle_critical_repositories(self):
"""Handled critical repositories during runtime."""
# Get critical repositories
critical_queue = QueueManager()
instored = []
critical = []
was_installed = False
try:
critical = await self.async_github_get_hacs_default_file("critical")
except GitHubNotModifiedException:
return
except GitHubException:
pass
if not critical:
self.log.debug("No critical repositories")
return
stored_critical = await async_load_from_store(self.hass, "critical")
for stored in stored_critical or []:
instored.append(stored["repository"])
stored_critical = []
for repository in critical:
removed_repo = get_removed(repository["repository"])
removed_repo.removal_type = "critical"
repo = self.get_by_name(repository["repository"])
stored = {
"repository": repository["repository"],
"reason": repository["reason"],
"link": repository["link"],
"acknowledged": True,
}
if repository["repository"] not in instored:
if repo is not None and repo.installed:
self.log.critical(
"Removing repository %s, it is marked as critical",
repository["repository"],
)
was_installed = True
stored["acknowledged"] = False
# Remove from HACS
critical_queue.add(repository.uninstall())
repo.remove()
stored_critical.append(stored)
removed_repo.update_data(stored)
# Uninstall
await critical_queue.execute()
# Save to FS
await async_save_to_store(self.hass, "critical", stored_critical)
# Restart HASS
if was_installed:
self.log.critical("Resarting Home Assistant")
self.hass.async_create_task(self.hass.async_stop(100))
async def prosess_queue(self, _notarealarg=None):
"""Recurring tasks for installed repositories."""
if not self.queue.has_pending_tasks:
self.log.debug("Nothing in the queue")
return
if self.queue.running:
self.log.debug("Queue is already running")
return
can_update = await self.async_can_update()
self.log.debug(
"Can update %s repositories, items in queue %s",
can_update,
self.queue.pending_tasks,
)
if can_update != 0:
self.status.background_task = True
self.hass.bus.async_fire("hacs/status", {})
try:
await self.queue.execute(can_update)
except HacsExecutionStillInProgress:
pass
self.status.background_task = False
self.hass.bus.async_fire("hacs/status", {})
async def recurring_tasks_installed(self, _notarealarg=None):
"""Recurring tasks for installed repositories."""
self.log.debug("Starting recurring background task for installed repositories")
self.status.background_task = True
self.hass.bus.async_fire("hacs/status", {})
for repository in self.repositories:
if self.status.startup and repository.data.full_name == "hacs/integration":
continue
if repository.data.installed and repository.data.category in self.common.categories:
self.queue.add(self.factory.safe_update(repository))
await self.handle_critical_repositories()
self.status.background_task = False
self.hass.bus.async_fire("hacs/status", {})
await self.data.async_write()
self.log.debug("Recurring background task for installed repositories done")
async def recurring_tasks_all(self, _notarealarg=None):
"""Recurring tasks for all repositories."""
self.log.debug("Starting recurring background task for all repositories")
self.status.background_task = True
self.hass.bus.async_fire("hacs/status", {})
for repository in self.repositories:
if repository.data.category in self.common.categories:
self.queue.add(self.factory.safe_common_update(repository))
await self.async_load_default_repositories()
await self.clear_out_removed_repositories()
self.status.background_task = False
await self.data.async_write()
self.hass.bus.async_fire("hacs/status", {})
self.hass.bus.async_fire("hacs/repository", {"action": "reload"})
self.log.debug("Recurring background task for all repositories done")
async def clear_out_removed_repositories(self):
"""Clear out blaclisted repositories."""
need_to_save = False
for removed in list_removed_repositories():
repository = self.get_by_name(removed.repository)
if repository is not None:
if repository.data.installed and removed.removal_type != "critical":
self.log.warning(
f"You have {repository.data.full_name} installed with HACS "
+ "this repository has been removed, please consider removing it. "
+ f"Removal reason ({removed.removal_type})"
)
else:
need_to_save = True
repository.remove()
if need_to_save:
await self.data.async_write()
async def async_load_default_repositories(self):
"""Load known repositories."""
self.log.info("Loading known repositories")
for item in await async_get_list_from_default(HacsCategory.REMOVED):
removed = get_removed(item["repository"])
removed.reason = item.get("reason")
removed.link = item.get("link")
removed.removal_type = item.get("removal_type")
for category in self.common.categories or []:
self.queue.add(self.async_get_category_repositories(HacsCategory(category)))
await self.prosess_queue()
async def async_get_category_repositories(self, category: HacsCategory):
"""Get repositories from category."""
repositories = await async_get_list_from_default(category)
for repo in repositories:
if self.common.renamed_repositories.get(repo):
repo = self.common.renamed_repositories[repo]
if is_removed(repo):
continue
if repo in self.common.archived_repositories:
continue
repository = self.get_by_name(repo)
if repository is not None:
if str(repository.data.id) not in self.common.default:
self.common.default.append(str(repository.data.id))
else:
continue
continue
self.queue.add(self.factory.safe_register(repo, category))