Created
August 27, 2015 18:02
-
-
Save cbare/246c2b6a10fb7d4db77b to your computer and use it in GitHub Desktop.
An example of how to use Synapse Tables to store intermediate results in a feature extraction pipeline.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ##========================================================== | |
| ## Tables Pipeline Example | |
| ## | |
| ## An example of using Synapse tables to store raw data and | |
| ## intermediate steps to extract features for downstream | |
| ## analysis | |
| ##========================================================== | |
| import synapseclient | |
| from synapseclient import Activity | |
| from synapseclient import Entity, Project, Folder, File | |
| from synapseclient import Evaluation, Submission, SubmissionStatus | |
| from synapseclient import Schema, Column, Table, Row, RowSet | |
| import pandas as pd | |
| import uuid | |
| import os | |
| import random | |
| import tempfile | |
| ## get a logged-in connection to Synapse | |
| syn = synapseclient.Synapse() | |
| syn.login() | |
| ##---------------------------------------------------------- | |
| ## project: Table_pipeline_demo (syn4908630) | |
| ##---------------------------------------------------------- | |
| project = syn.get('syn4908630') | |
| ##---------------------------------------------------------- | |
| ## Create Table 1 - raw files | |
| ##---------------------------------------------------------- | |
| cols = [ | |
| Column(name='id', columnType='STRING', maximumSize='36'), | |
| Column(name='otherid', columnType='STRING', maximumSize='36'), | |
| Column(name='junk', columnType='STRING', maximumSize='100'), | |
| Column(name='rawdata', columnType='FILEHANDLEID')] | |
| schema1 = syn.store(Schema(name='Raw Data', columns=cols, parent=project)) | |
| ## grab a handful of image files to be used as an example | |
| image_dir = '/Users/chris/Documents/graphics' | |
| rows = [] | |
| i = 0 | |
| for filename in os.listdir(image_dir): | |
| path = os.path.join(image_dir, filename) | |
| if ( filename.endswith('.jpg') or filename.endswith('.png') ) and os.path.getsize(path) < 1024*1024: | |
| fileHandle = syn._chunkedUploadFile(path) | |
| rows.append(Row([ | |
| str(uuid.uuid4()), | |
| str(uuid.uuid4()), | |
| ''.join(chr(random.randint(ord('a'), ord('z'))) for i in range(25)), | |
| fileHandle['id']])) | |
| i += 1 | |
| if i >= 20: break | |
| ## write the image files to Synapse as a Rowset object | |
| ## (Alternatives are using CSV files as is done below or Pandas DataFrames.) | |
| row_reference_set = syn.store(RowSet(columns=cols, schema=schema1, rows=rows)) | |
| ##---------------------------------------------------------- | |
| ## Create Table 2 - processed files | |
| ##---------------------------------------------------------- | |
| ## which columns to we want to carry over from the source table | |
| cols_to_copy = ['id', 'otherid'] | |
| ## pretend this is running separately and query the raw table from the previous step | |
| results = syn.tableQuery('select * from %s' % schema1.id) | |
| ## map resulting column names to their indices | |
| col_indices = {hdr['name']:i for i,hdr in enumerate(results.headers)} | |
| ## function to create a thumbnail of the given image | |
| from PIL import Image | |
| def thumbnail(filepath, outdir, size=(300,300)): | |
| basename, _ = os.path.splitext(os.path.basename(filepath)) | |
| outfile = os.path.join(outdir,basename+".thumbnail.png") | |
| image = Image.open(filepath) | |
| image.thumbnail(size, Image.ANTIALIAS) | |
| image.save(outfile,"PNG") | |
| return outfile | |
| ## process the files | |
| outdir = tempfile.mkdtemp() | |
| out_table = os.path.join(outdir,"processed_files_table.csv") | |
| with open(out_table,'w') as f: | |
| ## write csv header | |
| f.write(','.join(cols_to_copy + ['thumbnail']) + '\n') | |
| for row in results: | |
| file_info = syn.downloadTableFile(schema1, column='rawdata', rowId=row[col_indices['ROW_ID']], versionNumber=row[col_indices['ROW_VERSION']]) | |
| outfile = thumbnail(file_info['path'], outdir) | |
| file_handle = syn._chunkedUploadFile(outfile) | |
| fields = [row[col_indices[col]] for col in cols_to_copy] | |
| fields.append(file_handle['id']) | |
| f.write(','.join(fields) + '\n') | |
| ## columns in table 2 | |
| cols =[col for col in results.headers if col['name'] in cols_to_copy] | |
| cols.append(Column(name='thumbnail', columnType='FILEHANDLEID')) | |
| ## write to Synapse table | |
| schema2 = syn.store(Schema(name='thumbnails', columns=cols, parent=project)) | |
| table2 = syn.store(Table(schema2, out_table)) | |
| ##---------------------------------------------------------- | |
| ## Create Table 3 - features | |
| ##---------------------------------------------------------- | |
| ## query the results of the previous step | |
| results = syn.tableQuery('select * from %s' % schema2.id) | |
| ## map resulting column names to their indices | |
| col_indices = {hdr['name']:i for i,hdr in enumerate(results.headers)} | |
| ## a function for generating fake features | |
| def generate_features(n): | |
| return [random.random() for i in range(n)] | |
| ## write output feature table to a csv file | |
| outdir = tempfile.mkdtemp() | |
| out_table = os.path.join(outdir, "feature_table.csv") | |
| n = 5 # number of features | |
| with open(out_table,'w') as f: | |
| ## write csv header | |
| f.write(','.join(cols_to_copy + ['feature_'+str(i+1) for i in range(n)]) + '\n') | |
| for row in results: | |
| fields = [row[col_indices[col]] for col in cols_to_copy] | |
| fields.extend('%f'%feature for feature in generate_features(n)) | |
| f.write(','.join(fields) + '\n') | |
| ## columns in table 3 | |
| cols =[col for col in results.headers if col['name'] in cols_to_copy] | |
| for i in range(n): | |
| cols.append( Column(name='feature_'+str(i+1), columnType='DOUBLE') ) | |
| ## write to Synapse table | |
| schema3 = syn.store(Schema(name='features', columns=cols, parent=project)) | |
| table3 = syn.store(Table(schema3, out_table)) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See the Table_pipeline_demo project on Synapse.