Skip to content

Instantly share code, notes, and snippets.

@shortstack
Created October 9, 2024 16:12
Show Gist options
  • Save shortstack/bb164a673820ec4d4d8132a392bb34c2 to your computer and use it in GitHub Desktop.
Save shortstack/bb164a673820ec4d4d8132a392bb34c2 to your computer and use it in GitHub Desktop.
Velociraptor API example
import json
import grpc
import app.libraries.pyvelociraptor
from app.libraries.pyvelociraptor import api_pb2
from app.libraries.pyvelociraptor import api_pb2_grpc
def query_endpoint_status(endpoint_id):
status = query_vr(
'SELECT last_seen_at as last_seen FROM clients() WHERE client_id="%s"'
% (endpoint_id),
dict(),
)
return status
def query_vr(query, env_dict):
config = app.libraries.pyvelociraptor.LoadConfigFile("/path/to/client.config.yaml")
creds = grpc.ssl_channel_credentials(
root_certificates=config["ca_certificate"].encode("utf8"),
private_key=config["client_private_key"].encode("utf8"),
certificate_chain=config["client_cert"].encode("utf8"),
)
options = (
(
"grpc.ssl_target_name_override",
"VelociraptorServer",
),
)
env = []
for k, v in env_dict.items():
env.append(dict(key=k, value=v))
data = []
with grpc.secure_channel(
config["api_connection_string"], creds, options
) as channel:
stub = api_pb2_grpc.APIStub(channel)
request = api_pb2.VQLCollectorArgs(
max_wait=1,
max_row=100,
Query=[
api_pb2.VQLRequest(
Name="Test",
VQL=query,
)
],
env=env,
)
for response in stub.Query(request):
if response.Response:
for item in json.loads(response.Response):
data.append(item)
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment