Skip to content

Instantly share code, notes, and snippets.

@naoh16
Last active July 10, 2025 11:41
Show Gist options
  • Save naoh16/342b088032a6d37c73d86fe58120bcf1 to your computer and use it in GitHub Desktop.
Save naoh16/342b088032a6d37c73d86fe58120bcf1 to your computer and use it in GitHub Desktop.
AI・IoT PBL 2024
import json
from collections import deque
# import pickle
import skl2onnx
import onnxruntime
# https://pypi.org/project/skl2onnx/
# https://onnxruntime.ai/docs/install/#python-installs
# Example of installation:
# pip install skl2onnx onnxruntime
import numpy as np
from sklearn.svm import OneClassSVM
# Global variables
DATA_QUEUE = deque()
THRESHOLD = 0.0
SVM_NU = 0.5
MODEL_FILENAME = 'svm.model.onnx'
def init():
global DATA_QUEUE
def _load_data(filename):
with open(filename, 'r') as f:
for line in f:
_, s = line.rstrip().split('\t')
# print(s) # for debug
DATA_QUEUE.append(json.loads(s))
_load_data('train.txt') # 100 datum
_load_data('test.txt') # 50 datum
# print(data_queue) # for debug
def train():
global DATA_QUEUE
clf = OneClassSVM(gamma='auto', nu=SVM_NU)
tr_list = list()
for _ in range(100):
_d = DATA_QUEUE.popleft()
_feat = np.array([_d['ax'], _d['ay'], _d['az']], dtype=np.float32)
tr_list.append(_feat)
X = np.array(tr_list)
# print(X) # for debug
# Training
clf.fit(X)
# Save SVM's parameters as modelfile
onx = skl2onnx.to_onnx(clf, X[:1])
# The 2nd argument is used the estimation of input-type.
# eg. Array(float32, float32, float32)
with open(MODEL_FILENAME, "wb") as f:
f.write(onx.SerializeToString())
def test():
global DATA_QUEUE
# Load SVM's parameters as modelfile
sess = onnxruntime.InferenceSession(MODEL_FILENAME, providers=["CPUExecutionProvider"])
sess_x = sess.get_inputs()[0]
sess_y1 = sess.get_outputs()[0]
sess_y2 = sess.get_outputs()[1]
while DATA_QUEUE: # aka: while QUEUE_IS_NOT_EMPTY
_d = DATA_QUEUE.popleft()
_feat = np.array([_d['ax'], _d['ay'], _d['az']], dtype=np.float32)
_feat = _feat.reshape(-1, 3)
# _feat is a matrix of DATA_NUM x FEATURES
# Prediction
y, score = sess.run(
[sess_y1.name, sess_y2.name], # 入手したい出力(label, scores)
{sess_x.name: _feat} # 判定対象の入力
)
y, score = np.squeeze(y), np.squeeze(score)
# y and score are matricis of "DATA_NUM x 1"
# or,
# y = 1 if score>THRESHOLD else -1
print('{:s}\t{:s} (score = {:.4f})'.format(
_d['date'],
'Normal' if y > 0 else '*Anomaly!*',
score))
if __name__ == "__main__":
init()
train()
test()
import json
from collections import deque
import pickle
import numpy as np
from sklearn.svm import OneClassSVM
# Global variables
DATA_QUEUE = deque()
THRESHOLD = 0.0
SVM_NU = 0.5
MODEL_FILENAME = 'svm.model.pkl'
def init():
global DATA_QUEUE
def _load_data(filename):
with open(filename, 'r') as f:
for line in f:
_, s = line.rstrip().split('\t')
# print(s) # for debug
DATA_QUEUE.append(json.loads(s))
_load_data('train.txt') # 100 datum
_load_data('test.txt') # 50 datum
# print(data_queue) # for debug
def train():
global DATA_QUEUE
clf = OneClassSVM(gamma='auto', nu=SVM_NU)
tr_list = list()
for _ in range(100):
_d = DATA_QUEUE.popleft()
_feat = np.array([_d['ax'], _d['ay'], _d['az']], dtype=np.float32)
tr_list.append(_feat)
X = np.array(tr_list)
# print(X) # for debug
# Training
clf.fit(X)
# Save SVM's parameters as modelfile
with open(MODEL_FILENAME, 'wb') as f:
pickle.dump(clf, f)
def test():
global DATA_QUEUE
# Load SVM's parameters as modelfile
with open(MODEL_FILENAME, 'rb') as f:
clf = pickle.load(f)
while DATA_QUEUE: # aka: while QUEUE_IS_NOT_EMPTY
_d = DATA_QUEUE.popleft()
_feat = np.array([_d['ax'], _d['ay'], _d['az']], dtype=np.float32)
_feat = _feat.reshape(-1, 3)
# _feat is a matrix of DATA_NUM x FEATURES
# Prediction
score = clf.decision_function(_feat)[0]
# score is a matrix of "DATA_NUM x 1"
y = 1 if score>THRESHOLD else -1
# or, shortly:
# y = svm_classifier.predict(_feat)[0]
print('{:s}\t{:s} (score = {:.4f})'.format(
_d['date'],
'Normal' if y > 0 else '*Anomaly!*',
score))
if __name__ == "__main__":
init()
train()
test()
"""
Example of abnormaly detection for 3D accelerometer
- Original source code was written by S. Kobayashi.
- Then, the source code was modified by by S. Hara.
"""
import json
import time
import argparse
import datetime
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
from kafka import KafkaConsumer
import pandas as pd
import numpy as np
TIME_TERM = datetime.timedelta(seconds=10)
def anomaly_score_by_value(df, current_time, current_value, value_fn):
# extract data for specified time range (TIME_TERM)
current_value = value_fn(current_value)
usual_values = df[df.index > current_time-TIME_TERM].apply(value_fn,axis=1).to_numpy()
# calculate anomaly score
return np.abs(current_value - np.average(usual_values)) - np.std(usual_values) * 2
def test_value_anomaly(df, current_time, current_value, value_fn):
anomaly_score = anomaly_score_by_value(df, current_time, current_value, value_fn)
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*"
print("> Value anomaly score: {0}, {1}".format(anomaly_score, anomaly_message))
def anomaly_score_by_collection(df, current_time, current_value, value_fn=None):
# ignore this step if there are no enough messages
if len(df.index) < 3:
return 9999
# calculate time interval of data collection
tmp = []
for i in range(len(df.index) - 1):
tmp.append((df.index[i+1] - df.index[i]).total_seconds())
delta = np.array(tmp)
new_delta = delta[-1]
#delta = np.array([(df.index[i+1] - df.index[i]).total_seconds() for i in range(len(df.index) - 1)])
# calculate anomaly score
return np.abs(new_delta - np.average(delta)) - np.std(delta) * 2
def test_collection_anomaly(df, current_time, current_value, value_fn=None):
anomaly_score = anomaly_score_by_collection(df, current_time, current_value)
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*"
#print("Time since last message: {0} seconds".format(new_delta))
print("> Data collection anomaly score: {0}, {1}".format(anomaly_score, anomaly_message))
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn):
df_key_spec = value_spec + [ key_spec ] # concat list
df = pd.DataFrame([], columns=df_key_spec)
try :
for message in consumer:
print("DEBUG: key: {0}, value: {1}".format(message.key, message.value))
# Update history (dataframe)
_key = pd.to_datetime(message.value[key_spec])
df.loc[_key] = message.value
# Anomaly detection
test_value_anomaly(df, _key, message.value, value_fn)
test_collection_anomaly(df, _key, message.value)
print("")
except KeyboardInterrupt:
print("process interrupted")
return
if __name__ == "__main__":
consumer = KafkaConsumer(
"sensor1",
bootstrap_servers = "127.0.0.1:9092",
auto_offset_reset='earliest',
## convert message key to datetime.datetime
key_deserializer=lambda m: m.decode("utf-8"),
# convert message value to dict
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
main_loop(consumer,
key_spec='date',
value_spec=['ax', 'ay', 'az'],
value_fn=lambda v: v['ax'])
import sys
import math
import random
import json
from datetime import datetime, timedelta, timezone
import uuid
device_id = 'sensor1'
is_test = False if sys.argv[1]=='train' else True
if is_test:
count = 50
_seed = 10000
else:
count = 200
_seed = 10001
random.seed(_seed)
# starttime = datetime.now(tz=timezone(timedelta(hours=+9), 'JST'))
starttime = datetime.now(tz=timezone.utc)
factor = 1
for n in range(count):
if is_test and n == 30: factor = 100.0
if is_test and n == 40: factor = 1
x = 2 * math.cos(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05)
y = 2 * math.sin(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05)
z = -9.8 + random.gauss(0.0, factor*0.05)
_date = starttime + timedelta(seconds=n+random.gauss(0.0, 0.0005))
_key = str(uuid.uuid4())
_value = {
"date": _date.isoformat(),
"ax": x,
"ay": y,
"az": z
}
# Key[TAB]Value とする.
# kafka-console-producer.sh で --property "parse.key=true" も必要
print('\t'.join([_key, json.dumps(_value)]))
"""
Example of abnormaly detection for 3D accelerometer
- Original source code was written by S. Kobayashi.
- Then, the source code was modified by by S. Hara.
"""
import json
import time
import argparse
import datetime
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
from kafka import KafkaConsumer
import pandas as pd
import numpy as np
import streamlit as st
#import matplotlib.pyplot as plt
TIME_TERM = datetime.timedelta(seconds=10)
import det1
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn):
df_key_spec = value_spec + [ key_spec ] # concat list
df = pd.DataFrame([], columns=df_key_spec)
try :
for message in consumer:
# print("DEBUG: key: {0}, value: {1}".format(message.key, message.value))
## Update history (dataframe)
_key = pd.to_datetime(message.value[key_spec])
df.loc[_key] = message.value
## Anomaly detection (note: score calculation only)
_score = det1.anomaly_score_by_value(df, _key, message.value, value_fn)
# _score = det1.anomaly_score_by_collection(df, _key, message.value, value_fn)
## Update chart of Streamlit App
update_chart(_score)
print("")
except KeyboardInterrupt:
print("process interrupted")
return
# グラフを描画する
chart_y = [0.0]
chart_obj = None
def update_chart(new_score=None):
global chart_y, chart_obj
if new_score is None or chart_obj is None:
chart_obj = st.empty()
else:
chart_y.append(new_score)
chart_obj.line_chart(chart_y)
time.sleep(0.01)
if __name__ == "__main__":
st.title('streamlit example')
update_chart(None)
consumer = KafkaConsumer(
"sensor1",
bootstrap_servers = "127.0.0.1:9092",
auto_offset_reset='earliest',
## convert message key to datetime.datetime
key_deserializer=lambda m: m.decode("utf-8"),
# convert message value to dict
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
main_loop(consumer,
key_spec='date',
value_spec=['ax', 'ay', 'az'],
value_fn=lambda v: v['ax'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment