Source code for harp_apps.sqlalchemy_storage.models.blobs
from sqlalchemy import TIMESTAMP, LargeBinary, String, delete, func, select
from sqlalchemy.orm import aliased, mapped_column
from harp.models import Blob as BlobModel
from .base import Base, Repository, with_session
from .messages import Message
[docs]
class Blob(Base):
__tablename__ = "blobs"
id = mapped_column(String(40), primary_key=True, unique=True)
data = mapped_column(LargeBinary())
content_type = mapped_column(String(64))
created_at = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
[docs]
class BlobsRepository(Repository[Blob]):
Type = Blob
[docs]
def count_orphans(self):
MH = aliased(Message, name="mh")
MB = aliased(Message, name="mb")
subquery = (
select(Blob.id, func.count(MH.id) + func.count(MB.id))
.select_from(Blob)
.outerjoin(MH, MH.headers == Blob.id)
.outerjoin(MB, MB.body == Blob.id)
.group_by(Blob.id)
.subquery()
)
query = select(func.count(subquery.c.id)).where(subquery.c[1] == 0)
return query
[docs]
def delete_orphans(self):
MH = aliased(Message, name="mh")
MB = aliased(Message, name="mb")
subquery = (
select(Blob.id, func.count(MH.id) + func.count(MB.id))
.select_from(Blob)
.outerjoin(MH, MH.headers == Blob.id)
.outerjoin(MB, MB.body == Blob.id)
.group_by(Blob.id)
.subquery()
)
query = select(subquery.c.id).where(subquery.c[1] == 0)
return delete(Blob).where(Blob.id.in_(query))
[docs]
@with_session
async def create(self, values: dict | BlobModel, /, *, session):
if isinstance(values, BlobModel):
values = dict(
id=values.id,
data=values.data,
content_type=values.content_type,
)
return await super().create(values, session=session)