Skip to content

Instantly share code, notes, and snippets.

@ksasao
Last active February 11, 2025 14:16
Show Gist options
  • Save ksasao/ecc46f08b45e47880ffa1953790ef32e to your computer and use it in GitHub Desktop.
Save ksasao/ecc46f08b45e47880ffa1953790ef32e to your computer and use it in GitHub Desktop.
M5Atom GPS と ENV Pro で、緯度経度に紐づいた匂いのログを記録します。 https://x.com/ksasao/status/1887864442442096790
"""
コマンドライン引数で指定されたTSVファイル(タブ区切り)を読み込み、
各行のデータ(タイムスタンプ, タイプ, JSON文字列)を解析します。
Scent行の場合、JSON内の "Vector" 配列の4~13番目(インデックス3~12)の10次元匂いベクトルを抽出し、
直前のLocation行のLatitude, Longitude情報と対応付けます。
その後、scikit-learnのTSNEで10次元の匂いベクトルを2次元に低次元化し、
t-SNEの両軸の出力を利用してHSVカラー空間で色付けします。
具体的には、1軸目をHue、2軸目をSaturationとしてマッピングし、よりカラフルに点を表示します。
結果をFoliumで地図上にCircleMarker(サイズは倍に設定)としてプロットし、scent_map.html に保存します。
使用方法:
python plot.py data.tsv
必要なパッケージ:
pip install pandas scikit-learn matplotlib folium
"""
import sys
import json
import pandas as pd
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt # ここでは参考用にインポート
import folium
import colorsys # HSVをRGBに変換
def rgb_to_hex(rgb):
# rgb は (r, g, b) それぞれ 0〜1 の値
r, g, b = rgb
return '#{0:02x}{1:02x}{2:02x}'.format(int(r*255), int(g*255), int(b*255))
def main():
# コマンドライン引数でTSVファイルを指定
if len(sys.argv) != 2:
print("使用方法: python plot.py <tsvファイル>")
sys.exit(1)
tsv_file = sys.argv[1]
# --- 1. pandasでタブ区切りのTSVファイルを読み込む ---
df = pd.read_csv(tsv_file, sep="\t", header=None,
names=["timestamp", "type", "json_str"])
# --- 2. 各行の処理 ---
scent_vectors = [] # 各Scent行の10次元匂いベクトル
positions = [] # 対応する (Latitude, Longitude) タプル
current_location = None # 直前のLocation情報
for idx, row in df.iterrows():
data_type = row["type"]
json_str = row["json_str"]
# json_str が文字列でない場合は str() に変換
if not isinstance(json_str, str):
json_str = str(json_str)
# 期待するJSONは "{" で始まるはずなので、そうでなければスキップ
if not json_str.strip().startswith("{"):
print(f"JSON形式でないためスキップ: {json_str}")
continue
try:
data = json.loads(json_str)
except (json.JSONDecodeError, TypeError) as err:
print(f"JSONパースエラー: {json_str}\nエラー内容: {err}")
continue
if data_type == "Location":
if "Latitude" in data and "Longitude" in data:
current_location = (data["Latitude"], data["Longitude"])
elif data_type == "Scent":
if current_location is None:
continue # 直前のLocation情報がなければスキップ
vector = data.get("Vector", [])
if len(vector) < 13:
print(f"Vectorの要素数不足: {vector}")
continue
# 4~13番目(Pythonでインデックス3~12)を抽出
odor_vector = vector[3:13]
scent_vectors.append(odor_vector)
positions.append(current_location)
# その他のタイプは無視
if not scent_vectors:
print("有効なScentデータが見つかりませんでした。")
sys.exit(1)
# --- 3. TSNEで10次元の匂いベクトルを2次元に低次元化 ---
X = np.array(scent_vectors)
tsne = TSNE(n_components=2, random_state=0)
X_embedded = tsne.fit_transform(X)
# --- 4. t-SNE結果を用いたHSVカラーでの色付け ---
# 1軸目をHue, 2軸目をSaturationとして利用
tsne_x = X_embedded[:, 0]
tsne_y = X_embedded[:, 1]
# 各軸を0~1に正規化する
norm_x = (tsne_x - tsne_x.min()) / (tsne_x.max() - tsne_x.min())
norm_y = (tsne_y - tsne_y.min()) / (tsne_y.max() - tsne_y.min())
# HSVにおいて Hue = norm_x, Saturation = 0.5~1 (norm_yで補正), Value 固定 = 0.9 とする(彩度に変化が出るように調整)
hsv_colors = [colorsys.hsv_to_rgb(h, 0.5 + 0.5*s, 0.9) for h, s in zip(norm_x, norm_y)]
hex_colors = [rgb_to_hex(rgb) for rgb in hsv_colors]
# --- 5. Foliumで地図作成およびScentデータのプロット ---
lats = [pos[0] for pos in positions]
lons = [pos[1] for pos in positions]
# 中心位置を計算
center_lat = np.mean(lats)
center_lon = np.mean(lons)
# 緯度と経度の範囲を計算
lat_range = max(lats) - min(lats)
lon_range = max(lons) - min(lons)
# 適切なズームレベルを計算
zoom_level = min(
int(np.log2(360 / lat_range)),
int(np.log2(360 / lon_range))
) - 1
# 地図を作成
m = folium.Map(location=(center_lat, center_lon), zoom_start=zoom_level)
# 境界ボックスを作成
sw = [min(lats), min(lons)]
ne = [max(lats), max(lons)]
m.fit_bounds([sw, ne])
# CircleMarker の radius を 10 として点のサイズを倍にしています
for (lat, lon), hex_color in zip(positions, hex_colors):
folium.CircleMarker(
location=(lat, lon),
radius=10,
color=hex_color,
fill=True,
fill_color=hex_color,
fill_opacity=0.8,
).add_to(m)
output_name = tsv_file+".html"
m.save(output_name)
print(output_name + " を作成しました。ブラウザで開いて確認してください。")
if __name__ == "__main__":
main()
// Scent+GPS Logger for M5Atom Atomic GPS Kit
// Library version M5Atom 0.1.3
// Board version M5Stack 2.1.3
#include "bme68xLibrary.h"
#include "M5Atom.h"
#include <sys/time.h>
#include <SPI.h>
#include "FS.h"
#include <SD.h>
#include <TinyGPS++.h>
#include <BluetoothSerial.h>
// Bluetooth SPP
uint64_t chipid;
char chipname[256];
BluetoothSerial SerialBT;
// SD Card
File txtFile;
// M5Atom/Grove
#define SDA_PIN 26
#define SCL_PIN 32
// GPS
TinyGPSPlus gps;
const int _offsetSeconds = 9 * 3600;
#define TZ_JST "JST-9"
#define GPS_ID "Gps_05"
double Lat = -1000;
double Lon = -1000;
// BME688
#define BME688_I2C_ADDR 0x77
#define NEW_GAS_MEAS (BME68X_GASM_VALID_MSK | BME68X_HEAT_STAB_MSK | BME68X_NEW_DATA_MSK)
#define MEAS_DUR 140
#define GAS_ID "Gas_05"
Bme68x bme;
float _gas[10];
#define TEXT_LEN 512
char _text[TEXT_LEN];
char _logBuffer[TEXT_LEN*2];
char _filename[256];
void setTimeZone(){
setenv("TZ", TZ_JST, 1);
tzset();
}
bool writeLog(String logType, String logData) { //Write GPSdata to SDcard
struct tm t;
String buf = "";
if (!getLocalTime(&t)) {
Serial.println("Failed to obtain time");
return false;
}else if(t.tm_year + 1900 > 2020){
// ミリ秒まで時刻を合わせるため改めて時刻を取り直す
struct timeval tv;
gettimeofday(&tv, NULL);
t = *localtime(&tv.tv_sec);
int millisec = tv.tv_usec / 1000;
sprintf(_filename,"/%04d%02d%02d.tsv", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);
snprintf(_logBuffer,TEXT_LEN*2,"%04d-%02d-%02d %02d:%02d:%02d.%03d",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,
t.tm_hour, t.tm_min, t.tm_sec, millisec);
buf.concat(String(_logBuffer));
buf.concat("\t");
buf.concat(logType);
buf.concat("\t");
buf.concat(logData);
Serial.println(buf);
SerialBT.println(buf);
txtFile = SD.open(_filename, FILE_APPEND);
if(txtFile){
txtFile.print(buf);
txtFile.println();
txtFile.close();
}else{
// 書き込み失敗時は水色
M5.dis.drawpix(0,0x004f4f);
SerialBT.println("** Write Error **");
return false;
}
}else{
return false;
}
return true;
}
void printUint16Hex(uint16_t value) {
Serial.print(value < 4096 ? "0" : "");
Serial.print(value < 256 ? "0" : "");
Serial.print(value < 16 ? "0" : "");
Serial.print(value, HEX);
}
void printSerialNumber(uint16_t serial0, uint16_t serial1, uint16_t serial2) {
Serial.print("Serial: 0x");
printUint16Hex(serial0);
printUint16Hex(serial1);
printUint16Hex(serial2);
Serial.println();
}
// This custom version of delay() ensures that the gps object
// is being "fed".
static void smartDelay(unsigned long ms)
{
unsigned long start = millis();
int count = 0;
do
{
while (Serial1.available() && (millis() - start) < ms){
int c = Serial1.read();
gps.encode(c);
if(count&0xff==0){
delay(1);
}
// Serial.print((char)c);
count++;
}
delay(1);
} while (millis() - start < ms);
}
int counter = 0;
void setLocalTime(tm t){
time_t epoch = mktime(&t);
struct timeval tv;
tv.tv_sec = (long)epoch + _offsetSeconds; // 時差分を秒数で追加
tv.tv_usec = 0;
settimeofday(&tv, NULL);
}
void setDateTime(){
struct tm t;
t.tm_year = gps.date.year() - 1900;
t.tm_mon = gps.date.month() - 1;
t.tm_mday = gps.date.day();
t.tm_hour = gps.time.hour();
t.tm_min = gps.time.minute();
t.tm_sec = gps.time.second();
t.tm_isdst= -1;
setLocalTime(t);
Serial.println("Save date");
}
int _oldSec = -1;
void readGnss(){
// GNSSによる秒が更新されたら後続の処理を行う
int nowSec = gps.time.second();
double latitude = gps.location.lat();
double longitude = gps.location.lng();
double altitude = gps.altitude.meters();
int satellites = gps.satellites.value();
if(nowSec & 1){
M5.dis.drawpix(0,0x000000);
}else{
if(satellites <=4){
M5.dis.drawpix(0,0x4f0000);
}else if(satellites < 10){
M5.dis.drawpix(0,0x4f4f00);
}else{
M5.dis.drawpix(0,0x00004f);
}
}
if(nowSec == _oldSec || satellites <= 4 || (abs(latitude)<0.00001 && abs(longitude)<0.00001) ){
return;
}
_oldSec = nowSec;
snprintf(_text,TEXT_LEN,"{\"Id\":\"%s\",\"Latitude\":%.7f,\"Longitude\":%.7f,\"Altitude\":%.2f,\"Satellites\":%d}",
GPS_ID,
latitude,longitude,altitude,satellites);
writeLog("Location",String(_text));
}
void writeSystemLog(char* txt){
snprintf(_text,TEXT_LEN,"{\"Message\":\"%s\"}", txt);
writeLog("System",String(_text));
}
// BME688関連
int BME_INIT_VALUE = 5;
void initBME688(){
/* initializes the sensor based on I2C library */
bme.begin(BME688_I2C_ADDR, Wire);
if(bme.checkStatus())
{
if (bme.checkStatus() == BME68X_ERROR)
{
Serial.println("Sensor error:" + bme.statusString());
return;
}
else if (bme.checkStatus() == BME68X_WARNING)
{
Serial.println("Sensor Warning:" + bme.statusString());
}
}
/* Set the default configuration for temperature, pressure and humidity */
bme.setTPH();
// ヒーターの温度(℃)の1サイクル分の温度変化。 200-400℃程度を指定。配列の長さは最大10。
// uint16_t tempProf[10] = { 320, 100, 100, 100, 200, 200, 200, 320, 320,320 };
uint16_t tempProf[10] = {210,265,265,320,320, 265,210,190,300,230};
// ヒーターの温度を保持する時間の割合。数値×MEAS_DUR(ms)保持される。保持時間は1~4032ms。指定温度に達するまで20-30ms程度が必要。
// uint16_t mulProf[10] = { 10, 5, 5, 5, 5, 5, 5, 5, 5, 5 };
uint16_t mulProf[10] = {5,2,10,2,10, 6,6,6,6,6};
/* 各測定(温度,湿度,気圧,抵抗値)の繰り返し間隔(MEAS_DUR)から測定にかかる正味時間を引いたものをsharedHeatrDurに設定 */
uint16_t sharedHeatrDur = MEAS_DUR - (bme.getMeasDur(BME68X_PARALLEL_MODE) / 1000);
bme.setHeaterProf(tempProf, mulProf, sharedHeatrDur, 10);
bme.setOpMode(BME68X_PARALLEL_MODE);
for(int i=0;i<10;i++){
_gas[i] = BME_INIT_VALUE;
}
}
String _bmeMessage = "";
bool _bmeUpdated = false;
char _bmeBuffer[TEXT_LEN];
bool _bmeFetched = false;
void readBME688(){
bme68xData data;
uint8_t nFieldsLeft = 0;
float current = 0;
int index = 0;
delay(MEAS_DUR);
if (bme.fetchData())
{
do
{
nFieldsLeft = bme.getData(data);
if (data.status == NEW_GAS_MEAS)
{
current = log(data.gas_resistance);
index = data.gas_index;
if(index<0 || index>9){
continue;
}
_gas[index] = current;
if(index == 2){ // バッファが溢れないようにするため、次の待ち時間が長いタイミングを狙って処理する
bool validData = true;
for(int i=0; i<10;i++){
if(_gas[i]<BME_INIT_VALUE+0.1){ // BME_INIT_VALUE以下は異常値とみなす
validData=false;
break;
}
}
if(validData){
snprintf(_bmeBuffer,TEXT_LEN,"{\"Id\":\"%s\",\"Pressure\":%.0f,\"Vector\":[%.2f,%.2f,%.0f,%.3f,%.3f,%.3f,%.3f,%.3f,%.3f,%.3f,%.3f,%.3f,%.3f]}",
GAS_ID,
data.pressure,data.temperature,data.humidity,data.pressure,
_gas[0],_gas[1],_gas[2],_gas[3],_gas[4],_gas[5],_gas[6],_gas[7],_gas[8],_gas[9]);
_bmeMessage = String(_bmeBuffer);
writeLog("Scent",_bmeMessage);
}
}
delay(1);
}
} while (nFieldsLeft);
}
}
void setup() {
// 初期化の順序に注意
M5.begin(true,false,true);
delay(500);
// 動作速度の変更
setCpuFrequencyMhz(80); // 標準 240, 160, 80, 40, 20, 10 が選択可能
delay(100);
// Bluetooth Serial
chipid = ESP.getEfuseMac();
sprintf( chipname, "SerialBT_%04X", (uint16_t)(chipid >> 32));
Serial.printf("Bluetooth: %s\n", chipname);
SerialBT.begin(chipname);
Wire.begin(SDA_PIN, SCL_PIN);
Wire.setClock(400000UL);
M5.dis.fillpix(0x4f0000);
// SDカードへの接続
SPI.begin(23,33,19,-1);
if(!SD.begin(15, SPI, 40000000)){ // -1を15に修正
Serial.println("initialization failed!");
}
sdcard_type_t Type = SD.cardType();
Serial.printf("SDCard Type = %d \r\n",Type);
Serial.printf("SDCard Size = %d MiB\r\n" , (int)(SD.cardSize()/1024/1024));
writeSystemLog("SD Card Initialized.");
// GNSSモジュールへの接続
Serial1.begin(9600, SERIAL_8N1,22, -1, false, 500); // invert = false, timeout_ms = 500;
Serial1.setRxBufferSize(2048);
Serial1.flush();
M5.dis.fillpix(0x4f4f4f);
// センサの初期化
writeSystemLog("Initializing sensors...");
initBME688();
// リブートを記録
writeSystemLog("Rebooted.");
}
int oldHour = -1;
void adjustDateTime(){
// 1時間に一度時刻の補正を試みる
struct tm t;
getLocalTime(&t);
int nowHour = t.tm_hour;
int sat = gps.satellites.value();
if (!getLocalTime(&t) || oldHour != nowHour){
if(gps.date.year() > 2020 && sat > 4){ // GNSSの時刻が利用できる場合はそちらに合わせる
setDateTime();
writeSystemLog("Adjust datetime from GNSS");
}else{
writeSystemLog("Adjust datetime failed");
}
oldHour = nowHour;
}
}
void loop() {
adjustDateTime();
readGnss();
readBME688();
smartDelay(20); // BME688 のバッファを処理するために頻繁に呼び出す必要がある
M5.update();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment