import asyncio
from typing import cast
from harp import get_logger
from harp.settings import USE_PROMETHEUS
from harp.typing import Storage
from harp_apps.sqlalchemy_storage.storage import SqlAlchemyStorage
from ..sqlalchemy_storage.models.base import with_session
from .settings import OLD_AFTER, PERIOD
logger = get_logger(__name__)
[docs]
class JanitorWorker:
[docs]
def __init__(self, storage: Storage):
self.storage: SqlAlchemyStorage = cast(SqlAlchemyStorage, storage)
self.running = False
self.session_factory = self.storage.session_factory
if USE_PROMETHEUS:
from prometheus_client import Gauge
self._prometheus = {
"storage.transactions": Gauge("storage_transactions", "Transactions currently in storage."),
"storage.messages": Gauge("storage_messages", "Messages currently in storage."),
"storage.blobs": Gauge("storage_blobs", "Blob objects currently in storage."),
"storage.blobs.orphans": Gauge("storage_blobs_orphans", "Orphan blobs currently in storage."),
}
[docs]
def stop(self):
"""
Mark the loop for termination.
"""
self.running = False
[docs]
async def run(self):
"""
Once dependencies are ready, start the main loop (basically, run the `loop()` every PERIOD seconds), until
`stop()` is called.
"""
# do not start before storage is ready
await self.storage.ready()
self.running = True
while self.running:
try:
await self.loop()
except Exception as exc:
logger.exception(exc)
await asyncio.sleep(PERIOD)
[docs]
async def loop(self):
"""
One iteration of the janitor loop.
"""
# Delete old transactions
result = await self.delete_old_transactions()
if result.rowcount:
logger.debug("🧹 Deleted %d old transactions", result.rowcount)
# Delete orphan blobs
result = await self.delete_orphan_blobs()
if result.rowcount:
logger.debug("🧹 Deleted %d orphan blobs", result.rowcount)
# Compute and store stored objecg counts as metrics
logger.debug("🧹 Compute and store metrics...")
await self.compute_and_store_metrics()
[docs]
@with_session
async def delete_old_transactions(self, /, *, session):
"""
Remove transactions older than OLD_AFTER days. On correct database implementations (postgresql for example), it
will cascade to related objects. On sqlite, there will be garbage left, but it's not a big deal.
"""
result = await session.execute(self.storage.transactions.delete_old(OLD_AFTER))
await session.commit()
return result
[docs]
@with_session
async def delete_orphan_blobs(self, /, *, session):
"""
Find and remove blobs that are not referenced anymore by any transaction.
"""
result = await session.execute(self.storage.blobs.delete_orphans())
await session.commit()
return result
[docs]
@with_session
async def compute_and_store_metrics(self, /, *, session):
"""
Compute counts of objects in storage, and store them as metrics.
"""
await self.storage.metrics.insert_values(await self.compute_metrics(session))
[docs]
async def compute_metrics(self, session):
values = {
"storage.transactions": await self.do_count(session, "transactions"),
"storage.messages": await self.do_count(session, "messages"),
"storage.blobs": await self.do_count(session, "blobs"),
"storage.blobs.orphans": await self.do_count(session, "blobs", method="count_orphans"),
}
if USE_PROMETHEUS:
for key, value in values.items():
self._prometheus[key].set(value)
return values
[docs]
async def do_count(self, session, name: str, /, *, method="count"):
"""
Helper to count objects in storage, from different repositories and using different methods for building the
actual query.
:param session: sqlalchemy async session
:param name: repository name (should be available from storage)
:param method: method name to call on the repository to get the actual sqlalchemy query, default as "count"
:return: integer
"""
return (
await session.execute(
getattr(
getattr(self.storage, name),
method,
)()
)
).scalar()