Created
July 21, 2022 04:20
-
-
Save rsimd/eb4ec61d2ad1e42effe3983b7f8bf6d5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io, os, requests | |
from typing import Callable, Optional,Any | |
import numpy as np | |
import pandas as pd | |
import torch | |
import pytorch_lightning as pl | |
import fugashi | |
from tqdm.auto import tqdm | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
class BowDataset(torch.utils.data.Dataset): | |
def __init__(self,bow,labels=None,transform:Optional[Callable]=None, dtype:Any=np.float32, label_index:Optional[int]=None): | |
self.transform = transform | |
self.dtype = dtype | |
self.label_index = label_index | |
self.data = self.bow2seq(bow.astype(self.dtype)) | |
self.vocab_size = bow.shape[1] | |
self.lookuptable = torch.eye(self.vocab_size) | |
self.labels = torch.from_numpy(labels) if labels is not None else None | |
@staticmethod | |
def bow2seq(bow,verbose:bool=False): | |
""" | |
bow: scipy.sparse.*_matrix, BoW stype matrix | |
""" | |
bow = bow.tolil() | |
D,_ = bow.shape | |
seq = [] | |
for d in (tqdm(range(D)) if verbose else range(D)): | |
data = bow.data[d] | |
row = bow.rows[d] | |
seq.append([]) | |
for cnt,w in zip(data, row): | |
for _ in range(int(cnt)): | |
seq[-1].append(w) | |
seq[-1] = torch.tensor(seq[-1], dtype=torch.int64) | |
#verbose and pbar.close() # type:ignore | |
return seq | |
def __getitem__(self, index): | |
x = self.lookuptable[self.data[index]].sum(0) | |
if self.transform is not None: | |
x = self.transform(x) | |
if self.labels is None: | |
return x, | |
y = self.labels[index] | |
if self.label_index is not None: | |
y = y[self.label_index] | |
return x,y | |
def __len__(self): | |
return len(self.data) | |
class WRIMEDataModule(pl.LightningDataModule): | |
def __init__(self, fpath: str=".", batch_size: int = 32, num_workers: int=1, vectorizer=None,max_features:int=2000, label_index:Optional[int]=None, dtype=np.float32): | |
super().__init__() | |
self.fpath = fpath | |
self.batch_size = batch_size | |
self.num_workers = num_workers | |
self.max_features = max_features | |
self.label_index = label_index | |
self.dtype = dtype | |
self.urls = [ | |
"https://raw.githubusercontent.com/ids-cv/wrime/master/wrime-ver1.tsv", | |
"https://raw.githubusercontent.com/ids-cv/wrime/master/wrime-ver2.tsv", | |
] | |
self.df = self._load_corpus() | |
self.vectorizer = self._build_vectorizer() if vectorizer is None else vectorizer | |
self.vectorizer.fit(self.df[self.df["Train/Dev/Test"] != "test"]) | |
self.corpus = self._fit_vectorizer() | |
self.num_features = self.corpus["X_train"].shape[1] | |
def _load_corpus(self): | |
if "wrime_ver1.tsv" in os.listdir(self.fpath) and "wrime_ver2.tsv" in os.listdir(self.fpath): | |
df1 = pd.read_csv(os.path.join(self.fpath, "wrime-ver1.tsv")) | |
df2 = pd.read_csv(os.path.join(self.fpath, "wrime-ver2.tsv")) | |
dfs = [df1,df2] | |
df = pd.concat(dfs) | |
df = df.drop(columns = set(dfs[0].keys())^set(dfs[1].keys())) | |
return df | |
dfs = [] | |
for url in self.urls: | |
r = requests.get(url).content | |
dfs += [pd.read_csv(io.BytesIO(r),sep="\t")] | |
fixed_keys = list(dfs[0].columns) | |
fixed_keys[fixed_keys.index("Reader2_Saddness")] = "Reader2_Sadness" | |
fixed_keys[fixed_keys.index("Reader3_Saddness")] = "Reader3_Sadness" | |
dfs[0].columns = fixed_keys | |
# save | |
dfs[0].to_csv(os.path.join(self.fpath, "wrime-ver1.tsv"),sep="\t", index=False) | |
dfs[1].to_csv(os.path.join(self.fpath, "wrime-ver2.tsv"),sep="\t", index=False) | |
df = pd.concat(dfs) | |
return df.drop(columns = set(dfs[0].keys())^set(dfs[1].keys())) # xor | |
def _build_vectorizer(self): | |
tagger = fugashi.Tagger("-Owakati") | |
analyzer = lambda doc: tagger.parse(doc).strip() | |
return CountVectorizer( | |
input="content", encoding="utf-8", decode_error="strict", | |
strip_accents=None, lowercase=True, preprocessor=None, | |
tokenizer=None, stop_words=None, token_pattern=r"(?u)\b\w\w+\b", | |
ngram_range=(1, 1), analyzer=analyzer, max_df=1, min_df=1, | |
max_features=self.max_features, vocabulary=None, binary=False, dtype=self.dtype) | |
def _fit_vectorizer(self): | |
df = self.df | |
label_keys = [key for key in df.keys() if "Avg. Readers" in key] | |
return dict( | |
X_train = self.vectorizer.transform(df[df["Train/Dev/Test"] == "train"]), | |
X_dev = self.vectorizer.transform(df[df["Train/Dev/Test"] == "dev"]), | |
X_test = self.vectorizer.transform(df[df["Train/Dev/Test"] == "test"]), | |
y_train = df[df["Train/Dev/Test"] == "train"][label_keys].to_numpy(), | |
y_dev = df[df["Train/Dev/Test"] == "dev"][label_keys].to_numpy(), | |
y_test = df[df["Train/Dev/Test"] == "test"][label_keys].to_numpy(), | |
) | |
def setup(self, stage=None): | |
if stage == "fit" or stage is None: | |
self.train_dataset = BowDataset(self.corpus["X_train"], self.corpus["y_train"],label_index=self.label_index) | |
self.validate_dataset = BowDataset(self.corpus["X_dev"], self.corpus["y_dev"],label_index=self.label_index) | |
if stage == "test" or stage is None: | |
self.test_dataset = BowDataset(self.corpus["X_test"], self.corpus["y_test"],label_index=self.label_index) | |
if stage == "predict" or stage is None: | |
self.predict_dataset = BowDataset(self.corpus["X_test"]) | |
def train_dataloader(self): | |
return torch.utils.data.DataLoader(self.train_dataset, | |
batch_size=self.batch_size, | |
num_workers=self.num_workers) | |
def val_dataloader(self): | |
return torch.utils.data.DataLoader(self.validate_dataset, | |
batch_size=self.batch_size, | |
num_workers=self.num_workers) | |
def test_dataloader(self): | |
return torch.utils.data.DataLoader(self.test_dataset, | |
batch_size=self.batch_size, | |
num_workers=self.num_workers) | |
def predict_dataloader(self): | |
return torch.utils.data.DataLoader(self.predict_dataset, | |
batch_size=self.batch_size, | |
num_workers=self.num_workers) | |
if __name__ == "__main__": | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import pytorch_lightning as pl | |
import numpy as np | |
class BaselineMLP(pl.LightningModule): | |
def __init__(self, vocab_size, hidden_size, num_class): | |
super().__init__() | |
self.feature_extractor = nn.Sequential( | |
# layer1 | |
nn.Linear(vocab_size, hidden_size), | |
nn.BatchNorm1d(hidden_size), | |
nn.ReLU(), | |
# layer2 | |
nn.Linear(hidden_size, hidden_size), | |
nn.BatchNorm1d(hidden_size), | |
nn.ReLU(), | |
) | |
self.classifier = nn.Sequential( | |
# layer3 | |
nn.Linear(hidden_size, num_class), | |
#nn.BatchNorm1d(num_class), | |
#nn.Softmax(dim=1), | |
) | |
#self.classifier = nn.ModuleDict( | |
# "" | |
#) | |
def forward(self,x:torch.Tensor)->torch.Tensor: | |
h = self.feature_extractor(x) | |
return self.classifier(h) | |
def training_step(self, batch, batch_idx): | |
x,y = batch | |
logits_y = self.forward(x) | |
criterion = F.cross_entropy(logits_y, y) | |
self.log("train_loss", criterion) | |
return criterion | |
def validation_step(self, batch, batch_idx): | |
x,y = batch | |
logits_y = self.forward(x) | |
criterion = F.cross_entropy(logits_y, y) | |
self.log("val_loss", criterion) | |
return criterion | |
def test_step(self, batch, batch_idx): | |
x,y = batch | |
logits_y = self.forward(x) | |
criterion = F.cross_entropy(logits_y, y) | |
self.log("test_loss", criterion) | |
return criterion | |
def predict_step(self, batch, batch_idx): | |
# enable Monte Carlo Dropout | |
x, = batch | |
logits_y = self.forward(x) | |
return logits_y.softmax(1) | |
def configure_optimizers(self): | |
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) | |
return optimizer | |
if __name__ == "__main__": | |
from data import WRIMEDataModule | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
print(f"Using {device}") | |
pl.seed_everything(0) | |
dataset = WRIMEDataModule(".", batch_size=512,num_workers=8, max_features=5000,label_index=0) | |
model = BaselineMLP(dataset.num_features, 2000, 4)#.to(device) | |
trainer = pl.Trainer(max_epochs=10,gpus=1) | |
trainer.fit(model.to(device),dataset) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment