Created
January 29, 2021 02:05
-
-
Save nabilm/91b0e308f2d1d944a7c2330bda6825cf to your computer and use it in GitHub Desktop.
A less memory consuming dofn beam module based on bigquery load from gcs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Can be called like this: | |
emails_list = [ | |
('amy', '[email protected]'), | |
('carl', '[email protected]'), | |
('julia', '[email protected]'), | |
('carl', '[email protected]'), | |
] | |
| "create" >> beam.Create(emails_list) | |
| "write to google storage" >> WriteToText("gs://<bucket>/bq_load/file") | |
| "load to bigquery" >> beam.ParDo(LoadJsonToBigquery(table_spec=table_spec, schema=table_schema)) | |
""" | |
import logging | |
import apache_beam as beam | |
from google.cloud import bigquery | |
from google.cloud.exceptions import NotFound | |
_LOGGER = logging.getLogger(__name__) | |
class LoadJsonToBigquery(beam.DoFn): | |
""" | |
This transform is created to substitute the current beam | |
bigquery writer which use more memory | |
this one now operate with json files and it lacks streaming | |
currently it support the same beam format for schema and | |
tablespec | |
TODO: | |
paritioned Table | |
CSV support | |
more flexiblity | |
""" | |
def __init__(self, table_spec: str, schema: dict, dataset_location: str = "US"): | |
# dataset location setting | |
self._dataset_location = dataset_location | |
# adjust beam format to fit google client format | |
self._table_spec = table_spec.replace(":", ".") | |
# adjust beam schema to fit google client schema | |
self._schema = [ | |
bigquery.SchemaField(*(item["name"], item["type"], item["mode"])) | |
for item in schema["fields"] | |
] | |
def start_bundle(self): | |
self._client = bigquery.Client() | |
def process(self, record: str): | |
""" | |
Get google storage uri's created and loaded to bigquery | |
based on a schema | |
""" | |
# check if the table exist and create it if doesn't exist | |
try: | |
self._client.get_table(self._table_spec) # Make an API request. | |
_LOGGER.info("Table {} already exists.".format(self._table_spec)) | |
except NotFound: | |
_LOGGER.info("Table {} is not found. creating it".format(self._table_spec)) | |
table = bigquery.Table(self._table_id, schema=self._schema) | |
table = self._client.create_table(table) # Make an API request | |
job_config = bigquery.LoadJobConfig( | |
schema=self._schema, | |
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, | |
) | |
# Load api request | |
_LOGGER.info("Initiate bigquery load job") | |
load_job = self._client.load_table_from_uri( | |
record, | |
self._table_spec, | |
location=self._dataset_location, # Must match the destination dataset location. | |
job_config=job_config, | |
) | |
load_job.result() # Waits for the job to complete. | |
dest_table = self._client.get_table(self._table_spec) | |
num_of_rows = dest_table.num_rows | |
_LOGGER.info("{} enteries written".format(num_of_rows)) | |
return [num_of_rows] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment