-
-
Save iandanforth/4172099 to your computer and use it in GitHub Desktop.
Python CSV batch import example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dateutil.parser | |
import optparse | |
import csv | |
import tempodb | |
from threading import Thread | |
from Queue import Queue | |
class Worker(Thread): | |
"""Thread executing tasks from a given tasks queue""" | |
def __init__(self, tasks): | |
Thread.__init__(self) | |
self.tasks = tasks | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
func, args, kargs = self.tasks.get() | |
try: func(*args, **kargs) | |
except Exception, e: print e | |
self.tasks.task_done() | |
class ThreadPool: | |
"""Pool of threads consuming tasks from a queue""" | |
def __init__(self, num_threads): | |
self.tasks = Queue(num_threads) | |
for _ in range(num_threads): Worker(self.tasks) | |
def add_task(self, func, *args, **kargs): | |
"""Add a task to the queue""" | |
self.tasks.put((func, args, kargs)) | |
def wait_completion(self): | |
"""Wait for completion of all the tasks in the queue""" | |
self.tasks.join() | |
def main(): | |
# This script assumes that the input file is sorted by key | |
parser = optparse.OptionParser(usage="usage: %prog [options] filename", | |
version="%prog 0.1") | |
parser.add_option("-i", "--input", | |
dest="filename", | |
help="read data from FILENAME") | |
parser.add_option("-k", "--key", dest="key", help="tempodb database key") | |
parser.add_option("-s", "--secret", | |
dest="secret", | |
help="tempodb database secret") | |
parser.add_option("-H", "--host", | |
dest="host", | |
default="api.tempo-db.com", | |
help="tempodb host") | |
parser.add_option("-P", "--port", | |
dest="port", | |
default=443, | |
help="tempodb port") | |
parser.add_option("-S", | |
"--secure", | |
action="store_true", | |
dest="secure", | |
default=True, | |
help="tempodb secure") | |
(options, args) = parser.parse_args() | |
if not options.filename: | |
parser.error("Enter a file to read from.") | |
########################################################################## | |
# Create a connection to TempoDB | |
client = tempodb.Client(options.key, | |
options.secret, | |
options.host, | |
int(options.port), | |
options.secure) | |
# Init a Thread pool with the desired number of threads | |
pool = ThreadPool(3) | |
# Define how many records to send in each batch | |
batch_size = 60 | |
# Begin parsing and enqueing data | |
in_filename = options.filename | |
with open(in_filename) as source_file: | |
reader = csv.reader(source_file) | |
# Discover how many series exist in this file | |
line_one = reader.next() | |
column_count = len(line_one) | |
# Reset to the beginning of the file for futher reading | |
source_file.seek(0) | |
series = {} | |
for index in range(column_count): | |
series['series.' + str(index)] = [] | |
# Process each line | |
for line in reader: | |
for i, value in enumerate(line): | |
if i == 0: | |
# Pull out and store the timestamp for this line and move on | |
input_date = dateutil.parser.parse(value) | |
continue | |
else: | |
# Regenerate our series name | |
series_name = 'series.'+ str(i) | |
if len(series[series_name]) >= batch_size: | |
# Send to TempoDB when we have some lines and flush the buffer | |
pool.add_task(client.write_key, series_name, series[series_name]) | |
series[series_name] = [] | |
else: | |
# Add this new value to the relevant series | |
series[series_name].append(tempodb.DataPoint(input_date, float(value))) | |
# Pick up any scraps | |
for series_name, data in series.iteritems(): | |
if len(data) > 0: | |
pool.add_task(client.write_key, series_name, data) | |
# Wait for completion | |
pool.wait_completion() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45 | |
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87 | |
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23 | |
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01 | |
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment