Overhaul IRC bot implementation

This commit is contained in:
Jake Bauer 2021-10-16 04:10:12 -04:00
parent e4a40c943e
commit d059540ed5
3 changed files with 373 additions and 119 deletions

388
bot.py Executable file → Normal file
View File

@ -1,146 +1,296 @@
#!/usr/bin/env python3
# TODO: Clean up message sending by consolidating all those PRIVMSG bits
# TODO: Make separate adminhelp command for admin-only commands
# Copyright (C) 2020 Jake Bauer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# For basic functionality
import sys
import signal
import socket
import ssl
# For fun and features
import json
import time
import base64
import random
import requests
from threading import Timer
from datetime import datetime
from pytz import timezone
VERSION = "v0.5.0"
AUTHOR = "Jake Bauer"
server = "irc.paritybit.ca"
port = 6697
channels = ["#test"]
botnick = "testbot"
quotesfile = "quotes.txt"
eightBallResponses = ["It is certain.",
"It is decidedly so.",
"Without a doubt.",
"Yes definitely.",
"You may rely on it.",
"As I see it, yes.",
"Most likely.",
"Yes.",
"Signs point to yes.",
"Reply hazy, try again",
"Ask again later.",
"Better not tell you now.",
"Cannot predict now.",
"Concentrate and ask again.",
"Don't count on it."
"My reply is no."
"My sources say no."
"Outlook not so good."
"Very doubtful."]
ircsock = None
# RepeatedTimer class from Stackoverflow question: https://stackoverflow.com/questions/3393612
# Author of Question: https://stackoverflow.com/users/374797/john-howard
# Answer: https://stackoverflow.com/a/13151299
# Author of Answer: https://stackoverflow.com/users/624066
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def send_msg(msg):
ircsock.sendall(msg.encode('utf-8'))
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
def get_channel(ircmsg):
return ircmsg.split(' ')[2]
class Bot:
def __init__(self):
self.VERSION = "v0.8.0"
self.AUTHOR = "Jake Bauer (jbauer)"
self.ircsock = None
self.server = ""
self.port = 0
self.nick = ""
self.password = ""
self.channels = []
self.admins = []
self.eightBallResponses = []
self.quotesfile = ""
self.commandList = "+help +version +quit +quote +time +roll +weather +8ball +repeat +addadmin +rmadmin +admins +chgpass"
self.exception = ""
def load_config(self):
try:
with open ("config.json", "r") as file:
config = json.load(file)
self.server = config.get("server")
self.port = config.get("port")
self.nick = config.get("nick")
self.password = config.get("password") or ""
self.channels = config.get("channels") or []
self.admins = config.get("admins") or []
self.eightBallResponses = config.get("eightBallResponses") or []
self.quotesfile = config.get("quotesfile") or ""
if not self.server or not self.port or not self.nick:
sys.stderr.write("Missing value for server, port, or nick, check your config file!\n")
sys.exit(1)
except Exception as e:
sys.stderr.write("Could not open config file: " + str(e) + "\n")
sys.exit(1)
def init():
global ircsock
# Set up the socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((server, port))
ircsock = ssl.wrap_socket(s)
def save_config(self):
try:
with open ("config.json", "w") as file:
config = {
"server": self.server,
"port": self.port,
"nick": self.nick,
"password": self.password,
"channels": self.channels,
"admins": self.admins,
"eightBallResponses": self.eightBallResponses,
"quotesfile": self.quotesfile
}
json.dump(config, file, indent=4)
except Exception as e:
sys.stderr.write("Could not open config file: " + str(e) + "\n")
# Perform the initial connection
message = "USER " + " " + botnick + " 0 * " + botnick +"\n"
send_msg(message)
message = "NICK "+ botnick +"\n"
send_msg(message)
def listen(self):
incoming = self.ircsock.recv(2048).decode('utf-8').strip('\n\r')
print("\033[32m>>\033[0m " + incoming, flush=True)
return incoming
# Join the configured channels
for channel in channels:
message = "JOIN "+ channel +"\n"
send_msg(message)
def waitfor(self, message):
while True:
incoming = self.listen()
if message in incoming:
return incoming
def main():
while 1:
ircmsg = ircsock.recv(2048).decode('utf-8')
ircmsg = ircmsg.strip('\n\r')
print(ircmsg)
def get_channel(self, message):
return message.split(' ')[2]
if ircmsg.find(":,tbhelp") != -1:
message = "PRIVMSG "+ get_channel(ircmsg) +" :Available commands: ,tbhelp ,tbversion ,tbwhereami ,tbquote ,tbtime ,tbdiceroll ,tbweather ,tb8ball\n"
send_msg(message)
def get_user(self, message):
return message.split(' ')[0].split('!')[0][1:]
elif ircmsg.find(":,tbversion") != -1:
message = "PRIVMSG " + get_channel(ircmsg) + " :" + botnick + "version " + VERSION + " by " + AUTHOR + ".\n"
send_msg(message)
def is_admin(self, user):
if user in self.admins:
return True
return False
elif ircmsg.find(":,tbwhereami") != -1:
message = "PRIVMSG "+ get_channel(ircmsg) +" :You are currently in " + get_channel(ircmsg) + ".\n"
send_msg(message)
def send_message(self, message):
self.ircsock.sendall(message.encode('utf-8'))
print("\033[31m<<\033[0m " + message, end='', flush=True)
elif ircmsg.find(":,tbquote") != -1:
with open(quotesfile, "r") as f:
lines = f.read().splitlines()
selectedLine = random.choice(lines)
message = "PRIVMSG " + get_channel(ircmsg) + " :" + selectedLine + "\n"
send_msg(message)
def announce(self, message):
for channel in self.channels:
self.send_message(message)
elif ircmsg.find(":,tbtime") != -1:
try:
message = "PRIVMSG " + get_channel(ircmsg) + " :" + datetime.now(timezone(ircmsg.split(' ')[4])).strftime('%Y-%m-%d %H:%M:%S') + "\n"
except Exception as e:
message = "PRIVMSG " + get_channel(ircmsg) + " :Please specify a valid timezone (e.g. ,tbtime America/Toronto).\n"
send_msg(message)
def handle_exception(self, incoming, exception):
self.exception = exception
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :An unexpected error occurred. Run +showexception to see what went wrong.\n")
elif ircmsg.find(":,tbdiceroll") != -1:
try:
message = "PRIVMSG " + get_channel(ircmsg) + " :" + str(random.randint(1, int(ircmsg.split(' ')[4]))) + "\n"
except Exception as e:
message = "PRIVMSG " + get_channel(ircmsg) + " :Please specify a maximum number (e.g. ,tbdiceroll 20).\n"
send_msg(message)
def repeat(self, channel, message):
self.send_message("PRIVMSG " + channel + " :" + message + "\n")
elif ircmsg.find(":,tbweather") != -1:
try:
city = ircmsg.split(' ')[4]
url = "https://wttr.in/" + city + "?m&format=3"
response = requests.get(url)
message = "PRIVMSG " + get_channel(ircmsg) + " :" + response.text.strip('\n\r') + "\n"
except Exception as e:
message = "PRIVMSG " + get_channel(ircmsg) + " :Please specify a city (e.g. ,tbweather Toronto).\n"
send_msg(message)
def connect(self):
sys.stderr.write("Connecting to: " + self.server + ":" + str(self.port) + "\n")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.server, self.port))
self.ircsock = ssl.wrap_socket(s)
self.send_message("USER " + self.nick + " 0 * " + self.nick + "\n")
self.send_message("NICK "+ self.nick + "\n")
self.sasl_auth() or self.nickserv_auth()
for channel in self.channels:
self.send_message("JOIN "+ channel +"\n")
elif ircmsg.find(":,tb8ball") != -1:
try:
ircmsg.split(' ')[4]
message = "PRIVMSG " + get_channel(ircmsg) + " :" + random.choice(eightBallResponses) + "\n"
except Exception as e:
message = "PRIVMSG " + get_channel(ircmsg) + " :Please ask me a question.\n"
send_msg(message)
def sasl_auth(self):
self.send_message("CAP REQ :sasl\n")
incoming = self.waitfor("CAP")
if "ACK :sasl" in incoming:
self.send_message("AUTHENTICATE PLAIN\n")
self.waitfor("+")
creds = self.nick + "\0" + self.nick + "\0" + self.password
creds = base64.b64encode(creds.encode('utf8')).decode('utf8')
self.send_message("AUTHENTICATE " + creds + "\n")
incoming = self.waitfor(":SASL authentication")
if ("903" in incoming):
self.send_message("CAP END\n")
return True
else:
return False
else:
return False
elif ircmsg.find("PING ") != -1:
print("Responding to PING.")
message = "PONG :" + botnick + "\n"
send_msg(message)
def nickserv_auth(self):
self.send_message("PRIVMSG NickServ :" + self.password + "\n")
incoming = self.waitfor("NickServ")
if ("You are now identified"):
return True
else:
return False
init()
main()
def disconnect(self):
self.send_message("QUIT :Quitting...\n")
self.ircsock.close()
def quit (self, incoming):
if self.is_admin(incoming):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :*sigh* alright then...\n")
self.disconnect()
self.save_config()
return True
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :You don't have permission to use this command.\n")
return False
def command_help(self, incoming):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Available commands: " + self.commandList + "\n")
def command_version(self, incoming):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + self.nick + "version " + self.VERSION + " by " + self.AUTHOR + ".\n")
def command_quote(self, incoming):
with open(self.quotesfile, "r") as f:
lines = f.read().splitlines()
selectedLine = random.choice(lines)
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + selectedLine + "\n")
def command_time(self, incoming):
try:
message = "PRIVMSG " + self.get_channel(incoming) + " :" + datetime.now(timezone(incoming.split(' ')[4])).strftime('%Y-%m-%d %H:%M:%S') + "\n"
except:
message = "PRIVMSG " + self.get_channel(incoming) + " :Please specify a valid timezone (e.g. +time America/Toronto).\n"
self.send_message(message)
def command_roll(self, incoming):
try:
message = "PRIVMSG " + self.get_channel(incoming) + " :" + str(random.randint(1, int(incoming.split(' ')[4]))) + "\n"
except:
message = "PRIVMSG " + self.get_channel(incoming) + " :Please specify a maximum number (e.g. '+roll 20').\n"
self.send_message(message)
def command_weather(self, incoming):
try:
city = incoming.split(' ')[4]
url = "https://wttr.in/" + city + "?m&format=3"
response = requests.get(url)
message = "PRIVMSG " + self.get_channel(incoming) + " :" + response.text.strip('\n\r') + "\n"
except:
message = "PRIVMSG " + self.get_channel(incoming) + " :Please specify a city (e.g. '+weather Toronto').\n"
self.send_message(message)
def command_eightball(self, incoming):
try:
incoming.split(' ')[4]
message = "PRIVMSG " + self.get_channel(incoming) + " :" + random.choice(self.eightBallResponses) + "\n"
except Exception as e:
print(e)
message = "PRIVMSG " + self.get_channel(incoming) + " :Please ask me a question.\n"
self.send_message(message)
def command_repeatsetup(self, incoming):
if not self.is_admin(incoming):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :You don't have permission to use this command.\n")
return
try:
channel = self.get_channel(incoming)
interval = int(incoming.split(' ')[4])
message = ' '.join(incoming.split(' ')[5:])
except Exception as e:
print(e)
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Invalid syntax. Expected +repeat <seconds> <message>\n")
rt = RepeatedTimer(interval, self.repeat, channel, message)
def command_admins(self, incoming):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Admins: " + ', '.join(self.admins) + "\n")
def command_addadmin(self, incoming):
try:
candidate = incoming.split(' ')[4]
except:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Invalid syntax. Expected +addadmin <nick>\n")
if self.is_admin(self.get_user(incoming)):
if self.is_admin(candidate):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + candidate + " is already an admin.\n")
self.admins.append(candidate)
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + candidate + " granted admin privileges.\n")
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :You don't have permission to use this command.\n")
def command_rmadmin(self, incoming):
try:
candidate = incoming.split(' ')[4]
except:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Invalid syntax. Expected +rmadmin <nick>\n")
if self.is_admin(self.get_user(incoming)):
if not self.is_admin(candidate):
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + candidate + " is not an admin.\n")
self.admins.remove(candidate)
self.send_message("PRIVMSG " + self.get_channel(incoming) + " : Revoked " + candidate + "'s admin privileges.\n")
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :You don't have permission to use this command.\n")
def command_showexception(self, incoming):
if self.is_admin(self.get_user(incoming)):
if not self.exception:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " : No exceptions occurred... yet.\n")
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :" + str(self.exception) + "\n")
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :You don't have permission to use this command.\n")
def command_chgpass(self, incoming):
try:
password = incoming.split(' ')[4]
if self.is_admin(self.get_user(incoming)):
self.send_message("PRIVMSG NickServ :SET PASSWORD " + password + "\n")
response = self.waitfor("NickServ")
if "successfully" in response:
self.password = password
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Password changed.\n")
else:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Failed to change password.\n")
except:
self.send_message("PRIVMSG " + self.get_channel(incoming) + " :Invalid syntax. Expected +chgpass <new_password>\n")

34
config.default.json Normal file
View File

@ -0,0 +1,34 @@
{
"server": "irc.example.com",
"port": 6697,
"nick": "botty_mcbotface",
"password": "password",
"channels": [
"#example"
],
"admins": [
"botmaster"
],
"eightBallResponses": [
"It is certain.",
"It is decidedly so.",
"Without a doubt.",
"Yes \u2013 definitely.",
"You may rely on it.",
"As I see it, yes.",
"Most likely.",
"Yes.",
"Signs point to yes.",
"Reply hazy, try again",
"Ask again later.",
"Better not tell you now.",
"Cannot predict now.",
"Concentrate and ask again.",
"Don't count on it.",
"My reply is no.",
"My sources say no.",
"Outlook not so good.",
"Very doubtful."
],
"quotesfile": "quotes.txt"
}

70
run.py Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
import sys
import signal
from bot import Bot
def main():
bot = Bot()
bot.load_config()
bot.connect()
def signal_handler(sig, frame):
print("\n!!CAUGHT SIGINT!!")
bot.disconnect()
bot.save_config()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
while True:
try:
incoming = bot.listen()
# Properly respond to DMs
if "PING " in incoming:
print("Responding to PING.")
message = "PONG :" + bot.nick + "\n"
bot.send_message(message)
continue
if bot.get_channel(incoming) == bot.nick:
incoming = incoming.replace(bot.nick, bot.get_user(incoming))
if ":+quit" in incoming:
quit = bot.quit(incoming)
if quit:
sys.exit(0)
elif ":+help" in incoming:
bot.command_help(incoming)
elif ":+version" in incoming:
bot.command_version(incoming)
elif ":+quote" in incoming:
bot.command_quote(incoming)
elif ":+time" in incoming:
bot.command_time(incoming)
elif ":+roll" in incoming:
bot.command_roll(incoming)
elif ":+weather" in incoming:
bot.command_weather(incoming)
elif ":+8ball" in incoming:
bot.command_eightball(incoming)
elif ":+repeat" in incoming:
bot.command_repeatsetup(incoming)
elif ":+addadmin" in incoming:
bot.command_addadmin(incoming)
elif ":+rmadmin" in incoming:
bot.command_rmadmin(incoming)
elif ":+admins" in incoming:
bot.command_admins(incoming)
elif ":+showexception" in incoming:
bot.command_showexception(incoming)
elif ":+chgpass" in incoming:
bot.command_chgpass(incoming)
except Exception as e:
bot.handle_exception(incoming, e)
continue
if __name__ == '__main__':
main()