Skip to content

Instantly share code, notes, and snippets.

@lc-at
Created January 24, 2025 08:00
Show Gist options
  • Save lc-at/ef516cf62e87e269de792364ab7f7f07 to your computer and use it in GitHub Desktop.
Save lc-at/ef516cf62e87e269de792364ab7f7f07 to your computer and use it in GitHub Desktop.
Simple p0f client in Python
import ipaddress
import pathlib
import socket
import sys
QUERY_MAGIC = 0x50304601.to_bytes(4, byteorder=sys.byteorder)
RESPONSE_MAGIC = 0x50304602.to_bytes(4, byteorder=sys.byteorder)
STATUS_BADQUERY = 0x00.to_bytes(4, byteorder=sys.byteorder)
STATUS_OK = 0x10.to_bytes(4, byteorder=sys.byteorder)
STATUS_NOMATCH = 0x20.to_bytes(4, byteorder=sys.byteorder)
class P0fClient:
"""p0f client."""
def __init__(self, socketPath: str) -> None:
"""Initialize p0f client."""
if not pathlib.Path(socketPath).exists():
raise FileNotFoundError("p0f socket not found", socketPath)
self.socketPath = socketPath
@staticmethod
def _buildQuery(ip: str) -> bytes:
"""Build a p0f query for the given IP address."""
ip = ipaddress.ip_address(ip)
q = QUERY_MAGIC
q += ip.version.to_bytes(1)
q += ip.packed.ljust(16, b"\x00")
return q
def query(self, ip: str) -> dict:
"""Query p0f for the given IP address."""
res = {}
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(self.socketPath)
s.sendall(self._buildQquery(ip))
magic = s.recv(4)
if magic != RESPONSE_MAGIC:
raise ValueError("Received invalid response magic", magic.hex())
status = s.recv(4)
if status != STATUS_OK:
raise ValueError("Received not OK response status", status.hex())
res["firstSeen"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["lastSeen"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["totalConn"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["uptimeMin"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["upModDays"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["lastNAT"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["lastChg"] = int.from_bytes(s.recv(4), byteorder=sys.byteorder)
res["distance"] = int.from_bytes(
s.recv(2),
byteorder=sys.byteorder,
signed=True,
)
res["badSw"] = int.from_bytes(s.recv(1), byteorder=sys.byteorder)
res["osMatchQ"] = int.from_bytes(s.recv(1), byteorder=sys.byteorder)
res["osName"] = s.recv(32).decode().strip("\x00")
res["osFlavor"] = s.recv(32).decode().strip("\x00")
res["httpName"] = s.recv(32).decode().strip("\x00")
res["httpFlavor"] = s.recv(32).decode().strip("\x00")
res["linkType"] = s.recv(32).decode().strip("\x00")
res["language"] = s.recv(32).decode().strip("\x00")
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment