Skip to content

Instantly share code, notes, and snippets.

@SalemHarrache
Created August 10, 2012 16:49
Show Gist options
  • Save SalemHarrache/3315513 to your computer and use it in GitHub Desktop.
Save SalemHarrache/3315513 to your computer and use it in GitHub Desktop.
Ejabberd extauth : Authenticate phpbb users against PostgreSQL with Peewee
#!/usr/bin/env python
# Ejabberd extauth : Authenticate phpbb users against PostgreSQL with Peewee
# Original Author: Lukas Kolbe <[email protected]>
import sys
import logging
import struct
from hashlib import md5
from peewee import PrimaryKeyField, CharField, PostgresqlDatabase, Model,\
DoesNotExist, VarCharColumn, ForeignKeyField
LOGGER = logging.getLogger('ejabberd-extauth')
# Default to logging to stderr.
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s ')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
LOGGER.addHandler(stream_handler)
CONFIG = {
'name': 'dbname',
'args': {'user': 'userdb',
'password': 'passworddb',
'host': 'localhost'}
}
db = PostgresqlDatabase(CONFIG['name'], **CONFIG['args'])
class User(Model):
class Meta:
db_table = 'phpbb_users'
database = db
id = PrimaryKeyField(db_column='user_id')
username = CharField()
password = CharField(db_column='user_password')
class Session(Model):
class Meta:
db_table = 'phpbb_sessions'
database = db
id = PrimaryKeyField(db_column='session_id', column_class=VarCharColumn)
user = ForeignKeyField(User, db_column='session_user_id',
related_name='id')
@classmethod
def authentificate(cls, username, session_id):
sq = cls.filter(id=session_id, user__username=username)
sq = sq.limit(1)
sq = list(sq)
if len(sq) == 1:
return True
return False
ITOA64 = './0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
def raw_md5(*args):
m = md5()
for i in args:
m.update(i)
return m.digest()
def hex_md5(*args):
m = md5()
for i in args:
m.update(i)
return m.hexdigest()
def phpbb_check_hash(password, password_hash):
if len(password_hash) == 34:
return hash_crypt_private(password, password_hash) == password_hash
return hex_md5(password) == password_hash
def hash_encode64(raw_hash, count, itoa64=ITOA64):
output = ''
i = 0
while True:
value = ord(raw_hash[i])
i += 1
output += itoa64[value & 0x3f]
if i < count:
value |= ord(raw_hash[i]) << 8
output += itoa64[(value >> 6) & 0x3f]
i += 1
if i >= count:
break
if i < count:
value |= ord(raw_hash[i]) << 16
output += itoa64[(value >> 12) & 0x3f]
i += 1
if i >= count:
break
output += itoa64[(value >> 18) & 0x3f]
if not i < count:
break
return output
def hash_crypt_private(password, setting, itoa64=ITOA64):
output = '*'
if setting[0:0 + 3] != '$H$':
return output
count_log2 = itoa64.find(setting[3])
if count_log2 < 7 or count_log2 > 30:
return output
count = 1 << count_log2
salt = setting[4:4 + 8]
if len(salt) != 8:
return output
raw_hash = raw_md5(salt, password)
for i in xrange(count):
raw_hash = raw_md5(raw_hash, password)
output = setting[0:0 + 12]
output += hash_encode64(raw_hash, 16, itoa64)
return output
class EjabberdInputError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def ejabberd_out(bool):
LOGGER.debug("Ejabberd gets: %s" % bool)
answer = 0
if bool:
answer = 1
token = struct.pack('>hh', 2, answer)
values = (ord(token[0]), ord(token[1]), ord(token[2]), ord(token[3]))
LOGGER.debug("sent bytes: %#x %#x %#x %#x" % values)
sys.stdout.write(token)
sys.stdout.flush()
def ejabberd_in():
LOGGER.debug("trying to read 2 bytes from ejabberd:")
try:
input_length = sys.stdin.read(2)
except IOError:
LOGGER.debug("ioerror")
if len(input_length) is not 2:
LOGGER.debug("ejabberd sent us wrong things!")
raise EjabberdInputError('Wrong input from ejabberd!')
LOGGER.debug('got 2 bytes via stdin')
(size,) = struct.unpack('>h', input_length)
return sys.stdin.read(size).split(':')
def get_user(username):
db.connect()
try:
user = User.get(username=username)
except DoesNotExist:
user = None
db.close()
return user
def auth_pass(username, password):
LOGGER.debug("%s wants authentication ..." % username)
user = get_user(username)
if user is not None:
if phpbb_check_hash(password, user.password):
return True
return False
def auth_session(username, session_id):
LOGGER.debug("%s wants authentication ..." % username)
return Session.authentificate(username, session_id)
def isuser(username):
LOGGER.debug("do we know %s?" % username)
return (get_user(username) is not None)
def log_success(method, username, success):
if success:
LOGGER.info("%s successful for %s" % (method, username))
else:
LOGGER.info("%s unsuccessful for %s" % (method, username))
# this is our main-loop. I hate infinite loops.
while True:
LOGGER.debug("start of infinite loop")
try:
data = ejabberd_in()
except EjabberdInputError, inst:
LOGGER.info("Exception occured: %s", inst)
break
LOGGER.debug('Method: %s' % data[0])
success = False
if data[0] == "auth":
success = auth_session(data[1], data[3])
if not success:
success = auth_pass(data[1], data[3])
ejabberd_out(success)
log_success("auth", data[1], success)
elif data[0] == "isuser":
success = isuser(data[1])
ejabberd_out(success)
log_success("isuser", data[1], success)
else:
ejabberd_out(False)
LOGGER.debug("end of infinite loop")
LOGGER.info('extauth script terminating')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment