Created
March 12, 2020 13:21
-
-
Save Kalki5/b0af050fcbc06b67f174182df2a7ee1d to your computer and use it in GitHub Desktop.
Monitor the changes in Listening TCP connections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import SOCK_STREAM | |
from time import sleep | |
import psutil | |
standard_ports = { | |
# Connections | |
21, 22, 25, 53, 3389, | |
# Databases | |
3306, 5432, 1433, 6379, 11211, | |
# Unknowns | |
6010, 6011, | |
} | |
def get_listening_tcp_ports(): | |
tcps = {} | |
for connection in psutil.net_connections(kind='inet'): | |
if connection.type == SOCK_STREAM and connection.status == 'LISTEN' and connection.laddr.ip not in {'127.0.0.1', '::1'} and connection.laddr.port not in standard_ports: | |
tcps[connection.laddr.port] = (connection.laddr.ip, connection.laddr.port, connection.pid, psutil.Process(connection.pid).name()) | |
return tcps | |
def update_mappings(tcps_done, tcps_new): | |
for port in (tcps_done.keys() - tcps_new.keys()): | |
print('Make Delete call - Port: {}'.format(port)) | |
tcps_done.pop(port) | |
for port in (tcps_new.keys() - tcps_done.keys()): | |
print('Make Add call - Port: {}'.format(port)) | |
tcps_done[port] = tcps_new[port] | |
def main(): | |
tcps_done = {} | |
while True: | |
tcps_new = get_listening_tcp_ports() | |
update_mappings(tcps_done, tcps_new) | |
sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment