Last active
January 13, 2021 09:26
-
-
Save ddelange/35d219991d60606dfff034a3985bb0d1 to your computer and use it in GitHub Desktop.
Doccano Sequence Labelling Active Learning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Interact with Doccano Sequence Labelling projects. | |
Trains on checked (approved) annotations, pushes back predictions for unchecked annotations. | |
### Notes | |
- Doccano might implement active learning themselves in the future: https://github.com/doccano/doccano/issues/191 | |
- To support multilingual NER projects, split_sentence() below uses polyglot, which in turn requires PyICU. | |
- See main() for a single cycle of active learning. Manual finetuning is out of scope of this PoC, but | |
can be controlled by passing existing_model_path, or by amending get_model() and/or train_model(). | |
### TODO | |
- This is a PoC. Model changes are needed, e.g. increase num_train_epochs (currently set to 1). | |
- Avoid blind overwriting of doc annotations: | |
- All documents in the selected project(s) that don't have the checkmark, will be predicted, cleared and overwritten! | |
- Create new User 'model' (annotator) in Django admin. | |
- Only clear/overwrite annotations from 'model' user, not from actual users. | |
### Usage | |
A simple example set-up would be a remote Jupyter Notebook (instructions https://gist.github.com/ddelange/a4b2771707aa492683081f3a8e2c42d4) | |
running in the background that periodically does something like: | |
``` | |
import time | |
from .doccano_active_learning import main | |
while True: | |
main( | |
project_ids=[1, 2, 3], | |
existing_model_path="/path/to/latest/handmade/model/checkpoint" | |
) | |
time.sleep(12 * 60 * 60) # 12hrs | |
``` | |
### Installation | |
PyICU installation instructions for Mac OS X: https://gist.github.com/ddelange/6e04e81b99fae08e817a00515d4a378d | |
Linux should also be straightforward with a `apt-get install libicu-dev && pip install pyicu` or similar. | |
$ python -c 'import icu' # check if PyICU is importable | |
With PyICU available, get the remaining dependencies: | |
For Pytorch, 600MB disk space can be saved on machines without CUDA-enabled GPU: | |
$ pip install --no-cache-dir 'torch==1.7.1;sys_platform=="darwin"' 'torch==1.7.1+cpu;sys_platform=="linux"' -f https://download.pytorch.org/whl/torch_stable.html | |
$ pip install simpletransformers doccano_client==1.0.0 git+https://github.com/aboSamoor/polyglot | |
### Testing | |
No environment variables are required to run doctest: | |
$ pytest --doctest-modules doccano_active_learning.py | |
Source: https://git.io/doccano_active_learning | |
Copyright (c) 2021 ddelange | |
This work is licensed under the terms of the MIT license. | |
For a copy, see <https://opensource.org/licenses/MIT>. | |
""" | |
import itertools | |
import logging | |
import os | |
import re | |
import string | |
from io import BytesIO | |
from pathlib import Path | |
from typing import Dict, Iterable, Tuple, Type, Union | |
import numpy as np # installed by simpletransformers | |
import pandas as pd # installed by simpletransformers | |
import requests # installed by doccano_client | |
import torch # installed by simpletransformers | |
from doccano_api_client import DoccanoClient | |
from polyglot.text import Sentence | |
from simpletransformers.ner import NERArgs, NERModel # use any library you like! | |
from tqdm.auto import tqdm # installed by simpletransformers | |
DOCCANO_HOST = os.environ.get("DOCCANO_HOST", "") | |
DOCCANO_USERNAME = os.environ.get("DOCCANO_USERNAME", "") | |
DOCCANO_PASSWORD = os.environ.get("DOCCANO_PASSWORD", "") | |
logger = logging.getLogger(__name__) | |
ModelType = Type[NERModel] | |
def split_sentence(sentence: str) -> Iterable[str]: | |
"""Split a sentence into tokens. Used in convert_doccano_annotations_to_model and predict_unchecked_docs. | |
>>> split_sentence(' Yes!! a. b bc ') | |
['Yes', '!', '!', 'a', '.', 'b', 'bc'] | |
""" | |
return list(Sentence(sentence).words) | |
RE_PUNCTUATION_CORRECTION = re.compile(f' (?=[{re.escape(string.punctuation)}])') | |
def join_sentence(sentence: Iterable[str]) -> str: | |
"""Join a set of tokens into a sentence. Currently unused. | |
>>> join_sentence(['Yes', '!', '!', 'a', '.', 'b', 'bc']) | |
'Yes!! a. b bc' | |
""" | |
return RE_PUNCTUATION_CORRECTION.sub("", " ".join(sentence)) | |
def get_client( | |
*, | |
doccano_host: str = DOCCANO_HOST, | |
doccano_username: str = DOCCANO_USERNAME, | |
doccano_password: str = DOCCANO_PASSWORD, | |
): | |
"""Instantiate a client and log in to a Doccano instance.""" | |
assert all( | |
[doccano_host, doccano_username, doccano_password] | |
), "Invalid Doccano host or credentials passed" | |
def clear_annotations( | |
self, | |
project_id: int, | |
document_id: int, | |
) -> requests.models.Response: | |
"""Clear all annotations for doc.""" | |
url = "v1/projects/{}/docs/{}/annotations".format(project_id, document_id) | |
return self.delete(url) | |
def delete_annotation( | |
self, | |
project_id: int, | |
document_id: int, | |
annotation_id: int, | |
) -> requests.models.Response: | |
"""Delete single annotation for doc.""" | |
url = "v1/projects/{}/docs/{}/annotations/{}".format( | |
project_id, document_id, annotation_id | |
) | |
return self.delete(url) | |
# overwrite because doccano-client==1.0.0 accepts no kwargs and master won't install | |
def add_annotation( | |
self, project_id: int, annotation_id: int, document_id: int, **kwargs | |
) -> requests.models.Response: | |
""" | |
Adds an annotation to a given document. | |
Variable keyword arguments \*\*kwargs give support to doccano | |
annotations for different project types. | |
For example, for SequenceLabeling one should call using start_offset | |
and end_offset keyword arguments. | |
Args: | |
project_id (int): Project database identifier. | |
annotation_id (int): Annotation identifier. | |
document_id (int): Document identifier. | |
**kwargs: Arbitrary keyword arguments. | |
Returns: | |
requests.models.Response: The request response. | |
""" | |
url = "/v1/projects/{p_id}/docs/{d_id}/annotations".format( | |
p_id=project_id, d_id=document_id | |
) | |
payload = {"label": annotation_id, "projectId": project_id, **kwargs} | |
return self.post(url, json=payload) | |
def download_project_docs(self, project_id: int) -> pd.DataFrame: | |
"""Download all project docs including annotations and approvals.""" | |
resp = self.get_doc_download(project_id=project_id) # jsonl | |
resp.raise_for_status() | |
docs = pd.read_json(BytesIO(resp.content), lines=True) | |
docs["project_id"] = project_id | |
return docs | |
def get_labels_mappings( | |
self, | |
project_ids: Iterable[int], | |
) -> Tuple[Dict[int, str], Dict[str, int]]: | |
"""Get mappings to convert Doccano label_id to string and vice versa.""" | |
ints_to_labels, labels_to_ints = {}, {} | |
for project_id in project_ids: | |
label_list = self.get_label_list(project_id) | |
ints_to_labels.update({lab["id"]: lab["text"] for lab in label_list}) | |
labels_to_ints.update({lab["text"]: lab["id"] for lab in label_list}) | |
return ints_to_labels, labels_to_ints | |
def upload_docs(self, docs: pd.DataFrame) -> None: | |
"""Update existing docs by clearing annotations and adding updated ones back.""" | |
for doc in tqdm(docs.itertuples(), unit="doc_updates"): | |
self.clear_annotations( | |
project_id=doc.project_id, | |
document_id=doc.id, | |
).raise_for_status() | |
for annotation in doc.annotations: | |
self.add_annotation( | |
project_id=doc.project_id, | |
document_id=doc.id, | |
annotation_id=annotation["label"], | |
start_offset=annotation["start_offset"], | |
end_offset=annotation["end_offset"], | |
) | |
DoccanoClient.clear_annotations = clear_annotations | |
DoccanoClient.delete_annotation = delete_annotation | |
DoccanoClient.add_annotation = add_annotation | |
DoccanoClient.download_project_docs = download_project_docs | |
DoccanoClient.get_labels_mappings = get_labels_mappings | |
DoccanoClient.upload_docs = upload_docs | |
client = DoccanoClient( | |
baseurl=doccano_host, | |
username=doccano_username, | |
password=doccano_password, | |
) | |
# get basic information about the authorized user | |
client.me = client.get_me() | |
logger.info("Logged in as %s", client.me) | |
return client | |
def split_checked_unchecked(docs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Split docs in two sub-frames based on whether the docs have an approver.""" | |
na_mask = docs["annotation_approver"].isna() | |
checked_docs = docs[~na_mask] | |
unchecked_docs = docs[na_mask] | |
return checked_docs, unchecked_docs | |
def split_training_data( | |
training_data: pd.DataFrame, | |
train_ratio: float = 0.8, | |
) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Split training_data in two sub-frames randomly without breaking up sentences.""" | |
sentence_ids = training_data["sentence_id"].unique() | |
mask = training_data["sentence_id"].isin( | |
np.random.choice( | |
sentence_ids, | |
size=int(train_ratio * sentence_ids.size), | |
replace=False, | |
) | |
) | |
train_data = training_data[mask] | |
eval_data = training_data[~mask] | |
return train_data, eval_data | |
def convert_model_predictions_to_doccano( | |
predictions: pd.Series, | |
original_sentences: pd.Series, | |
labels_mapping: Dict[str, int], | |
) -> pd.Series: | |
"""Transform model predictions into Doccano annotations. | |
>>> predictions = pd.Series([[{'Hermione': 'I-employee'},{"'": 'B-employee'},{'s': 'I-employee'},{'class': 'B-product_category'},{'was': 'I-product_category'},{'the': 'O'},{'best': 'O'},{'class': 'B-product'},{'in': 'O'},{'her': 'B-material'},{'class': 'I-material'}, {'.': 'O'}]]) | |
>>> original_sentences = pd.Series(["Hermione's class was the best class in her class."]) | |
>>> labels_to_ints = {'application_industry': 196, 'manufacturing_process': 197, 'product_category': 198, 'product': 199, 'material': 200, 'employee': 201, 'company_type': 202, 'operational_country': 203, 'revenue': 204, 'company_name': 205, 'contact': 206, 'age': 207, 'rnd_capacity': 208} | |
>>> doccano_annotations = convert_model_predictions_to_doccano(predictions, original_sentences, labels_to_ints) | |
>>> import pprint; pprint.pprint(doccano_annotations.to_list(), sort_dicts=False) | |
[[{'label': 201, 'start_offset': 0, 'end_offset': 8}, | |
{'label': 201, 'start_offset': 8, 'end_offset': 10}, | |
{'label': 198, 'start_offset': 11, 'end_offset': 20}, | |
{'label': 199, 'start_offset': 30, 'end_offset': 35}, | |
{'label': 200, 'start_offset': 39, 'end_offset': 48}]] | |
Args: | |
predictions: model predictions, one sentence per row. | |
original_sentences: corresponding sentences, used for slicing. | |
labels_mapping: mapping to translate the labels in predictions to label_ids. | |
Returns: | |
corresponding column of Doccano annotations. | |
Raises: | |
ValueError: labels other than B-I-O, or if I is encountered before B. | |
""" | |
annotations = pd.DataFrame( | |
{ | |
"prediction": predictions.reset_index(drop=True), | |
"original_sentence": original_sentences.reset_index(drop=True), | |
} | |
) | |
def _get_annotation(row): | |
pred = row["prediction"] | |
sent = row["original_sentence"] | |
sentence_index = 0 | |
annotations_found = [] | |
for word_pred in pred: | |
word, label_raw = list(word_pred.items())[0] | |
if label_raw == "O": | |
continue | |
label_type, label = label_raw.split("-", 1) | |
label_id = labels_mapping[label] | |
# sent_slice = sent[sentence_index:] | |
if label_type == "I": | |
# model can predict I even though there is no preceding B. convert to B. | |
if not annotations_found or annotations_found[-1]["label"] != label_id: | |
label_type = "B" | |
if label_type == "B": | |
# beginning: create new annotation | |
start_offset = sent.index(word, sentence_index) | |
end_offset = start_offset + len(word) | |
sentence_index = end_offset | |
annotations_found.append( | |
{ | |
"label": label_id, | |
"start_offset": start_offset, | |
"end_offset": end_offset, | |
} | |
) | |
elif label_type == "I": | |
# intermediate: amend last annotation | |
annotation = annotations_found[-1] | |
if annotation["label"] != label_id: | |
# if I is encountered before B | |
raise ValueError( | |
"Bad label '{label}' ({label_id}) encountered (expected {annotation['label']}) in prediction {pred} for sentence '{sent}'" | |
) | |
end_offset = sent.index(word, sentence_index) + len(word) | |
sentence_index = end_offset | |
annotation["end_offset"] = end_offset | |
else: | |
raise ValueError( | |
f"Bad label type '{label_type}' encountered in prediction {pred} for sentence '{sent}'" | |
) | |
return annotations_found | |
return annotations.apply(_get_annotation, axis=1) | |
def convert_doccano_annotations_to_model( | |
docs: pd.DataFrame, | |
labels_mapping: Dict[int, str], | |
) -> pd.DataFrame: | |
"""Transform Doccano annotations into model training data. | |
>>> df = pd.DataFrame([{'id': 104256, 'text': 'Unsere Leistungen Qualität und Service Profilift unser Hubgerät für alle Fälle Informieren Sie sich über unseren Profilift!', 'annotations': [{'label': 199, 'start_offset': 113, 'end_offset': 122, 'user': 1, 'created_at': '2021-01-05T12:36:44.327223Z', 'updated_at': '2021-01-05T12:36:44.327250Z'}, {'label': 197, 'start_offset': 18, 'end_offset': 38, 'user': 1, 'created_at': '2021-01-05T11:59:19.929916Z', 'updated_at': '2021-01-05T11:59:19.929935Z'}, {'label': 199, 'start_offset': 55, 'end_offset': 63, 'user': 1, 'created_at': '2020-12-18T09:32:55.264966Z', 'updated_at': '2020-12-18T09:32:55.264985Z'}, {'label': 199, 'start_offset': 39, 'end_offset': 48, 'user': 1, 'created_at': '2020-12-18T09:32:50.234131Z', 'updated_at': '2020-12-18T09:32:50.234156Z'}], 'meta': {}, 'annotation_approver': 'admin', 'project_id': 18}]) | |
>>> ints_to_labels = {196: 'application_industry', 197: 'manufacturing_process', 198: 'product_category', 199: 'product', 200: 'material', 201: 'employee', 202: 'company_type', 203: 'operational_country', 204: 'revenue', 205: 'company_name', 206: 'contact', 207: 'age', 208: 'rnd_capacity'} | |
>>> convert_doccano_annotations_to_model(df, ints_to_labels) | |
sentence_id words labels | |
0 104256 Unsere O | |
1 104256 Leistungen O | |
2 104256 Qualität B-manufacturing_process | |
3 104256 und I-manufacturing_process | |
4 104256 Service I-manufacturing_process | |
5 104256 Profilift B-product | |
6 104256 unser O | |
7 104256 Hubgerät B-product | |
8 104256 für O | |
9 104256 alle O | |
10 104256 Fälle O | |
11 104256 Informieren O | |
12 104256 Sie O | |
13 104256 sich O | |
14 104256 über O | |
15 104256 unseren O | |
16 104256 Profilift B-product | |
17 104256 ! O | |
Args: | |
docs: annotated sentences | |
labels_mapping: mapping to translate the label_ids in docs to strings. | |
Returns: | |
training data for the model. | |
Raises: | |
ValueError: When an annotation begins/ends in the middle of a word. | |
""" | |
docs = docs[docs["annotations"].str.len() > 0] # drop rows without annotations | |
assert not docs.empty, "Expected non-empty DataFrame with Doccano annotations" | |
def _get_annotation(row): | |
"""Get list of model annotations for a row (sentence + annotations list).""" | |
sentence_id = row["id"] | |
sentence = row["text"] | |
# sort annotations by occurence in senternce | |
annotations = pd.DataFrame(row.annotations).sort_values("start_offset") | |
# convert numeric labels to string labels | |
annotations["label"] = annotations["label"].apply(labels_mapping.get) | |
annotations["text"] = annotations.apply( | |
# get list of words for annotation offset | |
lambda annotation: split_sentence( | |
sentence[annotation["start_offset"] : annotation["end_offset"]] | |
), | |
axis=1, | |
) | |
# for each word (sorting matters) generate the correct B- and I- labels | |
annotations = annotations[["label", "text"]].explode("text") | |
label_prefixes = [ | |
"I-" if dupe else "B-" for dupe in annotations.index.duplicated("first") | |
] | |
annotations["label"] = label_prefixes + annotations["label"] | |
# for each word in sentence, pop from start of list, or take "O" (other) label | |
# can't use to_dict and pop from OrderedDict, as there could be duplicate keys | |
annotated_words = annotations["text"].to_list() | |
annotated_labels = annotations["label"].to_list() | |
sentence_words = split_sentence(sentence) | |
bad_annotations = set(annotated_words).difference(sentence_words) | |
if bad_annotations: | |
raise ValueError( | |
f"Annotations {bad_annotations} aren't present in sentence words {sentence_words}" | |
) | |
final = [] | |
for word in sentence_words: | |
if annotated_words and word == annotated_words[0]: | |
annotated_words.pop(0) | |
label = annotated_labels.pop(0) | |
else: | |
label = "O" | |
final.append({"sentence_id": sentence_id, "words": word, "labels": label}) | |
return final | |
# explode annotations | |
annotations = ( | |
docs.apply(_get_annotation, axis=1) | |
.explode() | |
.apply(pd.Series) | |
.reset_index(drop=True) | |
) | |
return annotations | |
def get_model( | |
labels: Iterable[str], existing_model_path: Union[None, Path, str] = None | |
) -> ModelType: | |
"""Initiate model object with custom labels, optionally loading from checkpoint.""" | |
if existing_model_path is None: | |
existing_model_path = "distilbert-base-multilingual-cased" | |
logger.info(f"Loading model {existing_model_path}") | |
custom_labels = list( | |
itertools.chain(*[[f"B-{label}", f"I-{label}"] for label in labels]) | |
) + ["O"] | |
# Configure the model | |
model_args = NERArgs() | |
model_args.train_batch_size = 4 | |
model_args.evaluate_during_training = True | |
model_args.num_train_epochs = 1 | |
model_args.overwrite_output_dir = True | |
model = NERModel( | |
"distilbert", | |
existing_model_path, | |
args=model_args, | |
use_cuda=bool(torch.cuda.device_count()), | |
labels=custom_labels, | |
) | |
return model | |
def train_model( | |
docs: pd.DataFrame, | |
labels_mapping: Dict[int, str], | |
existing_model_path: Union[None, Path, str] = None, | |
) -> ModelType: | |
"""Train model with latest checked docs from one or multiple projects.""" | |
training_data = convert_doccano_annotations_to_model(docs, labels_mapping) | |
train_data, eval_data = split_training_data(training_data) | |
model = get_model( | |
labels=labels_mapping.values(), | |
existing_model_path=existing_model_path, | |
) | |
logger.info("Training model") | |
model.train_model(train_data=train_data, eval_data=eval_data) | |
result, model_outputs, preds_list = model.eval_model(eval_data=eval_data) | |
logger.info(f"Eval results:\n{result}") | |
return model | |
def predict_unchecked_docs( | |
model: ModelType, | |
docs: pd.DataFrame, | |
labels_mapping: Dict[str, int], | |
) -> pd.DataFrame: | |
"""Predict and overwrite the annotations column.""" | |
predictions, raw_outputs = model.predict( | |
docs["text"].apply(split_sentence), | |
split_on_space=False, | |
) | |
docs["annotations"] = convert_model_predictions_to_doccano( | |
predictions=pd.Series(predictions), | |
original_sentences=docs["text"], | |
labels_mapping=labels_mapping, | |
).to_list() | |
return docs | |
def main( | |
project_ids: Iterable[int], | |
existing_model_path: Union[None, Path, str] = None, | |
): | |
"""Run single cycle, overwriting unannotated (not manually marked as checked) documents. | |
1. Fetch checked_docs and unchecked_docs for project_ids | |
2. Train model with latest annotations (checked_docs) from project_ids | |
3. Annotate unchecked_docs using latest model, put back in DataFrame | |
4. Push new annotations for unapproved rows | |
Args: | |
project_ids: iterable of Doccano project ids to train with and to update | |
""" | |
# 1 | |
client = get_client() | |
docs = pd.concat( | |
tqdm( | |
(client.download_project_docs(project_id) for project_id in project_ids), | |
unit="project_exports", | |
) | |
) | |
checked_docs, unchecked_docs = split_checked_unchecked(docs) | |
# 2 | |
ints_to_labels, labels_to_ints = client.get_labels_mappings( | |
project_ids=project_ids, | |
) | |
model = train_model( | |
docs=checked_docs, | |
labels_mapping=ints_to_labels, | |
existing_model_path=existing_model_path, | |
) | |
# 3 | |
annotated_docs = predict_unchecked_docs( | |
model=model, | |
docs=unchecked_docs, | |
labels_mapping=labels_to_ints, | |
) | |
logger.info(f"Annotations:\n{annotated_docs}") | |
# 4 | |
client.upload_docs(docs=annotated_docs) | |
if __name__ == "__main__": | |
logging.basicConfig(level="INFO") | |
main(project_ids=[1, 2, 3]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment