Skip to content

Instantly share code, notes, and snippets.

@nabilm
Created January 29, 2021 02:05
Show Gist options
  • Save nabilm/91b0e308f2d1d944a7c2330bda6825cf to your computer and use it in GitHub Desktop.
Save nabilm/91b0e308f2d1d944a7c2330bda6825cf to your computer and use it in GitHub Desktop.
A less memory consuming dofn beam module based on bigquery load from gcs
"""
Can be called like this:
emails_list = [
('amy', '[email protected]'),
('carl', '[email protected]'),
('julia', '[email protected]'),
('carl', '[email protected]'),
]
| "create" >> beam.Create(emails_list)
| "write to google storage" >> WriteToText("gs://<bucket>/bq_load/file")
| "load to bigquery" >> beam.ParDo(LoadJsonToBigquery(table_spec=table_spec, schema=table_schema))
"""
import logging
import apache_beam as beam
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
_LOGGER = logging.getLogger(__name__)
class LoadJsonToBigquery(beam.DoFn):
"""
This transform is created to substitute the current beam
bigquery writer which use more memory
this one now operate with json files and it lacks streaming
currently it support the same beam format for schema and
tablespec
TODO:
paritioned Table
CSV support
more flexiblity
"""
def __init__(self, table_spec: str, schema: dict, dataset_location: str = "US"):
# dataset location setting
self._dataset_location = dataset_location
# adjust beam format to fit google client format
self._table_spec = table_spec.replace(":", ".")
# adjust beam schema to fit google client schema
self._schema = [
bigquery.SchemaField(*(item["name"], item["type"], item["mode"]))
for item in schema["fields"]
]
def start_bundle(self):
self._client = bigquery.Client()
def process(self, record: str):
"""
Get google storage uri's created and loaded to bigquery
based on a schema
"""
# check if the table exist and create it if doesn't exist
try:
self._client.get_table(self._table_spec) # Make an API request.
_LOGGER.info("Table {} already exists.".format(self._table_spec))
except NotFound:
_LOGGER.info("Table {} is not found. creating it".format(self._table_spec))
table = bigquery.Table(self._table_id, schema=self._schema)
table = self._client.create_table(table) # Make an API request
job_config = bigquery.LoadJobConfig(
schema=self._schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
# Load api request
_LOGGER.info("Initiate bigquery load job")
load_job = self._client.load_table_from_uri(
record,
self._table_spec,
location=self._dataset_location, # Must match the destination dataset location.
job_config=job_config,
)
load_job.result() # Waits for the job to complete.
dest_table = self._client.get_table(self._table_spec)
num_of_rows = dest_table.num_rows
_LOGGER.info("{} enteries written".format(num_of_rows))
return [num_of_rows]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment