Module userland.scripts.messages.app

Messages Textual app

Global variables

var RATE_LIMIT_SECONDS

Time to wait before allowing refresh after empty query result

Classes

class MessageFilter
Expand source code
class MessageFilter:
    """Data class for filtering messages"""

    private = False
    """If `True`, show private messages; if `False`, show public messages"""

    tags: list[str] | None = None
    """Tag name(s) to filter for"""

Data class for filtering messages

Class variables

var private

If True, show private messages; if False, show public messages

var tags : list[str] | None

Tag name(s) to filter for

class MessagesApp (context: SSHContext,
**kwargs)
Expand source code
class MessagesApp(BannerApp):
    """Message bases Textual app"""

    BANNER_PADDING = 8

    BINDINGS = [
        ("escape", "quit", "Exit"),
        ("n", "compose", "Compose"),
        ("r", "reply", "Reply"),
        ("f", "filter", "Filter"),
    ]

    CSS_PATH = path.join(path.dirname(__file__), "styles.tcss")

    filter: MessageFilter
    """Current message filter"""

    _first = -1
    """Newest message ID"""

    _last = -1
    """Oldest message ID"""

    _recent_query: datetime | None = None
    """Time of last query"""

    _last_query_empty = False
    """If last query had no results"""

    def __init__(
        self,
        context: SSHContext,
        **kwargs,
    ):
        ""  # empty docstring
        self.filter = MessageFilter()
        super(MessagesApp, self).__init__(context, **kwargs)

    async def _allow_refresh(self) -> bool:
        """Avoid database call for a while if last refresh was empty."""

        now = datetime.utcnow()
        if (
            self._recent_query is not None
            and self._last_query_empty
            and (now - self._recent_query).total_seconds() < RATE_LIMIT_SECONDS
        ):
            return False

        self._recent_query = now
        return True

    async def _load_messages(self, newer=False) -> None:
        """Load messages and append/prepend to ListView."""

        lv: ListView = self.query_one(ListView)
        first = len(lv.children) == 0
        limit = lv.region.height * 2
        query_limit = limit if first else lv.region.height
        messages: Sequence[Tuple[int, str, str]]

        if first:
            messages = await get_latest_messages(self.filter.tags, query_limit)
        elif newer:
            messages = await get_newer_messages(
                self._last, self.filter.tags, query_limit
            )
        else:
            messages = await get_older_messages(
                self._first, self.filter.tags, query_limit
            )

        # remember if result was empty for rate limiting refresh
        if not messages:
            self._last_query_empty = True
        else:
            self._last_query_empty = False

        coros = []

        # append/prepend items to ListView
        for idx, [message_id, title, author] in enumerate(messages):
            if self._first == -1 or message_id < self._first:
                self._first = message_id

            if self._last == -1 or message_id > self._last:
                self._last = message_id

            item = ListItem(
                Label(
                    f"[italic][white]#{message_id}[/][/]",
                    classes="message_id",
                ),
                Label(title, classes="message_title", markup=False),
                Label(author, classes="message_author", markup=False),
                id=f"message_{message_id}",
                classes="even" if idx % 2 else "",
            )

            coros.append(lv.append(item))

            # add to top if pulling newer messages
            if newer and not first:
                lv.move_child(item, before=0)

        # if this is the first load, we're done!
        if first:
            for c in coros:
                await c

            return

        count = len(lv.children)
        last_index = lv.index or 0

        # trim ListView items to limit
        if count > limit:
            lv.index = None
            how_many = count - limit

            for _ in range(how_many):
                if newer:
                    # remove from bottom if loading newer items
                    coros.append(lv.children[-1].remove())
                else:
                    # remove from top if loading older items
                    coros.append(lv.children[0].remove())

            # adjust CSS striping
            for idx, c in enumerate(lv.children):
                c.set_classes("even" if idx % 2 else "")

            # set new first/last message ID
            assert lv.children[-1].id
            self._first = int(lv.children[-1].id.split("_")[1])
            assert lv.children[0].id
            self._last = int(lv.children[0].id.split("_")[1])

            # restore ListView selection index
            if newer:
                lv.index = how_many
            else:
                lv.index = last_index - how_many

        for c in coros:
            await c

        # keep selected item in view
        lv.scroll_to_widget(lv.children[lv.index], animate=False)  # type: ignore

    async def _update_tags(self, tags: list[str]) -> None:
        lv = self.query_one(ListView)
        await lv.clear()
        self.filter.tags = tags
        await self._load_messages()
        lv.index = 0
        lv.focus()

    def compose(self) -> ComposeResult:
        # load widgets from BannerApp
        for widget in super(MessagesApp, self).compose():
            yield widget

        yield ListView(id="messages_list")
        yield Footer()

    async def action_compose(self) -> None:
        if self.screen.id != "_default":
            # not in message list screen; pop screen first
            self.pop_screen()
            return await self.action_compose()

        await self.push_screen(EditorScreen())

    async def action_filter(self) -> None:
        if self.screen.id != "_default":
            return

        await self.push_screen(
            FilterModal(tags=self.filter.tags),
            self._update_tags,  # type: ignore
        )

    async def action_reply(self) -> None:
        # not in message list screen; pop screen first
        if self.screen.id != "_default":
            self.pop_screen()
            return await self.action_reply()

        lv: ListView = self.query_one(ListView)
        assert lv.index is not None
        selected = lv.children[lv.index]
        assert selected.id
        message_id = int(selected.id.split("_")[1])

        async with db_session() as db:
            message: Message = (
                await db.exec(
                    select(Message)
                    .where(Message.id == message_id)
                    .options(joinedload(Message.author))  # type: ignore
                )
            ).one()

            await self.push_screen(
                EditorScreen(
                    content=(
                        "\n\n---"
                        f"\n\n{
                            message.author.name if message.author else 'Unknown'
                        } wrote:"
                        f"\n\n{message.content}"
                    ),
                    reply_to=message,
                )
            )

    async def action_quit(self) -> None:
        ""  # empty docstring
        self.exit()

    async def on_key(self, event: events.Key) -> None:
        if self.screen.id != "_default":
            return

        if event.key not in [
            "down",
            "end",
            "home",
            "pagedown",
            "pageup",
            "up",
        ]:
            return

        lv: ListView = self.get_widget_by_id("messages_list")  # type: ignore

        if event.key in ("home", "pageup"):
            if lv.index == 0 and self._allow_refresh():
                # hit top boundary; load newer messages
                await self._load_messages(newer=True)

            lv.index = 0

        elif event.key in ("end", "pagedown"):
            last = len(lv.children) - 1

            if lv.index == last and self._allow_refresh():
                # hit bottom boundary; load older messages
                await self._load_messages()

            lv.index = last

        elif event.key == "up" and lv.index == 0 and self._allow_refresh():
            # hit top boundary; load newer messages
            await self._load_messages(newer=True)

        elif (
            event.key == "down"
            and lv.index == len(lv.children) - 1
            and self._allow_refresh()
        ):
            # hit bottom boundary; load older messages
            await self._load_messages()

    async def on_list_view_selected(self, event: ListView.Selected) -> None:
        """Load selected message in MarkdownViewer."""

        assert event.item.id
        message_id = int(event.item.id.split("_")[1])

        async with db_session() as db:
            message = (
                await db.exec(
                    select(Message)
                    .where(Message.id == message_id)
                    .options(
                        joinedload(Message.author),  # type: ignore
                        joinedload(Message.recipient),  # type: ignore
                    )
                )
            ).one()
            tags = (
                await db.exec(
                    select(MessageTags.tag_name).where(
                        MessageTags.message_id == message_id
                    )
                )
            ).all()

        assert message
        await self.push_screen(ViewScreen(message=message, tags=tags))

    async def on_event(self, event: events.Event | events.MouseScrollDown):
        ""  # empty docstring
        await super(MessagesApp, self).on_event(event)

        try:
            if self.screen.id != "_default":
                return
        except ScreenStackError:
            return

        down = False

        if isinstance(event, events.MouseScrollDown):
            down = True
        elif isinstance(event, events.MouseScrollUp):
            down = False
        else:
            return

        lv = self.query_one(ListView)

        if down and lv.is_vertical_scroll_end and await self._allow_refresh():
            await self._load_messages()
            lv.index = len(lv.children) - round(lv.region.height * 1.25)
            lv.scroll_end(animate=False)
            lv.scroll_to(None, lv.index, animate=False)

        elif (
            not down and lv.scroll_offset.y == 0 and await self._allow_refresh()
        ):
            await self._load_messages(newer=True)
            lv.index = round(lv.region.height * 1.25)
            lv.scroll_home(animate=False)
            lv.scroll_to(None, lv.index - lv.region.height + 1, animate=False)

    async def on_ready(self) -> None:
        """App is ready; load messages."""

        # halt briefly for banner to fully load
        await sleep(0.1)
        await self._load_messages()
        lv = self.query_one(ListView)
        lv.index = 0

