Module aethersprite.extensions.base.poll
Poll cog
Expand source code
"Poll cog"
# stdlib
from datetime import datetime
from functools import partial
import re
# 3rd party
from discord import Color, Embed, Message, Member
from discord.ext.commands import check, command, Context
from discord.ext.commands.bot import Bot
from discord.raw_models import RawReactionActionEvent
from sqlitedict import SqliteDict
# api
from aethersprite import data_folder, log
from aethersprite.authz import channel_only, owner, require_roles_from_setting
from aethersprite.emotes import (
BUTTON_SUFFIX,
CHECK_MARK,
PROHIBITED,
SHADE_BLOCK,
SOLID_BLOCK,
THUMBS_DOWN,
WASTEBASKET,
)
from aethersprite.filters import RoleFilter
from aethersprite.settings import register, settings
# constants
BAR_WIDTH = 20
POLL_EXPIRY = 86400 * 90 # 90 days
bot: Bot
# database
polls = SqliteDict(
f"{data_folder}poll.sqlite3", tablename="polls", autocommit=True
)
# filters
create_filter = RoleFilter("poll.createroles")
vote_filter = RoleFilter("poll.voteroles")
# authz checks
authz_create = partial(require_roles_from_setting, setting="poll.createroles")
authz_vote = partial(require_roles_from_setting, setting="poll.voteroles")
@command()
@check(channel_only)
async def poll(ctx: Context, *, options: str):
"""
Create a poll
To create a poll, options must be provided. Separate options with commas. You may provide a prompt if you wish by encasing the first argument to the command in brackets.
To delete a poll, use both the Delete and Confirm reactions. Only a moderator, administrator, or the creator of the poll may delete it.
Examples:
!poll The dress is green, The dress is gold
!poll [Do you see what I see?] Yes, No
"""
match = re.match(r"^(?:\[([^\]]+)\]\s*)?(.+)$", options)
if match is None:
await ctx.message.add_reaction(THUMBS_DOWN)
log.warn(f"{ctx.author} Provided invalid arguments: {options}")
return
prompt, qstr = match.groups()
count = 1
opts = {}
for s in qstr.split(","):
emoji = f"{count}{BUTTON_SUFFIX}"
opt = s.strip()
opts[emoji] = {"text": opt, "count": 0, "votes": set([])}
count += 1
poll = {
"timestamp": datetime.utcnow(),
"author": ctx.author.display_name,
"author_id": ctx.author.id,
"avatar": ctx.author.display_avatar.url,
"prompt": prompt,
"options": opts,
"open": 1,
"delete": set([]),
"confirm": set([]),
}
msg: Message = await ctx.send(embed=_get_embed(poll))
for emoji in opts.keys():
await msg.add_reaction(emoji)
await msg.add_reaction(PROHIBITED)
await msg.add_reaction(WASTEBASKET)
await msg.add_reaction(CHECK_MARK)
polls[msg.id] = poll
log.info(f"{ctx.author} created poll: {poll!r}")
await ctx.message.delete()
def _get_embed(poll: dict):
total = sum([int(o["count"]) for _, o in poll["options"].items()])
open = "open" if poll["open"] else "closed"
prohib_text = "Close" if poll["open"] else "Open"
embed = Embed(
title=f':bar_chart: {poll["prompt"] or "Poll"}',
description=f"Poll is: {open}",
color=Color.blue(),
)
embed.set_author(name=poll["author"], icon_url=poll["avatar"])
embed.set_footer(
text=f"{PROHIBITED} {prohib_text} | "
f"{WASTEBASKET} Delete | {CHECK_MARK} Confirm"
)
for key, opt in poll["options"].items():
count = int(opt["count"])
rawpct = round(
0 if (total == 0 or count == 0) else (count / total) * 100, 2
)
pct = 0 if (total == 0 or count == 0) else round((count / total) * 20)
left = 20 - pct
bar = f"{SOLID_BLOCK * pct}{SHADE_BLOCK * left}"
embed.add_field(
name=f'{key} {opt["text"]}',
inline=False,
value=f'{bar} {opt["count"]} ({rawpct}%)',
)
return embed
async def _update_poll(
member: Member,
message: Message,
emoji: str,
adjustment: int,
):
poll = polls[message.id]
opts = poll["options"]
opt = opts[emoji]
opt["count"] += adjustment
acted = False
if adjustment > 0:
if member.id in opt["votes"]:
# correct count if they voted but reacts are out of sync
opt["count"] -= adjustment
else:
acted = True
opt["votes"].add(member.id)
elif adjustment < 0 and member.id in opt["votes"]:
acted = True
opt["votes"].remove(member.id)
opts[emoji] = opt
poll["options"] = opts
polls[message.id] = poll
verb = "voted" if adjustment > 0 else "retracted vote"
if acted:
log.info(f'{member} {verb} for {emoji} - {poll["prompt"]}')
else:
log.warn(
f"Ignored vote for {emoji} by {member} in {message.id} - "
f'{poll["prompt"]} (reacts are out of sync)'
)
await message.edit(embed=_get_embed(poll))
def _allowed(setting: str, message: Message, member: Member) -> bool:
perms = message.channel.permissions_for(member)
poll = polls[message.id]
# allow administrators, owners, moderators, bot owners, and poll author
if (
perms.administrator
or perms.manage_channels
or perms.manage_guild
or owner == str(member)
or member.id == poll["author_id"]
):
return True
stg = settings[setting].get(message, raw=True)
if stg is None:
log.debug(f"No roles configured ({setting}), allowing by default")
return True
role_ids = [int(r) for r in stg]
for r in member.roles:
if r.id in role_ids:
log.debug("Role found, allowing")
return True
log.debug("Not allowed")
return False
async def on_raw_reaction_add(payload: RawReactionActionEvent):
"""Handle on_reaction_add event."""
assert bot.user
assert payload.member
if payload.user_id == bot.user.id or payload.message_id not in polls:
return
poll = polls[payload.message_id]
channel = payload.member.guild.get_channel(payload.channel_id)
assert channel
msg: Message = await channel.fetch_message( # type: ignore
payload.message_id,
)
async def _delete():
assert payload.member
prompt = poll["prompt"]
delete = payload.member.id in poll["delete"]
confirm = payload.member.id in poll["confirm"]
if delete and confirm:
await msg.delete()
del polls[msg.id]
log.info(f"{payload.member} deleted poll {msg.id} - {prompt}")
if _allowed("poll.createroles", msg, payload.member):
if payload.emoji.name == WASTEBASKET:
poll["delete"].add(payload.member.id)
polls[msg.id] = poll
await _delete()
return
if payload.emoji.name == CHECK_MARK:
poll["confirm"].add(payload.member.id)
polls[msg.id] = poll
await _delete()
return
if payload.emoji.name == PROHIBITED:
poll["open"] = False
polls[msg.id] = poll
await msg.edit(embed=_get_embed(poll))
return
opts = poll["options"]
if (
payload.emoji.name not in opts
or not poll["open"]
or not _allowed("poll.voteroles", msg, payload.member)
):
await msg.remove_reaction(payload.emoji.name, payload.member)
return
await _update_poll(payload.member, msg, payload.emoji.name, 1)
async def on_raw_reaction_remove(payload: RawReactionActionEvent):
"Handle on_reaction_remove event."
assert bot.user
assert payload.guild_id
if payload.user_id == bot.user.id or payload.message_id not in polls:
return
poll = polls[payload.message_id]
guild = bot.get_guild(payload.guild_id)
assert guild
member = guild.get_member(payload.user_id)
assert member
channel = guild.get_channel(payload.channel_id)
assert channel
msg: Message = await channel.fetch_message( # type: ignore
payload.message_id,
)
if payload.emoji.name == WASTEBASKET and member.id in poll["delete"]:
poll["delete"].remove(member.id)
polls[msg.id] = poll
return
if payload.emoji.name == CHECK_MARK and member.id in poll["confirm"]:
poll["confirm"].remove(member.id)
polls[msg.id] = poll
return
if payload.emoji.name == PROHIBITED and _allowed(
"poll.createroles", msg, member
):
poll["open"] = True
polls[msg.id] = poll
await msg.edit(embed=_get_embed(poll))
return
if payload.emoji.name not in poll["options"] or not poll["open"]:
return
await _update_poll(member, msg, payload.emoji.name, -1)
async def on_ready():
# clear out old polls
now = datetime.utcnow()
for k, p in polls.items():
ts: datetime = p["timestamp"]
if (now - ts).total_seconds() >= POLL_EXPIRY:
del polls[k]
async def setup(bot_: Bot):
global bot
bot = bot_
# settings
register(
"poll.createroles",
None,
lambda _: True,
False,
"Roles allowed to create polls. Defaults to anyone.",
filter=create_filter,
)
register(
"poll.voteroles",
None,
lambda _: True,
False,
"Roles allowed to vote in polls. Defaults to anyone.",
filter=vote_filter,
)
# events
bot.add_listener(on_raw_reaction_add)
bot.add_listener(on_raw_reaction_remove)
bot.add_listener(on_ready)
bot.add_command(poll)
async def teardown(bot: Bot):
global settings
for key in (
"poll.createroles",
"poll.voteroles",
):
del settings[key]
Functions
async def on_raw_reaction_add(payload: discord.raw_models.RawReactionActionEvent)
-
Handle on_reaction_add event.
Expand source code
async def on_raw_reaction_add(payload: RawReactionActionEvent): """Handle on_reaction_add event.""" assert bot.user assert payload.member if payload.user_id == bot.user.id or payload.message_id not in polls: return poll = polls[payload.message_id] channel = payload.member.guild.get_channel(payload.channel_id) assert channel msg: Message = await channel.fetch_message( # type: ignore payload.message_id, ) async def _delete(): assert payload.member prompt = poll["prompt"] delete = payload.member.id in poll["delete"] confirm = payload.member.id in poll["confirm"] if delete and confirm: await msg.delete() del polls[msg.id] log.info(f"{payload.member} deleted poll {msg.id} - {prompt}") if _allowed("poll.createroles", msg, payload.member): if payload.emoji.name == WASTEBASKET: poll["delete"].add(payload.member.id) polls[msg.id] = poll await _delete() return if payload.emoji.name == CHECK_MARK: poll["confirm"].add(payload.member.id) polls[msg.id] = poll await _delete() return if payload.emoji.name == PROHIBITED: poll["open"] = False polls[msg.id] = poll await msg.edit(embed=_get_embed(poll)) return opts = poll["options"] if ( payload.emoji.name not in opts or not poll["open"] or not _allowed("poll.voteroles", msg, payload.member) ): await msg.remove_reaction(payload.emoji.name, payload.member) return await _update_poll(payload.member, msg, payload.emoji.name, 1)
async def on_raw_reaction_remove(payload: discord.raw_models.RawReactionActionEvent)
-
Handle on_reaction_remove event.
Expand source code
async def on_raw_reaction_remove(payload: RawReactionActionEvent): "Handle on_reaction_remove event." assert bot.user assert payload.guild_id if payload.user_id == bot.user.id or payload.message_id not in polls: return poll = polls[payload.message_id] guild = bot.get_guild(payload.guild_id) assert guild member = guild.get_member(payload.user_id) assert member channel = guild.get_channel(payload.channel_id) assert channel msg: Message = await channel.fetch_message( # type: ignore payload.message_id, ) if payload.emoji.name == WASTEBASKET and member.id in poll["delete"]: poll["delete"].remove(member.id) polls[msg.id] = poll return if payload.emoji.name == CHECK_MARK and member.id in poll["confirm"]: poll["confirm"].remove(member.id) polls[msg.id] = poll return if payload.emoji.name == PROHIBITED and _allowed( "poll.createroles", msg, member ): poll["open"] = True polls[msg.id] = poll await msg.edit(embed=_get_embed(poll)) return if payload.emoji.name not in poll["options"] or not poll["open"]: return await _update_poll(member, msg, payload.emoji.name, -1)
async def on_ready()
-
Expand source code
async def on_ready(): # clear out old polls now = datetime.utcnow() for k, p in polls.items(): ts: datetime = p["timestamp"] if (now - ts).total_seconds() >= POLL_EXPIRY: del polls[k]
async def setup(bot_: discord.ext.commands.bot.Bot)
-
Expand source code
async def setup(bot_: Bot): global bot bot = bot_ # settings register( "poll.createroles", None, lambda _: True, False, "Roles allowed to create polls. Defaults to anyone.", filter=create_filter, ) register( "poll.voteroles", None, lambda _: True, False, "Roles allowed to vote in polls. Defaults to anyone.", filter=vote_filter, ) # events bot.add_listener(on_raw_reaction_add) bot.add_listener(on_raw_reaction_remove) bot.add_listener(on_ready) bot.add_command(poll)
async def teardown(bot: discord.ext.commands.bot.Bot)
-
Expand source code
async def teardown(bot: Bot): global settings for key in ( "poll.createroles", "poll.voteroles", ): del settings[key]