Skip to content

Instantly share code, notes, and snippets.

@sbp
Created August 15, 2012 21:46
Show Gist options
  • Save sbp/3364001 to your computer and use it in GitHub Desktop.
Save sbp/3364001 to your computer and use it in GitHub Desktop.
Bot sketch, intended for a phenny-like bot
import re
import socket
import threading
import queue
import multiprocessing
import time
import commands
class Client(object):
def __init__(self, options):
self.received = multiprocessing.Queue()
self.sending = multiprocessing.Queue()
self.messages = multiprocessing.Queue()
self.tasks = multiprocessing.Queue()
self.lock = threading.Lock() # @@ for debugging
self.commands = {}
for obj in commands.__dict__.values():
if not hasattr(obj, "__call__"): continue
if hasattr(obj, "commands"):
for command_name in obj.commands:
self.commands[command_name] = obj
else: self.commands[obj.__name__] = obj
self.options = options
self.socket = socket.socket(socket.AF_INET, socket.TCP_NODELAY)
self.socket.connect((options["host"], options["port"]))
args = (self.socket, self.received, self.lock)
receive = threading.Thread(target=self.receive_loop, args=args)
receive.start()
args = (self.socket, self.sending, self.lock)
send = threading.Thread(target=self.send_loop, args=args)
send.start()
args = (self.received, self.messages)
messages = multiprocessing.Process(target=self.receive_messages, args=args)
messages.start()
time.sleep(1)
self.message("NICK", ["srgsfaef"])
self.message("USER", ["sefesfa", "8", "*", "irc.py"])
for channel in self.options["channels"]:
self.message("JOIN", [channel])
while True:
message = self.messages.get()
self.received_message(message)
@staticmethod
def receive_loop(socket, received, lock):
with socket.makefile("rb") as sockfile:
for octets in sockfile:
received.put(octets)
with lock:
print("RECV:", octets)
@staticmethod
def send_loop(socket, sending, lock):
with socket.makefile("wb") as sockfile:
while True:
octets = sending.get()
if octets is StopIteration: break
with lock:
sockfile.write(octets)
sockfile.flush()
print("SENT:", octets)
@staticmethod
def receive_messages(input, output):
r_message = re.compile(br'(?:(:.*?) )?(.*?) (.*)')
r_address = re.compile(br':?([^!@]*)!?([^@]*)@?(.*)')
r_param = re.compile(br'(?:^|(?<= ))(:.*|[^ ]+)')
while True:
octets = input.get()
octets = octets.rstrip(b'\r\n')
a = r_message.match(octets)
if not a: continue
prefix, command, params = a.groups()
if prefix:
b = r_address.match(prefix)
if b: prefix = b.groups()
params = r_param.findall(params)
if params and params[-1].startswith(b':'):
params[-1] = params[-1][1:]
message = {}
message["command"] = command.decode("ascii", "replace")
message["prefix"] = {}
message["prefix"]["nick"] = prefix[0].decode("ascii", "replace")
message["prefix"]["user"] = prefix[1].decode("ascii", "replace")
message["prefix"]["host"] = prefix[2].decode("ascii", "replace")
def gamut(param):
encodings = ["utf-8", "iso-8859-1", "cp1252"]
for encoding in encodings:
try: return param.decode(encoding)
except UnicodeDecodeError as err: continue
return param.decode("utf-8", "replace")
message["parameters"] = [gamut(param) for param in params]
message["octets"] = octets
output.put(message)
def received_message(self, message):
if message["command"] == "PRIVMSG":
self.received_privmsg(message)
else:
self.received_other(message)
def received_privmsg(self, message):
input = message["parameters"][1]
if input.startswith(".") and " " in input:
command, arg = input.split(" ", 1)
command = command[1:]
if command in self.commands:
class Context(object):
def __init__(self, send_message, message, arg):
self.nick = message["prefix"]["nick"]
self.sender = message["parameters"][0]
self.input = message["parameters"][1]
self.message = message
self.arg = arg
self.send_message = send_message
def say(self, text):
self.send_message("PRIVMSG", [self.sender, text])
def reply(self, text):
text = self.nick + ": " + text
self.send_message("PRIVMSG", [self.sender, text])
c = multiprocessing.Process(
target= self.commands[command],
args=(Context(self.message, message, arg),)
)
c.start()
def received_other(self, message):
...
def _received_message(self, message):
print(message)
if message["command"] == "PRIVMSG":
message_text = message["parameters"][1]
if message_text.startswith(".") and " " in message_text:
command, arg = message_text.split(" ", 1)
command = command[1:]
if command in self.commands:
c = multiprocessing.Process(
target= self.commands[command],
args=(message, arg, self.sending)
)
c.start()
def message(self, command, params=None):
if params:
params[-1] = ":" + params[-1]
command += " " + " ".join(params)
command = command.encode("utf-8", "replace") + b"\r\n"
self.sending.put(command)
c = Client({
"host": "barjavel.freenode.net",
"port": 6667,
"channels": ["#swhack"]
})
# c.connect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment