Last active
February 20, 2022 22:31
-
-
Save YumNumm/61ab0c9f59db5dce9c6f9eace7b7623d to your computer and use it in GitHub Desktop.
強震モニター ExtensionからPOSTを受け取ってツイート/FCM POSTするプログラム
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from http.server import BaseHTTPRequestHandler, HTTPServer | |
from urllib.parse import parse_qs, urlparse | |
import firebase_admin | |
from firebase_admin import credentials | |
from firebase_admin import auth | |
from firebase_admin import messaging | |
import tweepy | |
import time | |
import reader | |
import json | |
CK = "" | |
CS = "" | |
AT = "" | |
AS = "" | |
auth = tweepy.OAuthHandler(CK, CS) | |
auth.set_access_token(AT, AS) | |
api = tweepy.API(auth) | |
cred_p = credentials.Certificate( | |
"/app/eqalert-prod-firebase-adminsdk-ruem1-5b09cba6fa.json" | |
) | |
fb_p = firebase_admin.initialize_app(cred_p, name="prod") | |
# DEVENV | |
cred_d = credentials.Certificate( | |
"/app/eqalert-dev-firebase-adminsdk-lyjhw-cc6d037b92.json" | |
) | |
fb_d = firebase_admin.initialize_app(cred_d) | |
print(fb_p.name) | |
print(fb_d.name) | |
address = ("0.0.0.0", 80) | |
class MyHTTPRequestHandler(BaseHTTPRequestHandler): | |
def do_GET(self): | |
parsed_path = urlparse(self.path) | |
self.send_response(200) | |
self.send_header("Content-Type", "text/plain; charset=utf-8") | |
self.end_headers() | |
self.wfile.write(b"GET func!") | |
def do_POST(self): | |
print("POSTED!!!!!!!!") | |
parsed_path = urlparse(self.path) | |
content_length = int(self.headers["content-length"]) | |
msg = reader.readJson( | |
"{}".format((self.rfile.read(content_length).decode("utf-8"))) | |
) | |
api.update_status(str(msg)) | |
message = messaging.Message( | |
notification=messaging.Notification( | |
title="地震通知", | |
body=str(msg), | |
), | |
topic="dev", | |
) | |
response = messaging.send(message) | |
self.send_response(200) | |
self.send_header("Content-Type", "text/plain; charset=utf-8") | |
self.end_headers() | |
self.wfile.write(b"POST") | |
#api.update_status("EEW Server Up!\n" + str(time.time())) | |
with HTTPServer(address, MyHTTPRequestHandler) as server: | |
server.serve_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
def readJson(jsonData): | |
print(jsonData) | |
jsonData = json.loads(jsonData) | |
print(jsonData) | |
# get first eew message | |
if jsonData.get("type") == "eew": | |
magnitude = float(jsonData.get("magnitude")) | |
# get EQ Data | |
report = str(jsonData.get("report")) | |
if report == "final": | |
report = "最終" | |
else: | |
report = "第" + report | |
epicenter = str(jsonData.get("epicenter")) | |
depth = str(jsonData.get("depth")) | |
intensity = str(jsonData.get("intensity")) | |
time = str(jsonData.get("time")) | |
msg = str( | |
"""緊急地震速報(第{4}報) | |
震 源 : {0} | |
予想震度 : {1} | |
規 模 : M{2} | |
深 さ : {3} | |
時 刻 : {5}""".format( | |
epicenter, intensity, str(magnitude), depth, report, time | |
) | |
) | |
return msg | |
# Alert Cancel | |
if jsonData.get("type") == "pga_alert_cancel": | |
msg = "### Cancel Message ###" | |
return msg | |
# 地震検知 | |
if jsonData.get("type") == "pga_alert": | |
max_pga = str(jsonData.get("max_pga")) | |
est_intensity = str(jsonData.get("estimated_intensity")) | |
region_list = str(jsonData.get("region_list")) | |
time = str(jsonData.get("time")) | |
msg = str( | |
"""揺れを検知しました | |
最大表面加速度: {0} | |
予想最大震度 : {1} | |
地 域 : {2} | |
時 刻 : {3}""" | |
).format(max_pga, est_intensity, region_list, time) | |
return msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment