Skip to content

Instantly share code, notes, and snippets.

@FrankSpierings
Last active December 14, 2020 12:57
Show Gist options
  • Save FrankSpierings/75b23f32ef98fc2d90de55994d40f0c3 to your computer and use it in GitHub Desktop.
Save FrankSpierings/75b23f32ef98fc2d90de55994d40f0c3 to your computer and use it in GitHub Desktop.
Describes some configuration and scripts to parse Evtx files and MS-DNS Debug query logs to the ELK stack.

Setup Docker Elk Stack

  • Pull the recipe:
cd /tmp
git clone https://github.com/deviantony/docker-elk
  • Add the following to elasticsearch/config/elasticsearch.yml:
node.ml: true
xpack.security.authc.api_key.enabled: true
  • Generate a 32-byte random key and add it to kibana/config/kibana.yml:
xpack.encryptedSavedObjects.encryptionKey: "0123456789abcdefghijklmnopqrstuv"
  • Make sure Docker can allocate enough memory (through its preferences).

  • Give elasticsearch more memory by replacing: ES_JAVA_OPTS: "-Xmx256m -Xms256m" with ES_JAVA_OPTS: "-Xmx1g -Xms1g" in docker-compose.yml

  • Start the environment: docker-compose up

Obtain Evtx Files

  • This is a dirty method to copy the logs to a single machine:
$ComputerNames = @()
$ComputerNames += Get-ADComputer -Filter "*"
$ComputerNames = $ComputerNames |% {if ($_.DNSHostName -ne "") {$_.DNSHostName}}

$LogFiles = @"
Security
Application
System
"@

function Get-AliveComputers {
    param (
        [string[]] $ComputerNames
    )

    $ComputerNames = $ComputerNames |? {if (Test-Connection -ComputerName $_ -Count 1 -Quiet) {return $_}}
    return $ComputerNames
}

$ComputerNames = Get-AliveComputers $ComputerNames

$BaseDstPath = "c:\log_collection"
$BaseSrcPath = "C$\windows\system32\winevt\Logs"
mkdir $BaseDstPath -Force | Out-Null
$ComputerNames.Trim().Split("`n") |% {
    $ComputerName = $_.Trim()
    $DstPath = $("{0}\{1}" -f $BaseDstPath, $ComputerName)
    mkdir $DstPath -Force | Out-Null
    $LogFiles.Trim().Split("`n") |% {
        $LogFile = $_.Trim()
        $SrcPath = $("\\{0}\{1}\{2}.evtx" -f $ComputerName, $BaseSrcPath, $LogFile)
        $command = $('Copy-Item -ErrorAction "Continue" -Force -Path "{0}" -Destination "{1}"' -f $SrcPath, $DstPath)
        Write-Host $command
        $command | IEX
    }
}

Import Evtx Files

winlogbeat.event_logs:
  - name: ${EVTX_FILE}
    no_more_events: stop
    processors:
      - drop_event.when.and:
        - equals.event.code: '4672'
        - equals.winlog.event_data.SubjectUserName: 'LOCAL SERVICE'
      - script:
          lang: javascript
          id: security
          file: ${path.home}/module/security/config/winlogbeat-security.js

winlogbeat.shutdown_timeout: 30s
winlogbeat.registry_file: evtx-registry.yml

setup.kibana:
  host: "192.168.0.1:5601"
  username: "elastic"
  password: "changeme"

output.elasticsearch:
  hosts: ["192.168.0.1:9200"]
  protocol: "http"
  username: "elastic"
  password: "changeme"
  bulk_max_size: 500
  • Import the logs using the following Powershell script, change the filter to add more than 1 type of log.
$log_directory = '\\VBOXSVR\logs'
$winlogbeat_directory = "C:\winlogbeat"
cd $winlogbeat_directory

gci -recurse $log_directory -filter 'Security.evtx' |% {
    ".\winlogbeat.exe -e -c .\winlogbeat-security.yml -E EVTX_FILE=" + $_.FullName | IEX
}

Import Microsoft DNS requests

  • Use the following script to import Microsoft DNS Debugging Log (requests only)
import re
from elasticsearch import Elasticsearch, helpers, client
from glob import glob
from datetime import datetime
import time


files = glob('/tmp/logs/DNSLog*.txt')
ELK_index = 'dns-1.0'
ELK_host = "http://192.168.0.1:9200"
http_auth=('elastic', 'changeme')
batch_size = 500
es = Elasticsearch([ELK_host], http_auth=http_auth)


def sendbatch(es, batch):
    success = False
    i = 0
    while (not success):
        try:
            print('Sending {0} - try: {1}'.format(len(batch), i))
            helpers.bulk(es, batch)
            success = True
        except Exception as error:
            i += 1
            time.sleep(4)
            if (i >= 10):
                print("Error: {0}".format(error))
                return False

def create_index(es):
    # Create index definition
    body = {
        'settings': {
            'index' : {
                'max_regex_length': 65535,
            }
        },
        'mappings': {
            'properties': {
                '@timestamp': {'type':'date'},
                'client': {'type':'ip'},
                'query': {'type':'keyword'},
                'rtype': {'type':'keyword'},
            },
        },
    }
    es.indices.create(index=ELK_index, body=body, ignore=400)

create_index(es)
batch = []
for file in files:
    with open(file, 'rb') as f:
        for line in f:
            line = line.strip()
            pattern = b'(\d+-\d+-\d+) (\d+:\d+:\d+) .*? PACKET .*? (.*?) .* (\d+\.\d+\.\d+\.\d+).*?(\[.*?\]) (\w+)\s*(.*)'
            matches = re.findall(pattern, line)
            if len(matches) > 0:
                d, t, _, ip, _, rtype, q = matches[0]
                q = q.decode(encoding="ascii", errors="backslashreplace")
                q = re.sub('\(.*?\)', '.', q)
                q = q.strip('.') # strip the first and last.

                ip = ip.decode()
                rtype = rtype.decode()

                timestr = ('{0} {1}'.format(d.decode(), t.decode()))
                timeformat = '%d-%m-%Y %H:%M:%S'
                timestamp = datetime.strptime(timestr, timeformat)
                timestamp = str(timestamp.isoformat())

                output = ({
                    'client': ip,
                    'query': q,
                    'rtype': rtype,

                    # This must be last!
                    '@timestamp': timestamp,
                    '_index': ELK_index,
                    '_type': '_doc'
                })

                batch.append(output)
                if len(batch) >= batch_size:
                    sendbatch(es, batch)
                    batch = []

if (len(batch)) > 0:
    sendbatch(es, batch)

Kibana SIEM

  • You might try the Kibana Siem interface: http://localhost:5601/app/siem, but this won't understand the dns-1.0 index.

DNS Query Exclude

  • This can definitely be improved, but this is a way to ignore domains in the search.
exclude_domains = '''
microsoft.com
apple.com
'''

subdomainpattern = '(.*\\.){0,1}'
exclude_domains = exclude_domains.strip()
exclude_domains = exclude_domains.replace('.', '\\.') #Escape the dots.
exclude_domains = exclude_domains.split('\n') # Split to an arry
exclude_domains = sorted(set(exclude_domains))
exclude_domains = ['{0}{1}'.format(subdomainpattern, i) for i in exclude_domains]
pattern = '|'.join(exclude_domains)
pattern = 'NOT query:/.*dns-sd._udp.*/ AND NOT query:/{0}/'.format(pattern)

print(pattern)

Process Query Exclude

  • Same for this one... needs improvement.
exclude_cmds = '''
Sophos
'''

exclude_cmds = exclude_cmds.strip()
exclude_cmds = exclude_cmds.replace('.', '\\.') #Escape the dots.
# exclude_cmds = exclude_cmds.replace('\\', '\\\\') #Escape '\'
exclude_cmds = exclude_cmds.split('\n') # Split to an arry
exclude_cmds = sorted(set(exclude_cmds))
exclude_cmds = ['.*{1}.*'.format(subdomainpattern, i) for i in exclude_cmds]
pattern = '|'.join(exclude_cmds)
pattern = 'NOT winlog.event_data.CommandLine:/{0}/'.format(pattern)

print(pattern)

AlienVault OTX Query the DNS log

  • Again, this needs improvement, but it can be usefull to track the DNS queries reputations.
import re
from glob import glob
from datetime import datetime
import time
from OTXv2 import OTXv2
from OTXv2 import IndicatorTypes


otx = OTXv2("API_KEY")
files = glob('/tmp/logs/DNSLog*.txt')
outputfile = '/tmp/DNS-pulses.txt'

def parseline(line):
    line = line.strip()
    pattern = b'(\d+-\d+-\d+) (\d+:\d+:\d+) .*? PACKET .*? (.*?) .* (\d+\.\d+\.\d+\.\d+).*?(\[.*?\]) (\w+)\s*(.*)'
    matches = re.findall(pattern, line)
    if len(matches) > 0:
        d, t, _, ip, _, rtype, q = matches[0]
        q = q.decode(encoding="ascii", errors="backslashreplace")
        q = re.sub('\(.*?\)', '.', q)
        q = q.strip('.') # strip the first and last.
        ip = ip.decode()
        rtype = rtype.decode()

        timestr = ('{0} {1}'.format(d.decode(), t.decode()))
        timeformat = '%d-%m-%Y %H:%M:%S'
        timestamp = datetime.strptime(timestr, timeformat)
        timestamp = str(timestamp.isoformat())

        output = ({
            'client': ip,
            'query': q,
            'rtype': rtype,
            '@timestamp': timestamp,
        })

        return output
    return None

with open(outputfile, 'w', 256) as fout:
    pulses = {}
    for file in files:
        with open(file, 'rb') as f:
            for line in f:
                data = parseline(line)
                if (data):
                    query = data['query'].lower()
                    rtype = data['rtype'].upper()
                    if ('.' in query and
                        not query.endswith('.local') and
                        not '._dns-sd._udp' in query and
                        not '._tcp.dc.' in query and
                        (rtype == 'A' or rtype == 'AAAA')):
                        # print(data['query'])
                        try:
                            if query not in pulses.keys():
                                result = otx.get_indicator_details_full(IndicatorTypes.DOMAIN, query)
                                count = result['general']['pulse_info']['count']
                                pulses[query] = count
                            outline = '{0}\t{1}\t{2}\t{3}'.format(data['client'], rtype, query, pulses[query])
                            print(outline)
                            fout.write(outline + '\n')
                        except:
                            pass
  • Look through the output file like: cat /tmp/DNS-pulses.txt | grep -vE '0$' | sort -u

Elasticsearch Unique DNS

  • Never finished this but it can be modified in the future to combine OTX information and DNS information for example.
  • Request unique DNS entries from ES:
from elasticsearch import Elasticsearch, helpers, client
import json


ELK_index = 'dns-1.0'
ELK_host = "http://127.0.0.1:9200"
http_auth=('elastic', 'changeme')
batch_size = 500
es = Elasticsearch([ELK_host], http_auth=http_auth)


unique_results = []
def nextbatch(body, data, agg_id, field):
	body['aggs'][agg_id]['composite']['after'] = {}
	body['aggs'][agg_id]['composite']['after'][field] = data['aggregations'][agg_id]['buckets'][-1]['key'][field]
	return body


field = 'query'
agg_id = 'keys'
body = {
	'size': 0,
	'aggs': {
		agg_id: {
			'composite': {
				'sources': [{
					field: {
						'terms': {
							'field': field,
						}
					}
				}],
				'size': batch_size,
			}
		}
	},
	"query": {
		"bool": {
			"must": [{
				"query_string": {
					"query": "rtype: /A|AAAA/ AND NOT query: /.*\\.local|.*\\.lcl/" # AND client: 192.168.7.139"
				}
			}]
		}
	}
}

data = es.search(index=ELK_index, body=body, filter_path=['aggregations'])

while (len(data['aggregations']['keys']['buckets']) > 0):
	unique_results += [i['key'][field] for i in data['aggregations'][agg_id]['buckets']]

	# Next
	body = nextbatch(body, data, agg_id, field)
	data = es.search(index=ELK_index, body=body, filter_path=['aggregations'])


print(json.dumps(unique_results, indent=4))

Add OTX information from DNS requests to ES

  • This is probably not a good method, but it does add value.
  • This adds the malware and pulse counter to the dns-1.0 index.
from elasticsearch import Elasticsearch, helpers, client
import json
from OTXv2 import OTXv2
from OTXv2 import IndicatorTypes

ELK_input_index = 'dns-1.0'
ELK_host = "http://127.0.0.1:9200"
http_auth=('elastic', 'changeme')
input_batch_size = 500
es = Elasticsearch([ELK_host], http_auth=http_auth)


unique_results = []
def nextbatch(body, data, agg_id, field):
    body['aggs'][agg_id]['composite']['after'] = {}
    body['aggs'][agg_id]['composite']['after'][field] = data['aggregations'][agg_id]['buckets'][-1]['key'][field]
    return body


field = 'query'
agg_id = 'keys'
body = {
    'size': 0,
    'aggs': {
        agg_id: {
            'composite': {
                'sources': [{
                    field: {
                        'terms': {
                            'field': field,
                        }
                    }
                }],
                'size': input_batch_size,
            }
        }
    },
    "query": {
        "bool": {
            "must": [{
                "query_string": {
                    "query": "rtype: /A|AAAA/ AND query: /.*\\..*/ AND NOT query: /.*\\.local|.*\\.lcl/"
                }
            }]
        }
    }
}

data = es.search(index=ELK_input_index, body=body, filter_path=['aggregations'])

while (len(data['aggregations']['keys']['buckets']) > 0):
    unique_results += [i['key'][field] for i in data['aggregations'][agg_id]['buckets']]

    # Next
    body = nextbatch(body, data, agg_id, field)
    data = es.search(index=ELK_input_index, body=body, filter_path=['aggregations'])

# Poor performance for large datasets, but it will do for now... hackway
otx = OTXv2('')
index = 'otx-1.0'
for domain in unique_results:
    if not es.exists(index=index, id=domain):
        print("Checking: {0}".format(domain))
        try:
            result = otx.get_indicator_details_full(IndicatorTypes.DOMAIN, domain)
            body = {
                '_index': index,
                '_type': '_doc',
                '_id': domain,
                'otx': result,
            }
            helpers.bulk(es, [body])
        except Exception as e:
            print(e)
    else:
        print("Already exists: {0}".format(domain))

Add OTX index information to DNS index

  • Is this the right way? I have no idea ;)
from elasticsearch import Elasticsearch, helpers, client
import sys, traceback
import json
import time
from OTXv2 import OTXv2
from OTXv2 import IndicatorTypes

ELK_input_index = 'otx-1.0'
ELK_output_index = 'dns-1.0'
ELK_host = "http://127.0.0.1:9200"
http_auth=('elastic', 'changeme')
input_batch_size = 10
es = Elasticsearch([ELK_host], http_auth=http_auth)


### Update requests based on: otx.general.pulse_info.count

field = 'otx.general.pulse_info.count'
field_min = 1
index = 0
body = {
    "from": index,
    "size": input_batch_size,
    "query": {
        "range": {
            field: {
                "gte": field_min
            }
        }
    }
}

data = es.search(index=ELK_input_index, body=body)
while (len(data['hits']['hits']) > 0):
    index += len(data['hits']['hits'])

    for hit in data['hits']['hits']:
        domain = hit['_id']
        count = hit['_source']['otx']['general']['pulse_info']['count']

        fieldname = 'pulse_count'
        fieldvalue = count

        print('{0} - {1} = {2}'.format(domain, fieldname, fieldvalue))
        tries = 0
        while tries < 10:
            try:
                tries += 1
                es.update_by_query(
                    index=ELK_output_index,
                    body = {
                        "script" : "ctx._source.{0} = {1}".format(fieldname, fieldvalue),
                        "query": {
                            "term": {
                                "query": domain
                            }
                        },
                    }
                )
                break
            except:
                traceback.print_exc(file=sys.stdout)
                time.sleep(1 * tries)

    # Next
    body['from'] = index
    data = es.search(index=ELK_input_index, body=body)


### Update requests based on: otx.malware.count

field = 'otx.malware.count'
field_min = 1
index = 0
body = {
    "from": index,
    "size": input_batch_size,
    "query": {
        "range": {
            field: {
                "gte": field_min
            }
        }
    }
}

data = es.search(index=ELK_input_index, body=body)
while (len(data['hits']['hits']) > 0):
    index += len(data['hits']['hits'])

    for hit in data['hits']['hits']:
        domain = hit['_id']
        count = hit['_source']['otx']['malware']['count']

        fieldname = 'malware_count'
        fieldvalue = count

        print('{0} - {1} = {2}'.format(domain, fieldname, fieldvalue))
        tries = 0
        while tries < 10:
            try:
                tries += 1
                es.update_by_query(
                    index=ELK_output_index,
                    body = {
                        "script" : "ctx._source.{0} = {1}".format(fieldname, fieldvalue),
                        "query": {
                            "term": {
                                "query": domain
                            }
                        },
                    }
                )
                break
            except:
                traceback.print_exc(file=sys.stdout)
                time.sleep(1 * tries)

    # Next
    body['from'] = index
    data = es.search(index=ELK_input_index, body=body)

SIEM Events - export.ndjson:

{
    "attributes":
    {
        "columns": ["event.code", "event.action", "related.user", "host.name", "source.ip"],
        "description": "",
        "hits": 0,
        "kibanaSavedObjectMeta":
        {
            "searchSourceJSON": "{\"highlightAll\":true,\"version\":true,\"query\":{\"language\":\"kuery\",\"query\":\"\"},\"indexRefName\":\"kibanaSavedObjectMeta.searchSourceJSON.index\",\"filter\":[{\"$state\":{\"store\":\"appState\"},\"meta\":{\"alias\":null,\"disabled\":false,\"key\":\"event.code\",\"negate\":false,\"params\":[\"4624 \",\"4625\",\"4740\",\"4776\",\"4720\",\"4732\",\"4728\",\"4756\",\"4735\",\"4648\",\"4688\"],\"type\":\"phrases\",\"value\":\"4624 , 4625, 4740, 4776, 4720, 4732, 4728, 4756, 4735, 4648, 4688\",\"indexRefName\":\"kibanaSavedObjectMeta.searchSourceJSON.filter[0].meta.index\"},\"query\":{\"bool\":{\"minimum_should_match\":1,\"should\":[{\"match_phrase\":{\"event.code\":\"4624 \"}},{\"match_phrase\":{\"event.code\":\"4625\"}},{\"match_phrase\":{\"event.code\":\"4740\"}},{\"match_phrase\":{\"event.code\":\"4776\"}},{\"match_phrase\":{\"event.code\":\"4720\"}},{\"match_phrase\":{\"event.code\":\"4732\"}},{\"match_phrase\":{\"event.code\":\"4728\"}},{\"match_phrase\":{\"event.code\":\"4756\"}},{\"match_phrase\":{\"event.code\":\"4735\"}},{\"match_phrase\":{\"event.code\":\"4648\"}},{\"match_phrase\":{\"event.code\":\"4688\"}}]}}}]}"
        },
        "sort": [],
        "title": "SIEM Events",
        "version": 1
    },
    "id": "3616e530-3e09-11eb-84bc-7bc8dad84735",
    "migrationVersion":
    {
        "search": "7.4.0"
    },
    "references": [
    {
        "id": "08d264c0-39fc-11eb-84bc-7bc8dad84735",
        "name": "kibanaSavedObjectMeta.searchSourceJSON.index",
        "type": "index-pattern"
    },
    {
        "id": "08d264c0-39fc-11eb-84bc-7bc8dad84735",
        "name": "kibanaSavedObjectMeta.searchSourceJSON.filter[0].meta.index",
        "type": "index-pattern"
    }],
    "type": "search",
    "updated_at": "2020-12-14T12:52:11.921Z",
    "version": "WzEzOTY3NywxXQ=="
}
{
    "exportedCount": 1,
    "missingRefCount": 0,
    "missingReferences": []
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment