Module userland.web.routes.chat

Web chat

Global variables

var REFRESH_THRESHOLD

Number of seconds for CSRF token refresh

var TOKEN_EXPIRY

Number of seconds for CSRF token expiration

Functions

def chat(user: Annotated[User, Depends(login_user)],
request: starlette.requests.Request) ‑> sse_starlette.sse.EventSourceResponse
Expand source code
@api.get("/chat/")
def chat(
    user: Annotated[User, Depends(login_user)], request: Request
) -> EventSourceResponse:
    """Server-sent events for chat EventSource."""

    async def generate():
        pubsub = redis.pubsub()
        pubsub.subscribe("chat")
        redis.publish(
            "chat",
            ChatMessage(
                user=None, message=f"{user.name} has joined"
            ).model_dump_json(),
        )
        token = _refresh_token(user.name)
        then = datetime.utcnow()
        yield ChatToken(token=token).model_dump_json()

        try:
            while not await request.is_disconnected():
                now = datetime.utcnow()

                if (now - then).total_seconds() > REFRESH_THRESHOLD:
                    token = _refresh_token(user.name)
                    then = now
                    yield ChatToken(token=token).model_dump_json()

                message = pubsub.get_message(True)
                data: bytes

                if not message:
                    await sleep(0.1)
                    continue

                data = message["data"]
                yield data.decode("utf-8")
        finally:
            redis.delete(f"{user.name}.chat_csrf")
            redis.publish(
                "chat",
                ChatMessage(
                    user=None, message=f"{user.name} has left"
                ).model_dump_json(),
            )
            pubsub.close()

    return EventSourceResponse(generate())

Server-sent events for chat EventSource.

async def post_chat(message: ChatPost,
user: Annotated[User, Depends(login_user)]) ‑> None
Expand source code
@api.post("/chat/")
async def post_chat(
    message: ChatPost,
    user: Annotated[User, Depends(login_user)],
) -> None:
    """
    Post a chat message.

    Args:
        message: The message object being posted.
    """

    token_bytes: bytes = redis.get(f"{user.name}.chat_csrf")  # type: ignore
    token = token_bytes.decode("utf-8")

    if token is None or token != message.token:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Missing or invalid CSRF token",
        )

    if len(message.message) > MAX_LENGTH:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"Too long; message must be <= {MAX_LENGTH}",
        )

    redis.publish(
        "chat",
        ChatMessage(user=user.name, message=message.message).model_dump_json(),
    )

Post a chat message.

Args

message
The message object being posted.