Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Last active December 24, 2015 04:59
Show Gist options
  • Save mnunberg/6747647 to your computer and use it in GitHub Desktop.
Save mnunberg/6747647 to your computer and use it in GitHub Desktop.
txcouchbase
#
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This file contains the stub Async implementation.
This module is prefixed by an underscore and thus is not public API,
meaning the interface may change. Its presence is here primarily to
expose potential integrators to the mechanisms by which the library
may be extended to support other async frameworks
"""
import couchbase._bootstrap
import couchbase._libcouchbase
from couchbase._libcouchbase import AsyncResult, PYCBC_CONN_F_ASYNC
from couchbase.connection import Connection
from couchbase.views.iterator import View as _CBView
from couchbase.exceptions import CouchbaseError
class Async(Connection):
def __init__(self, iops=None, **kwargs):
"""
Create a new 'Async' connection. An async connection is an object
which functions like a normal synchronous connection, except that it
returns future objects (i.e. :class:`couchbase.results.AsyncResult`
objects) instead of :class:`couchbase.results.Result`.
These objects are actually :class:`couchbase.results.MultiResult`
objects which are empty upon retun. As operations complete, this
object becomes populated with the relevant data.
Note that the 'AsyncResult' object must currently have a valid
'callback' and 'errback' object set *after* they are returned from
the API methods. If this is not the case then an exception will be
raised when the callbacks are about to arrive. This behavior is the
primary reason why this interface isn't public, too :)
"""
if not iops:
raise ValueError("Must have IOPS")
kwargs.setdefault('_flags', 0)
# Must have an IOPS implementation
kwargs['_iops'] = iops
# Flags should be async
kwargs['_flags'] |= PYCBC_CONN_F_ASYNC
# Don't lock/unlock GIL as the enter/leave points are not coordinated
#kwargs['unlock_gil'] = False
super(Async, self).__init__(**kwargs)
def _ctor_do_connect(self):
# Don't connect on init
pass
class AsyncViewBase(_CBView):
def __init__(self, *args, **kwargs):
"""
Initialize a new AsyncViewBase object. This is intended to be
subclassed in order to implement the require methods to be
invoked on error, data, and row events
"""
kwargs['streaming'] = True
super(View, self).__init__(*args, **kwargs)
def __iter__(self):
raise NotImplementedError("Iteration not supported on async view")
def on_error(self, ex):
"""
Called when there is a failure with the response data
:param Exception ex: The exception caught
"""
raise ex
def on_rows(self, rowiter):
"""
Called when there are more processed views.
:param iterable rowiter: An iterable which will yield results
as defined by the :class:`RowProcessor` implementation
"""
raise NotImplementedError()
def on_data(self, data):
"""
Called for any non-view-row HTTP data
:param list data: Raw HTTP data received from the request, in a list
form. The type of elements in the list will be either JSON objects
or string objects, depending on the content.
"""
pass
def on_done(self):
"""
Called when this request has completed
"""
def _callback(self, htres, rows, data):
try:
self._process_payload(rows, data)
if self._rp_iter:
self.on_rows(self._rp_iter)
else:
self.on_data(data)
except Exception as e:
self.on_error(e)
finally:
self._rp_iter = None
if htres.done:
self.on_done()
def start_query():
"""
Initiate the callbacks for this query. These callbacks
will be invoked until the request has completed
"""
self._setup_streaming_request()
self._do_iter = True
self.raw._callback = self._callback
from twisted.internet import error as TxErrors
import couchbase._libcouchbase as LCB
from couchbase._libcouchbase import (
Event, TimerEvent, IOEvent,
LCB_READ_EVENT, LCB_WRITE_EVENT, LCB_RW_EVENT,
PYCBC_EVSTATE_ACTIVE,
PYCBC_EVACTION_WATCH,
PYCBC_EVACTION_UNWATCH
)
class TxIOEvent(IOEvent):
"""
IOEvent is a class implemented in C. It exposes
a 'fileno()' method, so we don't have to.
"""
def __init__(self):
super(TxIOEvent, self).__init__()
# We have fileno
def doRead(self):
self.ready(LCB_READ_EVENT)
def doWrite(self):
self.ready(LCB_WRITE_EVENT)
def connectionLost(self, reason):
pass
#print "Argh..." + str(reason)
#self.ready(LCB_READ_EVENT)
def logPrefix(self):
return "Couchbase FD={0}".format(self.fileno())
class v0Iops(object):
"""
IOPS Implementation to be used with Twisted's "FD" based reactors
"""
def __init__(self, reactor, is_sync=False):
self.reactor = reactor
self.is_sync = is_sync
self._stop = False
def update_event(self, event, action, flags, fd=None):
"""
Called by libcouchbase to add/remove event watchers
"""
if action == PYCBC_EVACTION_UNWATCH:
if event.flags & LCB_READ_EVENT:
self.reactor.removeReader(event)
if event.flags & LCB_WRITE_EVENT:
self.reactor.removeWriter(event)
elif action == PYCBC_EVACTION_WATCH:
if flags & LCB_READ_EVENT:
self.reactor.addReader(event)
if flags & LCB_WRITE_EVENT:
self.reactor.addWriter(event)
if flags & LCB_READ_EVENT == 0:
self.reactor.removeReader(event)
if flags & LCB_WRITE_EVENT == 0:
self.reactor.removeWriter(event)
def _make_tmcb(self, tmev):
def ret(*args, **kwargs):
tmev.ready(0)
return ret
def update_timer(self, timer, action, usecs):
"""
Called by libcouchbase to add/remove timers
"""
if action == PYCBC_EVACTION_UNWATCH:
try:
timer.txhandle.cancel()
except TxErrors.AlreadyCalled:
pass
timer.txhandle = None
else:
ret = self.reactor.callLater(usecs / 1000000,
self._make_tmcb(timer))
timer.txhandle = ret
def io_event_factory(self):
return TxIOEvent()
def start_watching(self):
"""
Start/Stop operations. This is a no-op in twisted because
it's a continuously running async loop
"""
if not self.is_sync:
return
self._stop = False
while not self._stop:
self.reactor.doIteration(0)
def stop_watching(self):
self._stop = True
#
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python
import argparse
from time import time
#from twisted.internet import epollreactor
#epollreactor.install()
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import LoopingCall
from txcouchbase.connection import Connection
from couchbase.connection import FMT_BYTES
from couchbase.transcoder import Transcoder
ap = argparse.ArgumentParser()
ap.add_argument('-t', '--threads', default=4, type=int,
help="Number of threads to spawn. 0 means no threads "
"but workload will still run in the main thread")
ap.add_argument('-d', '--delay', default=0, type=float,
help="Number of seconds to wait between each op. "
"may be a fraction")
ap.add_argument('-C', '--clients', default=1, type=int,
help="Number of clients (nthreads are per-client)")
ap.add_argument('-b', '--bucket', default='default', type=str)
ap.add_argument('-p', '--password', default=None, type=str)
ap.add_argument('-H', '--hostname', default='localhost', type=str)
ap.add_argument('-D', '--duration', default=10, type=int,
help="Duration of run (in seconds)")
ap.add_argument('-T', '--transcoder', default=False,
action='store_true',
help="Use the Transcoder object rather than built-in "
"conversion routines")
ap.add_argument('--ksize', default=12, type=int,
help="Key size to use")
ap.add_argument('--vsize', default=128, type=int,
help="Value size to use")
options = ap.parse_args()
TC = Transcoder()
class Runner(object):
def __init__(self, cb):
self.cb = cb
self.delay = options.delay
self.key = 'K' * options.ksize
self.value = b'V' * options.vsize
self.wait_time = 0
self.opcount = 0
self.end_time = time() + options.duration
def lproc():
self.opcount += 1
return self.cb.set(self.key, self.value, format=FMT_BYTES)
#self.lc = LoopingCall(lproc)
#self.lc.start(options.delay)
self._do_stop = False
self.start()
@inlineCallbacks
def start(self):
while not self._do_stop:
yield self.cb.set(self.key, self.value, format=FMT_BYTES)
self.opcount += 1
def stop(self):
self._do_stop = True
#self.lc.stop()
global_begin = time()
runners = []
clients = []
kwargs = {
'bucket': options.bucket,
'host': options.hostname,
'password': options.password
}
if TC:
kwargs['transcoder'] = TC
for _ in range(options.clients):
d = Connection.create_connection(**kwargs)
def on_connect(conn):
clients.append(conn)
for _ in range(options.threads):
r = Runner(conn)
runners.append(r)
d.addCallback(on_connect)
def stop_all():
[r.stop() for r in runners]
reactor.stop()
reactor.callLater(options.duration, stop_all)
reactor.run()
global_duration = time() - global_begin
total_ops = sum([r.opcount for r in runners])
total_time = 0
for r in runners:
total_time += r.wait_time
print("Total run took an absolute time of %0.2f seconds" % (global_duration,))
print("Did a total of %d operations" % (total_ops,))
print("[ABS] %0.2f ops/second" % (float(total_ops)/float(global_duration),))
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from couchbase._async import Async
from txcouchbase.iops import v0Iops
from twisted.internet import reactor as tx_reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
class Connection(Async):
def __init__(self, reactor=None, **kwargs):
"""
Connection subclass for uses within a Twisted application.
This extends the base Connection object with a single method
that generates Deferred objects for operation results.
"""
if not reactor:
reactor = tx_reactor
iops = v0Iops(reactor)
super(Connection, self).__init__(iops=iops, **kwargs)
@classmethod
def create_connection(cls, *args, **kwargs):
obj = cls(*args, **kwargs)
d = Deferred()
obj._conncb = lambda x: d.callback(obj) if x else d.errback(None)
return d
@staticmethod
def _d_fail(ft, exc):
failure = Failure(exc_value=exobj, exc_type=extype, exc_tb=extb)
ft.userdata.errback(failure)
ft.userdata = None
@staticmethod
def _d_ok(ft):
ft.userdata.callback(ft)
ft.userdata = None
def _defer(self, opres):
"""
Converts a return value from a :class:`couchbase.connection.Connection`
object into a :class:`Deferred`. This should only be used on APIs
which do not have a twisted wrapper, and in such case, the APIs
should be called through the base class (e.g. `Connection.foo(cb, ..)`)
rather than on the object itself (i.e. `cb.foo(..)`) in case this
method is later wrapped.
:param opres: The result returned by one of the base class' API methods
:type opres: :class:`couchbase.results.AsyncResult`
:return: A :class:`Deferred` object.
"""
d = Deferred()
opres.userdata = d
opres.callback = self._d_ok
opres.errback = self._d_fail
return d
def _retmulti(self, meth, *args, **kwargs):
return self._defer(meth(self, *args, **kwargs))
def _retone(self, meth, *args, **kwargs):
d = self._defer(meth(self, *args, **kwargs))
k = args[0]
d.addCallback(lambda x: x[k])
return d
def get(self, *args, **kwargs):
return self._retone(Async.get, *args, **kwargs)
def set(self, *args, **kwargs):
return self._retone(Async.set, *args, **kwargs)
def add(self, *args, **kwargs):
return sef._retone(Async.add, *args, **kwargs)
def replace(self, *args, **kwargs):
return self._retone(Async.replace, *args, **kwargs)
def append(self, *args, **kwargs):
return self._retone(Async.append, *args, **kwargs)
def prepend(self, *args, **kwargs):
return self._retone(Async.prepend, *args, **kwargs)
def delete(self, *args, **kwargs):
return self._retone(Async.delete, *args, **kwargs)
def lock(self, *args, **kwargs):
return self._retone(Async.lock, *args, **kwargs)
def unlock(self, *args, **kwargs):
return self._retone(Async.unlock, *args, **kwargs)
def get_multi(self, *args, **kwargs):
return self._retmulti(Async.get_multi, *args, **kwargs)
def set_multi(self, *args, **kwargs):
return self._retmulti(Async.set_multi, *args, **kwargs)
def add_multi(self, *args, **kwargs):
return self._retmulti(Async.add_multi, *args, **kwargs)
def replace_multi(self, *args, **kwargs):
return self._retmulti(Async.replace_multi, *args, **kwargs)
def append_multi(self, *args, **kwargs):
return self._retmulti(Async.append_multi, *args, **kwargs)
def prepend_multi(self, *args, **kwargs):
return self._retmulti(Async.prepend_multi, *args, **kwargs)
def lock_multi(self, *args, **kwargs):
return self._retmulti(Async.lock_multi, *args, **kwargs)
def unlock_multi(self, *args, **kwargs):
return self._retmulti(Async.unlock_multi, *args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment