Skip to content

Instantly share code, notes, and snippets.

@maneeshdisodia
Forked from elavenrac/Offline-Dataflow.md
Created June 29, 2022 07:01
Show Gist options
  • Save maneeshdisodia/a32cee7666e2c6c438731b320f1d5204 to your computer and use it in GitHub Desktop.
Save maneeshdisodia/a32cee7666e2c6c438731b320f1d5204 to your computer and use it in GitHub Desktop.
GCP Dataflow processing with no external IPs

GCP Dataflow Pipelines

This gist is a detailed walkthrough on how to deploy python Dataflow pipelines in GCP to run without external IPs. Full code samples are available below.

This walkthrough assumes you have a already authenticated with gcloud login commands and have the appropriate IAM privileges to execute these operations.

Step 1 - Gather application dependencies

Since we are planning to use no external IPs on our dataflow worker nodes, we must package up all our application dependencies for an offline deployment. I highly recommend using a virtual environment as your global dependencies will be much more than your single application will require.

Dump your application dependencies into a single file.

$ pip freeze > requirements.txt 

Transfer your requirements.txt file contents to a setup.py file.

import setuptools

setuptools.setup(
        name='package-import',
        version='0.0.1',
        install_requires=[
            'apache-beam[gcp]==2.22.0',
            'future==0.18.2',
            'ffmpeg==1.4',
            'pytz==2020.1',
            'python-dateutil==2.8.1',
            'six==1.15.0',
            'numpy==1.18.5',
            ],
        packages=setuptools.find_packages(),
        )

Step 2 - Build application dependencies for deployment

We will now build a source distribution containing our required pypi application dependencies packed into a tar file, which has been compressed with gzip to save space. More information on sdist can be found here. NOTE: this same process will also work to package non pypi dependencies as well if included in the setup.py file.

$ python setup.py sdist --formats=gztar

This will create a dist/ directory which the tarball.

$ cd dist/
$ ls
package-import-0.0.1.tar.gz

Step 3 - Upload application dependencies to Google Cloud Storage (GCS)

Create a GCS bucket that will store your tar file.

$ gsutil mb gs://<globally_unique_bucket_name>

Upload the tarball into the GCS bucket.

$ gsutil cp package-import-0.0.1.tar.gz gs://<globally_unique_bucket_name>

Verify object has been uploaded.

$ gsutil ls gs://<globally_unique_bucket_name>

Step 4 - Add two functions to your existing pipeline code

These functions will aid in the "offline" installation of our tarball pypyi packages pulled from GCS bucket to the Dataflow worker nodes.

def run_command(self, description, command):
   import subprocess
   logger = logging

   try:
     status = subprocess.call(command)
   except Exception as e:
     raise Exception(description + ' caught exception: ' + str(e))
   if status == 0:
     logger.debug(description + ': `' + ' '.join(command) +
                       '` completed successfully')
     return status
   else:
     raise Exception(description + ' failed with signal ' +
                     str(status))


def start_bundle(self):
   logger = logging
   logger.debug('start_bundle firing')

   try:
     update_status = self.run_command('apt-get update',
                                       ['sudo', 'apt-get', 'update'])
     gcs_status = self.run_command('pull from gcs bucket pypi packages',
                                       ['gsutil', 'cp', 'gs://<replace>/package-import-0.0.1.tar.gz', '.'])
     pip_status = self.run_command('pip install',
                                   ['pip', 'install', '--user', 'package-import-0.0.1.tar.gz'])

   except Exception as e:
     raise e

   if update_status == 0 and \
         pip_status == 0 and \
         gcs_status == 0:
     logger.debug('start_bundle completed successfully')
     return True
   else:
     return False

Step 5 - Deploy your pipeline code to Dataflow

The sample pipeline used in this example is an extension of the wordcount example from the Apache Beam github located here.

Be sure that your subnetwork has Private Access enabled or the worker nodes wont be able to reach GCS and other GCP APIs without external IP addresses.

$ python short_pipeline.py --runner DataflowRunner  --output_topic projects/<projectID>/topics/<topicName> --input_topic projects/<projectID>/topics/<topicName> input_subscription projects/<projectID>/sunscriptions/<subName> --project <projectID> --region us-east1 --temp_location gs://<globally_unique_bucket_name> --staging_location gs://<globally_unique_bucket_name> --no_use_public_ips --network dataflow --subnetwork regions/us-east1/subnetworks/dataflow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment