This gist is a detailed walkthrough on how to deploy python Dataflow pipelines in GCP to run without external IPs. Full code samples are available below.
This walkthrough assumes you have a already authenticated with gcloud login commands and have the appropriate IAM privileges to execute these operations.
Since we are planning to use no external IPs on our dataflow worker nodes, we must package up all our application dependencies for an offline deployment. I highly recommend using a virtual environment as your global dependencies will be much more than your single application will require.
Dump your application dependencies into a single file.
$ pip freeze > requirements.txt
Transfer your requirements.txt file contents to a setup.py file.
import setuptools
setuptools.setup(
name='package-import',
version='0.0.1',
install_requires=[
'apache-beam[gcp]==2.22.0',
'future==0.18.2',
'ffmpeg==1.4',
'pytz==2020.1',
'python-dateutil==2.8.1',
'six==1.15.0',
'numpy==1.18.5',
],
packages=setuptools.find_packages(),
)
We will now build a source distribution containing our required pypi application dependencies packed into a tar file, which has been compressed with gzip to save space. More information on sdist can be found here. NOTE: this same process will also work to package non pypi dependencies as well if included in the setup.py file.
$ python setup.py sdist --formats=gztar
This will create a dist/ directory which the tarball.
$ cd dist/
$ ls
package-import-0.0.1.tar.gz
Create a GCS bucket that will store your tar file.
$ gsutil mb gs://<globally_unique_bucket_name>
Upload the tarball into the GCS bucket.
$ gsutil cp package-import-0.0.1.tar.gz gs://<globally_unique_bucket_name>
Verify object has been uploaded.
$ gsutil ls gs://<globally_unique_bucket_name>
These functions will aid in the "offline" installation of our tarball pypyi packages pulled from GCS bucket to the Dataflow worker nodes.
def run_command(self, description, command):
import subprocess
logger = logging
try:
status = subprocess.call(command)
except Exception as e:
raise Exception(description + ' caught exception: ' + str(e))
if status == 0:
logger.debug(description + ': `' + ' '.join(command) +
'` completed successfully')
return status
else:
raise Exception(description + ' failed with signal ' +
str(status))
def start_bundle(self):
logger = logging
logger.debug('start_bundle firing')
try:
update_status = self.run_command('apt-get update',
['sudo', 'apt-get', 'update'])
gcs_status = self.run_command('pull from gcs bucket pypi packages',
['gsutil', 'cp', 'gs://<replace>/package-import-0.0.1.tar.gz', '.'])
pip_status = self.run_command('pip install',
['pip', 'install', '--user', 'package-import-0.0.1.tar.gz'])
except Exception as e:
raise e
if update_status == 0 and \
pip_status == 0 and \
gcs_status == 0:
logger.debug('start_bundle completed successfully')
return True
else:
return False
The sample pipeline used in this example is an extension of the wordcount example from the Apache Beam github located here.
Be sure that your subnetwork has Private Access enabled or the worker nodes wont be able to reach GCS and other GCP APIs without external IP addresses.
$ python short_pipeline.py --runner DataflowRunner --output_topic projects/<projectID>/topics/<topicName> --input_topic projects/<projectID>/topics/<topicName> input_subscription projects/<projectID>/sunscriptions/<subName> --project <projectID> --region us-east1 --temp_location gs://<globally_unique_bucket_name> --staging_location gs://<globally_unique_bucket_name> --no_use_public_ips --network dataflow --subnetwork regions/us-east1/subnetworks/dataflow