Created
April 25, 2012 15:32
-
-
Save dimitri/2490675 to your computer and use it in GitHub Desktop.
PGQ queue forwarding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
This script is used to forward events from source queues to | |
destination queue. Source and destination queues must be named | |
the same. The script applies to as many source databases as | |
needed. It allows for consuming all events from a distributed | |
system (think plproxy nodes) from a single federated queue | |
installed on the destination database. | |
Setup looks like: | |
[movers] | |
job_name = queue_mover | |
src_db = dbname=postgres port=5432 user=postgres | |
dst_db = host=10.10.10.10 port=6432 user=dbuser dbname=foo | |
pidfile = /var/tmp/%(job_name)s.pid | |
logfile = /var/tmp/%(job_name)s.log | |
loop_delay = 5 | |
connection_lifetime = 30 | |
pgq_lazy_fetch = 5000 | |
fl_p16 = dbname=fl_p16 port=5432 user=fl | |
fl_p17 = dbname=fl_p17 port=5432 user=fl | |
... | |
The extra connection strings are used to connect to the local databases where to find the queues you want to forward. The main src_db is mainly used to discover those databases. | |
""" | |
import sys, os, ConfigParser | |
import pkgloader | |
pkgloader.require('skytools', '3.0') | |
import skytools, pgq, skytools.config as config | |
class QueueMover(pgq.SerialConsumer): | |
def __init__(self, dst_db, db_name, qname, log, args): | |
self.service_name = "mover %s.%s" % (db_name, qname) | |
self.dbname = db_name # keep a copy for logs output | |
self.db_name = db_name | |
self.dst_db = dst_db | |
self.queue_name = qname | |
self.batches = 0 # support for --mark-- info lines | |
self.empty = 0 | |
self.load_config() | |
pgq.SerialConsumer.__init__(self, "queue_mover", "src_db", "dst_db", args) | |
self.dst_queue_name = self.queue_name | |
# force non daemon mode, whatever the command line says | |
self.set_single_loop(True) | |
self.go_daemon = 0 | |
self.log = log | |
def load_config(self): | |
connstr = "dbname=%s user=fl" % self.dbname | |
override = { | |
"pidfile": '', | |
"src_db": connstr, | |
"dst_db": self.dst_db, | |
"queue_name": self.queue_name, | |
"dst_queue_name": self.queue_name | |
} | |
self.cf = config.Config(self.service_name, None, override = override) | |
def process_remote_batch(self, db, batch_id, ev_list, dst_db): | |
self.batches += 1 | |
# load data | |
rows = [] | |
for ev in ev_list: | |
data = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time] | |
rows.append(data) | |
fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time'] | |
self.log.info("%9s %-15s batch: %d count: %d" % | |
(self.dbname, self.queue_name, batch_id, len(rows))) | |
# insert data | |
curs = dst_db.cursor() | |
pgq.bulk_insert_events(curs, rows, fields, self.dst_queue_name) | |
def work(self): | |
self.empty += 1 | |
sleep = pgq.SerialConsumer.work(self) | |
if self.batches: | |
self.empty = 0 | |
else: | |
self.log.debug("%9s %-15s saw %d consecutive empty batches" % | |
(self.dbname, self.queue_name, self.empty)) | |
# we got called to work but didn't enter process_remote_batch | |
if self.empty > 10: | |
self.log.info("%9s %-15s --mark--" % | |
(self.dbname, self.queue_name)) | |
self.empty = 0 | |
return sleep | |
class Movers(skytools.DBScript): | |
def __init__(self, args): | |
skytools.DBScript.__init__(self, "movers", args) | |
self.load_config() | |
self.db_name = self.cf.get("src_db") | |
self.dst_db = self.cf.get("dst_db") | |
self.queue_movers = [] | |
self.args = args | |
def init_optparse(self, parser = None): | |
p = skytools.DBScript.init_optparse(self, parser) | |
p.add_option('--list-databases', action='store_true', dest="listdb", | |
help = 'list local databases') | |
p.add_option('--register', action='store_true', | |
help = 'register consumer on queue') | |
p.add_option('--unregister', action='store_true', | |
help = 'unregister consumer from queue') | |
return p | |
def init_workers(self): | |
""" create the QueueMover objects """ | |
db = self.get_database("src_db") | |
curs = db.cursor() | |
curs.execute("select datname from pg_database where datname ~ 'fl_p' order by datname") | |
for part in curs.fetchall(): | |
datname = part['datname'] | |
self.log.info("adding movers for %s" % datname) | |
pdb = self.get_database(datname) | |
pcurs = pdb.cursor() | |
pcurs.execute("select queue_name from pgq.queue order by queue_name") | |
for q in pcurs.fetchall(): | |
qname = q['queue_name'] | |
qm = QueueMover(self.dst_db, datname, qname, self.log, self.args) | |
self.queue_movers.append(qm) | |
def startup(self): | |
"""Handle commands here. __init__ does not have error logging.""" | |
if self.options.listdb: | |
self.list_databases() | |
sys.exit(0) | |
if self.options.register: | |
self.register_consumer() | |
sys.exit(0) | |
if self.options.unregister: | |
self.unregister_consumer() | |
sys.exit(0) | |
# startup code is a good place to init our workers | |
self.init_workers() | |
return skytools.DBScript.startup(self) | |
def list_databases(self): | |
""" discover local databases """ | |
db = self.get_database("src_db") | |
curs = db.cursor() | |
curs.execute("select datname from pg_database where datname ~ 'fl_p' order by datname") | |
for part in curs.fetchall(): | |
datname = part['datname'] | |
print "%s = dbname=%s port=5432 user=fl" % (datname, datname) | |
def register_consumer(self): | |
self.init_workers() | |
for qm in self.queue_movers: | |
self.log.info("register consumer %s for queue %s in database %s" % | |
(qm.service_name, qm.queue_name, qm.dbname)) | |
qm.register_consumer() | |
def unregister_consumer(self): | |
self.init_workers() | |
for qm in self.queue_movers: | |
self.log.info("unregister consumer %s for queue %s in database %s" % | |
(qm.service_name, qm.queue_name, qm.dbname)) | |
qm.unregister_consumer() | |
def work(self): | |
sleep = 0 | |
for qm in self.queue_movers: | |
try: | |
sleep += qm.work() | |
except Exception, e: | |
print e | |
sys.exit(1) | |
self.log.debug("work is done, sleep = %d" % sleep) | |
return sleep | |
def start(self): | |
self.log.info("Starting process") | |
skytools.DBScript.start(self) | |
if __name__ == '__main__': | |
script = Movers(sys.argv[1:]) | |
script.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment