Module userland.scripts.messages.app

Messages Textual app

Global variables

var RATE_LIMIT_SECONDS

Time to wait before allowing refresh after empty query result

Classes

class MessageFilter

Data class for filtering messages

Expand source code
class MessageFilter:
    """Data class for filtering messages"""

    private = False
    """If `True`, show private messages; if `False`, show public messages"""

    tags: list[str] | None = None
    """Tag name(s) to filter for"""

Class variables

var private

If True, show private messages; if False, show public messages

var tags : list[str] | None

Tag name(s) to filter for

class MessagesApp (context: SSHContext, **kwargs)

Message bases Textual app

Create an instance of an app.

Args

driver_class
Driver class or None to auto-detect. This will be used by some Textual tools.
css_path
Path to CSS or None to use the CSS_PATH class variable. To load multiple CSS files, pass a list of strings or paths which will be loaded in order.
watch_css
Reload CSS if the files changed. This is set automatically if you are using textual run with the dev switch.

Raises

CssPathError
When the supplied CSS path(s) are an unexpected type.
Expand source code
class MessagesApp(BannerApp):
    """Message bases Textual app"""

    BINDINGS = [
        ("f", "filter", "Filter"),
        ("n", "compose", "Compose"),
        ("r", "reply", "Reply"),
        ("escape", "", "Exit"),
    ]

    CSS = """
        $highlight: ansi_yellow;

        ListItem {
            background: $primary-background;
            layout: horizontal;
        }

        ListItem.even {
            background: $secondary-background;
        }

        ListView ListItem.--highlight {
            background: $highlight 50%;
        }

        ListView:focus ListItem.--highlight {
            background: $highlight;
        }

        ListItem Label.message_id {
            margin-right: 1;
        }
    """
    """Stylesheet"""

    filter: MessageFilter
    """Current message filter"""

    _first = -1
    """Newest message ID"""

    _last = -1
    """Oldest message ID"""

    _recent_query: datetime | None = None
    """Time of last query"""

    _last_query_empty = False
    """If last query had no results"""

    def __init__(
        self,
        context: SSHContext,
        **kwargs,
    ):
        self.filter = MessageFilter()
        super().__init__(context, **kwargs)

    async def _allow_refresh(self) -> bool:
        """Avoid database call for a while if last refresh was empty."""

        now = datetime.utcnow()
        if (
            self._recent_query is not None
            and self._last_query_empty
            and (now - self._recent_query).total_seconds() < RATE_LIMIT_SECONDS
        ):
            return False

        self._recent_query = now
        return True

    async def _load_messages(self, newer=False) -> None:
        """Load messages and append/prepend to ListView."""

        lv: ListView = self.query_one(ListView)
        first = len(lv.children) == 0
        limit = lv.region.height * 2
        query_limit = limit if first else lv.region.height
        messages: dict

        if first:
            messages = await get_latest_messages(self.filter.tags, query_limit)
        elif newer:
            messages = await get_newer_messages(
                self._last, self.filter.tags, query_limit
            )
        else:
            messages = await get_older_messages(
                self._first, self.filter.tags, query_limit
            )

        # remember if result was empty for rate limiting refresh
        if not messages:
            self._last_query_empty = True
        else:
            self._last_query_empty = False

        coros = []

        # append/prepend items to ListView
        for idx, m in enumerate(messages):
            message_id = int(m["id"])

            if self._first == -1 or message_id < self._first:
                self._first = message_id

            if self._last == -1 or message_id > self._last:
                self._last = message_id

            item = ListItem(
                Label(
                    f"[italic][white]({m['id']})[/][/]", classes="message_id"
                ),
                Label(m["title"], classes="message_title", markup=False),
                id=f"message_{m['id']}",
                classes="even" if idx % 2 else "",
            )

            coros.append(lv.append(item))

            # add to top if pulling newer messages
            if newer and not first:
                lv.move_child(item, before=0)

        # if this is the first load, we're done!
        if first:
            for c in coros:
                await c

            return

        count = len(lv.children)
        last_index = lv.index or 0

        # trim ListView items to limit
        if count > limit:
            lv.index = None
            how_many = count - limit

            for _ in range(how_many):
                if newer:
                    # remove from bottom if loading newer items
                    coros.append(lv.children[-1].remove())
                else:
                    # remove from top if loading older items
                    coros.append(lv.children[0].remove())

            # fix CSS striping
            for idx, c in enumerate(lv.children):
                c.set_classes("even" if idx % 2 else "")

            # set new first/last message ID
            assert lv.children[-1].id
            self._first = int(lv.children[-1].id.split("_")[1])
            assert lv.children[0].id
            self._last = int(lv.children[0].id.split("_")[1])

            # restore ListView selection index
            if newer:
                lv.index = how_many
            else:
                lv.index = last_index - how_many

        for c in coros:
            await c

        # keep selected item in view
        lv.scroll_to_widget(lv.children[lv.index], animate=False)  # type: ignore

    async def _update_tags(self, tags: list[str]) -> None:
        lv = self.query_one(ListView)
        await lv.clear()
        self.filter.tags = [t for t in tags if t != ""]
        await self._load_messages()
        lv.index = 0
        lv.focus()

    def compose(self) -> ComposeResult:
        # load widgets from BannerApp
        for widget in super().compose():
            yield widget

        yield ListView(id="messages_list")
        yield Footer()

    async def action_compose(self) -> None:
        try:
            self.query_one(ListView)
        except NoMatches:
            # not in message list screen; pop screen first
            self.pop_screen()
            return await self.action_compose()

        await self.push_screen(EditorScreen())

    async def action_filter(self) -> None:
        try:
            self.query_one(ListView)
        except NoMatches:
            # not in message list screen; pop screen first
            self.pop_screen()
            return await self.action_filter()

        await self.push_screen(
            FilterModal(tags=self.filter.tags), self._update_tags
        )

    async def action_reply(self) -> None:
        try:
            lv: ListView = self.query_one(ListView)
        except NoMatches:
            # not in message list screen; pop screen first
            self.pop_screen()
            return await self.action_reply()

        assert lv.index is not None
        selected = lv.children[lv.index]
        assert selected.id
        message_id = int(selected.id.split("_")[1])
        message: Message = await db.one(
            Message.load(author=User.on(Message.author_id == User.id)).where(
                Message.id == message_id
            )
        )
        await self.push_screen(
            EditorScreen(
                content=(
                    f"\n\n---\n\n{message.author.name} wrote:"
                    f"\n\n{message.content}"
                ),
                reply_to=message,
            )
        )

    async def on_key(self, event: events.Key) -> None:
        if event.key not in [
            "down",
            "end",
            "escape",
            "home",
            "pagedown",
            "pageup",
            "up",
        ]:
            return

        try:
            lv: ListView = self.get_widget_by_id("messages_list")  # type: ignore
        except NoMatches:
            # we're not in the messages list screen; bail out
            return

        if event.key == "escape":
            self.exit()

        if event.key in ("home", "pageup"):
            if lv.index == 0 and self._allow_refresh():
                # hit top boundary; load newer messages
                await self._load_messages(newer=True)

            lv.index = 0

        elif event.key in ("end", "pagedown"):
            last = len(lv.children) - 1

            if lv.index == last and self._allow_refresh():
                # hit bottom boundary; load older messages
                await self._load_messages()

            lv.index = last

        elif event.key == "up" and lv.index == 0 and self._allow_refresh():
            # hit top boundary; load newer messages
            await self._load_messages(newer=True)

        elif (
            event.key == "down"
            and lv.index == len(lv.children) - 1
            and self._allow_refresh()
        ):
            # hit bottom boundary; load older messages
            await self._load_messages()

    async def on_list_view_selected(self, event: ListView.Selected) -> None:
        """Load selected message in MarkdownViewer."""

        assert event.item.id
        message_id = int(event.item.id.split("_")[1])
        message: Message = await Message.get(message_id)
        await self.push_screen(ViewScreen(message=message))

    async def on_event(self, event: events.Event | events.MouseScrollDown):
        await super().on_event(event)
        scroll = False
        down = False

        if isinstance(event, events.MouseScrollDown):
            scroll = True
            down = True

        elif isinstance(event, events.MouseScrollUp):
            scroll = True
            down = False

        if not scroll:
            return

        try:
            lv = self.query_one(ListView)
        except NoMatches:
            # no ListView; might be in modal/editor
            return

        if down and lv.is_vertical_scroll_end and await self._allow_refresh():
            await self._load_messages()
            lv.index = len(lv.children) - round(lv.region.height * 1.25)
            lv.scroll_end(animate=False)
            lv.scroll_to(None, lv.index, animate=False)

        elif (
            not down and lv.scroll_offset.y == 0 and await self._allow_refresh()
        ):
            await self._load_messages(newer=True)
            lv.index = round(lv.region.height * 1.25)
            lv.scroll_home(animate=False)
            lv.scroll_to(None, lv.index - lv.region.height + 1, animate=False)

    async def on_ready(self) -> None:
        """App is ready; load messages."""

        # halt briefly for banner to fully load
        await sleep(0.1)
        await self._load_messages()
        lv = self.query_one(ListView)
        lv.index = 0
        lv.focus()

Ancestors

  • BannerApp
  • XthuluApp
  • textual.app.App
  • typing.Generic
  • textual.dom.DOMNode
  • textual.message_pump.MessagePump

Class variables

var BINDINGS
var CSS

Stylesheet

var filterMessageFilter

Current message filter

Methods

async def action_compose(self) ‑> None
async def action_filter(self) ‑> None
async def action_reply(self) ‑> None
async def on_event(self, event: textual.events.Event | textual.events.MouseScrollDown)

Called to process an event.

Args

event
An Event object.
async def on_key(self, event: textual.events.Key) ‑> None
async def on_list_view_selected(self, event: textual.widgets._list_view.ListView.Selected) ‑> None

Load selected message in MarkdownViewer.

async def on_ready(self) ‑> None

App is ready; load messages.

Inherited members