Skip to content

Instantly share code, notes, and snippets.

@pablosjv
Created August 27, 2021 11:55
Show Gist options
  • Save pablosjv/10afecb79a736661d39ee7c200978132 to your computer and use it in GitHub Desktop.
Save pablosjv/10afecb79a736661d39ee7c200978132 to your computer and use it in GitHub Desktop.
Large Scale Pytorch Inference Pipeline: Spark vs Dask - Code Examples
import os
import dask
import dask.dataframe as dd
import pandas as pd
import torch
from dask.distributed import Client
from transformers import RobertaForSequenceClassification, RobertaTokenizer
from . import ClusterType, TokensDataset, get_cluster
BATCH_SIZE: int = 250
def run(src, dst, model_path):
# NOTE: load and distribute de model
tokenizer, model_loaded = load_model(model_path)
dmodel = dask.delayed(model_loaded.cpu())
ddf = dd.read_parquet(src)
ddf['input_ids'] = ddf.body_masked.apply(
get_input_ids,
tokenizer=tokenizer,
meta=('body_masked', 'object'),
)
ddf['attention_mask'] = ddf.body_masked.apply(
get_attention_mask,
tokenizer=tokenizer,
meta=('body_masked', 'object'),
)
ddf['prediction'] = ddf.map_partitions(
predict,
model=dmodel,
meta='float64',
)
cols = list(ddf.columns)
cols.remove("input_ids")
cols.remove("attention_mask")
ddf[cols].to_parquet(dst)
def load_model(model_path):
"""Return the model and tokenizer"""
detoke = RobertaTokenizer.from_pretrained(model_path)
model = RobertaForSequenceClassification.from_pretrained(model_path, return_dict=True)
return detoke, model
def get_input_ids(x, tokenizer):
return tokenizer(str(x), padding=True, truncation=True)['input_ids']
def get_attention_mask(x, tokenizer):
return tokenizer(str(x), padding=True, truncation=True)['attention_mask']
def predict(df: pd.DataFrame, model):
final_tokens = TokensDataset(df.input_ids, df.attention_mask)
loader = torch.utils.data.DataLoader(
final_tokens,
batch_size=BATCH_SIZE,
num_workers=0,
)
all_predictions = []
with torch.no_grad():
for batch in loader:
tensor_input_ids = torch.as_tensor(
batch.input_ids, dtype=torch.long).to('cpu')
tensor_attention_mask = torch.as_tensor(
batch.attention_mask, dtype=torch.long).to('cpu')
outputs = model(
input_ids=tensor_input_ids,
attention_mask=tensor_attention_mask,
)
all_predictions.extend(outputs.logits.flatten().tolist())
return pd.Series(all_predictions, index=df.index, dtype="float64")
if __name__ == "__main__":
cluster = get_cluster(ClusterType(os.getenv("CLUSTER", 'yarn')))
client = Client(cluster)
print(f"Dashboard link: {client.dashboard_link}")
src = os.getenv('SRC')
dst = os.getenv('DST')
model_path = os.getenv('MODEL_PATH')
run(src, dst, model_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment