Source code for harp_apps.sqlalchemy_storage.models.messages

from datetime import UTC
from typing import TYPE_CHECKING

from sqlalchemy import TIMESTAMP, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from harp.http import get_serializer_for
from harp.models.messages import Message as MessageModel

from .base import Base, Repository, with_session

if TYPE_CHECKING:
    from .transactions import Transaction


[docs] class Message(Base): __tablename__ = "messages" id = mapped_column(Integer(), primary_key=True, unique=True, autoincrement=True) kind = mapped_column(String(10)) summary = mapped_column(String(255)) headers = mapped_column(String(40)) body = mapped_column(String(40)) created_at = mapped_column(TIMESTAMP(timezone=True)) transaction_id = mapped_column(ForeignKey("transactions.id", ondelete="CASCADE")) transaction: Mapped["Transaction"] = relationship(back_populates="messages")
[docs] def to_model(self): return MessageModel( id=self.id, transaction_id=self.transaction_id, kind=self.kind, summary=self.summary, headers=self.headers, body=self.body, created_at=self.created_at.replace(tzinfo=UTC) if self.created_at else self.created_at, )
[docs] @classmethod def from_models(cls, transaction, message, headers, content): serializer = get_serializer_for(message) obj = cls() obj.transaction_id = transaction.id obj.kind = message.kind obj.summary = serializer.summary obj.headers = headers.id obj.body = content.id obj.created_at = message.created_at.astimezone(UTC) return obj
[docs] class MessagesRepository(Repository[Message]): Type = Message
[docs] @with_session async def create(self, values: dict | MessageModel, /, *, session): if isinstance(values, MessageModel): values = dict( id=values.id, transaction_id=values.transaction_id, kind=values.kind, summary=values.summary, headers=values.headers, body=values.body, created_at=values.created_at, ) return await super().create(values, session=session)