Created
January 16, 2025 01:07
-
-
Save Romern/517d471edd971538093e454b4ab1eb0f to your computer and use it in GitHub Desktop.
Windows Search Protocol Python implementation (WIP: only CPMCreateQueryIn correctly implemented)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from wsp.packets import ( | |
CPMConnectIn, CPMCreateQueryIn, CPMDisconnect, | |
CColumnSet, CRestrictionArray, CRowsetProperties, CPidMapper, | |
CColumnGroupArray, PropSpec, CPropertyRestriction, CRestriction | |
) | |
import uuid | |
def hex_dump(data: bytes, prefix: str = "") -> str: | |
hex_str = ''.join([f'{b:02x}' for b in data]) | |
return f"{prefix}{hex_str}" | |
def create_connect_packet() -> bytes: | |
connect = CPMConnectIn( | |
_iClientVersion=0x00000102, # Protocol version | |
_fClientIsRemote=1, # Running on different machine | |
MachineName="PYTHON-CLIENT", | |
UserName="PythonUser", | |
cPropSets=0, | |
cExtPropSet=0 | |
) | |
return connect.to_bytes() | |
def create_query_packet() -> bytes: | |
column_set = CColumnSet(indexes=[0,1]) | |
storage_guid = uuid.UUID("b725f130-47ef-101a-a5f1-02608c9eebac") # Storage | |
scope_prop = PropSpec( | |
guid=storage_guid, | |
ulKind=0x01, # PRSPEC_PROPID | |
propid=0x16 # Scope | |
) | |
prop_restriction = CPropertyRestriction( | |
relop=4, # PREQ | |
Property=scope_prop, | |
prval="file:////WIN10VM2/SomeFolder", | |
lcid=0x2000 | |
) | |
restriction = CRestriction( | |
ulType=5, # RTProperty | |
Weight=1000, | |
Restriction=prop_restriction | |
) | |
restriction_array = CRestrictionArray( | |
restrictions=[restriction] | |
) | |
scope = PropSpec( | |
guid=storage_guid, | |
ulKind=0x01, | |
propid=0x16 | |
) | |
itemfolderpathdisplay = PropSpec( | |
guid=uuid.UUID("e3e0584c-b788-4a5a-bb20-7f5a44c9acdd"), | |
ulKind=0x01, | |
propid=0x06 | |
) | |
itemname = PropSpec( | |
guid=uuid.UUID("6b8da074-3b5c-43bc-886f-0a2cdce00b6f"), | |
ulKind=0x01, | |
propid=0x64 | |
) | |
pid_mapper = CPidMapper( | |
PropSpecs=[ | |
itemfolderpathdisplay, | |
itemname, | |
scope | |
] | |
) | |
rowset_props = CRowsetProperties( | |
uBooleanOptions=0x00000001, | |
ulMaxOpenRows=0, | |
ulMemUsage=0, | |
cMaxResults=10, | |
cCmdTimeout=30 | |
) | |
group_array = CColumnGroupArray() | |
packet = CPMCreateQueryIn( | |
ColumnSet=column_set, | |
RestrictionArray=restriction_array, | |
RowSetProperties=rowset_props, | |
PidMapper=pid_mapper, | |
GroupArray=group_array | |
) | |
return packet.to_bytes() | |
def create_disconnect_packet() -> bytes: | |
disconnect = CPMDisconnect() | |
return disconnect.to_bytes() | |
def main(): | |
print("\n1. CPMConnectIn packet") | |
connect_bytes = create_connect_packet() | |
print(hex_dump(connect_bytes, " ")) | |
print("\n2. CPMCreateQueryIn packet") | |
query_bytes = create_query_packet() | |
print(hex_dump(query_bytes, " ")) | |
print("\n3. CPMDisconnect packet") | |
disconnect_bytes = create_disconnect_packet() | |
print(hex_dump(disconnect_bytes, " ")) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass, field | |
from enum import IntEnum | |
import struct | |
from typing import List, Optional, Any | |
import uuid | |
# TODO: Define constants for guids and property values | |
# TODO: Implement remaining classes needed for connectIn | |
# TODO: see what a minimal packet is to trigger wsp coerce | |
def AlignWrite(buffer: bytearray, alignment: int): | |
while (len(buffer) % alignment != 0): | |
buffer.extend(b"\x00") | |
def AddAlign(buffer: bytearray, t: bytes, alignment: int): | |
AlignWrite(buffer, alignment) | |
buffer.extend(t) | |
ChecksumMagicNumber = 0x59533959 | |
def CalculateChecksum(buffer: bytes, _msg: int): | |
checksum = sum(int.from_bytes(buffer[i:i+4], "little") for i in range(0, len(buffer), 4)) | |
checksum ^= ChecksumMagicNumber | |
checksum -= _msg | |
return checksum & 0xffffffff | |
class WspMessageType(IntEnum): | |
CPMConnectIn = 0x000000C8 | |
CPMConnectOut = 0x000000C8 | |
CPMDisconnect = 0x000000C9 | |
CPMCreateQueryIn = 0x000000CA | |
CPMCreateQueryOut = 0x000000CA | |
CPMFreeCursorIn = 0x000000CB | |
CPMFreeCursorOut = 0x000000CB | |
CPMGetRowsIn = 0x000000CC | |
CPMGetRowsOut = 0x000000CC | |
@dataclass | |
class WspMessageHeader: | |
_msg: WspMessageType | |
_status: int = 0 | |
_ulChecksum: int = 0 | |
_ulReserved2: int = 0 | |
def to_bytes(self) -> bytes: | |
return struct.pack("<IIII", | |
self._msg, | |
self._status, | |
self._ulChecksum, | |
self._ulReserved2 | |
) | |
@dataclass | |
class PropSpec: | |
guid: uuid.UUID | |
ulKind: int # PRSPEC_PROPID = 1 | |
propid: int | |
def to_bytes(self, buffer: bytearray): | |
AddAlign(buffer, self.guid.bytes_le, 8) | |
buffer.extend(struct.pack("<II", self.ulKind, self.propid)) | |
@dataclass | |
class CColumnSet: | |
indexes: List[int] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", len(self.indexes))) | |
for i in self.indexes: | |
buffer.extend(struct.pack("<I", i)) | |
class CDbColId_eKind_Values(IntEnum): | |
DBKIND_GUID_NAME = 0x00000000 | |
DBKIND_GUID_PROPID = 0x00000001 | |
@dataclass | |
class CDbColId: | |
eKind: CDbColId_eKind_Values | |
GUID: uuid.UUID | |
ulId: int | |
vString: str | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", self.eKind)) | |
AddAlign(buffer, struct.pack("<I", self.GUID.bytes_le), 8) | |
buffer.extend(struct.pack("<I", self.ulId)) | |
if self.eKind == CDbColId_eKind_Values.DBKIND_GUID_NAME: | |
raise NotImplementedError() | |
@dataclass | |
class CProp: | |
DBPROPID: int | |
DBPROPOPTIONS: int | |
DBPROPSTATUS: int | |
colid: CDbColId | |
vValue: CBaseStorageVariant | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", self.DBPROPID)) | |
buffer.extend(struct.pack("<I", self.DBPROPOPTIONS)) | |
self.colid.to_bytes(buffer) | |
self.vValue.to_bytes(buffer) | |
@dataclass | |
class CPropSet: | |
guidPropertySet: uuid.UUID | |
aProps: List[int] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(self.guidPropertySet.bytes_le) | |
AddAlign(buffer, struct.pack("<I", len(self.aProps))) | |
for i in self.aProps: | |
AlignWrite(buffer, 4) | |
i.to_bytes(buffer) | |
@dataclass | |
class CPropertyRestriction: | |
relop: int # PREQ = 4 | |
Property: PropSpec | |
prval: str # VT_LPWSTR value | |
lcid: int | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", self.relop)) | |
self.Property.to_bytes(buffer) | |
AlignWrite(buffer,4) | |
buffer.extend(struct.pack("<I", 0x1F)) # VT_LPWSTR type | |
str_bytes = (self.prval + "\0").encode('utf-16le') | |
str_len = len(str_bytes) // 2 # Length in characters (16-bit) | |
buffer.extend(struct.pack("<I", str_len)) # String length | |
buffer.extend(str_bytes) | |
AddAlign(buffer, struct.pack("<I", self.lcid), 4) | |
@dataclass | |
class CRestriction: | |
ulType: int = 0 | |
Weight: int = 0 | |
Restriction: Any = None | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", self.ulType)) # Type first | |
buffer.extend(struct.pack("<I", self.Weight)) # Weight second | |
if self.Restriction is not None: | |
self.Restriction.to_bytes(buffer) | |
@dataclass | |
class CRestrictionArray: | |
restrictions: List[CRestriction] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<B", len(self.restrictions))) | |
buffer.extend(struct.pack("<B", 1 if len(self.restrictions)>0 else 0)) | |
AlignWrite(buffer,4) | |
for restriction in self.restrictions: | |
restriction.to_bytes(buffer) | |
@dataclass | |
class CSortSet: | |
sortArray: List[int] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", len(self.sortArray))) | |
for sort in self.sortArray: | |
buffer.extend(struct.pack("<I", sort)) | |
@dataclass | |
class CInGroupSortAggregSets: | |
Reserved: int = 0 | |
SortSets: List[CSortSet] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<II", len(self.SortSets), self.Reserved)) | |
for sort_set in self.SortSets: | |
sort_set.to_bytes(buffer) | |
@dataclass | |
class CCategSpec: | |
def to_bytes(self) -> bytes: | |
return bytes() | |
@dataclass | |
class CCategorizationSpec: | |
csColumns: CColumnSet = field(default_factory=CColumnSet) | |
Spec: CCategSpec = field(default_factory=CCategSpec) | |
def to_bytes(self, buffer: bytearray): | |
self.csColumns.to_bytes(buffer) | |
self.Spec.to_bytes(buffer) | |
@dataclass | |
class CCategorizationSet: | |
categories: List[CCategorizationSpec] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", len(self.categories))) | |
for category in self.categories: | |
category.to_bytes(buffer) | |
@dataclass | |
class CRowsetProperties: | |
uBooleanOptions: int = 0x00000001 | |
ulMaxOpenRows: int = 0 | |
ulMemUsage: int = 0 | |
cMaxResults: int = 10 | |
cCmdTimeout: int = 30 | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<IIIII", | |
self.uBooleanOptions, | |
self.ulMaxOpenRows, | |
self.ulMemUsage, | |
self.cMaxResults, | |
self.cCmdTimeout | |
)) | |
@dataclass | |
class CPidMapper: | |
PropSpecs: List[PropSpec] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", len(self.PropSpecs))) | |
for prop_spec in self.PropSpecs: | |
prop_spec.to_bytes(buffer) | |
@dataclass | |
class CColumnGroup: | |
def to_bytes(self, buffer: bytearray): | |
pass | |
@dataclass | |
class CColumnGroupArray: | |
aGroupArray: List[CColumnGroup] = field(default_factory=list) | |
def to_bytes(self, buffer: bytearray): | |
buffer.extend(struct.pack("<I", len(self.aGroupArray))) | |
for group in self.aGroupArray: | |
group.to_bytes(buffer) | |
@dataclass | |
class CPMCreateQueryIn: | |
ColumnSet: Optional[CColumnSet] = None | |
RestrictionArray: Optional[CRestrictionArray] = None | |
SortSet: Optional[CSortSet] = None | |
CCategorizationSet: Optional[CCategorizationSet] = None | |
RowSetProperties: Optional[CRowsetProperties] = None | |
PidMapper: Optional[CPidMapper] = None | |
GroupArray: Optional[CColumnGroupArray] = None | |
Lcid: int = 0x2000 | |
def to_bytes(self) -> bytes: | |
header = WspMessageHeader(_msg=WspMessageType.CPMCreateQueryIn) | |
body = self._get_body_bytes() | |
# Calculate checksum | |
header._ulChecksum = CalculateChecksum(body, WspMessageType.CPMCreateQueryIn) | |
return header.to_bytes() + body | |
def _get_body_bytes(self) -> bytes: | |
temp_buffer = bytearray() | |
# First write size (will update later) | |
temp_buffer.extend(struct.pack("<I", 0)) | |
# Write present flags as individual bytes | |
temp_buffer.append(0x01 if self.ColumnSet else 0x00) # CColumnSetPresent | |
if self.ColumnSet: | |
AlignWrite(temp_buffer,4) | |
self.ColumnSet.to_bytes(temp_buffer) | |
temp_buffer.append(0x01 if self.RestrictionArray else 0x00) # CRestrictionPresent | |
if self.RestrictionArray: | |
self.RestrictionArray.to_bytes(temp_buffer) | |
temp_buffer.append(0x00) # CSortSetPresent (not used) | |
temp_buffer.append(0x00) # CCategorizationSetPresent (not used) | |
AlignWrite(temp_buffer,4) | |
# Add RowSetProperties | |
self.RowSetProperties.to_bytes(temp_buffer) | |
# Add PidMapper | |
self.PidMapper.to_bytes(temp_buffer) | |
# Add GroupArray | |
self.GroupArray.to_bytes(temp_buffer) | |
# Add Lcid | |
temp_buffer.extend(struct.pack("<I", self.Lcid)) | |
# Update size | |
size = len(temp_buffer) | |
temp_buffer[0:4] = struct.pack("<I", size) | |
return bytes(temp_buffer) | |
@dataclass | |
class CPMConnectIn: | |
_iClientVersion: int = 0x00000102 | |
_fClientIsRemote: int = 0x00000001 | |
_cbBlob1: int = 0 | |
_cbBlob2: int = 0 | |
MachineName: str = "PYTHON-CLIENT" | |
UserName: str = "PythonUser" | |
cPropSets: int = 0 | |
cExtPropSet: int = 0 | |
def to_bytes(self) -> bytes: | |
header = WspMessageHeader(_msg=WspMessageType.CPMConnectIn) | |
body = struct.pack("<IIII", | |
self._iClientVersion, | |
self._fClientIsRemote, | |
self._cbBlob1, | |
self._cbBlob2 | |
) | |
machine_name = (self.MachineName + "\0").encode('utf-16le') | |
user_name = (self.UserName + "\0").encode('utf-16le') | |
body += machine_name + user_name | |
body += struct.pack("<II", self.cPropSets, self.cExtPropSet) | |
header._ulChecksum = CalculateChecksum(body, WspMessageType.CPMCreateQueryIn) | |
return header.to_bytes() + body | |
@dataclass | |
class CPMDisconnect: | |
def to_bytes(self) -> bytes: | |
header = WspMessageHeader(_msg=WspMessageType.CPMDisconnect) | |
# No body for disconnect message | |
return header.to_bytes() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment