Created
November 5, 2025 05:01
-
-
Save achantavy/d5f91238255076e11152acdbacd9eb81 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from unificontrol import UnifiClient | |
| from typing import Any | |
| from typing import Dict | |
| from typing import List | |
| from cartography.client.core.tx import load_graph_data | |
| from dataclasses import dataclass | |
| import neo4j | |
| from neo4j import GraphDatabase | |
| from cartography.client.core.tx import load_graph_data | |
| from cartography.graph.model import CartographyNodeProperties | |
| from cartography.graph.model import CartographyNodeSchema | |
| from cartography.graph.model import CartographyRelProperties | |
| from cartography.graph.model import CartographyRelSchema | |
| from cartography.graph.model import LinkDirection | |
| from cartography.graph.model import make_target_node_matcher | |
| from cartography.graph.model import PropertyRef | |
| from cartography.graph.model import TargetNodeMatcher | |
| from cartography.graph.querybuilder import build_ingestion_query | |
| HOST = 'localhost' # hostname where your unifi controller is | |
| USERNAME = 'admin' | |
| PASSWORD = 'adminpwforunifi' | |
| SITE = 'default' | |
| def get_unifi_clients(client: UnifiClient) -> List[Dict[str, Any]]: | |
| clients = client.list_clients() | |
| return clients | |
| def get_unifi_devices(client: UnifiClient) -> List[Dict[str, Any]]: | |
| devices = client.list_devices_basic() | |
| return devices | |
| @dataclass(frozen=True) | |
| class UnifiClientProps(CartographyNodeProperties): | |
| id: PropertyRef = PropertyRef('mac') | |
| lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True) | |
| is_guest: PropertyRef = PropertyRef('is_guest') | |
| mac: PropertyRef = PropertyRef('mac') | |
| ip: PropertyRef = PropertyRef('ip') | |
| oui: PropertyRef = PropertyRef('oui') | |
| satisfaction: PropertyRef = PropertyRef('satisfaction') | |
| channel: PropertyRef = PropertyRef('channel') | |
| radio: PropertyRef = PropertyRef('radio') | |
| is_wired: PropertyRef = PropertyRef('is_wired') | |
| qos_policy_applied: PropertyRef = PropertyRef('qos_policy_applied') | |
| @dataclass(frozen=True) | |
| class UbntClientToDeviceRelProps(CartographyRelProperties): | |
| lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True) | |
| @dataclass(frozen=True) | |
| class UbntClientToDevice(CartographyRelSchema): | |
| target_node_label: str = 'UnifiDevice' | |
| target_node_matcher: TargetNodeMatcher = make_target_node_matcher( | |
| {'id': PropertyRef('ap_mac')}, | |
| ) | |
| direction: LinkDirection = LinkDirection.INWARD | |
| rel_label: str = "RESOURCE" | |
| properties: UbntClientToDeviceRelProps = UbntClientToDeviceRelProps() | |
| @dataclass(frozen=True) | |
| class UbntClient(CartographyNodeSchema): | |
| label: str = 'UnifiClient' | |
| properties: UnifiClientProps = UnifiClientProps() | |
| sub_resource_relationship: UbntClientToDevice = UbntClientToDevice() | |
| @dataclass(frozen=True) | |
| class UbntDeviceProps(CartographyNodeProperties): | |
| id: PropertyRef = PropertyRef('mac') | |
| lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True) | |
| mac: PropertyRef = PropertyRef('mac') | |
| adopted: PropertyRef = PropertyRef('adopted') | |
| type_name: PropertyRef = PropertyRef('type') | |
| model: PropertyRef = PropertyRef('model') | |
| name: PropertyRef = PropertyRef('name') | |
| @dataclass(frozen=True) | |
| class UbntDevice(CartographyNodeSchema): | |
| label: str = 'UnifiDevice' | |
| properties: UbntDeviceProps = UbntDeviceProps() | |
| def load_unifi_devices( | |
| neo4j_session: neo4j.Session, | |
| device_data: List[Dict], | |
| update_tag: int, | |
| ) -> None: | |
| ingestion_query = build_ingestion_query(UbntDevice()) | |
| load_graph_data( | |
| neo4j_session, | |
| ingestion_query, | |
| device_data, | |
| lastupdated=update_tag, | |
| ) | |
| def load_unifi_clients( | |
| neo4j_session: neo4j.Session, | |
| device_data: List[Dict], | |
| update_tag: int, | |
| ) -> None: | |
| ingestion_query = build_ingestion_query(UbntClient()) | |
| load_graph_data( | |
| neo4j_session, | |
| ingestion_query, | |
| device_data, | |
| lastupdated=update_tag, | |
| ) | |
| def sync_unifi_clients(client: UbntClient, neo4j_session: neo4j.Session, update_tag: int) -> None: | |
| clients = get_unifi_clients(client) | |
| load_unifi_clients(neo4j_session, clients, update_tag) | |
| def sync_unifi_devices(client: UbntClient, neo4j_session: neo4j.Session, update_tag: int) -> None: | |
| devices = get_unifi_devices(client) | |
| load_unifi_devices(neo4j_session, devices, update_tag) | |
| def sync_unifi(neo4j_session: neo4j.Session, update_tag: int) -> None: | |
| client = UnifiClient(host=HOST, username=USERNAME, password=PASSWORD, site=SITE) | |
| sync_unifi_devices(client, neo4j_session, update_tag) | |
| sync_unifi_clients(client, neo4j_session, update_tag) | |
| disconnects = neo4j_session.run( | |
| """ | |
| MATCH (n:UnifiClient) WHERE n.lastupdated <> $UPDATE_TAG | |
| RETURN n.mac as mac, n.oui as oui, n.lastupdated as timestamp | |
| """, | |
| UPDATE_TAG=update_tag, | |
| ).data() | |
| if disconnects: | |
| print("disconnect", disconnects) | |
| neo4j_session.run( | |
| """ | |
| MATCH (n:UnifiClient) WHERE n.lastupdated <> $UPDATE_TAG DETACH DELETE n | |
| """, | |
| UPDATE_TAG=update_tag, | |
| ) | |
| changers = neo4j_session.run( | |
| """ | |
| MATCH (n:UnifiClient)-[r:RESOURCE]-(d:UnifiDevice) WHERE r.lastupdated <> $UPDATE_TAG | |
| RETURN n.mac as mac, n.oui as oui, d.name as ap, n.lastupdated as timestamp | |
| """, | |
| UPDATE_TAG=update_tag, | |
| ).data() | |
| if changers: | |
| print('roam', changers) | |
| neo4j_session.run( | |
| """ | |
| MATCH (:UnifiClient)-[r:RESOURCE]-(:UnifiDevice) WHERE r.lastupdated <> $UPDATE_TAG DELETE r | |
| """, | |
| UPDATE_TAG=update_tag, | |
| ) | |
| if __name__ == '__main__': | |
| neo4j_driver = GraphDatabase.driver('bolt://localhost:7687') | |
| while True: | |
| with neo4j_driver.session() as session: | |
| update_tag = int(time.time()) | |
| try: | |
| sync_unifi(session, update_tag) | |
| except: | |
| print('fail, retrying') | |
| continue | |
| time.sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment