Skip to content

Instantly share code, notes, and snippets.

@azechi
Last active March 28, 2023 01:01
Show Gist options
  • Save azechi/2884a73698b1356b2c770b3d2e189f8f to your computer and use it in GitHub Desktop.
Save azechi/2884a73698b1356b2c770b3d2e189f8f to your computer and use it in GitHub Desktop.
google ドライブの共有フォルダをAPIで作成する azechi/azechi#3
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "45f413b8-8b8b-4656-87c3-9e41c66ebe3a",
"metadata": {},
"outputs": [],
"source": [
"from google.oauth2.credentials import Credentials\n",
"\n",
"credential = None\n",
"try:\n",
" credential = Credentials.from_authorized_user_file(\"token.json\")\n",
"except FileNotFoundError:\n",
" pass\n",
"\n",
"if credential is None:\n",
" SCOPE = 'https://www.googleapis.com/auth/drive.file'\n",
" import json\n",
" with open(\"client_secret.json\") as f:\n",
" client = {k:v for k, v in json.load(f).items() if k in [\"auth_uri\", \"token_uri\", \"client_id\", \"client_secret\"]}\n",
" %run oauth2_authz_code_grant.py\n",
" grant_response = await oauth2_authz_code_grant(SCOPE, **client)\n",
" credential = Credentials.from_authorized_user_info(client | grant_response)\n",
" # save credential\n",
" s = credential.to_json([\"expiry\", \"access_token\"])\n",
" with open(\"token.json\", \"w\") as f:\n",
" f.write(s)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e54d3da0-aca5-4cdc-beb5-c2d110bbbf0f",
"metadata": {},
"outputs": [],
"source": [
"from googleapiclient.discovery import build\n",
"\n",
"service = build('drive', 'v3', credentials=credential)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8d42e84-fd79-403d-a23c-45de0200065f",
"metadata": {},
"outputs": [],
"source": [
"file_metadata = {\n",
" 'name': '共有したいフォルダ',\n",
" 'mimeType': 'application/vnd.google-apps.folder'\n",
"}\n",
"\n",
"file = service.files().create(body=file_metadata, fields='id,webViewLink').execute()\n",
"print(F'Folder ID: \"{file.get(\"id\")}\".')\n",
"file_id = file.get('id')\n",
"\n",
"service.permissions().list(fileId=file_id).execute()\n",
"\n",
"anyone_reader = {\n",
" 'type': 'anyone',\n",
" 'role': 'reader'\n",
"} \n",
"# permissionID='anyouneWithLink'で用意されてるっぽい\n",
"\n",
"print(service.permissions().create(fileId=file_id, body=anyone_reader, fields='').execute())\n",
"print(file.get('webViewLink'))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5703d5e9-7ddd-45d5-8ba7-429de5dc4d8f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
async def oauth2_authz_code_grant(scope, client_id, client_secret, auth_uri, token_uri, redirect_port=0, timeout=60):
code, redirect_uri = await get_authorization_code(scope, client_id, auth_uri, redirect_port, timeout)
return get_token(code, redirect_uri, client_id, client_secret, token_uri)
async def get_authorization_code(scope, client_id, auth_uri, port=0, timeout=60):
from asyncio import start_server, wait_for, Event, TimeoutError
from urllib.parse import urlencode, urlparse, parse_qs
event = Event()
code: str
async def handler(reader, writer):
nonlocal code
data = await reader.readline()
data = data.decode()
code = parse_qs(urlparse(data.split()[1]).query)["code"][0]
writer.write(b"HTTP/1.1 200 OK\nContent-Length: 0\n\n")
await writer.drain()
writer.close()
await writer.wait_closed()
event.set()
redirect_uri: str
async with await start_server(handler, '0.0.0.0', port) as srv:
_, port = srv.sockets[0].getsockname()
redirect_uri = f"http://localhost:{port}"
params = urlencode({
"client_id": client_id,
"scope": scope,
"response_type": "code",
"redirect_uri": redirect_uri
})
print(f"{auth_uri}?{params}")
from IPython.display import clear_output
clear_output(True)
try:
await wait_for(event.wait(), timeout=timeout)
except TimeoutError:
print("timeout")
code = None
print("server closed")
return code, redirect_uri
def get_token(code, redirect_uri, client_id, client_secret, token_uri):
from urllib.parse import urlencode
from urllib.request import urlopen, Request
from json import loads
params = urlencode({
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri
})
req = Request(token_uri, params.encode('ascii'))
with urlopen(req) as res:
return loads(res.read())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment