Skip to content

Instantly share code, notes, and snippets.

@theycallmeloki
Created February 4, 2024 04:32
Show Gist options
  • Save theycallmeloki/7d72c4f8a23eb06513b9daf8309e6921 to your computer and use it in GitHub Desktop.
Save theycallmeloki/7d72c4f8a23eb06513b9daf8309e6921 to your computer and use it in GitHub Desktop.
Generic batch parquet template
import asyncio
from quart import Quart
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
import pandas as pd
from vllm import LLM, SamplingParams
from langchain.llms import VLLM
from collections import deque
import threading
import time
import csv
app = Quart(__name__)
input_directory = 'x' # Directory to watch for input files
output_directory = 'y' # Directory to save output files
# Sample prompts loaded as DataFrame from a Parquet file.
prompts_df = pd.read_parquet('prompts.parquet')
# Configure LLM
llm = LLM("TheBloke/CodeLlama-34B-Instruct-AWQ", tensor_parallel_size=2, gpu_memory_utilization=1.0, quantization="awq", dtype="float16", enforce_eager=True, max_model_len=2048)
# Create a sampling params object.
sampling_params = SamplingParams(temperature=0.1, top_p=0.95, max_tokens=100)
# Thread-safe structures for tracking files
pending_files = asyncio.Queue()
processing_files = set()
lock = threading.Lock()
tokens_processed = 0
prompts_processed = 0
start_time = time.time()
BATCH_SIZE = 60 # Start with a mandatory minimum batch size
BATCH_SIZE_INCREMENT = 10 # Increment batch size by this amount if it's found to be more efficient
MAX_BATCH_SIZE = 200 # Maximum allowed batch size
metrics_file_path = 'batch_metrics.csv'
# Initialize metrics file
with open(metrics_file_path, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['timestamp', 'batch_size', 'processing_time'])
# Get the event loop in the main async function
loop = asyncio.get_event_loop()
class FileHandler(FileSystemEventHandler):
def __init__(self, loop):
self.loop = loop
def on_created(self, event):
if event.is_directory:
return
if not event.src_path.endswith('.parquet'):
return
# Use the passed event loop
asyncio.run_coroutine_threadsafe(pending_files.put(event.src_path), self.loop)
async def process_pending_files():
global tokens_processed
global prompts_processed
while True:
batch_start_time = time.time()
batch_paths = []
# Collect a batch of file paths
for _ in range(BATCH_SIZE):
try:
# Try to get a new path without waiting indefinitely
path = await asyncio.wait_for(pending_files.get(), timeout=0.1)
batch_paths.append(path)
with lock:
processing_files.add(path)
except asyncio.TimeoutError:
# No more files to process, if any files collected, process them
if batch_paths:
break
else:
# If no files in batch, sleep briefly to prevent tight loop when idle
await asyncio.sleep(0.5)
continue
all_prompts = []
# Load and accumulate prompts from each file in the batch
for path in batch_paths:
df = pd.read_parquet(path)
prompts = df.to_dict(orient='records')
all_prompts.extend([prompt_dict['prompt'] for prompt_dict in prompts])
# Process all prompts in a single batch
outputs = llm.generate(all_prompts, sampling_params)
results = []
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
results.append({'prompt': prompt, 'response': generated_text})
tokens_processed += len(generated_text.split())
prompts_processed += 1
# Convert results to DataFrame and save as Parquet
results_df = pd.DataFrame(results)
for path in batch_paths:
output_file = os.path.join(output_directory, os.path.basename(path).replace('.parquet', '_output.parquet'))
results_df.to_parquet(output_file)
# Calculate processing time for the batch
batch_processing_time = time.time() - batch_start_time
# Write batch metrics to CSV
with open(metrics_file_path, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), len(batch_paths), batch_processing_time])
with lock:
for path in batch_paths:
processing_files.remove(path)
os.remove(path)
@app.before_serving
async def startup():
global loop # Reference the global loop
loop = asyncio.get_running_loop() # Ensure you get the running loop
event_handler = FileHandler(loop) # Pass the loop to your FileHandler
observer = Observer()
observer.schedule(event_handler, input_directory, recursive=True)
observer.start()
asyncio.create_task(process_pending_files())
@app.route('/')
async def index():
elapsed_time = time.time() - start_time
tokens_per_second = tokens_processed / elapsed_time if elapsed_time > 0 else 0
prompts_per_second = prompts_processed / elapsed_time if elapsed_time > 0 else 0
with lock:
return (
f"MiladyOS is Running!<br>"
f"Files pending: {pending_files.qsize()}<br>"
f"Files being processed: {len(processing_files)}<br>"
f"Tokens processed: {tokens_processed}<br>"
f"Prompts processed: {prompts_processed}<br>"
f"Tokens per second: {tokens_per_second:.2f}<br>"
f"Prompts per second: {prompts_per_second:.2f}<br>"
)
if __name__ == '__main__':
app.run()
#####TEST#####
import pandas as pd
import json
import uuid
import os
# Path to your JSON file
json_file_path = 'prompts.json'
# Load JSON file
with open(json_file_path, 'r') as f:
prompt_lines = json.load(f)
# Ensure it's a list of strings
assert all(isinstance(item, str) for item in prompt_lines), "All items in the JSON file must be strings"
# Directory to save output files
output_directory = 'x'
# Ensure output directory exists
if not os.path.exists(output_directory):
os.makedirs(output_directory)
# Process each prompt
for line in prompt_lines:
# Create DataFrame for the current prompt
df = pd.DataFrame({'prompt': [line]})
# Generate a unique filename for each prompt
filename = f"{uuid.uuid4()}.parquet"
file_path = os.path.join(output_directory, filename)
# Save as Parquet
df.to_parquet(file_path, index=False)
print(f"Converted and saved prompts to {output_directory} directory.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment