197 lines
7.9 KiB
Python
197 lines
7.9 KiB
Python
import collections
|
|
from typing import Union
|
|
import discord
|
|
from discord.ext import commands
|
|
import math
|
|
from datetime import datetime, timedelta
|
|
import pymysql
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
password = open("../sqlPass.txt", 'r').read()
|
|
|
|
scheduler = AsyncIOScheduler({
|
|
'apscheduler.jobstores.default': {
|
|
'type': 'sqlalchemy',
|
|
'url': f'mysql+pymysql://discord_bot:{password}@localhost:5618/discord_bot?charset=utf8mb4',
|
|
},
|
|
'apscheduler.job_defaults.coalesce': 'True',
|
|
'apscheduler.timezone': 'America/Chicago'
|
|
})
|
|
|
|
scheduler.start()
|
|
|
|
mcSelf = None
|
|
|
|
|
|
def get_con() -> pymysql.Connection:
|
|
return pymysql.connect(host='localhost',
|
|
port=5618,
|
|
user='discord_bot',
|
|
password=f'{password}',
|
|
db='minecraft',
|
|
charset='utf8mb4',
|
|
cursorclass=pymysql.cursors.DictCursor)
|
|
|
|
|
|
async def get_poll_results(choices: dict, pollMessageId: int, channelId: int):
|
|
print("run")
|
|
|
|
getPollResultsReactions = (await McRoll.get_message(mcSelf, pollMessageId, channelId)).reactions
|
|
for reaction in getPollResultsReactions:
|
|
async for user in reaction.users():
|
|
serverIP = reaction_to_serverip(reaction)
|
|
if not user.bot:
|
|
choices[serverIP]['votes'] = choices[serverIP]['votes'] + 1
|
|
|
|
choices[serverIP]['score'] = choices[serverIP]['votes'] * round(
|
|
2 * math.log10(choices[serverIP]['lastActivated'] + 1) + 1, 2)
|
|
prune_results(choices)
|
|
if len(choices) >= 3:
|
|
await McRoll.main_poll_recursion(mcSelf, choices, channelId)
|
|
else:
|
|
await McRoll.declare_winner(mcSelf, choices, channelId)
|
|
|
|
|
|
def reaction_to_serverip(reaction: Union[discord.Reaction, str]) -> int:
|
|
con = get_con()
|
|
reaction = str(reaction.emoji.id) \
|
|
if isinstance(reaction.emoji, discord.Emoji) else reaction.emoji.encode('unicode_escape').decode('utf-8')
|
|
|
|
with con.cursor() as cursor:
|
|
cursor.execute("SELECT serverIP FROM minecraft.server_list WHERE reaction=%s;", reaction)
|
|
serverIp = cursor.fetchone()
|
|
con.close()
|
|
return serverIp['serverIP']
|
|
|
|
|
|
def get_choices() -> dict:
|
|
con = get_con()
|
|
choices = {}
|
|
with con.cursor() as cursor:
|
|
cursor.execute("SELECT serverIP, serverName, lastActivated, reaction, getEmoji, onlyServer "
|
|
"FROM minecraft.server_list WHERE lastActivated is not null and permanent=0")
|
|
row = cursor.fetchone()
|
|
while row is not None:
|
|
row['votes'] = 0
|
|
row['score'] = 0
|
|
choices[row['serverIP']] = row
|
|
row = cursor.fetchone()
|
|
con.close()
|
|
return choices
|
|
|
|
|
|
def pop_lowest(choices: dict):
|
|
lowest = {'score': 10}
|
|
pop = False
|
|
for choice in list(choices):
|
|
if choices[choice]['score'] < lowest['score']:
|
|
lowest = choices[choice]
|
|
pop = True
|
|
if pop:
|
|
choices.pop(lowest['serverIP'])
|
|
|
|
|
|
def prune_results(choices: dict):
|
|
for serverIp, choice in list(choices.items()):
|
|
if choice['votes'] <= 0:
|
|
del choices[serverIp]
|
|
if len(choices) >= 4:
|
|
pop_lowest(choices)
|
|
pop_lowest(choices)
|
|
elif len(choices) == 3:
|
|
pop_lowest(choices)
|
|
|
|
|
|
class McRoll(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.embedInfo = """Each server has a multiplier based on how long it has been sense that server has been
|
|
rolled. When this poll is over all servers with 0 votes will be disregarded. If there are 4 or more servers
|
|
left the two with the lowest votes will be disregarded. Poll is rerun until there are only 2 servers,
|
|
these two are the winners. Ties reroll the poll. You can only vote for one server per poll. """
|
|
self.pollMessageId = None
|
|
global mcSelf
|
|
mcSelf = self # This is a bad way of doing things but it works
|
|
|
|
@commands.command()
|
|
async def mc_roll(self, ctx):
|
|
if ctx.message.author.id != 305589587215122432:
|
|
return
|
|
choices = get_choices()
|
|
await self.main_poll_recursion(choices, ctx.channel.id)
|
|
|
|
async def main_poll_recursion(self, choices, channelId):
|
|
self.pollMessageId = (await self.poll(choices, channelId)).id
|
|
scheduler.add_job(get_poll_results, 'date',
|
|
run_date=(datetime.now() + timedelta(seconds=10)),
|
|
args=[choices, self.pollMessageId, channelId],
|
|
coalesce=True)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_reaction_add(self, reaction, user):
|
|
if self.pollMessageId is None:
|
|
return
|
|
reactedUsers = {}
|
|
if reaction.message.id == self.pollMessageId and user.id != 533427166193385494:
|
|
for iReaction in reaction.message.reactions:
|
|
async for iUser in iReaction.users():
|
|
if iUser.id != 533427166193385494:
|
|
if iUser.id not in reactedUsers:
|
|
reactedUsers[iUser.id] = False
|
|
if reactedUsers[iUser.id]:
|
|
await reaction.remove(user)
|
|
self.pollMessageId = reaction.message.id
|
|
await user.send(
|
|
"You can only vote for one option, please remove your previous vote to change it.")
|
|
return
|
|
self.pollMessageId = reaction.message.id
|
|
reactedUsers[iUser.id] = True
|
|
|
|
async def poll(self, choices: dict, channel: int) -> discord.Message:
|
|
channel = await self.client.fetch_channel(channel)
|
|
embed = discord.Embed(title="Poll", description=self.embedInfo,
|
|
timestamp=(datetime.utcnow() + timedelta(days=1)))
|
|
for choice in choices.values():
|
|
embed.add_field(
|
|
name=(choice['serverName'] + ' ' +
|
|
(str(self.client.get_emoji(int(choice['reaction'])))
|
|
if choice['getEmoji']
|
|
else bytes(choice['reaction'], "utf-8").decode("unicode_escape"))),
|
|
value=("Multiplier : " + str(round(2 * math.log10(choice['lastActivated'] + 1) + 1, 2)) +
|
|
'\n' + "Last Rolled : " + str(choice['lastActivated'] * 2) + " weeks ago." +
|
|
("\nThis is an only server, meaning if it wins it will be the only winner" if choice['onlyServer']
|
|
else " ")))
|
|
|
|
pollMessage = await channel.send(embed=embed)
|
|
await channel.send('@everyone')
|
|
|
|
for choice in choices.values():
|
|
await pollMessage.add_reaction(self.client.get_emoji(int(choice['reaction']))
|
|
if choice['getEmoji']
|
|
else bytes(choice['reaction'], "utf-8").decode("unicode_escape"))
|
|
return pollMessage
|
|
|
|
async def declare_winner(self, choices, channelId):
|
|
embed = discord.Embed(title="Winner", description="Congratulations")
|
|
for item in list(choices):
|
|
winner = choices[item]
|
|
embed.add_field(
|
|
name=winner['serverName'],
|
|
value="Multiplier : " + str(round(2 * math.log10(winner['lastActivated'] + 1) + 1, 2)) + '\n' +
|
|
"Last Rolled : " + str(winner['lastActivated'] * 2) + " weeks ago." + '\n' +
|
|
"Votes : " + str(winner['votes']) + '\n' +
|
|
"Calculated Votes: " + str(
|
|
winner['votes'] * round(2 * math.log10(winner['lastActivated'] + 1) + 1, 2))
|
|
)
|
|
channel = await self.client.fetch_channel(channelId)
|
|
await channel.send(embed=embed)
|
|
await channel.send("@everyone")
|
|
|
|
async def get_message(self, messageId, channelId):
|
|
channel = await self.client.fetch_channel(channelId)
|
|
return await channel.fetch_message(messageId)
|
|
|
|
|
|
def setup(client):
|
|
client.add_cog(McRoll(client))
|