Message bases Textual app

Ancestors

  • BannerApp
  • XthuluApp
  • textual.app.App
  • typing.Generic
  • textual.dom.DOMNode
  • textual.message_pump.MessagePump

Class variables

var BINDINGS
var CSS_PATH
var filterMessageFilter

Current message filter

Methods

async def action_compose(self) ‑> None
Expand source code
async def action_compose(self) -> None:
    if self.screen.id != "_default":
        # not in message list screen; pop screen first
        self.pop_screen()
        return await self.action_compose()

    await self.push_screen(EditorScreen())
async def action_filter(self) ‑> None
Expand source code
async def action_filter(self) -> None:
    if self.screen.id != "_default":
        return

    await self.push_screen(
        FilterModal(tags=self.filter.tags),
        self._update_tags,  # type: ignore
    )
async def action_quit(self) ‑> None
Expand source code
async def action_quit(self) -> None:
    ""  # empty docstring
    self.exit()
async def action_reply(self) ‑> None
Expand source code
async def action_reply(self) -> None:
    # not in message list screen; pop screen first
    if self.screen.id != "_default":
        self.pop_screen()
        return await self.action_reply()

    lv: ListView = self.query_one(ListView)
    assert lv.index is not None
    selected = lv.children[lv.index]
    assert selected.id
    message_id = int(selected.id.split("_")[1])

    async with db_session() as db:
        message: Message = (
            await db.exec(
                select(Message)
                .where(Message.id == message_id)
                .options(joinedload(Message.author))  # type: ignore
            )
        ).one()

        await self.push_screen(
            EditorScreen(
                content=(
                    "\n\n---"
                    f"\n\n{
                        message.author.name if message.author else 'Unknown'
                    } wrote:"
                    f"\n\n{message.content}"
                ),
                reply_to=message,
            )
        )
def compose(self) ‑> Iterable[textual.widget.Widget]
Expand source code
def compose(self) -> ComposeResult:
    # load widgets from BannerApp
    for widget in super(MessagesApp, self).compose():
        yield widget

    yield ListView(id="messages_list")
    yield Footer()
async def on_event(self, event: textual.events.Event | textual.events.MouseScrollDown)
Expand source code
async def on_event(self, event: events.Event | events.MouseScrollDown):
    ""  # empty docstring
    await super(MessagesApp, self).on_event(event)

    try:
        if self.screen.id != "_default":
            return
    except ScreenStackError:
        return

    down = False

    if isinstance(event, events.MouseScrollDown):
        down = True
    elif isinstance(event, events.MouseScrollUp):
        down = False
    else:
        return

    lv = self.query_one(ListView)

    if down and lv.is_vertical_scroll_end and await self._allow_refresh():
        await self._load_messages()
        lv.index = len(lv.children) - round(lv.region.height * 1.25)
        lv.scroll_end(animate=False)
        lv.scroll_to(None, lv.index, animate=False)

    elif (
        not down and lv.scroll_offset.y == 0 and await self._allow_refresh()
    ):
        await self._load_messages(newer=True)
        lv.index = round(lv.region.height * 1.25)
        lv.scroll_home(animate=False)
        lv.scroll_to(None, lv.index - lv.region.height + 1, animate=False)
async def on_key(self, event: textual.events.Key) ‑> None
Expand source code
async def on_key(self, event: events.Key) -> None:
    if self.screen.id != "_default":
        return

    if event.key not in [
        "down",
        "end",
        "home",
        "pagedown",
        "pageup",
        "up",
    ]:
        return

    lv: ListView = self.get_widget_by_id("messages_list")  # type: ignore

    if event.key in ("home", "pageup"):
        if lv.index == 0 and self._allow_refresh():
            # hit top boundary; load newer messages
            await self._load_messages(newer=True)

        lv.index = 0

    elif event.key in ("end", "pagedown"):
        last = len(lv.children) - 1

        if lv.index == last and self._allow_refresh():
            # hit bottom boundary; load older messages
            await self._load_messages()

        lv.index = last

    elif event.key == "up" and lv.index == 0 and self._allow_refresh():
        # hit top boundary; load newer messages
        await self._load_messages(newer=True)

    elif (
        event.key == "down"
        and lv.index == len(lv.children) - 1
        and self._allow_refresh()
    ):
        # hit bottom boundary; load older messages
        await self._load_messages()
async def on_list_view_selected(self, event: textual.widgets._list_view.ListView.Selected) ‑> None
Expand source code
async def on_list_view_selected(self, event: ListView.Selected) -> None:
    """Load selected message in MarkdownViewer."""

    assert event.item.id
    message_id = int(event.item.id.split("_")[1])

    async with db_session() as db:
        message = (
            await db.exec(
                select(Message)
                .where(Message.id == message_id)
                .options(
                    joinedload(Message.author),  # type: ignore
                    joinedload(Message.recipient),  # type: ignore
                )
            )
        ).one()
        tags = (
            await db.exec(
                select(MessageTags.tag_name).where(
                    MessageTags.message_id == message_id
                )
            )
        ).all()

    assert message
    await self.push_screen(ViewScreen(message=message, tags=tags))

Load selected message in MarkdownViewer.

async def on_ready(self) ‑> None
Expand source code
async def on_ready(self) -> None:
    """App is ready; load messages."""

    # halt briefly for banner to fully load
    await sleep(0.1)
    await self._load_messages()
    lv = self.query_one(ListView)
    lv.index = 0

App is ready; load messages.

Inherited members