Module aethersprite.extensions.base.roles
Roles self-service cog
Expand source code
"""Roles self-service cog"""
# stdlib
import asyncio as aio
from datetime import datetime, timedelta
# 3rd party
from discord import Color, Embed, Message
from discord.errors import NotFound
from discord.ext.commands import Bot, check, command, Context
from discord.raw_models import RawReactionActionEvent
from sqlitedict import SqliteDict
# local
from aethersprite import bot, data_folder, log
from aethersprite.authz import channel_only, require_admin
from aethersprite.common import FakeContext, seconds_to_str
from aethersprite.filters import RoleFilter
from aethersprite.settings import register, settings
loop = aio.get_event_loop()
# constants
DIGIT_SUFFIX = "\ufe0f\u20e3"
# database
postdb_file = f"{data_folder}roles.sqlite3"
posts = SqliteDict(postdb_file, tablename="selfserv_posts", autocommit=True)
directories = SqliteDict(postdb_file, tablename="catalog", autocommit=True)
class DirectoryUpdateFilter(RoleFilter):
"""Automatically update directory post when roles.catalog is updated"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def in_(self, ctx: Context, value: str) -> list[int] | None:
"""Filter input."""
assert ctx.guild
val = super().in_(ctx, value)
directory = (
directories[ctx.guild.id] if ctx.guild.id in directories else None
)
if directory is None:
return val
chan = ctx.guild.get_channel(directory["channel"])
if chan is None:
return val
async def update():
try:
msg = await chan.fetch_message( # type: ignore
directory["message"],
)
await _get_message(ctx, msg)
except NotFound:
pass
aio.ensure_future(update())
return val
roles_filter = DirectoryUpdateFilter("roles.catalog")
async def _get_message(
ctx: Context,
msg: Message | None = None,
expiry: str | None = None,
):
roles_: list[str] = settings["roles.catalog"].get(ctx)[:10] # type: ignore
embed = Embed(
title=":billed_cap: Available roles",
description="Use post reactions to manage role membership",
color=Color.purple(),
)
if expiry is not None:
embed.set_footer(text=f"This post will be deleted in {expiry}.")
count = 0
for role in sorted(roles_, key=lambda x: x.lower()):
embed.add_field(name=f"{count}{DIGIT_SUFFIX} {role}", value="\u200b")
count += 1
if msg is None:
msg = await ctx.send(embed=embed)
else:
await msg.edit(embed=embed)
await msg.clear_reactions()
for i in range(0, count):
await msg.add_reaction(f"{i}{DIGIT_SUFFIX}")
return msg
@command()
@check(channel_only)
async def roles(ctx: Context):
"""Manage your membership in available roles"""
assert ctx.guild
expiry_raw: int = settings["roles.postexpiry"].get(ctx) # type: ignore
expiry = seconds_to_str(expiry_raw)
roles_ = settings["roles.catalog"].get(ctx)
if roles_ is None or len(roles_) == 0:
await ctx.send(
":person_shrugging: There are no available self-service roles."
)
log.warn(
f"{ctx.author} invoked roles self-service, but no roles are available"
)
return
msg = await _get_message(ctx, expiry=expiry)
posts[msg.id] = {
"guild": ctx.guild.id,
"channel": ctx.channel.id,
"expiry": datetime.utcnow() + timedelta(seconds=expiry_raw),
}
log.info(f"{ctx.author} invoked roles self-service")
loop.call_later(expiry_raw, _delete, msg.id)
await ctx.message.delete()
@command()
@check(channel_only)
@check(require_admin)
async def catalog(ctx: Context):
"""Create a permanent roles catalog post in the current channel."""
assert ctx.guild
roles_ = settings["roles.catalog"].get(ctx)
if roles_ is None or len(roles_) == 0:
await ctx.send(
":person_shrugging: There are no available " "self-service roles."
)
log.warn(
f"{ctx.author} attempted to post roles catalog, but no roles "
"are available"
)
return
guild_id = str(ctx.guild.id)
if guild_id in directories:
existing = directories[guild_id]
chan = ctx.guild.get_channel(existing["channel"])
if chan is not None:
try:
msg = await chan.fetch_message( # type: ignore
existing["message"],
)
await msg.delete()
except NotFound:
pass
msg = await _get_message(ctx)
directories[guild_id] = {"message": msg.id, "channel": ctx.channel.id}
log.info(f"{ctx.author} posted roles catalog to {ctx.channel}")
await ctx.message.delete()
async def on_raw_reaction_add(payload: RawReactionActionEvent):
"""Handle on_reaction_add event."""
assert bot.user
assert payload.guild_id
if payload.user_id == bot.user.id:
return
directory = (
directories[payload.guild_id]
if payload.guild_id in directories
else None
)
if payload.message_id not in posts and (
directory is None or payload.message_id != directory["message"]
):
return
guild = bot.get_guild(payload.guild_id)
assert guild
channel = guild.get_channel(payload.channel_id)
assert channel
message = await channel.fetch_message( # type: ignore
payload.message_id,
)
member = guild.get_member(payload.user_id)
assert member
split = str(payload.emoji).split("\ufe0f")
if len(split) != 2:
await message.remove_reaction(payload.emoji, member)
return
fake_ctx = FakeContext(guild=guild)
setting: list[int] = settings["roles.catalog"].get(
fake_ctx,
raw=True, # type: ignore
)
roles_ = sorted(
[r for r in guild.roles if r.id in setting],
key=lambda x: x.name.lower(),
)
which = int(split[0])
if which < 0 or which > len(roles_):
await message.remove_reaction(payload.emoji, member)
return
role = roles_[which]
await member.add_roles(role)
log.info(f"{member} added role {role}")
async def on_raw_reaction_remove(payload: RawReactionActionEvent):
"""Handle on_reaction_remove event."""
assert bot.user
assert payload.guild_id
if payload.user_id == bot.user.id:
return
directory = (
directories[payload.guild_id]
if payload.guild_id in directories
else None
)
if payload.message_id not in posts and (
directory is None or payload.message_id != directory["message"]
):
return
split = str(payload.emoji).split("\ufe0f")
if len(split) != 2:
return
guild = bot.get_guild(payload.guild_id)
assert guild
member = guild.get_member(payload.user_id)
assert member
fake_ctx = FakeContext(guild=guild)
setting: list[int] = settings["roles.catalog"].get(
fake_ctx,
raw=True, # type: ignore
)
roles_ = sorted(
[r for r in guild.roles if r.id in setting],
key=lambda x: x.name.lower(),
)
which = int(split[0])
if which < 0 or which > len(roles_):
return
role = roles_[which]
await member.remove_roles(role)
log.info(f"{member} removed role {role}")
async def on_ready():
"""Clear expired/missing roles posts on startup."""
# clean up missing directories
for guild_id, directory in directories.items():
try:
guild = bot.get_guild(int(guild_id))
assert guild
chan = guild.get_channel(directory["channel"])
assert chan
msg = await chan.fetch_message( # type: ignore
directory["message"],
)
except NotFound:
log.warn(f"Deleted missing directory post for {guild_id}")
del directories[guild_id]
# clean up expired posts
now = datetime.utcnow()
for id, msg in posts.items():
if msg["expiry"] <= now:
_delete(id)
else:
expiry: datetime = msg["expiry"]
diff = (expiry - now).total_seconds()
loop.call_later(diff, _delete, id)
log.debug(f"Scheduled deletion of self-service post {id}")
def _delete(id: int):
if id not in posts:
return
post = posts[id]
guild = bot.get_guild(post["guild"])
assert guild
channel = guild.get_channel(post["channel"])
async def f():
try:
msg: Message = await channel.fetch_message( # type: ignore
id,
)
await msg.delete()
except NotFound:
pass
del posts[id]
aio.ensure_future(f())
log.info(f"Deleted roles self-service post {id}")
async def setup(bot: Bot):
# settings
register(
"roles.catalog",
None,
lambda x: True,
False,
"The roles members are allowed to add/remove themselves",
filter=roles_filter,
)
register(
"roles.postexpiry",
60,
lambda x: True,
False,
"The length of time (in seconds) to keep self-service posts",
)
# events
bot.add_listener(on_raw_reaction_add)
bot.add_listener(on_raw_reaction_remove)
bot.add_listener(on_ready)
bot.add_command(catalog)
bot.add_command(roles)
async def teardown(bot):
global settings
del settings["roles.catalog"]
Functions
async def on_raw_reaction_add(payload: discord.raw_models.RawReactionActionEvent)
-
Handle on_reaction_add event.
Expand source code
async def on_raw_reaction_add(payload: RawReactionActionEvent): """Handle on_reaction_add event.""" assert bot.user assert payload.guild_id if payload.user_id == bot.user.id: return directory = ( directories[payload.guild_id] if payload.guild_id in directories else None ) if payload.message_id not in posts and ( directory is None or payload.message_id != directory["message"] ): return guild = bot.get_guild(payload.guild_id) assert guild channel = guild.get_channel(payload.channel_id) assert channel message = await channel.fetch_message( # type: ignore payload.message_id, ) member = guild.get_member(payload.user_id) assert member split = str(payload.emoji).split("\ufe0f") if len(split) != 2: await message.remove_reaction(payload.emoji, member) return fake_ctx = FakeContext(guild=guild) setting: list[int] = settings["roles.catalog"].get( fake_ctx, raw=True, # type: ignore ) roles_ = sorted( [r for r in guild.roles if r.id in setting], key=lambda x: x.name.lower(), ) which = int(split[0]) if which < 0 or which > len(roles_): await message.remove_reaction(payload.emoji, member) return role = roles_[which] await member.add_roles(role) log.info(f"{member} added role {role}")
async def on_raw_reaction_remove(payload: discord.raw_models.RawReactionActionEvent)
-
Handle on_reaction_remove event.
Expand source code
async def on_raw_reaction_remove(payload: RawReactionActionEvent): """Handle on_reaction_remove event.""" assert bot.user assert payload.guild_id if payload.user_id == bot.user.id: return directory = ( directories[payload.guild_id] if payload.guild_id in directories else None ) if payload.message_id not in posts and ( directory is None or payload.message_id != directory["message"] ): return split = str(payload.emoji).split("\ufe0f") if len(split) != 2: return guild = bot.get_guild(payload.guild_id) assert guild member = guild.get_member(payload.user_id) assert member fake_ctx = FakeContext(guild=guild) setting: list[int] = settings["roles.catalog"].get( fake_ctx, raw=True, # type: ignore ) roles_ = sorted( [r for r in guild.roles if r.id in setting], key=lambda x: x.name.lower(), ) which = int(split[0]) if which < 0 or which > len(roles_): return role = roles_[which] await member.remove_roles(role) log.info(f"{member} removed role {role}")
async def on_ready()
-
Clear expired/missing roles posts on startup.
Expand source code
async def on_ready(): """Clear expired/missing roles posts on startup.""" # clean up missing directories for guild_id, directory in directories.items(): try: guild = bot.get_guild(int(guild_id)) assert guild chan = guild.get_channel(directory["channel"]) assert chan msg = await chan.fetch_message( # type: ignore directory["message"], ) except NotFound: log.warn(f"Deleted missing directory post for {guild_id}") del directories[guild_id] # clean up expired posts now = datetime.utcnow() for id, msg in posts.items(): if msg["expiry"] <= now: _delete(id) else: expiry: datetime = msg["expiry"] diff = (expiry - now).total_seconds() loop.call_later(diff, _delete, id) log.debug(f"Scheduled deletion of self-service post {id}")
async def setup(bot: discord.ext.commands.bot.Bot)
-
Expand source code
async def setup(bot: Bot): # settings register( "roles.catalog", None, lambda x: True, False, "The roles members are allowed to add/remove themselves", filter=roles_filter, ) register( "roles.postexpiry", 60, lambda x: True, False, "The length of time (in seconds) to keep self-service posts", ) # events bot.add_listener(on_raw_reaction_add) bot.add_listener(on_raw_reaction_remove) bot.add_listener(on_ready) bot.add_command(catalog) bot.add_command(roles)
async def teardown(bot)
-
Expand source code
async def teardown(bot): global settings del settings["roles.catalog"]
Classes
class DirectoryUpdateFilter (*args, **kwargs)
-
Automatically update directory post when roles.catalog is updated
Expand source code
class DirectoryUpdateFilter(RoleFilter): """Automatically update directory post when roles.catalog is updated""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def in_(self, ctx: Context, value: str) -> list[int] | None: """Filter input.""" assert ctx.guild val = super().in_(ctx, value) directory = ( directories[ctx.guild.id] if ctx.guild.id in directories else None ) if directory is None: return val chan = ctx.guild.get_channel(directory["channel"]) if chan is None: return val async def update(): try: msg = await chan.fetch_message( # type: ignore directory["message"], ) await _get_message(ctx, msg) except NotFound: pass aio.ensure_future(update()) return val
Ancestors
Methods
def in_(self, ctx: discord.ext.commands.context.Context, value: str) ‑> list[int] | None
-
Filter input.
Expand source code
def in_(self, ctx: Context, value: str) -> list[int] | None: """Filter input.""" assert ctx.guild val = super().in_(ctx, value) directory = ( directories[ctx.guild.id] if ctx.guild.id in directories else None ) if directory is None: return val chan = ctx.guild.get_channel(directory["channel"]) if chan is None: return val async def update(): try: msg = await chan.fetch_message( # type: ignore directory["message"], ) await _get_message(ctx, msg) except NotFound: pass aio.ensure_future(update()) return val
Inherited members