Skip to content

Instantly share code, notes, and snippets.

@steeve85
Created May 28, 2012 22:41
Show Gist options
  • Save steeve85/2821500 to your computer and use it in GitHub Desktop.
Save steeve85/2821500 to your computer and use it in GitHub Desktop.
[broken] Dionaea module to upload files/malwares to malware.lu
#********************************************************************************
#* Dionaea
#* - catches bugs -
#*
#*
#*
#* Copyright (C) 2009 Paul Baecher & Markus Koetter
#*
#* This program is free software; you can redistribute it and/or
#* modify it under the terms of the GNU General Public License
#* as published by the Free Software Foundation; either version 2
#* of the License, or (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program; if not, write to the Free Software
#* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#*
#*
#* contact [email protected]
#*
#*******************************************************************************/
from dionaea.core import ihandler, incident, g_dionaea
from dionaea.core import connection
import logging
#import json
import re, hashlib, urllib.request, urllib.parse, urllib.error, mimetypes, os, stat, sys
from http.cookiejar import CookieJar
#import mimetools
import email.message
logger = logging.getLogger('malwarelu')
logger.setLevel(logging.DEBUG)
"""
Malware.lu part
"""
# python upload file stuff to make it all in one
# from here http://pipe.scs.fsu.edu/PostHandler/MultipartPostHandler.py
class Callable:
def __init__(self, anycallable):
self.__call__ = anycallable
# Controls how sequences are uncoded.
#If true, elements may be given multiple values by
# assigning a sequence.
doseq = 1
class MultipartPostHandler(urllib.request.BaseHandler):
# needs to run first
handler_order = urllib.request.HTTPHandler.handler_order - 10
def http_request(self, request):
data = request.get_data()
if data is not None and type(data) != str:
v_files = []
v_vars = []
try:
for(key, value) in list(data.items()):
if type(value) == file:
v_files.append((key, value))
else:
v_vars.append((key, value))
except TypeError:
systype, value, traceback = sys.exc_info()
#raise TypeError, "not a valid non-string sequence or mapping object", traceback
logger.debug("not a valid non-string sequence or mapping object : %s" % traceback)
if len(v_files) == 0:
data = urllib.parse.urlencode(v_vars, doseq)
else:
boundary, data = self.multipart_encode(v_vars, v_files)
contenttype = 'multipart/form-data; boundary=%s' % boundary
if(request.has_header('Content-Type')
and request.get_header('Content-Type').\
find('multipart/form-data') != 0):
logger.debug("Replacing %s with %s" % (request.get_header('content-type'), 'multipart/form-data'))
request.add_unredirected_header('Content-Type', contenttype)
request.add_data(data)
return request
def multipart_encode(vars, files, boundary = None, buffer = None):
if boundary is None:
#boundary = mimetools.choose_boundary()
boundary = email.message.get_boundary()
if buffer is None:
buffer = ''
for(key, value) in vars:
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s"' % key
buffer += '\r\n\r\n' + value + '\r\n'
for(key, fd) in files:
file_size = os.fstat(fd.fileno())[stat.ST_SIZE]
filename = fd.name.split('/')[-1]
contenttype = mimetypes.guess_type(filename)[0] or \
'application/octet-stream'
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s";'\
' filename="%s"\r\n' % (key, filename)
buffer += 'Content-Type: %s\r\n' % contenttype
# buffer += 'Content-Length: %s\r\n' % file_size
fd.seek(0)
buffer += '\r\n' + fd.read() + '\r\n'
buffer += '--%s--\r\n\r\n' % boundary
return boundary, buffer
multipart_encode = Callable(multipart_encode)
https_request = http_request
class MalwareLuAuthError(Exception):
def __str__(self):
return "Wrong username or password"
class MalwareLuAlreadyHere(Exception):
def __init__(self, md5):
self.md5 = md5
def __str__(self):
return "Hash %s already in database" % self.md5
class MalwareLu():
url_auth = "http://malware.lu/_auth.php"
url_download = "http://malware.lu/_download.php"
url_upload = "http://malware.lu/_upload.php"
url_search = "http://malware.lu/_search.php"
url_logout = "http://malware.lu/_logout.php"
def __init__(self, username, password):
self.cj = CookieJar()
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cj))
if self.login(username, password) == False:
raise MalwareLuAuthError
def __del__(self):
self.logout()
def filetohash(self, filename):
file_content = open(filename, "rb").read()
sha = hashlib.md5(file_content).hexdigest()
return sha
def get_content_type(self, filename):
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
def login(self,username, password):
params = urllib.parse.urlencode({'username': username, 'password': password})
f = self.opener.open(self.url_auth, params)
data = f.read()
if not re.search("<input type=\"text\" name=\"md5\"/>", data):
return False
return True
def logout(self):
f = self.opener.open(self.url_logout)
data = f.read()
def download(self, md5):
params = urllib.parse.urlencode({'md5': md5})
f = self.opener.open(self.url_download + "?%s" % params)
headers = f.info()
if headers['content-type'] == "application/octet-stream":
filename = re.findall("filename=(\S+)",
headers['Content-Disposition'])[0]
fp = open(filename, 'w')
fp.write(f.read())
fp.close()
return True
return False
def check(self, md5):
params = urllib.parse.urlencode({'md5': md5})
f = self.opener.open(self.url_search + "?%s" % params)
data = f.read()
if re.search("not found in the database", data):
return False
return True
def upload(self, filename):
if self.check(self.filetohash(filename)) == True:
raise MalwareLuAlreadyHere(self.filetohash(filename))
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cj),
MultipartPostHandler)
params = {'file': open(filename, 'rb'), }
f = opener.open(self.url_upload, params)
data = f.read()
if not re.search("Upload done", data):
return False
return True
"""
dionaea part
"""
#logger = logging.getLogger('malwarelu')
#logger.setLevel(logging.DEBUG)
class malwareluhandler(ihandler):
def __init__(self, path):
logger.debug("%s ready!" % (self.__class__.__name__))
ihandler.__init__(self, path)
self.username = g_dionaea.config()['modules']['python']['malwarelu']['username']
self.password = g_dionaea.config()['modules']['python']['malwarelu']['password']
self.m = MalwareLu(self.username, self.password)
def handle_incident(self, icd):
logger.debug("Submitting file to Malware.lu")
if self.m.upload(icd.path):
logger.debug("Upload %s successfully" % icd.path)
else:
logger.debug("Upload %s failed" % icd.path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment