Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ extend-ignore =
# # weird
PIE803 C101 FNE007 FNE008 N812 ANN101 ANN102 PT004 WPS110 WPS111 WPS114 WPS338 WPS407 WPS414 WPS440 VNE001 VNE002 CM001
# too many
WPS200 WPS201 WPS202 WPS203 WPS204 WPS210 WPS211 WPS212 WPS213 WPS214 WPS217 WPS218 WPS221 WPS222 WPS224 WPS230 WPS231 WPS234 WPS235 WPS238
WPS200 WPS201 WPS202 WPS203 WPS204 WPS210 WPS211 WPS212 WPS213 WPS214 WPS217 WPS218 WPS221 WPS222 WPS224 WPS229 WPS230 WPS231 WPS234 WPS235 WPS238
# "vague" imports
WPS347

Expand Down
55 changes: 55 additions & 0 deletions alembic/versions/058_notification_idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""notification-idempotency

Revision ID: 058
Revises: 057
Create Date: 2026-06-15 00:57:00.751009

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "058"
down_revision: Union[str, None] = "057"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"notifications",
sa.Column("idempotency_key", sa.String(length=100), nullable=True),
schema="xi_back_2",
)
op.add_column(
"notifications",
sa.Column("idempotency_expires_at", sa.DateTime(timezone=True), nullable=True),
schema="xi_back_2",
)
op.create_index(
"unique_index_notifications_idempotency",
"notifications",
["idempotency_key"],
unique=True,
schema="xi_back_2",
postgresql_where=sa.text("idempotency_key IS NOT NULL"),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"unique_index_notifications_idempotency",
table_name="notifications",
schema="xi_back_2",
postgresql_where=sa.text("idempotency_key IS NOT NULL"),
)
op.drop_column("notifications", "idempotency_expires_at", schema="xi_back_2")
op.drop_column("notifications", "idempotency_key", schema="xi_back_2")
# ### end Alembic commands ###
25 changes: 22 additions & 3 deletions app/common/schemas/notifications_sch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ class NotificationKind(StrEnum):
STUDENT_RECIPIENT_INVOICE_PAYMENT_CONFIRMED_V1 = auto()

SINGLE_CLASSROOM_EVENT_CREATED_V1 = auto()
CLASSROOM_EVENT_INSTANCE_RESCHEDULED_V1 = auto()
CLASSROOM_EVENT_INSTANCE_RESCHEDULED_V1 = auto() # TODO add `PERSISTED_` for V2
CLASSROOM_EVENT_INSTANCE_CANCELLED_V1 = auto()
CLASSROOM_EVENT_INSTANCE_REMINDER_V1 = auto()

REPEATING_CLASSROOM_EVENT_CREATED_V1 = auto()
CLASSROOM_EVENT_REPETITION_UPDATED_V1 = auto()
CLASSROOM_EVENT_REPETITION_CANCELLED_V1 = auto()

PERSISTED_CLASSROOM_EVENT_INSTANCE_REMINDER_V1 = auto()
REPEATED_CLASSROOM_EVENT_INSTANCE_REMINDER_V1 = auto()

CUSTOM_V1 = auto()


Expand Down Expand Up @@ -62,17 +66,26 @@ class RecipientInvoiceNotificationPayloadSchema(BaseModel):
recipient_invoice_id: int


class ClassroomEventInstanceNotificationPayloadSchema(BaseModel):
class PersistedClassroomEventInstanceNotificationPayloadSchema(BaseModel):
kind: Literal[
NotificationKind.SINGLE_CLASSROOM_EVENT_CREATED_V1,
NotificationKind.CLASSROOM_EVENT_INSTANCE_RESCHEDULED_V1,
NotificationKind.CLASSROOM_EVENT_INSTANCE_CANCELLED_V1,
NotificationKind.PERSISTED_CLASSROOM_EVENT_INSTANCE_REMINDER_V1,
]

classroom_id: int
event_instance_id: UUID


class RepeatedClassroomEventInstanceNotificationPayloadSchema(BaseModel):
kind: Literal[NotificationKind.REPEATED_CLASSROOM_EVENT_INSTANCE_REMINDER_V1,]

classroom_id: int
repetition_mode_id: UUID
instance_index: int


class ClassroomScheduleFocusNotificationPayloadSchema(BaseModel):
kind: Literal[
NotificationKind.REPEATING_CLASSROOM_EVENT_CREATED_V1,
Expand Down Expand Up @@ -100,7 +113,8 @@ class CustomNotificationPayloadSchema(BaseModel):
| EnrollmentNotificationPayloadSchema
| ClassroomNotificationPayloadSchema
| RecipientInvoiceNotificationPayloadSchema
| ClassroomEventInstanceNotificationPayloadSchema
| PersistedClassroomEventInstanceNotificationPayloadSchema
| RepeatedClassroomEventInstanceNotificationPayloadSchema
| ClassroomScheduleFocusNotificationPayloadSchema
| CustomNotificationPayloadSchema,
Field(discriminator="kind"),
Expand Down Expand Up @@ -133,12 +147,17 @@ class ClassroomParticipantRecipientFilterSchema(BaseModel):
]


IdempotencyKeyType = Annotated[str | None, Field(min_length=1, max_length=100)]


class NotificationInputV2Schema(BaseModel):
payload: AnyNotificationPayloadSchema
recipient_filters: Annotated[
list[AnyRecipientFilterSchema],
Field(min_length=1, max_length=100),
]
idempotency_key: IdempotencyKeyType = None
idempotency_expires_at: AwareDatetime | None = None


# TODO (?) add recipient logic to payload instead?
2 changes: 1 addition & 1 deletion app/common/sqlalchemy_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def get_all_with_assumed_limit(
result = list(await self.get_all(stmt.limit(limit)))

if len(result) == limit:
logging.warning(
logging.error(
f"Reached the limit of {limit} in one query",
extra={"stmt": str(stmt)},
)
Expand Down
14 changes: 11 additions & 3 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,18 @@ async def consume_scope(
self,
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[Any],
) -> Any:
async with sessionmaker.begin() as session:
) -> Any: # pragma: no cover
async with sessionmaker() as session:
session_context.set(session)
return await call_next(msg)
try:
result = await call_next(msg)
if session.in_transaction():
await session.commit()
return result
except Exception:
if session.in_transaction():
await session.rollback()
raise


faststream = RedisRouter(
Expand Down
38 changes: 37 additions & 1 deletion app/notifications/models/notifications_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import AwareDatetime, BaseModel, Field, TypeAdapter
from pydantic_marshals.sqlalchemy import MappedModel
from sqlalchemy import DateTime
from sqlalchemy import DateTime, Index, String
from sqlalchemy.orm import Mapped, mapped_column

from app.common.config import Base
Expand Down Expand Up @@ -36,10 +36,46 @@ class Notification(Base):
PydanticJSONType(TypeAdapter(AnyNotificationPayloadSchema))
)

idempotency_key: Mapped[str | None] = mapped_column(String(100))
idempotency_expires_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True),
default=None,
)

__table_args__ = (
Index(
"unique_index_notifications_idempotency",
idempotency_key,
unique=True,
postgresql_where=idempotency_key.is_not(None),
),
)

ResponseSchema = MappedModel.create(
columns=[
id,
(created_at, AwareDatetime),
(payload, AnyNotificationPayloadSchema),
]
)

@classmethod
async def is_idempotency_violated(
cls, idempotency_key: str | None
) -> bool: # pragma: no cover
if idempotency_key is None:
return False

result = await cls.find_first_by_kwargs(idempotency_key=idempotency_key)
if result is None:
return False

if (
result.idempotency_expires_at is not None
and result.idempotency_expires_at < datetime_utc_now()
):
result.idempotency_key = None
result.idempotency_expires_at = None
return False

return True
25 changes: 22 additions & 3 deletions app/notifications/routes/notifications_sub.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio

import sentry_sdk
from faststream.redis import RedisRouter

from app.common.config import settings
from app.common.faststream_ext import build_stream_sub
from app.common.schemas.notifications_sch import NotificationInputV2Schema
from app.common.sqlalchemy_ext import db
from app.notifications.models.notifications_db import Notification
from app.notifications.models.recipient_notifications_db import RecipientNotification
from app.notifications.routes.notifications_sio import NewNotificationEmitter
Expand All @@ -29,6 +31,10 @@ async def send_notification(
emitter: NewNotificationEmitter,
data: NotificationInputV2Schema,
) -> None:
if await Notification.is_idempotency_violated(idempotency_key=data.idempotency_key):
# TODO (?) catch the integrity error instead
return # pragma: no cover

recipient_user_ids = (
await recipients_svc.generate_recipient_user_ids_for_notification(
notification_data=data,
Expand All @@ -38,7 +44,11 @@ async def send_notification(
if len(recipient_user_ids) == 0:
return

notification = await Notification.create(payload=data.payload)
notification = await Notification.create(
payload=data.payload,
idempotency_key=data.idempotency_key,
idempotency_expires_at=data.idempotency_expires_at,
)

await RecipientNotification.create_batch(
{
Expand All @@ -48,7 +58,11 @@ async def send_notification(
for recipient_user_id in recipient_user_ids
)

await asyncio.gather(
await db.session.commit()
# TODO The commit is here to ensure idempotency, but that's not reliable
# in future split this into multiple events (first save to db, then send)

results = await asyncio.gather(
*platform_notification_sender.PlatformNotificationSender(
notification=notification,
emitter=emitter,
Expand All @@ -59,5 +73,10 @@ async def send_notification(
*telegram_notification_sender.TelegramNotificationSender(
notification=notification,
).generate_tasks(recipient_user_ids=recipient_user_ids),
# TODO handle partial failure with `return_exceptions=True`
return_exceptions=True,
)

for result in results:
if result is None:
continue
sentry_sdk.capture_exception(result)
Loading
Loading