Skip to content

Instantly share code, notes, and snippets.

@palash25
Created November 25, 2024 03:59
Show Gist options
  • Save palash25/b36459639643ad8bf2dd10d1bf436565 to your computer and use it in GitHub Desktop.
Save palash25/b36459639643ad8bf2dd10d1bf436565 to your computer and use it in GitHub Desktop.
qdrant rust client PR
import re
import sys
import subprocess
from pathlib import Path
def clean_types(code):
"""Clean up fully qualified types to their simple forms"""
# First handle the most specific patterns
replacements = [
(r'::derive_builder::export::core::option::Option', 'Option'),
(r'::derive_builder::export::core::result::Result', 'Result'),
(r'::core::option::Option', 'Option'),
(r'::core::result::Result', 'Result'),
(r'::prost::alloc::string::String', 'String'),
(r'::prost::alloc::vec::Vec', 'Vec'),
(r'::std::convert::Infallible', 'Infallible'),
# Add more specific patterns here
]
result = code
for pattern, replacement in replacements:
result = re.sub(pattern, replacement, result)
result = re.sub(
r'expect\(&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)\)',
r'expect(\1)',
result
)
# Alternative pattern if the above doesn't match all cases
result = re.sub(
r'&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)',
r'\1',
result
)
# Clean up expect with format patterns
result = re.sub(
r'expect\(&::alloc::__export::must_use\({[\s\S]*?let res = ::alloc::fmt::format\(format_args!\((.*?)\)\);[\s\S]*?res[\s\S]*?\}\)\)',
r'expect(\1)',
result
)
# Clean up remaining alloc prefixes
result = re.sub(r'::alloc::fmt::', 'std::fmt::', result)
result = re.sub(r'::alloc::', 'std::', result)
# Clean up any remaining ::derive_builder::export patterns
result = re.sub(r'::derive_builder::export::', '', result)
result = re.sub(
r'&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)',
r'&format!(\1)',
result
)
return result
def clean_file(file_path):
try:
# Read the file
with open(file_path, 'r') as f:
content = f.read()
# Clean the content
cleaned = clean_types(content)
# Write back to the file
with open(file_path, 'w') as f:
f.write(cleaned)
# Format the file with rustfmt
#subprocess.run(["rustfmt", file_path], check=True)
print(f"Successfully cleaned {file_path}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
sys.exit(1)
def clean_directory(directory_path):
builders_dir = Path(directory_path)
if not builders_dir.exists():
print(f"Directory {directory_path} does not exist!")
return
# Process all .rs files in the directory
success = True
for rust_file in builders_dir.glob('*.rs'):
print(f"Processing {rust_file}...")
if not clean_file(rust_file):
success = False
if success:
print("\nAll files processed successfully!")
else:
print("\nSome files had errors during processing.")
sys.exit(1)
if __name__ == "__main__":
#if len(sys.argv) != 2:
# print("Usage: python clean.py <path_to_rust_file>")
# sys.exit(1)
#file_path = sys.argv[1]
#clean_file(file_path)
clean_directory('src/builders')
import re
import os
from collections import defaultdict
def to_snake_case(name):
"""Convert CamelCase to snake_case"""
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
def extract_builder_code(expanded_file):
with open(expanded_file, 'r') as f:
content = f.read()
# Dictionary to group struct with its implementations
builder_groups = defaultdict(list)
def find_complete_block(content, start):
"""Find complete {} block with proper brace matching"""
brace_count = 0
in_string = False
string_char = None
for i in range(start, len(content)):
char = content[i]
if char in ['"', "'"] and (i == 0 or content[i-1] != '\\'):
if not in_string:
in_string = True
string_char = char
elif string_char == char:
in_string = False
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
return content[start:i+1]
return None
# Pattern for Builder structs
struct_pattern = r'pub struct (\w+Builder)\s*\{'
for match in re.finditer(struct_pattern, content):
builder_name = match.group(1)
struct_block = find_complete_block(content, match.start())
if struct_block:
builder_groups[builder_name].append(('struct', struct_block))
# Pattern for impl blocks, excluding standard library derives
impl_pattern = r'impl(?:\s+(?:From<\w+Builder>|\w+Builder)(?:\s+for\s+\w+)?)\s*\{'
for match in re.finditer(impl_pattern, content):
impl_block = find_complete_block(content, match.start())
if impl_block:
# Check if the impl block belongs to any builder
for builder_name in builder_groups:
if builder_name in impl_block:
# Only print for DeletePointsBuilder
if builder_name == "DeletePointsBuilder":
print(f"Found impl block for {builder_name}:")
print(impl_block) # Print the impl block for debugging
# Filter out unwanted impl blocks
if any(x in impl_block for x in ['impl ::core::', 'impl ::prost::', 'impl ::derive_builder::']):
print(f"Filtering out impl block for {builder_name} due to unwanted traits.")
continue
builder_groups[builder_name].append(('impl', impl_block))
break
return builder_groups
def write_builder_files(builder_groups, output_dir='src/builders'):
# Create builders directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Create mod.rs to export all builders
with open(os.path.join(output_dir, 'mod.rs'), 'w') as f:
f.write("// Generated from macro expansions\n\n")
for builder_name in builder_groups:
module_name = to_snake_case(builder_name)
f.write(f"mod {module_name};\n")
f.write(f"pub use {module_name}::{builder_name};\n\n")
# Create individual files for each builder
for builder_name, blocks in builder_groups.items():
file_name = f"{to_snake_case(builder_name)}.rs"
file_path = os.path.join(output_dir, file_name)
with open(file_path, 'w') as f:
f.write("use crate::qdrant::*;\n\n")
# Write struct first
struct_blocks = [block for block_type, block in blocks if block_type == 'struct']
if struct_blocks:
f.write(struct_blocks[0])
f.write("\n\n")
# Write all impl blocks
impl_blocks = [block for block_type, block in blocks if block_type == 'impl']
for impl_block in impl_blocks:
f.write(impl_block)
f.write("\n\n")
if __name__ == "__main__":
builder_groups = extract_builder_code("expanded.rs")
write_builder_files(builder_groups)
import re
import sys
from pathlib import Path
def fix_must_use(code):
"""Replace &std::__export::must_use with a simplified format! usage."""
# Pattern to match the expect call with std::__export::must_use
pattern = r'(\.expect\(&std::__export::must_use\(\{\s*let\s+res\s*=\s*std::fmt::format\(format_args!\((.*?)\);\s*res\s*\}\)\))'
# Replacement pattern using format!
replacement = r'.expect(&format!(\2))'
# Perform the substitution
new_code = re.sub(pattern, replacement, code, flags=re.DOTALL)
return new_code
def fix_file(file_path):
try:
# Read the file
with open(file_path, 'r') as f:
content = f.read()
# Fix the content
fixed_content = fix_must_use(content)
# Write back to the file if changes were made
if fixed_content != content:
with open(file_path, 'w') as f:
f.write(fixed_content)
print(f"Successfully fixed {file_path}")
else:
print(f"No changes needed for {file_path}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
sys.exit(1)
def fix_directory(directory_path):
builders_dir = Path(directory_path)
if not builders_dir.exists():
print(f"Directory {directory_path} does not exist!")
return
# Process all .rs files in the directory
for rust_file in builders_dir.glob('*.rs'):
print(f"Processing {rust_file}...")
fix_file(rust_file)
print("\nAll files processed successfully!")
if __name__ == "__main__":
fix_directory('src/builders')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment