Created
January 24, 2025 08:00
-
-
Save lc-at/ef516cf62e87e269de792364ab7f7f07 to your computer and use it in GitHub Desktop.
Simple p0f client in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ipaddress | |
import pathlib | |
import socket | |
import sys | |
QUERY_MAGIC = 0x50304601.to_bytes(4, byteorder=sys.byteorder) | |
RESPONSE_MAGIC = 0x50304602.to_bytes(4, byteorder=sys.byteorder) | |
STATUS_BADQUERY = 0x00.to_bytes(4, byteorder=sys.byteorder) | |
STATUS_OK = 0x10.to_bytes(4, byteorder=sys.byteorder) | |
STATUS_NOMATCH = 0x20.to_bytes(4, byteorder=sys.byteorder) | |
class P0fClient: | |
"""p0f client.""" | |
def __init__(self, socketPath: str) -> None: | |
"""Initialize p0f client.""" | |
if not pathlib.Path(socketPath).exists(): | |
raise FileNotFoundError("p0f socket not found", socketPath) | |
self.socketPath = socketPath | |
@staticmethod | |
def _buildQuery(ip: str) -> bytes: | |
"""Build a p0f query for the given IP address.""" | |
ip = ipaddress.ip_address(ip) | |
q = QUERY_MAGIC | |
q += ip.version.to_bytes(1) | |
q += ip.packed.ljust(16, b"\x00") | |
return q | |
def query(self, ip: str) -> dict: | |
"""Query p0f for the given IP address.""" | |
res = {} | |
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: | |
s.connect(self.socketPath) | |
s.sendall(self._buildQquery(ip)) | |
magic = s.recv(4) | |
if magic != RESPONSE_MAGIC: | |
raise ValueError("Received invalid response magic", magic.hex()) | |
status = s.recv(4) | |
if status != STATUS_OK: | |
raise ValueError("Received not OK response status", status.hex()) | |
res["firstSeen"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["lastSeen"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["totalConn"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["uptimeMin"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["upModDays"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["lastNAT"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["lastChg"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder) | |
res["distance"] = int.from_bytes( | |
s.recv(2), | |
byteorder=sys.byteorder, | |
signed=True, | |
) | |
res["badSw"] = int.from_bytes(s.recv(1), byteorder=sys.byteorder) | |
res["osMatchQ"] = int.from_bytes(s.recv(1), byteorder=sys.byteorder) | |
res["osName"] = s.recv(32).decode().strip("\x00") | |
res["osFlavor"] = s.recv(32).decode().strip("\x00") | |
res["httpName"] = s.recv(32).decode().strip("\x00") | |
res["httpFlavor"] = s.recv(32).decode().strip("\x00") | |
res["linkType"] = s.recv(32).decode().strip("\x00") | |
res["language"] = s.recv(32).decode().strip("\x00") | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment