Skip to content

Instantly share code, notes, and snippets.

@emersonford
Last active July 13, 2022 11:13
Show Gist options
  • Save emersonford/85cc74f7f90c81d1b75c3c246124591d to your computer and use it in GitHub Desktop.
Save emersonford/85cc74f7f90c81d1b75c3c246124591d to your computer and use it in GitHub Desktop.
Python script to rewrite the `mac_smb_port` setting for Google Drive to a stable port number, thereby allowing the Spotlight index for it to be stable.
#!/usr/bin/env python3
# see https://apple.stackexchange.com/questions/420959/spotlight-indexing-of-google-drive-file-stream-disabled-on-restart/424445#424445 for an explanation
import os
from copy import deepcopy
from math import log
from pathlib import Path
DEBUG = 0
DRIVEFS_PATH = os.environ["HOME"] + "/Library/Application Support/Google/DriveFS"
def print_field(field_num, wire_type, pl, byts):
if DEBUG:
print(
"field",
field_num,
": wire_type",
wire_type,
": payload",
pl[0] if wire_type != 2 else byts[pl[1] : pl[2]],
": byte_size",
pl[2] - pl[1],
sep="\t",
)
# byte -> (byte, byte)
def split_msb(byt):
return (byt & 0b1000_0000) >> 7, byt & 0b0111_1111
def get_wire_type(varint):
return varint & 0b111
def get_field_number(varint):
return varint >> 3
# bytearray -> int
def decode_varint(bytarr, cursor):
varint = 0
contbit = 1
count = 0
while contbit:
contbit, val = split_msb(bytarr[cursor + count])
varint |= val << (count * 7)
count += 1
return varint, cursor + count
def decode_fixed(bytarr, cursor, byts):
val = 0
for i in range(byts):
val |= bytarr[cursor] << (i * 8)
cursor += 1
return val, cursor
# bytearray, int -> Either (cursor, wire_type, payload) or (wire_type, payload_start, payload_end)
def decode_bytes(bytarr, cursor):
key, cursor = decode_varint(bytarr, cursor)
field_num = get_field_number(key)
wire_type = get_wire_type(key)
if wire_type in {0, 1, 5}:
oldcursor = cursor
val, cursor = (
decode_varint(bytarr, cursor)
if wire_type == 0
else decode_fixed(bytarr, cursor, 8 if wire_type == 1 else 4)
)
return field_num, wire_type, cursor, (val, oldcursor, cursor)
if wire_type == 2:
length, cursor = decode_varint(bytarr, cursor)
return field_num, wire_type, cursor + length, (None, cursor, cursor + length)
raise RuntimeError("Unexpected file format")
def encode_int_to_varint(num, size):
# get signed version
try:
# account for varint overhead in length
tmp = num.to_bytes(length=(size * 7) // 8, byteorder="little", signed=True)
except OverflowError:
print("failed to pack integer", num, "to encode size", size)
raise
num = int.from_bytes(tmp, byteorder="little", signed=False)
retval = 0
for i in range(size):
contbit = 0b1000_0000 if i != size - 1 else 0b0000_0000
val = (0b111_1111 & num) | contbit
num = num >> 7
retval = retval << 8
retval |= val
return retval.to_bytes(length=size, byteorder="big", signed=False)
def check_valid_file(filebytes):
try:
field_num, wire_type, cursor, pl = decode_bytes(filebytes, 0)
if wire_type != 2 or field_num != 1:
return False
cursor = pl[1]
end_of_struct = pl[2]
while cursor != end_of_struct:
field_num, wire_type, cursor, pl = decode_bytes(filebytes, cursor)
print_field(field_num, wire_type, pl, filebytes)
field_num, wire_type, cursor, pl = decode_bytes(filebytes, cursor)
if wire_type != 2 or field_num != 2:
return False
cursor = pl[1]
end_of_struct = pl[2]
while cursor != end_of_struct:
field_num, wire_type, cursor, pl = decode_bytes(filebytes, cursor)
print_field(field_num, wire_type, pl, filebytes)
if cursor != len(filebytes):
return False
except RuntimeError:
return False
return True
def get_to_port_field(filebytes):
field_num, wire_type, cursor, pl = decode_bytes(filebytes, 0)
if wire_type != 2:
raise RuntimeError("Unexpected file format")
if DEBUG:
print("found first struct with field_num:", field_num, "start:", pl[1])
cursor = pl[1]
while field_num != 100:
field_num, wire_type, cursor, pl = decode_bytes(filebytes, cursor)
print_field(field_num, wire_type, pl, filebytes)
return field_num, wire_type, cursor, pl
def from_twos_complement(num):
return int.from_bytes(
num.to_bytes(length=8, byteorder="little", signed=False),
byteorder="little",
signed=True,
)
if __name__ == "__main__":
for file in os.listdir(DRIVEFS_PATH):
fp = DRIVEFS_PATH + "/" + file + "/core_feature_config"
if os.path.isfile(fp):
try:
with open(fp, "rb") as f:
filebytes = bytearray(f.read())
field_num, wire_type, cursor, pl = get_to_port_field(filebytes)
print(
"Found Google Drive FS Account",
file,
"with mac_smb_port:",
str(from_twos_complement(pl[0])) + "...",
)
confirm = input("Change this account's port (yes/no)? ")
if confirm != "yes":
print("Skipping...")
continue
new_port = int(input("To what unused port (-1 for random OS port)? "))
if not (new_port == -1 or (1024 < new_port <= 65535)):
print(
"Port number must either be -1 or between 1024 (exclusive) and 65535 (inclusive)."
)
raise RuntimeError("Invalid port number given!")
newfilebytes = deepcopy(filebytes)
newfilebytes[pl[1] : pl[2]] = encode_int_to_varint(
new_port, pl[2] - pl[1]
)
print(
"Checking if proposed write to",
fp,
"is valid... ",
end="",
)
if not check_valid_file(newfilebytes):
print("FAILED!")
raise RuntimeError(
"Proposed writes are not valid, core_feature_config will not be changed."
)
else:
print("OK!")
print("Writing changes... ", end="")
with open(fp, "wb") as f:
f.write(newfilebytes)
print("OK!")
print(
"**Please manually run `mdutil -i on",
str(Path(DRIVEFS_PATH + "/" + file + "/my-drive").resolve().parent)
+ "` AFTER restarting Google Drive to enable Spotlight for this account.**",
)
except (OverflowError, RuntimeError) as e:
print("Failed to process account", file + ":", e)
continue
print(">> All accounts processed, done! <<")
print(
">> If any account's port was changed, please quit and relaunch Google Drive. <<"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment