Created
February 4, 2024 04:32
-
-
Save theycallmeloki/7d72c4f8a23eb06513b9daf8309e6921 to your computer and use it in GitHub Desktop.
Generic batch parquet template
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from quart import Quart | |
from watchdog.observers import Observer | |
from watchdog.events import FileSystemEventHandler | |
import os | |
import pandas as pd | |
from vllm import LLM, SamplingParams | |
from langchain.llms import VLLM | |
from collections import deque | |
import threading | |
import time | |
import csv | |
app = Quart(__name__) | |
input_directory = 'x' # Directory to watch for input files | |
output_directory = 'y' # Directory to save output files | |
# Sample prompts loaded as DataFrame from a Parquet file. | |
prompts_df = pd.read_parquet('prompts.parquet') | |
# Configure LLM | |
llm = LLM("TheBloke/CodeLlama-34B-Instruct-AWQ", tensor_parallel_size=2, gpu_memory_utilization=1.0, quantization="awq", dtype="float16", enforce_eager=True, max_model_len=2048) | |
# Create a sampling params object. | |
sampling_params = SamplingParams(temperature=0.1, top_p=0.95, max_tokens=100) | |
# Thread-safe structures for tracking files | |
pending_files = asyncio.Queue() | |
processing_files = set() | |
lock = threading.Lock() | |
tokens_processed = 0 | |
prompts_processed = 0 | |
start_time = time.time() | |
BATCH_SIZE = 60 # Start with a mandatory minimum batch size | |
BATCH_SIZE_INCREMENT = 10 # Increment batch size by this amount if it's found to be more efficient | |
MAX_BATCH_SIZE = 200 # Maximum allowed batch size | |
metrics_file_path = 'batch_metrics.csv' | |
# Initialize metrics file | |
with open(metrics_file_path, 'w', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(['timestamp', 'batch_size', 'processing_time']) | |
# Get the event loop in the main async function | |
loop = asyncio.get_event_loop() | |
class FileHandler(FileSystemEventHandler): | |
def __init__(self, loop): | |
self.loop = loop | |
def on_created(self, event): | |
if event.is_directory: | |
return | |
if not event.src_path.endswith('.parquet'): | |
return | |
# Use the passed event loop | |
asyncio.run_coroutine_threadsafe(pending_files.put(event.src_path), self.loop) | |
async def process_pending_files(): | |
global tokens_processed | |
global prompts_processed | |
while True: | |
batch_start_time = time.time() | |
batch_paths = [] | |
# Collect a batch of file paths | |
for _ in range(BATCH_SIZE): | |
try: | |
# Try to get a new path without waiting indefinitely | |
path = await asyncio.wait_for(pending_files.get(), timeout=0.1) | |
batch_paths.append(path) | |
with lock: | |
processing_files.add(path) | |
except asyncio.TimeoutError: | |
# No more files to process, if any files collected, process them | |
if batch_paths: | |
break | |
else: | |
# If no files in batch, sleep briefly to prevent tight loop when idle | |
await asyncio.sleep(0.5) | |
continue | |
all_prompts = [] | |
# Load and accumulate prompts from each file in the batch | |
for path in batch_paths: | |
df = pd.read_parquet(path) | |
prompts = df.to_dict(orient='records') | |
all_prompts.extend([prompt_dict['prompt'] for prompt_dict in prompts]) | |
# Process all prompts in a single batch | |
outputs = llm.generate(all_prompts, sampling_params) | |
results = [] | |
for output in outputs: | |
prompt = output.prompt | |
generated_text = output.outputs[0].text | |
results.append({'prompt': prompt, 'response': generated_text}) | |
tokens_processed += len(generated_text.split()) | |
prompts_processed += 1 | |
# Convert results to DataFrame and save as Parquet | |
results_df = pd.DataFrame(results) | |
for path in batch_paths: | |
output_file = os.path.join(output_directory, os.path.basename(path).replace('.parquet', '_output.parquet')) | |
results_df.to_parquet(output_file) | |
# Calculate processing time for the batch | |
batch_processing_time = time.time() - batch_start_time | |
# Write batch metrics to CSV | |
with open(metrics_file_path, 'a', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow([time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), len(batch_paths), batch_processing_time]) | |
with lock: | |
for path in batch_paths: | |
processing_files.remove(path) | |
os.remove(path) | |
@app.before_serving | |
async def startup(): | |
global loop # Reference the global loop | |
loop = asyncio.get_running_loop() # Ensure you get the running loop | |
event_handler = FileHandler(loop) # Pass the loop to your FileHandler | |
observer = Observer() | |
observer.schedule(event_handler, input_directory, recursive=True) | |
observer.start() | |
asyncio.create_task(process_pending_files()) | |
@app.route('/') | |
async def index(): | |
elapsed_time = time.time() - start_time | |
tokens_per_second = tokens_processed / elapsed_time if elapsed_time > 0 else 0 | |
prompts_per_second = prompts_processed / elapsed_time if elapsed_time > 0 else 0 | |
with lock: | |
return ( | |
f"MiladyOS is Running!<br>" | |
f"Files pending: {pending_files.qsize()}<br>" | |
f"Files being processed: {len(processing_files)}<br>" | |
f"Tokens processed: {tokens_processed}<br>" | |
f"Prompts processed: {prompts_processed}<br>" | |
f"Tokens per second: {tokens_per_second:.2f}<br>" | |
f"Prompts per second: {prompts_per_second:.2f}<br>" | |
) | |
if __name__ == '__main__': | |
app.run() | |
#####TEST##### | |
import pandas as pd | |
import json | |
import uuid | |
import os | |
# Path to your JSON file | |
json_file_path = 'prompts.json' | |
# Load JSON file | |
with open(json_file_path, 'r') as f: | |
prompt_lines = json.load(f) | |
# Ensure it's a list of strings | |
assert all(isinstance(item, str) for item in prompt_lines), "All items in the JSON file must be strings" | |
# Directory to save output files | |
output_directory = 'x' | |
# Ensure output directory exists | |
if not os.path.exists(output_directory): | |
os.makedirs(output_directory) | |
# Process each prompt | |
for line in prompt_lines: | |
# Create DataFrame for the current prompt | |
df = pd.DataFrame({'prompt': [line]}) | |
# Generate a unique filename for each prompt | |
filename = f"{uuid.uuid4()}.parquet" | |
file_path = os.path.join(output_directory, filename) | |
# Save as Parquet | |
df.to_parquet(file_path, index=False) | |
print(f"Converted and saved prompts to {output_directory} directory.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment