|
import os |
|
import sys |
|
import logging |
|
import datetime |
|
from typing import List, Tuple, Callable |
|
from transformers import BertTokenizer, BertForSequenceClassification |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
|
from sklearn.utils.class_weight import compute_class_weight |
|
from sklearn.linear_model import LinearDiscriminantAnalysis |
|
from sklearn.naive_bayes import MultinomialNB |
|
from sklearn.svm import SVR |
|
from minio import Minio |
|
from minio.error import MinioException |
|
import numpy as np |
|
|
|
# Initialize logging |
|
logger = logging.getLogger(__name__) |
|
|
|
def init_logger(log_level: str): |
|
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=log_level) |
|
logger.info("Logger initialized") |
|
|
|
# Initialize logger as early as possible |
|
init_logger("INFO") |
|
|
|
def check_env(): |
|
missing_vars = [var for var in ['MINIO_ENDPOINT', 'MINIO_ACCESS_KEY', 'MINIO_SECRET_KEY'] if var not in os.environ] |
|
if missing_vars: |
|
raise EnvironmentError(f"Missing environment variables: {', '.join(missing_vars)}") |
|
|
|
def get_minio_client() -> Minio: |
|
try: |
|
return Minio( |
|
endpoint=os.environ['MINIO_ENDPOINT'], |
|
access_key=os.environ['MINIO_ACCESS_KEY'], |
|
secret_key=os.environ['MINIO_SECRET_KEY'], |
|
) |
|
except MinioException as e: |
|
logger.error(f"Error initializing Minio client: {e}") |
|
raise |
|
|
|
def download_files(bucket_name: str, prefix: str, limit: int = 1000) -> List[str]: |
|
try: |
|
client = get_minio_client() |
|
# Assuming bucket() returns a bucket object with a list_objects method; please replace with actual method if different |
|
bucket = client.bucket(bucket_name) |
|
obj_list = bucket.list_objects(prefix=prefix, max_keys=limit) |
|
file_paths = [] |
|
for obj in obj_list: |
|
file_path = obj.key |
|
client.download_file(bucket_name, file_path, file_path) |
|
file_paths.append(file_path) |
|
return file_paths |
|
except Exception as e: |
|
logger.error(f"Error downloading files: {e}") |
|
raise |
|
|
|
def process_text_files(file_paths: List[str]) -> List[dict]: |
|
processed_files = [] |
|
for file_path in file_paths: |
|
try: |
|
with open(file_path, 'r') as f: |
|
file_content = f.read() |
|
processed_files.append({'file_path': file_path, 'text_content': file_content}) |
|
except IOError as e: |
|
logger.error(f"Error reading file {file_path}: {e}") |
|
return processed_files |
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') |
|
|
|
def tokenize_text(text: str) -> List[str]: |
|
try: |
|
return tokenizer.encode(text, return_tensors='pt', truncation=True, padding=True, max_length=512, dtype=np.long) |
|
except Exception as e: |
|
logger.error(f"Error tokenizing text: {e}") |
|
raise |
|
|
|
def prepare_data(processed_files: List[dict]) -> Tuple[List[dict], List[dict]]: |
|
# Implement the remaining logic here |
|
pass |
|
|
|
# Add a function for optional file cleanup |
|
def cleanup_files(file_paths: List[str]): |
|
for file_path in file_paths: |
|
try: |
|
os.remove(file_path) |
|
except Exception as e: |
|
logger.error(f"Error cleaning up file {file_path}: {e}") |
|
|
|
# Call the check_env function to ensure environment variables are set |
|
check_env() |
|
|
|
# This is an example of how you might call these functions in a main script |
|
# This part is just for illustration and won't actually run in this code snippet |
|
if __name__ == "__main__": |
|
try: |
|
# Your main logic here |
|
pass |
|
except Exception as e: |
|
logger.error(f"An error occurred: {e}") |
|
finally: |
|
# Optional cleanup |
|
cleanup_files([]) # Provide the list of files you want to cleanup |
|
|
|
# Note: This modified source code assumes that certain methods like client.bucket, bucket.list_objects are available |
|
# in the Minio client library. Please replace them with the actual methods if they are |