Skip to content

Instantly share code, notes, and snippets.

@dapangmao
Last active August 29, 2015 14:12
Show Gist options
  • Save dapangmao/7cae72b3533f28ba1022 to your computer and use it in GitHub Desktop.
Save dapangmao/7cae72b3533f28ba1022 to your computer and use it in GitHub Desktop.
Spark practice(4): malicious web attack

Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 activities within any given 10 minutes.

Sample.txt: AnonymousUserID TimeStamp ActivityCount

123    9:45am    10
234    9:46am    12
234    9:50am    20
456    9:53am    100
123    9:55am    33
456    9:56am    312
123    10:03am    110
123    10:16am    312
234    10:20am    201
456    10:23am    180
123    10:25am    393
456    10:27am    112
999    12:21pm    888

###Thought This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

###Single machine solution

from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
    return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
    return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
    queue, htable, ans = deque(), {}, set()
    with open(infile, 'rt') as _infile:
        for l in _infile:
            line = l.split()
            line[2] = int(line[2])
            current_id, current_time, current_clk = line
            if current_id not in htable:
                htable[current_id] = current_clk
            else:
                htable[current_id] += current_clk
            queue.append(line)
            while queue and get_diff(queue[0][1], current_time) > duration:
                past_id, _, past_clk = queue.popleft()
                htable[past_id] -= past_clk
            if htable[current_id] > maxcnt:
                ans.add(current_id)
    return ans

if __name__ == "__main__":
    print find_ids('sample.txt', 10, 500)

###Cluster solution The newest Spark version 0.1.2 starts to support Python streaming. However, the document is still scarce -- wait to see if this problem can be done by the new API.

To be continued
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment