Last active
September 16, 2022 21:09
-
-
Save haizaar/63b941ec747f71d076494847fef49317 to your computer and use it in GitHub Desktop.
APIConnector for thread-safe google api python client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from dataclasses import dataclass, field | |
from typing import Any, Callable, Hashable, List, Optional | |
import google.auth | |
import httplib2 | |
import structlog | |
from google_auth_httplib2 import AuthorizedHttp | |
from googleapiclient.http import HttpRequest | |
logger = structlog.get_logger(__name__) | |
@dataclass | |
class MemCache: | |
data: dict[Hashable, Any] = field(default_factory=dict) | |
def get(self, key: Hashable) -> Any: | |
if hit := self.data.get(key, None): | |
logger.debug("Cache hit", key=key) | |
return hit | |
def set(self, key: Hashable, data: Any) -> None: | |
self.data[key] = data | |
def delete(self, key): | |
try: | |
del self.data[1] | |
except KeyError: | |
pass | |
@dataclass | |
class APIConnector: | |
""" | |
This class is a thread-safe wrapper around HttpRequest.execute() method. | |
It uses a pool of AuthorizedHttp objects and makes sure there is | |
only one in-flight request for each. | |
""" | |
factory: Callable[[], AuthorizedHttp] | |
pool: List[AuthorizedHttp] = field(default_factory=[]) | |
@classmethod | |
def new( | |
cls, | |
credentials: google.auth.Credentials, | |
initial_size: int = 5, | |
timeout_seconds: int = 3, | |
cache: Optional[MemCache] = None, | |
) -> APIConnector: | |
factory = lambda: AuthorizedHttp( | |
credentials, http=httplib2.Http(timeout=timeout_seconds, cache=cache) | |
) | |
pool: List[AuthorizedHttp] = [] | |
for i in range(initial_size): | |
pool.append(factory()) | |
return cls(factory, pool=pool) | |
def execute(self, request: HttpRequest) -> Any: | |
http: Optional[AuthorizedHttp] = None | |
try: | |
http = self._provision_http() | |
return request.execute(http=http) | |
finally: | |
if http: | |
self.pool.append(http) | |
def _provision_http(self) -> AuthorizedHttp: | |
# This function can run in parallel in multiple threads. | |
try: | |
return self.pool.pop() | |
except IndexError: | |
logger.info("Transport pool exhausted. Creating new transport") | |
return self.factory() | |
def close(self) -> None: | |
for ahttp in self.pool: | |
ahttp.http.close() | |
def __del__(self) -> None: | |
self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey man, Im trying to use your code for something.
I have created a APICONNECTOR.py class and GSuiteUserManager.py class.
My question is, how am I making this function, to use the new code.
def connect_logic(config: dict[str]) -> googleapiclient.discovery.Resource:
logging.info("Connecting to google drive...")
# The token.json section of the config stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
credentials: Credentials = Credentials.from_authorized_user_info(
info=config[GD_CONFIG][TOKEN_CONFIG], scopes=config[GD_CONFIG][SCOPES])
service: googleapiclient.discovery.Resource = build("drive", "v3", credentials=credentials)
logging.info("Successfully logged into google drive")
return service