-
-
Save aveao/f6d40f41a424b1d9a54c9aba39742b76 to your computer and use it in GitHub Desktop.
Comic Chat fixer - ugly python3 fork with QoL features and optional twitch support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Comic Chat fixer MITM proxy: fixes Comic Chat to (sort of) work with modern | |
# IRC servers. Tested with Microsoft Chat 2.5 on Windows XP, 8 and 10 | |
# | |
# This is a fork by ave with additional fixes and python3 support | |
# Alongside some other QoL features like color nicks being displayed in chat | |
# https://gist.github.com/aveao/f6d40f41a424b1d9a54c9aba39742b76 | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
import getopt | |
import re | |
import socket | |
import sys | |
import threading | |
import ssl | |
# The debug option prints all MITM'd messages. This can help with debugging. | |
debug = True | |
def gen_nick_color_index(nick): | |
# same as thelounge | |
# https://github.com/thelounge/thelounge/blob/master/client/js/helpers/colorClass.js | |
nick_hash = 0 | |
for char in nick: | |
nick_hash += ord(char) | |
nick_index = 1 + (nick_hash % 32) | |
# 8, 24: Yellow | |
# 7: Dark yellow | |
ugly_color_map = {8: 7, 24: 7} | |
# hack | |
nick_index = ugly_color_map.get(nick_index, nick_index) | |
return nick_index | |
def format_nick( | |
nick, brackets=True, color=True, underscore=True, colon=False, space=True | |
): | |
# <\x1f{}\x1f> | |
# "\x0320example red\x03" | |
out_nick = nick | |
if underscore: | |
out_nick = "\x1f{}\x1f".format(out_nick) | |
if color: | |
color_index = gen_nick_color_index(nick) | |
out_nick = "\x03{}{}\x03".format(color_index, out_nick) | |
if brackets: | |
out_nick = "<{}>".format(out_nick) | |
if colon: | |
out_nick = ":{}".format(out_nick) | |
if space: | |
out_nick = "{} ".format(out_nick) | |
return out_nick | |
def thread_c2s(client, client_addr, password, host, port, use_ssl): | |
f = client.makefile() | |
queued_lines = [] | |
if password: | |
client.sendall(b":cchat.proxy 464 * :Password incorrect\r\n") | |
while True: | |
line = f.readline().rstrip("\r\n") | |
if line[:12] == "OPER (null) ": | |
if password == line[12:]: | |
print("[-] {0}:{1} authenticated successfully".format(*client_addr)) | |
break | |
else: | |
print("[!] {0}:{1} failed to authenticate".format(*client_addr)) | |
client.sendall(b":cchat.proxy 464 * :Password incorrect\r\n") | |
else: | |
queued_lines.append(line) | |
irc = socket.create_connection((host, port)) | |
if use_ssl: | |
irc = ssl.wrap_socket(irc) | |
for line in queued_lines: | |
irc.sendall(line + b"\r\n") | |
t = threading.Thread(target=thread_s2c, args=(client, client_addr, irc)) | |
t.daemon = True | |
t.start() | |
try: | |
while True: | |
line = f.readline().replace("\n", "\r\n").encode() | |
if debug: | |
print("c2s: " + repr(line)) | |
irc.sendall(line) | |
if len(line) == 0 or line[:5] == b"QUIT ": | |
break | |
except KeyboardInterrupt: | |
sys.exit(1) | |
except: | |
pass | |
try: | |
irc.close() | |
except: | |
pass | |
try: | |
client.close() | |
except: | |
pass | |
def thread_s2c(client, client_addr, irc): | |
f = irc.makefile() | |
srv_prefix = "@+" | |
try: | |
while True: | |
line = f.readline() | |
split = line.split(" ") | |
if len(split) > 2: | |
if split[0] == "ERROR": | |
client.sendall(line) | |
break | |
elif split[1] == "005": | |
# Get PREFIX= to fix ranks in the NAMES response | |
match = re.search(""" PREFIX=\(([^\)]+)\)([^\s]+)""", line) | |
if match: | |
srv_prefix = match.group(2) | |
elif split[1] in "JOIN" and split[2][0] != ":": | |
# Main purpose of the proxy. Fixes a crash bug with newer | |
# ircds, which send JOIN confirmations like this: | |
# | |
# :nick!user@host JOIN #channel | |
# | |
# instead of this: | |
# | |
# :nick!user@host JOIN :#channel | |
# | |
# CChat expects the channel name to have a : before the | |
# name. If it doesn't, it will crash, since it somehow | |
# attempts a stricmp(0). | |
split[2] = ":" + split[2] | |
elif split[1] in "PART" and split[2][0] == ":": | |
# :nick!user@host PART :#channel | |
# -> | |
# :nick!user@host PART #channel :"" | |
split[2] = split[2][1:] | |
split.append(':""') | |
elif split[1] == "PRIVMSG": | |
# Prepend nick to PRIVMSGs | |
nick = split[0].split("!")[0][1:] | |
is_mschat_prefixed = split[3].startswith(":(#") | |
is_mschatappear_prefixed = split[3].startswith(":#") | |
if is_mschatappear_prefixed: | |
pass | |
elif is_mschat_prefixed: | |
# too lazy, just don't prefix on empty msgs | |
if len(split) > 4: | |
split[4] = format_nick(nick) + split[4] | |
else: | |
split[3] = format_nick(nick, colon=True) + split[3][1:] | |
elif split[1] == "353": | |
# Convert additional ranks to regular op | |
for i in range(5, len(split)): | |
rank = "" | |
nick = "" | |
for char in split[i]: | |
if char == "+" and rank != "@": | |
# voice | |
rank = "+" | |
elif char in srv_prefix: | |
# everything unknown to CChat becomes op | |
rank = "@" | |
elif char != ":": | |
# not a rank | |
nick += char | |
split[i] = (split[i][0] == ":" and ":" or "") + rank + nick | |
line = " ".join(split) | |
# Comic Chat will stop receiving if it receives a line longer than | |
# 512 bytes, including the trailing CRLF. | |
tosend = line.rstrip("\r\n")[:510].encode() + b"\r\n" | |
if debug: | |
print("s2c: " + repr(tosend)) | |
client.sendall(tosend) | |
except KeyboardInterrupt: | |
sys.exit(1) | |
try: | |
irc.close() | |
except: | |
pass | |
try: | |
client.close() | |
except: | |
pass | |
def main(): | |
bind_host = "" | |
bind_port = 6461 | |
password = None | |
options, remainder = getopt.getopt( | |
sys.argv[1:], "h:p:a:", ["bindhost=", "bindport=", "password="] | |
) | |
for opt, arg in options: | |
if opt in ("-h", "--bindhost"): | |
bind_host = arg | |
elif opt in ("-p", "--bindport"): | |
bind_port = int(arg) | |
elif opt in ("-a", "--password"): | |
password = arg | |
if bind_port < 0 or bind_port > 65535 or len(remainder) < 1: | |
print( | |
"Usage: proxy.py [-h bindhost] [-p bindport] [-a password] server [[+]port]" | |
) | |
sys.exit(1) | |
host = remainder[0] | |
if len(remainder) > 1: | |
if remainder[1][0] == "+": | |
use_ssl = True | |
port = int(remainder[1][1:]) | |
else: | |
use_ssl = False | |
port = int(remainder[1]) | |
else: | |
use_ssl = False | |
port = 6667 | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind((bind_host, bind_port)) | |
server.listen(5) | |
print(f"[-] Waiting for connections at {bind_host}:{bind_port}") | |
try: | |
while True: | |
client, client_addr = server.accept() | |
print("[-] Connection from {0}:{1}".format(*client_addr)) | |
t = threading.Thread( | |
target=thread_c2s, | |
args=(client, client_addr, password, host, port, use_ssl), | |
) | |
t.daemon = True | |
t.start() | |
except KeyboardInterrupt: | |
server.close() | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Comic Chat fixer MITM proxy: fixes Comic Chat to (sort of) work with modern | |
# IRC servers. Tested with Microsoft Chat 2.5 on Windows XP, 8 and 10 | |
# | |
# This is a fork by ave with additional fixes and python3 support | |
# Alongside some other QoL features like color nicks being displayed in chat | |
# This is the twitch variant of the code. | |
# https://gist.github.com/aveao/f6d40f41a424b1d9a54c9aba39742b76 | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
import getopt | |
import re | |
import socket | |
import sys | |
import threading | |
import ssl | |
# The debug option prints all MITM'd messages. This can help with debugging. | |
debug = True | |
twitch_oauth_token = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" | |
twitch_username = b"aaaaaa" | |
def gen_nick_color_index(nick): | |
# same as thelounge | |
# https://github.com/thelounge/thelounge/blob/master/client/js/helpers/colorClass.js | |
nick_hash = 0 | |
for char in nick: | |
nick_hash += ord(char) | |
nick_index = 1 + (nick_hash % 32) | |
# 8, 24: Yellow | |
# 7: Dark yellow | |
ugly_color_map = {8: 7, 24: 7} | |
# hack | |
nick_index = ugly_color_map.get(nick_index, nick_index) | |
return nick_index | |
def format_nick( | |
nick, brackets=True, color=True, underscore=True, colon=False, space=True | |
): | |
# <\x1f{}\x1f> | |
# "\x0320example red\x03" | |
out_nick = nick | |
if underscore: | |
out_nick = "\x1f{}\x1f".format(out_nick) | |
if color: | |
color_index = gen_nick_color_index(nick) | |
out_nick = "\x03{}{}\x03".format(color_index, out_nick) | |
if brackets: | |
out_nick = "<{}>".format(out_nick) | |
if colon: | |
out_nick = ":{}".format(out_nick) | |
if space: | |
out_nick = "{} ".format(out_nick) | |
return out_nick | |
def thread_c2s(client, client_addr, password, host, port, use_ssl): | |
f = client.makefile() | |
queued_lines = [b"PASS oauth:" + twitch_oauth_token, b"NICK " + twitch_username] | |
irc = socket.create_connection((host, port)) | |
if use_ssl: | |
irc = ssl.wrap_socket(irc) | |
for line in queued_lines: | |
irc.sendall(line + b"\r\n") | |
t = threading.Thread(target=thread_s2c, args=(client, client_addr, irc)) | |
t.daemon = True | |
t.start() | |
try: | |
while True: | |
line = f.readline().replace("\n", "\r\n") | |
linesplit = line.split(" ") | |
if linesplit[0] in ["WHO", "MODE", "NICK", "USER"]: | |
continue | |
bline = line.encode() | |
if debug: | |
print("c2s: " + repr(bline)) | |
irc.sendall(bline) | |
if len(bline) == 0 or bline[:5] == b"QUIT ": | |
break | |
except KeyboardInterrupt: | |
sys.exit(1) | |
except: | |
pass | |
try: | |
irc.close() | |
except: | |
pass | |
try: | |
client.close() | |
except: | |
pass | |
def thread_s2c(client, client_addr, irc): | |
f = irc.makefile() | |
srv_prefix = "@+" | |
present_users = [] | |
try: | |
while True: | |
line = f.readline() | |
split = line.split(" ") | |
if len(split) > 2: | |
if split[0] == "ERROR": | |
client.sendall(line) | |
break | |
elif split[1] == "005": | |
# Get PREFIX= to fix ranks in the NAMES response | |
match = re.search(""" PREFIX=\(([^\)]+)\)([^\s]+)""", line) | |
if match: | |
srv_prefix = match.group(2) | |
elif split[1] in "JOIN" and split[2][0] != ":": | |
# Main purpose of the proxy. Fixes a crash bug with newer | |
# ircds, which send JOIN confirmations like this: | |
# | |
# :nick!user@host JOIN #channel | |
# | |
# instead of this: | |
# | |
# :nick!user@host JOIN :#channel | |
# | |
# CChat expects the channel name to have a : before the | |
# name. If it doesn't, it will crash, since it somehow | |
# attempts a stricmp(0). | |
split[2] = ":" + split[2] | |
elif split[1] == "PART" and split[2][0] == ":": | |
# :nick!user@host PART :#channel | |
# -> | |
# :nick!user@host PART #channel :"" | |
split[2] = split[2][1:] | |
split.append(':""') | |
elif split[1] == "PRIVMSG": | |
# Prepend nick to PRIVMSGs | |
nick = split[0].split("!")[0][1:] | |
if nick not in present_users: | |
# CURSED | |
fake_join = ":{}!{}@twitch.tv JOIN :{}\r\n".format( | |
nick, nick, split[2] | |
).encode() | |
print("s2c: " + repr(fake_join)) | |
client.sendall(fake_join) | |
present_users.append(nick) | |
is_mschat_prefixed = split[3].startswith(":(#") | |
is_mschatappear_prefixed = split[3].startswith(":#") | |
if is_mschatappear_prefixed: | |
pass | |
elif is_mschat_prefixed: | |
# too lazy, just don't prefix on empty msgs | |
if len(split) > 4: | |
split[4] = format_nick(nick) + split[4] | |
else: | |
split[3] = format_nick(nick, colon=True) + split[3][1:] | |
elif split[1] == "353": | |
# Convert additional ranks to regular op | |
for i in range(5, len(split)): | |
rank = "" | |
nick = "" | |
for char in split[i]: | |
if char == "+" and rank != "@": | |
# voice | |
rank = "+" | |
elif char in srv_prefix: | |
# everything unknown to CChat becomes op | |
rank = "@" | |
elif char != ":": | |
# not a rank | |
nick += char | |
split[i] = (split[i][0] == ":" and ":" or "") + rank + nick | |
line = " ".join(split) | |
# Comic Chat will stop receiving if it receives a line longer than | |
# 512 bytes, including the trailing CRLF. | |
tosend_partone = line.rstrip("\r\n")[:510].encode() | |
tosend = tosend_partone + b"\r\n" | |
if debug and tosend_partone: | |
print("s2c: " + repr(tosend)) | |
client.sendall(tosend) | |
except KeyboardInterrupt: | |
sys.exit(1) | |
try: | |
irc.close() | |
except: | |
pass | |
try: | |
client.close() | |
except: | |
pass | |
def main(): | |
bind_host = "" | |
bind_port = 6461 | |
password = None | |
options, remainder = getopt.getopt( | |
sys.argv[1:], "h:p:a:", ["bindhost=", "bindport=", "password="] | |
) | |
for opt, arg in options: | |
if opt in ("-h", "--bindhost"): | |
bind_host = arg | |
elif opt in ("-p", "--bindport"): | |
bind_port = int(arg) | |
elif opt in ("-a", "--password"): | |
password = arg | |
if bind_port < 0 or bind_port > 65535 or len(remainder) < 1: | |
print( | |
"Usage: proxy.py [-h bindhost] [-p bindport] [-a password] server [[+]port]" | |
) | |
sys.exit(1) | |
host = remainder[0] | |
if len(remainder) > 1: | |
if remainder[1][0] == "+": | |
use_ssl = True | |
port = int(remainder[1][1:]) | |
else: | |
use_ssl = False | |
port = int(remainder[1]) | |
else: | |
use_ssl = False | |
port = 6667 | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind((bind_host, bind_port)) | |
server.listen(5) | |
print(f"[-] Waiting for connections at {bind_host}:{bind_port}") | |
try: | |
while True: | |
client, client_addr = server.accept() | |
print("[-] Connection from {0}:{1}".format(*client_addr)) | |
t = threading.Thread( | |
target=thread_c2s, | |
args=(client, client_addr, password, host, port, use_ssl), | |
) | |
t.daemon = True | |
t.start() | |
except KeyboardInterrupt: | |
server.close() | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment