Skip to content

Instantly share code, notes, and snippets.

@fparaggio
Forked from ajcronk/batch-import.py
Created October 31, 2012 04:38
Show Gist options
  • Save fparaggio/3984820 to your computer and use it in GitHub Desktop.
Save fparaggio/3984820 to your computer and use it in GitHub Desktop.
Python CSV batch import example
import dateutil.parser
import optparse
from Queue import Queue
import tempodb
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1")
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret")
parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host")
parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port")
parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
in_filename = options.filename
source_file = open(in_filename)
client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure)
temperature_key = "thermostat.1.temperature"
solar_radiation_key = "thermostat.1.solar_radiation"
humidity_key = "thermostat.1.humidity"
temperature_data = []
solar_radiation_data = []
humidity_data = []
count = 0
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
for line in source_file:
timestamp, temperature, solar_radiation, humidity = line.split(',')
# grab 20 lines at a time
if count >= 20:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
temperature_data = []
solar_radiation_id = []
humidity_id = []
count = 0
input_date = dateutil.parser.parse(timestamp)
temperature_data.append(tempodb.DataPoint(input_date, float(temperature)))
solar_radiation_data.append(tempodb.DataPoint(input_date, float(solar_radiation)))
humidity_data.append(tempodb.DataPoint(input_date, float(humidity)))
count += 1
# pick up any scraps
if len(temperature_data) > 0:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
source_file.close()
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
main()
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment