Module userland.models.message.api

Shared userland messages API

Functions

async def get_latest_messages(tags: list[str] | None = None, limit=100) ‑> Sequence[Tuple[int, str]]
Expand source code
async def get_latest_messages(
    tags: list[str] | None = None, limit=100
) -> Sequence[Tuple[int, str]]:
    """
    Get the latest messages (in descending order).

    Args:
        tags: A list of tags to filter by, if any
        limit: The number of messages to return

    Returns:
        A list of messages matching the provided criteria
    """

    async with db_session() as db:
        return (
            await db.exec(
                get_messages_query(tags)
                .order_by(col(Message.id).desc())
                .limit(limit)
            )
        ).all()  # type: ignore

Get the latest messages (in descending order).

Args

tags
A list of tags to filter by, if any
limit
The number of messages to return

Returns

A list of messages matching the provided criteria

def get_messages_query(tags: list[str] | None = None)
Expand source code
def get_messages_query(tags: list[str] | None = None):
    """
    Query for pulling messages, optionally filtered by tag(s).

    Args:
        tags: A list of tags to filter by (if any)

    Returns:
        A query object
    """

    query = select(Message.id, Message.title)

    return (
        query
        if not tags or len(tags) == 0
        else query.select_from(Message)
        .join(MessageTags)
        .where(
            and_(
                MessageTags.message_id == Message.id,
                MessageTags.tag_name in tags,
            )
        )
    )

Query for pulling messages, optionally filtered by tag(s).

Args

tags
A list of tags to filter by (if any)

Returns

A query object

async def get_newer_messages(id: int, tags: list[str] | None = None, limit=100) ‑> Sequence[Tuple[int, str]]
Expand source code
async def get_newer_messages(
    id: int, tags: list[str] | None = None, limit=100
) -> Sequence[Tuple[int, str]]:
    """
    Get messages newer than the provided ID (in ascending order).

    Args:
        id: The message ID used as an exclusive lower bound
        tags: A list of tags to filter by, if any
        limit: The number of messages to return

    Returns:
        A list of messages matching the provided criteria
    """

    async with db_session() as db:
        return (
            await db.exec(
                get_messages_query(tags)
                .where(Message.id > id)  # type: ignore
                .order_by(col(Message.id).asc())
                .limit(limit)
            )
        ).all()  # type: ignore

Get messages newer than the provided ID (in ascending order).

Args

id
The message ID used as an exclusive lower bound
tags
A list of tags to filter by, if any
limit
The number of messages to return

Returns

A list of messages matching the provided criteria

async def get_older_messages(id: int, tags: list[str] | None = None, limit=100) ‑> Sequence[Tuple[int, str]]
Expand source code
async def get_older_messages(
    id: int, tags: list[str] | None = None, limit=100
) -> Sequence[Tuple[int, str]]:
    """
    Get messages older than the provided ID (in descending order).

    Args:
        id: The message ID used as an exclusive upper bound
        tags: A list of tags to filter by, if any
        limit: The number of messages to return

    Returns:
        A list of messages matching the provided criteria
    """

    async with db_session() as db:
        return (
            await db.exec(
                get_messages_query(tags)
                .where(Message.id < id)  # type: ignore
                .order_by(col(Message.id).desc())
                .limit(limit)
            )
        ).all()

Get messages older than the provided ID (in descending order).

Args

id
The message ID used as an exclusive upper bound
tags
A list of tags to filter by, if any
limit
The number of messages to return

Returns

A list of messages matching the provided criteria