Last active
July 31, 2022 07:55
-
-
Save a3r0id/4a40f4a5b7c52ef789066685911acf2a to your computer and use it in GitHub Desktop.
Get IP + Geolocation info for all remote UDP/TCP connections of a process by name or substring.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psutil | |
from requests import get | |
from sys import argv | |
from threading import Thread | |
from scapy.all import sniff | |
import IPy | |
from os import popen | |
# By A3R0 - Get IP + Geolocation info for all remote UDP/TCP connections of a process by name. | |
if len(argv) == 1: | |
print("Usage: %s <process name or substring - not case sensitive>" % argv[0]) | |
# Example: python remote_tcp_udp.py chrome.exe | |
exit(1) | |
search_term = argv[1] | |
positive_results = [] | |
def get_ip_info(ip): | |
data = get("http://ip-api.com/json/%s?fields=66846719" % ip).json() | |
isp = data["isp"]# + ", " + data["org"] | |
geo = data["city"] + ", " + data["regionName"] + ", " + data["country"] | |
usageType = "Proxy: " + str(data["proxy"]) + ", Hosting: " + str(data["hosting"]) + ", Mobile: " + str(data["mobile"]) | |
return isp + " | " + geo + " | " + usageType | |
def processCurrentConnections(current): | |
if len(current) != 0: | |
for conn in current: | |
if conn.pid != 0: | |
try: | |
process = psutil.Process(conn.pid) | |
process_name = process.name() | |
if not search_term.lower() in process_name.lower(): continue | |
raddrIp = conn.raddr[0] | |
raddrPort = conn.raddr[1] | |
except: | |
continue | |
if raddrIp in positive_results: continue | |
if raddrIp in ["127.0.0.1", "localhost", "::1", "0.0.0.0"]: continue | |
if IPy.IP(raddrIp).iptype() != "PUBLIC": continue | |
# Add this remote address to the list of positive results | |
if raddrIp not in positive_results: positive_results.append(raddrIp) | |
try: | |
# Get the remote address's info | |
ip_info = get_ip_info(raddrIp) | |
except: | |
ip_info = "[Failed]" | |
print(f"[{conn.pid}] - {process_name} - {conn.status} - {raddrIp}:{raddrPort} - {ip_info}") | |
def getPIDSys(port): | |
return popen(f"netstat -a -n -o | find \"{port}\"").read().split()[3] | |
def udphandler(x): | |
try: | |
if x['IP']: | |
# check if ip is public | |
ipToCheck = "" | |
if IPy.IP(x['IP'].src).iptype() == 'PUBLIC': | |
ipToCheck = x['IP'].src | |
myPort = x['UDP'].dport | |
elif IPy.IP(x['IP'].dst).iptype() == 'PUBLIC': | |
ipToCheck = x['IP'].dst | |
myPort = x['UDP'].sport | |
else: | |
return | |
pid = getPIDSys(myPort) | |
if pid == "0": return | |
process = psutil.Process(int(pid)) | |
process_name = process.name() | |
if not search_term.lower() in process_name.lower(): return | |
if ipToCheck in positive_results: return | |
# Add this remote address to the list of positive results | |
if ipToCheck not in positive_results: | |
positive_results.append(ipToCheck) | |
try: | |
# Get the remote address's info | |
ip_info = get_ip_info(ipToCheck) | |
except: | |
ip_info = "[Failed]" | |
print(f"[{pid}] - {process_name} - [UDP] - {ipToCheck} - {ip_info}") | |
except: | |
pass | |
def startSniffThread(): | |
cap = sniff(filter="udp", prn=lambda x: udphandler(x)) | |
cap.summary() | |
print("\nUse CTRL + BREAK To Stop Sniffing\n") | |
# Starts UDP sniffing | |
t = Thread(target=startSniffThread) | |
t.start() | |
# Starts TCP thread | |
while True: | |
processCurrentConnections(psutil.net_connections(kind='tcp')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requirements.txt | |
requests==2.25.1 | |
scapy==2.4.5 | |
psutil==5.8.0 | |
IPy==1.01 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment