Skip to content

Instantly share code, notes, and snippets.

@rsimd
Created July 21, 2022 04:20
Show Gist options
  • Save rsimd/eb4ec61d2ad1e42effe3983b7f8bf6d5 to your computer and use it in GitHub Desktop.
Save rsimd/eb4ec61d2ad1e42effe3983b7f8bf6d5 to your computer and use it in GitHub Desktop.
import io, os, requests
from typing import Callable, Optional,Any
import numpy as np
import pandas as pd
import torch
import pytorch_lightning as pl
import fugashi
from tqdm.auto import tqdm
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
class BowDataset(torch.utils.data.Dataset):
def __init__(self,bow,labels=None,transform:Optional[Callable]=None, dtype:Any=np.float32, label_index:Optional[int]=None):
self.transform = transform
self.dtype = dtype
self.label_index = label_index
self.data = self.bow2seq(bow.astype(self.dtype))
self.vocab_size = bow.shape[1]
self.lookuptable = torch.eye(self.vocab_size)
self.labels = torch.from_numpy(labels) if labels is not None else None
@staticmethod
def bow2seq(bow,verbose:bool=False):
"""
bow: scipy.sparse.*_matrix, BoW stype matrix
"""
bow = bow.tolil()
D,_ = bow.shape
seq = []
for d in (tqdm(range(D)) if verbose else range(D)):
data = bow.data[d]
row = bow.rows[d]
seq.append([])
for cnt,w in zip(data, row):
for _ in range(int(cnt)):
seq[-1].append(w)
seq[-1] = torch.tensor(seq[-1], dtype=torch.int64)
#verbose and pbar.close() # type:ignore
return seq
def __getitem__(self, index):
x = self.lookuptable[self.data[index]].sum(0)
if self.transform is not None:
x = self.transform(x)
if self.labels is None:
return x,
y = self.labels[index]
if self.label_index is not None:
y = y[self.label_index]
return x,y
def __len__(self):
return len(self.data)
class WRIMEDataModule(pl.LightningDataModule):
def __init__(self, fpath: str=".", batch_size: int = 32, num_workers: int=1, vectorizer=None,max_features:int=2000, label_index:Optional[int]=None, dtype=np.float32):
super().__init__()
self.fpath = fpath
self.batch_size = batch_size
self.num_workers = num_workers
self.max_features = max_features
self.label_index = label_index
self.dtype = dtype
self.urls = [
"https://raw.githubusercontent.com/ids-cv/wrime/master/wrime-ver1.tsv",
"https://raw.githubusercontent.com/ids-cv/wrime/master/wrime-ver2.tsv",
]
self.df = self._load_corpus()
self.vectorizer = self._build_vectorizer() if vectorizer is None else vectorizer
self.vectorizer.fit(self.df[self.df["Train/Dev/Test"] != "test"])
self.corpus = self._fit_vectorizer()
self.num_features = self.corpus["X_train"].shape[1]
def _load_corpus(self):
if "wrime_ver1.tsv" in os.listdir(self.fpath) and "wrime_ver2.tsv" in os.listdir(self.fpath):
df1 = pd.read_csv(os.path.join(self.fpath, "wrime-ver1.tsv"))
df2 = pd.read_csv(os.path.join(self.fpath, "wrime-ver2.tsv"))
dfs = [df1,df2]
df = pd.concat(dfs)
df = df.drop(columns = set(dfs[0].keys())^set(dfs[1].keys()))
return df
dfs = []
for url in self.urls:
r = requests.get(url).content
dfs += [pd.read_csv(io.BytesIO(r),sep="\t")]
fixed_keys = list(dfs[0].columns)
fixed_keys[fixed_keys.index("Reader2_Saddness")] = "Reader2_Sadness"
fixed_keys[fixed_keys.index("Reader3_Saddness")] = "Reader3_Sadness"
dfs[0].columns = fixed_keys
# save
dfs[0].to_csv(os.path.join(self.fpath, "wrime-ver1.tsv"),sep="\t", index=False)
dfs[1].to_csv(os.path.join(self.fpath, "wrime-ver2.tsv"),sep="\t", index=False)
df = pd.concat(dfs)
return df.drop(columns = set(dfs[0].keys())^set(dfs[1].keys())) # xor
def _build_vectorizer(self):
tagger = fugashi.Tagger("-Owakati")
analyzer = lambda doc: tagger.parse(doc).strip()
return CountVectorizer(
input="content", encoding="utf-8", decode_error="strict",
strip_accents=None, lowercase=True, preprocessor=None,
tokenizer=None, stop_words=None, token_pattern=r"(?u)\b\w\w+\b",
ngram_range=(1, 1), analyzer=analyzer, max_df=1, min_df=1,
max_features=self.max_features, vocabulary=None, binary=False, dtype=self.dtype)
def _fit_vectorizer(self):
df = self.df
label_keys = [key for key in df.keys() if "Avg. Readers" in key]
return dict(
X_train = self.vectorizer.transform(df[df["Train/Dev/Test"] == "train"]),
X_dev = self.vectorizer.transform(df[df["Train/Dev/Test"] == "dev"]),
X_test = self.vectorizer.transform(df[df["Train/Dev/Test"] == "test"]),
y_train = df[df["Train/Dev/Test"] == "train"][label_keys].to_numpy(),
y_dev = df[df["Train/Dev/Test"] == "dev"][label_keys].to_numpy(),
y_test = df[df["Train/Dev/Test"] == "test"][label_keys].to_numpy(),
)
def setup(self, stage=None):
if stage == "fit" or stage is None:
self.train_dataset = BowDataset(self.corpus["X_train"], self.corpus["y_train"],label_index=self.label_index)
self.validate_dataset = BowDataset(self.corpus["X_dev"], self.corpus["y_dev"],label_index=self.label_index)
if stage == "test" or stage is None:
self.test_dataset = BowDataset(self.corpus["X_test"], self.corpus["y_test"],label_index=self.label_index)
if stage == "predict" or stage is None:
self.predict_dataset = BowDataset(self.corpus["X_test"])
def train_dataloader(self):
return torch.utils.data.DataLoader(self.train_dataset,
batch_size=self.batch_size,
num_workers=self.num_workers)
def val_dataloader(self):
return torch.utils.data.DataLoader(self.validate_dataset,
batch_size=self.batch_size,
num_workers=self.num_workers)
def test_dataloader(self):
return torch.utils.data.DataLoader(self.test_dataset,
batch_size=self.batch_size,
num_workers=self.num_workers)
def predict_dataloader(self):
return torch.utils.data.DataLoader(self.predict_dataset,
batch_size=self.batch_size,
num_workers=self.num_workers)
if __name__ == "__main__":
...
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
import numpy as np
class BaselineMLP(pl.LightningModule):
def __init__(self, vocab_size, hidden_size, num_class):
super().__init__()
self.feature_extractor = nn.Sequential(
# layer1
nn.Linear(vocab_size, hidden_size),
nn.BatchNorm1d(hidden_size),
nn.ReLU(),
# layer2
nn.Linear(hidden_size, hidden_size),
nn.BatchNorm1d(hidden_size),
nn.ReLU(),
)
self.classifier = nn.Sequential(
# layer3
nn.Linear(hidden_size, num_class),
#nn.BatchNorm1d(num_class),
#nn.Softmax(dim=1),
)
#self.classifier = nn.ModuleDict(
# ""
#)
def forward(self,x:torch.Tensor)->torch.Tensor:
h = self.feature_extractor(x)
return self.classifier(h)
def training_step(self, batch, batch_idx):
x,y = batch
logits_y = self.forward(x)
criterion = F.cross_entropy(logits_y, y)
self.log("train_loss", criterion)
return criterion
def validation_step(self, batch, batch_idx):
x,y = batch
logits_y = self.forward(x)
criterion = F.cross_entropy(logits_y, y)
self.log("val_loss", criterion)
return criterion
def test_step(self, batch, batch_idx):
x,y = batch
logits_y = self.forward(x)
criterion = F.cross_entropy(logits_y, y)
self.log("test_loss", criterion)
return criterion
def predict_step(self, batch, batch_idx):
# enable Monte Carlo Dropout
x, = batch
logits_y = self.forward(x)
return logits_y.softmax(1)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
if __name__ == "__main__":
from data import WRIMEDataModule
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"Using {device}")
pl.seed_everything(0)
dataset = WRIMEDataModule(".", batch_size=512,num_workers=8, max_features=5000,label_index=0)
model = BaselineMLP(dataset.num_features, 2000, 4)#.to(device)
trainer = pl.Trainer(max_epochs=10,gpus=1)
trainer.fit(model.to(device),dataset)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment