Skip to content

Instantly share code, notes, and snippets.

@Romern
Created January 16, 2025 01:07
Show Gist options
  • Save Romern/517d471edd971538093e454b4ab1eb0f to your computer and use it in GitHub Desktop.
Save Romern/517d471edd971538093e454b4ab1eb0f to your computer and use it in GitHub Desktop.
Windows Search Protocol Python implementation (WIP: only CPMCreateQueryIn correctly implemented)
from wsp.packets import (
CPMConnectIn, CPMCreateQueryIn, CPMDisconnect,
CColumnSet, CRestrictionArray, CRowsetProperties, CPidMapper,
CColumnGroupArray, PropSpec, CPropertyRestriction, CRestriction
)
import uuid
def hex_dump(data: bytes, prefix: str = "") -> str:
hex_str = ''.join([f'{b:02x}' for b in data])
return f"{prefix}{hex_str}"
def create_connect_packet() -> bytes:
connect = CPMConnectIn(
_iClientVersion=0x00000102, # Protocol version
_fClientIsRemote=1, # Running on different machine
MachineName="PYTHON-CLIENT",
UserName="PythonUser",
cPropSets=0,
cExtPropSet=0
)
return connect.to_bytes()
def create_query_packet() -> bytes:
column_set = CColumnSet(indexes=[0,1])
storage_guid = uuid.UUID("b725f130-47ef-101a-a5f1-02608c9eebac") # Storage
scope_prop = PropSpec(
guid=storage_guid,
ulKind=0x01, # PRSPEC_PROPID
propid=0x16 # Scope
)
prop_restriction = CPropertyRestriction(
relop=4, # PREQ
Property=scope_prop,
prval="file:////WIN10VM2/SomeFolder",
lcid=0x2000
)
restriction = CRestriction(
ulType=5, # RTProperty
Weight=1000,
Restriction=prop_restriction
)
restriction_array = CRestrictionArray(
restrictions=[restriction]
)
scope = PropSpec(
guid=storage_guid,
ulKind=0x01,
propid=0x16
)
itemfolderpathdisplay = PropSpec(
guid=uuid.UUID("e3e0584c-b788-4a5a-bb20-7f5a44c9acdd"),
ulKind=0x01,
propid=0x06
)
itemname = PropSpec(
guid=uuid.UUID("6b8da074-3b5c-43bc-886f-0a2cdce00b6f"),
ulKind=0x01,
propid=0x64
)
pid_mapper = CPidMapper(
PropSpecs=[
itemfolderpathdisplay,
itemname,
scope
]
)
rowset_props = CRowsetProperties(
uBooleanOptions=0x00000001,
ulMaxOpenRows=0,
ulMemUsage=0,
cMaxResults=10,
cCmdTimeout=30
)
group_array = CColumnGroupArray()
packet = CPMCreateQueryIn(
ColumnSet=column_set,
RestrictionArray=restriction_array,
RowSetProperties=rowset_props,
PidMapper=pid_mapper,
GroupArray=group_array
)
return packet.to_bytes()
def create_disconnect_packet() -> bytes:
disconnect = CPMDisconnect()
return disconnect.to_bytes()
def main():
print("\n1. CPMConnectIn packet")
connect_bytes = create_connect_packet()
print(hex_dump(connect_bytes, " "))
print("\n2. CPMCreateQueryIn packet")
query_bytes = create_query_packet()
print(hex_dump(query_bytes, " "))
print("\n3. CPMDisconnect packet")
disconnect_bytes = create_disconnect_packet()
print(hex_dump(disconnect_bytes, " "))
if __name__ == "__main__":
main()
from dataclasses import dataclass, field
from enum import IntEnum
import struct
from typing import List, Optional, Any
import uuid
# TODO: Define constants for guids and property values
# TODO: Implement remaining classes needed for connectIn
# TODO: see what a minimal packet is to trigger wsp coerce
def AlignWrite(buffer: bytearray, alignment: int):
while (len(buffer) % alignment != 0):
buffer.extend(b"\x00")
def AddAlign(buffer: bytearray, t: bytes, alignment: int):
AlignWrite(buffer, alignment)
buffer.extend(t)
ChecksumMagicNumber = 0x59533959
def CalculateChecksum(buffer: bytes, _msg: int):
checksum = sum(int.from_bytes(buffer[i:i+4], "little") for i in range(0, len(buffer), 4))
checksum ^= ChecksumMagicNumber
checksum -= _msg
return checksum & 0xffffffff
class WspMessageType(IntEnum):
CPMConnectIn = 0x000000C8
CPMConnectOut = 0x000000C8
CPMDisconnect = 0x000000C9
CPMCreateQueryIn = 0x000000CA
CPMCreateQueryOut = 0x000000CA
CPMFreeCursorIn = 0x000000CB
CPMFreeCursorOut = 0x000000CB
CPMGetRowsIn = 0x000000CC
CPMGetRowsOut = 0x000000CC
@dataclass
class WspMessageHeader:
_msg: WspMessageType
_status: int = 0
_ulChecksum: int = 0
_ulReserved2: int = 0
def to_bytes(self) -> bytes:
return struct.pack("<IIII",
self._msg,
self._status,
self._ulChecksum,
self._ulReserved2
)
@dataclass
class PropSpec:
guid: uuid.UUID
ulKind: int # PRSPEC_PROPID = 1
propid: int
def to_bytes(self, buffer: bytearray):
AddAlign(buffer, self.guid.bytes_le, 8)
buffer.extend(struct.pack("<II", self.ulKind, self.propid))
@dataclass
class CColumnSet:
indexes: List[int] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", len(self.indexes)))
for i in self.indexes:
buffer.extend(struct.pack("<I", i))
class CDbColId_eKind_Values(IntEnum):
DBKIND_GUID_NAME = 0x00000000
DBKIND_GUID_PROPID = 0x00000001
@dataclass
class CDbColId:
eKind: CDbColId_eKind_Values
GUID: uuid.UUID
ulId: int
vString: str
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", self.eKind))
AddAlign(buffer, struct.pack("<I", self.GUID.bytes_le), 8)
buffer.extend(struct.pack("<I", self.ulId))
if self.eKind == CDbColId_eKind_Values.DBKIND_GUID_NAME:
raise NotImplementedError()
@dataclass
class CProp:
DBPROPID: int
DBPROPOPTIONS: int
DBPROPSTATUS: int
colid: CDbColId
vValue: CBaseStorageVariant
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", self.DBPROPID))
buffer.extend(struct.pack("<I", self.DBPROPOPTIONS))
self.colid.to_bytes(buffer)
self.vValue.to_bytes(buffer)
@dataclass
class CPropSet:
guidPropertySet: uuid.UUID
aProps: List[int] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(self.guidPropertySet.bytes_le)
AddAlign(buffer, struct.pack("<I", len(self.aProps)))
for i in self.aProps:
AlignWrite(buffer, 4)
i.to_bytes(buffer)
@dataclass
class CPropertyRestriction:
relop: int # PREQ = 4
Property: PropSpec
prval: str # VT_LPWSTR value
lcid: int
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", self.relop))
self.Property.to_bytes(buffer)
AlignWrite(buffer,4)
buffer.extend(struct.pack("<I", 0x1F)) # VT_LPWSTR type
str_bytes = (self.prval + "\0").encode('utf-16le')
str_len = len(str_bytes) // 2 # Length in characters (16-bit)
buffer.extend(struct.pack("<I", str_len)) # String length
buffer.extend(str_bytes)
AddAlign(buffer, struct.pack("<I", self.lcid), 4)
@dataclass
class CRestriction:
ulType: int = 0
Weight: int = 0
Restriction: Any = None
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", self.ulType)) # Type first
buffer.extend(struct.pack("<I", self.Weight)) # Weight second
if self.Restriction is not None:
self.Restriction.to_bytes(buffer)
@dataclass
class CRestrictionArray:
restrictions: List[CRestriction] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<B", len(self.restrictions)))
buffer.extend(struct.pack("<B", 1 if len(self.restrictions)>0 else 0))
AlignWrite(buffer,4)
for restriction in self.restrictions:
restriction.to_bytes(buffer)
@dataclass
class CSortSet:
sortArray: List[int] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", len(self.sortArray)))
for sort in self.sortArray:
buffer.extend(struct.pack("<I", sort))
@dataclass
class CInGroupSortAggregSets:
Reserved: int = 0
SortSets: List[CSortSet] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<II", len(self.SortSets), self.Reserved))
for sort_set in self.SortSets:
sort_set.to_bytes(buffer)
@dataclass
class CCategSpec:
def to_bytes(self) -> bytes:
return bytes()
@dataclass
class CCategorizationSpec:
csColumns: CColumnSet = field(default_factory=CColumnSet)
Spec: CCategSpec = field(default_factory=CCategSpec)
def to_bytes(self, buffer: bytearray):
self.csColumns.to_bytes(buffer)
self.Spec.to_bytes(buffer)
@dataclass
class CCategorizationSet:
categories: List[CCategorizationSpec] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", len(self.categories)))
for category in self.categories:
category.to_bytes(buffer)
@dataclass
class CRowsetProperties:
uBooleanOptions: int = 0x00000001
ulMaxOpenRows: int = 0
ulMemUsage: int = 0
cMaxResults: int = 10
cCmdTimeout: int = 30
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<IIIII",
self.uBooleanOptions,
self.ulMaxOpenRows,
self.ulMemUsage,
self.cMaxResults,
self.cCmdTimeout
))
@dataclass
class CPidMapper:
PropSpecs: List[PropSpec] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", len(self.PropSpecs)))
for prop_spec in self.PropSpecs:
prop_spec.to_bytes(buffer)
@dataclass
class CColumnGroup:
def to_bytes(self, buffer: bytearray):
pass
@dataclass
class CColumnGroupArray:
aGroupArray: List[CColumnGroup] = field(default_factory=list)
def to_bytes(self, buffer: bytearray):
buffer.extend(struct.pack("<I", len(self.aGroupArray)))
for group in self.aGroupArray:
group.to_bytes(buffer)
@dataclass
class CPMCreateQueryIn:
ColumnSet: Optional[CColumnSet] = None
RestrictionArray: Optional[CRestrictionArray] = None
SortSet: Optional[CSortSet] = None
CCategorizationSet: Optional[CCategorizationSet] = None
RowSetProperties: Optional[CRowsetProperties] = None
PidMapper: Optional[CPidMapper] = None
GroupArray: Optional[CColumnGroupArray] = None
Lcid: int = 0x2000
def to_bytes(self) -> bytes:
header = WspMessageHeader(_msg=WspMessageType.CPMCreateQueryIn)
body = self._get_body_bytes()
# Calculate checksum
header._ulChecksum = CalculateChecksum(body, WspMessageType.CPMCreateQueryIn)
return header.to_bytes() + body
def _get_body_bytes(self) -> bytes:
temp_buffer = bytearray()
# First write size (will update later)
temp_buffer.extend(struct.pack("<I", 0))
# Write present flags as individual bytes
temp_buffer.append(0x01 if self.ColumnSet else 0x00) # CColumnSetPresent
if self.ColumnSet:
AlignWrite(temp_buffer,4)
self.ColumnSet.to_bytes(temp_buffer)
temp_buffer.append(0x01 if self.RestrictionArray else 0x00) # CRestrictionPresent
if self.RestrictionArray:
self.RestrictionArray.to_bytes(temp_buffer)
temp_buffer.append(0x00) # CSortSetPresent (not used)
temp_buffer.append(0x00) # CCategorizationSetPresent (not used)
AlignWrite(temp_buffer,4)
# Add RowSetProperties
self.RowSetProperties.to_bytes(temp_buffer)
# Add PidMapper
self.PidMapper.to_bytes(temp_buffer)
# Add GroupArray
self.GroupArray.to_bytes(temp_buffer)
# Add Lcid
temp_buffer.extend(struct.pack("<I", self.Lcid))
# Update size
size = len(temp_buffer)
temp_buffer[0:4] = struct.pack("<I", size)
return bytes(temp_buffer)
@dataclass
class CPMConnectIn:
_iClientVersion: int = 0x00000102
_fClientIsRemote: int = 0x00000001
_cbBlob1: int = 0
_cbBlob2: int = 0
MachineName: str = "PYTHON-CLIENT"
UserName: str = "PythonUser"
cPropSets: int = 0
cExtPropSet: int = 0
def to_bytes(self) -> bytes:
header = WspMessageHeader(_msg=WspMessageType.CPMConnectIn)
body = struct.pack("<IIII",
self._iClientVersion,
self._fClientIsRemote,
self._cbBlob1,
self._cbBlob2
)
machine_name = (self.MachineName + "\0").encode('utf-16le')
user_name = (self.UserName + "\0").encode('utf-16le')
body += machine_name + user_name
body += struct.pack("<II", self.cPropSets, self.cExtPropSet)
header._ulChecksum = CalculateChecksum(body, WspMessageType.CPMCreateQueryIn)
return header.to_bytes() + body
@dataclass
class CPMDisconnect:
def to_bytes(self) -> bytes:
header = WspMessageHeader(_msg=WspMessageType.CPMDisconnect)
# No body for disconnect message
return header.to_bytes()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment