Module userland.web.routes.chat
Web chat
Global variables
var REFRESH_THRESHOLD
-
Number of seconds for CSRF token refresh
var TOKEN_EXPIRY
-
Number of seconds for CSRF token expiration
Functions
def chat(user: Annotated[User, Depends(login_user)],
request: starlette.requests.Request) ‑> sse_starlette.sse.EventSourceResponse-
Expand source code
@api.get("/chat/") def chat( user: Annotated[User, Depends(login_user)], request: Request ) -> EventSourceResponse: """Server-sent events for chat EventSource.""" async def generate(): pubsub = redis.pubsub() pubsub.subscribe("chat") redis.publish( "chat", ChatMessage( user=None, message=f"{user.name} has joined" ).model_dump_json(), ) token = _refresh_token(user.name) then = datetime.utcnow() yield ChatToken(token=token).model_dump_json() try: while not await request.is_disconnected(): now = datetime.utcnow() if (now - then).total_seconds() > REFRESH_THRESHOLD: token = _refresh_token(user.name) then = now yield ChatToken(token=token).model_dump_json() message = pubsub.get_message(True) data: bytes if not message: await sleep(0.1) continue data = message["data"] yield data.decode("utf-8") finally: redis.delete(f"{user.name}.chat_csrf") redis.publish( "chat", ChatMessage( user=None, message=f"{user.name} has left" ).model_dump_json(), ) pubsub.close() return EventSourceResponse(generate())
Server-sent events for chat EventSource.
async def post_chat(message: ChatPost,
user: Annotated[User, Depends(login_user)]) ‑> None-
Expand source code
@api.post("/chat/") async def post_chat( message: ChatPost, user: Annotated[User, Depends(login_user)], ) -> None: """ Post a chat message. Args: message: The message object being posted. """ token_bytes: bytes = redis.get(f"{user.name}.chat_csrf") # type: ignore token = token_bytes.decode("utf-8") if token is None or token != message.token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Missing or invalid CSRF token", ) if len(message.message) > MAX_LENGTH: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Too long; message must be <= {MAX_LENGTH}", ) redis.publish( "chat", ChatMessage(user=user.name, message=message.message).model_dump_json(), )
Post a chat message.
Args
message
- The message object being posted.