Created
November 25, 2024 03:59
-
-
Save palash25/b36459639643ad8bf2dd10d1bf436565 to your computer and use it in GitHub Desktop.
qdrant rust client PR
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sys | |
import subprocess | |
from pathlib import Path | |
def clean_types(code): | |
"""Clean up fully qualified types to their simple forms""" | |
# First handle the most specific patterns | |
replacements = [ | |
(r'::derive_builder::export::core::option::Option', 'Option'), | |
(r'::derive_builder::export::core::result::Result', 'Result'), | |
(r'::core::option::Option', 'Option'), | |
(r'::core::result::Result', 'Result'), | |
(r'::prost::alloc::string::String', 'String'), | |
(r'::prost::alloc::vec::Vec', 'Vec'), | |
(r'::std::convert::Infallible', 'Infallible'), | |
# Add more specific patterns here | |
] | |
result = code | |
for pattern, replacement in replacements: | |
result = re.sub(pattern, replacement, result) | |
result = re.sub( | |
r'expect\(&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)\)', | |
r'expect(\1)', | |
result | |
) | |
# Alternative pattern if the above doesn't match all cases | |
result = re.sub( | |
r'&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)', | |
r'\1', | |
result | |
) | |
# Clean up expect with format patterns | |
result = re.sub( | |
r'expect\(&::alloc::__export::must_use\({[\s\S]*?let res = ::alloc::fmt::format\(format_args!\((.*?)\)\);[\s\S]*?res[\s\S]*?\}\)\)', | |
r'expect(\1)', | |
result | |
) | |
# Clean up remaining alloc prefixes | |
result = re.sub(r'::alloc::fmt::', 'std::fmt::', result) | |
result = re.sub(r'::alloc::', 'std::', result) | |
# Clean up any remaining ::derive_builder::export patterns | |
result = re.sub(r'::derive_builder::export::', '', result) | |
result = re.sub( | |
r'&std::__export::must_use\(\{\s*let res = std::fmt::format\(format_args!\((.*?)\)\);\s*res\s*\}\)', | |
r'&format!(\1)', | |
result | |
) | |
return result | |
def clean_file(file_path): | |
try: | |
# Read the file | |
with open(file_path, 'r') as f: | |
content = f.read() | |
# Clean the content | |
cleaned = clean_types(content) | |
# Write back to the file | |
with open(file_path, 'w') as f: | |
f.write(cleaned) | |
# Format the file with rustfmt | |
#subprocess.run(["rustfmt", file_path], check=True) | |
print(f"Successfully cleaned {file_path}") | |
except Exception as e: | |
print(f"Error processing {file_path}: {str(e)}") | |
sys.exit(1) | |
def clean_directory(directory_path): | |
builders_dir = Path(directory_path) | |
if not builders_dir.exists(): | |
print(f"Directory {directory_path} does not exist!") | |
return | |
# Process all .rs files in the directory | |
success = True | |
for rust_file in builders_dir.glob('*.rs'): | |
print(f"Processing {rust_file}...") | |
if not clean_file(rust_file): | |
success = False | |
if success: | |
print("\nAll files processed successfully!") | |
else: | |
print("\nSome files had errors during processing.") | |
sys.exit(1) | |
if __name__ == "__main__": | |
#if len(sys.argv) != 2: | |
# print("Usage: python clean.py <path_to_rust_file>") | |
# sys.exit(1) | |
#file_path = sys.argv[1] | |
#clean_file(file_path) | |
clean_directory('src/builders') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
from collections import defaultdict | |
def to_snake_case(name): | |
"""Convert CamelCase to snake_case""" | |
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) | |
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower() | |
def extract_builder_code(expanded_file): | |
with open(expanded_file, 'r') as f: | |
content = f.read() | |
# Dictionary to group struct with its implementations | |
builder_groups = defaultdict(list) | |
def find_complete_block(content, start): | |
"""Find complete {} block with proper brace matching""" | |
brace_count = 0 | |
in_string = False | |
string_char = None | |
for i in range(start, len(content)): | |
char = content[i] | |
if char in ['"', "'"] and (i == 0 or content[i-1] != '\\'): | |
if not in_string: | |
in_string = True | |
string_char = char | |
elif string_char == char: | |
in_string = False | |
if not in_string: | |
if char == '{': | |
brace_count += 1 | |
elif char == '}': | |
brace_count -= 1 | |
if brace_count == 0: | |
return content[start:i+1] | |
return None | |
# Pattern for Builder structs | |
struct_pattern = r'pub struct (\w+Builder)\s*\{' | |
for match in re.finditer(struct_pattern, content): | |
builder_name = match.group(1) | |
struct_block = find_complete_block(content, match.start()) | |
if struct_block: | |
builder_groups[builder_name].append(('struct', struct_block)) | |
# Pattern for impl blocks, excluding standard library derives | |
impl_pattern = r'impl(?:\s+(?:From<\w+Builder>|\w+Builder)(?:\s+for\s+\w+)?)\s*\{' | |
for match in re.finditer(impl_pattern, content): | |
impl_block = find_complete_block(content, match.start()) | |
if impl_block: | |
# Check if the impl block belongs to any builder | |
for builder_name in builder_groups: | |
if builder_name in impl_block: | |
# Only print for DeletePointsBuilder | |
if builder_name == "DeletePointsBuilder": | |
print(f"Found impl block for {builder_name}:") | |
print(impl_block) # Print the impl block for debugging | |
# Filter out unwanted impl blocks | |
if any(x in impl_block for x in ['impl ::core::', 'impl ::prost::', 'impl ::derive_builder::']): | |
print(f"Filtering out impl block for {builder_name} due to unwanted traits.") | |
continue | |
builder_groups[builder_name].append(('impl', impl_block)) | |
break | |
return builder_groups | |
def write_builder_files(builder_groups, output_dir='src/builders'): | |
# Create builders directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Create mod.rs to export all builders | |
with open(os.path.join(output_dir, 'mod.rs'), 'w') as f: | |
f.write("// Generated from macro expansions\n\n") | |
for builder_name in builder_groups: | |
module_name = to_snake_case(builder_name) | |
f.write(f"mod {module_name};\n") | |
f.write(f"pub use {module_name}::{builder_name};\n\n") | |
# Create individual files for each builder | |
for builder_name, blocks in builder_groups.items(): | |
file_name = f"{to_snake_case(builder_name)}.rs" | |
file_path = os.path.join(output_dir, file_name) | |
with open(file_path, 'w') as f: | |
f.write("use crate::qdrant::*;\n\n") | |
# Write struct first | |
struct_blocks = [block for block_type, block in blocks if block_type == 'struct'] | |
if struct_blocks: | |
f.write(struct_blocks[0]) | |
f.write("\n\n") | |
# Write all impl blocks | |
impl_blocks = [block for block_type, block in blocks if block_type == 'impl'] | |
for impl_block in impl_blocks: | |
f.write(impl_block) | |
f.write("\n\n") | |
if __name__ == "__main__": | |
builder_groups = extract_builder_code("expanded.rs") | |
write_builder_files(builder_groups) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sys | |
from pathlib import Path | |
def fix_must_use(code): | |
"""Replace &std::__export::must_use with a simplified format! usage.""" | |
# Pattern to match the expect call with std::__export::must_use | |
pattern = r'(\.expect\(&std::__export::must_use\(\{\s*let\s+res\s*=\s*std::fmt::format\(format_args!\((.*?)\);\s*res\s*\}\)\))' | |
# Replacement pattern using format! | |
replacement = r'.expect(&format!(\2))' | |
# Perform the substitution | |
new_code = re.sub(pattern, replacement, code, flags=re.DOTALL) | |
return new_code | |
def fix_file(file_path): | |
try: | |
# Read the file | |
with open(file_path, 'r') as f: | |
content = f.read() | |
# Fix the content | |
fixed_content = fix_must_use(content) | |
# Write back to the file if changes were made | |
if fixed_content != content: | |
with open(file_path, 'w') as f: | |
f.write(fixed_content) | |
print(f"Successfully fixed {file_path}") | |
else: | |
print(f"No changes needed for {file_path}") | |
except Exception as e: | |
print(f"Error processing {file_path}: {str(e)}") | |
sys.exit(1) | |
def fix_directory(directory_path): | |
builders_dir = Path(directory_path) | |
if not builders_dir.exists(): | |
print(f"Directory {directory_path} does not exist!") | |
return | |
# Process all .rs files in the directory | |
for rust_file in builders_dir.glob('*.rs'): | |
print(f"Processing {rust_file}...") | |
fix_file(rust_file) | |
print("\nAll files processed successfully!") | |
if __name__ == "__main__": | |
fix_directory('src/builders') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment