Created
October 12, 2019 09:15
-
-
Save yume-yu/032ff8d9a8a210debd8fb8627205bf55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import datetime as dt | |
import os.path | |
import pickle | |
import re | |
import sys | |
import googleapiclient.errors as g_errors | |
from google.auth.transport.requests import Request | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from pytz import timezone | |
from workmanage import DrawShiftImg, Shift, Worker, Worktime | |
# If modifying these scopes, delete the file token.pickle. | |
SCOPES = [ | |
"https://www.googleapis.com/auth/calendar", | |
"https://www.googleapis.com/auth/spreadsheets", | |
] | |
CLIENT_SECRET = "./client_secret.json" | |
TOKEN_FILENAME = "token.pickle" | |
CALENDERID = "primary" | |
try: | |
CALENDERID = os.environ["CALENDERID"] | |
except KeyError: | |
pass | |
TIMEZONE = "Asia/Tokyo" | |
SHEETID = "" | |
BEFORE_OPEN_TIME = 8 | |
AFTER_CLOSE_TIME = 19 | |
class GoogleServices: | |
def __init__(self, creds): | |
self.calender = build("calendar", "v3", credentials=creds) | |
self.sheet = build("sheets", "v4", credentials=creds) | |
class ConnectGoogle: | |
@staticmethod | |
def update_token(token_name): | |
creds = None | |
if os.path.exists(token_name): | |
with open(token_name, "rb") as token: | |
creds = pickle.load(token) | |
if creds.valid: | |
print("ok, token is valid.") | |
# If there are no (valid) credentials available, let the user log in. | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET, SCOPES) | |
creds = flow.run_local_server(port=0) | |
# Save the credentials for the next run | |
with open(token_name, "wb") as token: | |
pickle.dump(creds, token) | |
print("update or create token.") | |
def connect_google(self): | |
creds = None | |
ConnectGoogle.update_token(TOKEN_FILENAME) | |
# 認証トークンがあったらそれを使う | |
if os.path.exists(TOKEN_FILENAME): | |
with open(TOKEN_FILENAME, "rb") as token: | |
creds = pickle.load(token) | |
# ないor無効なら中断 | |
if not creds or not creds.valid: | |
print("Error: no or invalid token.", file=sys.stderr) | |
return None | |
# service = build("calendar", "v3", credentials=creds) | |
service = GoogleServices(creds) | |
return service | |
def get_calenderID(self): | |
calendar_list = self.service.calender.calendarList().list().execute() | |
# print("calender name : calender id") | |
calid_list = {} | |
for calendar_entry in calendar_list["items"]: | |
calid_list.update({calendar_entry["summary"]: calendar_entry["id"]}) | |
# print("{} : {}".format(calendar_entry["summary"], calendar_entry["id"])) | |
pass | |
return calid_list | |
def __init__(self): | |
self.service = self.connect_google() | |
def convert_utc_format(self, target: dt.datetime): | |
return_value = ( | |
target.astimezone(timezone("UTC")).isoformat().replace("+00:00", "") + "Z" | |
) | |
return return_value | |
def calc_opening_hours(self, target: dt.datetime, when: str): | |
if when == "open": | |
return target + dt.timedelta(hours=(BEFORE_OPEN_TIME - target.hour)) | |
elif when == "close": | |
return target + dt.timedelta(hours=(AFTER_CLOSE_TIME - target.hour)) | |
else: | |
print("Error: 'when':{} is wrong value.".format(when), file=sys.stderr) | |
return None | |
def calc_nearly_monday(self, target: dt.datetime): | |
diff_to_monday = dt.timedelta(days=target.weekday()) | |
nearly_monday = target - diff_to_monday | |
return nearly_monday | |
def convert_event_to_worker(self, event) -> Worker: | |
if not event: | |
return None | |
worker_name: str = "" | |
requested = False | |
if re.search(r"-代行", str(event["summary"])): | |
worker_name = re.sub(r"-代行", "", str(event["summary"])) | |
requested = True | |
else: | |
worker_name = str(event["summary"]) | |
work_start = dt.datetime.fromisoformat(event["start"]["dateTime"]) | |
work_end = dt.datetime.fromisoformat(event["end"]["dateTime"]) | |
eventid = event["id"] | |
new_worker = Worker( | |
worker_name, | |
[Worktime(work_start, work_end, eventid=eventid, requested=requested)], | |
) | |
return new_worker | |
def generate_shift_aday(self, events: list): | |
""" | |
error handling | |
""" | |
if not events: | |
return None | |
day = [] | |
for event in events: | |
new_worker = self.convert_event_to_worker(event) | |
worker_index = Shift.has_worker(new_worker.name, day) | |
if worker_index is not None: | |
day[worker_index].append_worktime(new_worker.worktime[0]) | |
else: | |
day.append(new_worker) | |
weekday = Shift.WORKDAYS[day[0].worktime[0].start.weekday()] | |
day = {weekday: day} | |
return day | |
def get_day_schedule(self, date: dt.datetime = None): | |
if self.service.calender is None: | |
return None | |
if not date: | |
date = dt.datetime.now() | |
elif isinstance(date, dt.datetime): | |
# now = date | |
pass | |
before_open = self.convert_utc_format(self.calc_opening_hours(date, "open")) | |
after_close = self.convert_utc_format(self.calc_opening_hours(date, "close")) | |
events_result = ( | |
self.service.calender.events() | |
.list( | |
calendarId=CALENDERID, | |
timeMin=before_open, | |
timeMax=after_close, | |
singleEvents=True, | |
orderBy="startTime", | |
) | |
.execute() | |
) | |
events = events_result.get("items", []) | |
if not events: | |
print("No upcoming events found.") | |
return None | |
else: | |
return events | |
def delete_schedule(self, eventid): | |
self.service.calender.events().delete( | |
calendarId=CALENDERID, eventId=eventid | |
).execute() | |
def insert_schedule(self, name: str, start: dt.datetime, end: dt.datetime): | |
start = start.astimezone(timezone(TIMEZONE)).isoformat() | |
end = end.astimezone(timezone(TIMEZONE)).isoformat() | |
event = { | |
"summary": name, | |
"end": {"dateTime": end, "timeZone": TIMEZONE}, | |
"start": {"dateTime": start, "timeZone": TIMEZONE}, | |
} | |
event = ( | |
self.service.calender.events() | |
.insert(calendarId=CALENDERID, body=event) | |
.execute() | |
) | |
return self.generate_shift_aday([event]) | |
def update_schedule( | |
self, eventid: str, summary: str, start: dt.datetime, end: dt.datetime | |
): | |
start = start.astimezone(timezone(TIMEZONE)).isoformat() | |
end = end.astimezone(timezone(TIMEZONE)).isoformat() | |
event = ( | |
self.service.calender.events() | |
.get(calendarId=CALENDERID, eventId=eventid) | |
.execute() | |
) | |
event["summary"] = summary | |
event["start"]["dateTime"] = start | |
event["end"]["dateTime"] = end | |
updated_event = ( | |
self.service.calender.events() | |
.update(calendarId=CALENDERID, eventId=eventid, body=event) | |
.execute() | |
) | |
return updated_event | |
def get_schedule(self, eventid: str): | |
try: | |
target_schedule = ( | |
self.service.calender.events() | |
.get(calendarId=CALENDERID, eventId=eventid) | |
.execute() | |
) | |
except g_errors.HttpError: | |
target_schedule = None | |
return target_schedule | |
def get_day_shift(self, date: dt.datetime = None): | |
if not date: | |
date = dt.datetime.now() | |
elif isinstance(date, dt.datetime): | |
# now = date | |
pass | |
events = self.get_day_schedule(date) | |
shit_aday = self.generate_shift_aday(events) | |
return shit_aday | |
def get_week_shift(self, date: dt.datetime = None): | |
if not date: | |
date = dt.datetime.now().astimezone(timezone(TIMEZONE)) | |
elif isinstance(date, dt.datetime): | |
# now = date | |
pass | |
shift_dict = {} | |
nearly_monday = self.calc_nearly_monday(date) | |
for count in range(5): | |
aday = self.get_day_shift(nearly_monday + dt.timedelta(days=count)) | |
if aday is not None: | |
shift_dict.update(aday) | |
return shift_dict | |
if __name__ == "__main__": | |
connect = ConnectGoogle() | |
connect.get_calenderID() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
from pprint import pprint | |
from googleapiclient.discovery import build | |
# con = cg.ConnectGoogle() | |
TOKEN_FILENAME = "token.pickle" | |
creds = None | |
with open(TOKEN_FILENAME, "rb") as token: | |
creds = pickle.load(token) | |
service = build("sheets", "v4", credentials=creds) | |
request_doby = { | |
"dataFilters": [{"gridRange": {"startColumnIndex": 0, "startRowIndex": 1}}], | |
"majorDimension": "COLUMNS", | |
} | |
request = ( | |
service.spreadsheets() | |
.values() | |
.batchGetByDataFilter( | |
spreadsheetId="1iung0Vi3DNKOlb_IIV2oYya_0YjUsZaP2oBkjKekvbI", body=request_doby | |
) | |
) | |
response = request.execute() | |
pprint(response) | |
values_list = response.get("valueRanges")[0].get("valueRange").get("values") | |
id2name_dict = dict(zip(values_list[0], values_list[1])) | |
pprint(id2name_dict["U1ENCSW65"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
~/Documents/tmp/daiko-doc | |
❯ 4 | |
cd: no such entry in dir stack | |
~/Documents/tmp/daiko-doc | |
❯ 1 | |
cd: no such entry in dir stack | |
~/Documents/tmp/daiko-doc | |
❯ 2 | |
cd: no such entry in dir stack | |
~/Documents/tmp/daiko-doc | |
❯ 3 | |
cd: no such entry in dir stack | |
~/Documents/tmp/daiko-doc | |
❯ 4 | |
cd: no such entry in dir stack | |
~/Documents/tmp/daiko-doc | |
❯ cd ../../git-dir/daiko-bot | |
~/Documents/git-dir/daiko-bot forheroku* | |
❯ ls | |
Procfile id-name.db now.json run.py shift.json use_db.py | |
__pycache__ img.jpg now_a.json runtime.txt slackbot workmanage.py | |
client_secret.json interactivemessages.py package-lock.json sample.jpg static | |
connectgoogle.py main.py page.py sample.json tags | |
export.json make_memberlist.py postrest.py sample.py templates | |
file maketoken.py readme.md setup.py token.pickle | |
gorilla.txt node_modules requirements.txt sheetAPI-test.py token.pickle.old | |
~/Documents/git-dir/daiko-bot forheroku* | |
❯ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment