Last active
December 24, 2015 04:59
-
-
Save mnunberg/6747647 to your computer and use it in GitHub Desktop.
txcouchbase
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright 2013, Couchbase, Inc. | |
# All Rights Reserved | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License") | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
""" | |
This file contains the stub Async implementation. | |
This module is prefixed by an underscore and thus is not public API, | |
meaning the interface may change. Its presence is here primarily to | |
expose potential integrators to the mechanisms by which the library | |
may be extended to support other async frameworks | |
""" | |
import couchbase._bootstrap | |
import couchbase._libcouchbase | |
from couchbase._libcouchbase import AsyncResult, PYCBC_CONN_F_ASYNC | |
from couchbase.connection import Connection | |
from couchbase.views.iterator import View as _CBView | |
from couchbase.exceptions import CouchbaseError | |
class Async(Connection): | |
def __init__(self, iops=None, **kwargs): | |
""" | |
Create a new 'Async' connection. An async connection is an object | |
which functions like a normal synchronous connection, except that it | |
returns future objects (i.e. :class:`couchbase.results.AsyncResult` | |
objects) instead of :class:`couchbase.results.Result`. | |
These objects are actually :class:`couchbase.results.MultiResult` | |
objects which are empty upon retun. As operations complete, this | |
object becomes populated with the relevant data. | |
Note that the 'AsyncResult' object must currently have a valid | |
'callback' and 'errback' object set *after* they are returned from | |
the API methods. If this is not the case then an exception will be | |
raised when the callbacks are about to arrive. This behavior is the | |
primary reason why this interface isn't public, too :) | |
""" | |
if not iops: | |
raise ValueError("Must have IOPS") | |
kwargs.setdefault('_flags', 0) | |
# Must have an IOPS implementation | |
kwargs['_iops'] = iops | |
# Flags should be async | |
kwargs['_flags'] |= PYCBC_CONN_F_ASYNC | |
# Don't lock/unlock GIL as the enter/leave points are not coordinated | |
#kwargs['unlock_gil'] = False | |
super(Async, self).__init__(**kwargs) | |
def _ctor_do_connect(self): | |
# Don't connect on init | |
pass | |
class AsyncViewBase(_CBView): | |
def __init__(self, *args, **kwargs): | |
""" | |
Initialize a new AsyncViewBase object. This is intended to be | |
subclassed in order to implement the require methods to be | |
invoked on error, data, and row events | |
""" | |
kwargs['streaming'] = True | |
super(View, self).__init__(*args, **kwargs) | |
def __iter__(self): | |
raise NotImplementedError("Iteration not supported on async view") | |
def on_error(self, ex): | |
""" | |
Called when there is a failure with the response data | |
:param Exception ex: The exception caught | |
""" | |
raise ex | |
def on_rows(self, rowiter): | |
""" | |
Called when there are more processed views. | |
:param iterable rowiter: An iterable which will yield results | |
as defined by the :class:`RowProcessor` implementation | |
""" | |
raise NotImplementedError() | |
def on_data(self, data): | |
""" | |
Called for any non-view-row HTTP data | |
:param list data: Raw HTTP data received from the request, in a list | |
form. The type of elements in the list will be either JSON objects | |
or string objects, depending on the content. | |
""" | |
pass | |
def on_done(self): | |
""" | |
Called when this request has completed | |
""" | |
def _callback(self, htres, rows, data): | |
try: | |
self._process_payload(rows, data) | |
if self._rp_iter: | |
self.on_rows(self._rp_iter) | |
else: | |
self.on_data(data) | |
except Exception as e: | |
self.on_error(e) | |
finally: | |
self._rp_iter = None | |
if htres.done: | |
self.on_done() | |
def start_query(): | |
""" | |
Initiate the callbacks for this query. These callbacks | |
will be invoked until the request has completed | |
""" | |
self._setup_streaming_request() | |
self._do_iter = True | |
self.raw._callback = self._callback |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import error as TxErrors | |
import couchbase._libcouchbase as LCB | |
from couchbase._libcouchbase import ( | |
Event, TimerEvent, IOEvent, | |
LCB_READ_EVENT, LCB_WRITE_EVENT, LCB_RW_EVENT, | |
PYCBC_EVSTATE_ACTIVE, | |
PYCBC_EVACTION_WATCH, | |
PYCBC_EVACTION_UNWATCH | |
) | |
class TxIOEvent(IOEvent): | |
""" | |
IOEvent is a class implemented in C. It exposes | |
a 'fileno()' method, so we don't have to. | |
""" | |
def __init__(self): | |
super(TxIOEvent, self).__init__() | |
# We have fileno | |
def doRead(self): | |
self.ready(LCB_READ_EVENT) | |
def doWrite(self): | |
self.ready(LCB_WRITE_EVENT) | |
def connectionLost(self, reason): | |
pass | |
#print "Argh..." + str(reason) | |
#self.ready(LCB_READ_EVENT) | |
def logPrefix(self): | |
return "Couchbase FD={0}".format(self.fileno()) | |
class v0Iops(object): | |
""" | |
IOPS Implementation to be used with Twisted's "FD" based reactors | |
""" | |
def __init__(self, reactor, is_sync=False): | |
self.reactor = reactor | |
self.is_sync = is_sync | |
self._stop = False | |
def update_event(self, event, action, flags, fd=None): | |
""" | |
Called by libcouchbase to add/remove event watchers | |
""" | |
if action == PYCBC_EVACTION_UNWATCH: | |
if event.flags & LCB_READ_EVENT: | |
self.reactor.removeReader(event) | |
if event.flags & LCB_WRITE_EVENT: | |
self.reactor.removeWriter(event) | |
elif action == PYCBC_EVACTION_WATCH: | |
if flags & LCB_READ_EVENT: | |
self.reactor.addReader(event) | |
if flags & LCB_WRITE_EVENT: | |
self.reactor.addWriter(event) | |
if flags & LCB_READ_EVENT == 0: | |
self.reactor.removeReader(event) | |
if flags & LCB_WRITE_EVENT == 0: | |
self.reactor.removeWriter(event) | |
def _make_tmcb(self, tmev): | |
def ret(*args, **kwargs): | |
tmev.ready(0) | |
return ret | |
def update_timer(self, timer, action, usecs): | |
""" | |
Called by libcouchbase to add/remove timers | |
""" | |
if action == PYCBC_EVACTION_UNWATCH: | |
try: | |
timer.txhandle.cancel() | |
except TxErrors.AlreadyCalled: | |
pass | |
timer.txhandle = None | |
else: | |
ret = self.reactor.callLater(usecs / 1000000, | |
self._make_tmcb(timer)) | |
timer.txhandle = ret | |
def io_event_factory(self): | |
return TxIOEvent() | |
def start_watching(self): | |
""" | |
Start/Stop operations. This is a no-op in twisted because | |
it's a continuously running async loop | |
""" | |
if not self.is_sync: | |
return | |
self._stop = False | |
while not self._stop: | |
self.reactor.doIteration(0) | |
def stop_watching(self): | |
self._stop = True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright 2013, Couchbase, Inc. | |
# All Rights Reserved | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License") | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
#!/usr/bin/env python | |
import argparse | |
from time import time | |
#from twisted.internet import epollreactor | |
#epollreactor.install() | |
from twisted.internet import reactor | |
from twisted.internet.defer import inlineCallbacks | |
from twisted.internet.task import LoopingCall | |
from txcouchbase.connection import Connection | |
from couchbase.connection import FMT_BYTES | |
from couchbase.transcoder import Transcoder | |
ap = argparse.ArgumentParser() | |
ap.add_argument('-t', '--threads', default=4, type=int, | |
help="Number of threads to spawn. 0 means no threads " | |
"but workload will still run in the main thread") | |
ap.add_argument('-d', '--delay', default=0, type=float, | |
help="Number of seconds to wait between each op. " | |
"may be a fraction") | |
ap.add_argument('-C', '--clients', default=1, type=int, | |
help="Number of clients (nthreads are per-client)") | |
ap.add_argument('-b', '--bucket', default='default', type=str) | |
ap.add_argument('-p', '--password', default=None, type=str) | |
ap.add_argument('-H', '--hostname', default='localhost', type=str) | |
ap.add_argument('-D', '--duration', default=10, type=int, | |
help="Duration of run (in seconds)") | |
ap.add_argument('-T', '--transcoder', default=False, | |
action='store_true', | |
help="Use the Transcoder object rather than built-in " | |
"conversion routines") | |
ap.add_argument('--ksize', default=12, type=int, | |
help="Key size to use") | |
ap.add_argument('--vsize', default=128, type=int, | |
help="Value size to use") | |
options = ap.parse_args() | |
TC = Transcoder() | |
class Runner(object): | |
def __init__(self, cb): | |
self.cb = cb | |
self.delay = options.delay | |
self.key = 'K' * options.ksize | |
self.value = b'V' * options.vsize | |
self.wait_time = 0 | |
self.opcount = 0 | |
self.end_time = time() + options.duration | |
def lproc(): | |
self.opcount += 1 | |
return self.cb.set(self.key, self.value, format=FMT_BYTES) | |
#self.lc = LoopingCall(lproc) | |
#self.lc.start(options.delay) | |
self._do_stop = False | |
self.start() | |
@inlineCallbacks | |
def start(self): | |
while not self._do_stop: | |
yield self.cb.set(self.key, self.value, format=FMT_BYTES) | |
self.opcount += 1 | |
def stop(self): | |
self._do_stop = True | |
#self.lc.stop() | |
global_begin = time() | |
runners = [] | |
clients = [] | |
kwargs = { | |
'bucket': options.bucket, | |
'host': options.hostname, | |
'password': options.password | |
} | |
if TC: | |
kwargs['transcoder'] = TC | |
for _ in range(options.clients): | |
d = Connection.create_connection(**kwargs) | |
def on_connect(conn): | |
clients.append(conn) | |
for _ in range(options.threads): | |
r = Runner(conn) | |
runners.append(r) | |
d.addCallback(on_connect) | |
def stop_all(): | |
[r.stop() for r in runners] | |
reactor.stop() | |
reactor.callLater(options.duration, stop_all) | |
reactor.run() | |
global_duration = time() - global_begin | |
total_ops = sum([r.opcount for r in runners]) | |
total_time = 0 | |
for r in runners: | |
total_time += r.wait_time | |
print("Total run took an absolute time of %0.2f seconds" % (global_duration,)) | |
print("Did a total of %d operations" % (total_ops,)) | |
print("[ABS] %0.2f ops/second" % (float(total_ops)/float(global_duration),)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2013, Couchbase, Inc. | |
# All Rights Reserved | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License") | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
from couchbase._async import Async | |
from txcouchbase.iops import v0Iops | |
from twisted.internet import reactor as tx_reactor | |
from twisted.internet.defer import Deferred | |
from twisted.python.failure import Failure | |
class Connection(Async): | |
def __init__(self, reactor=None, **kwargs): | |
""" | |
Connection subclass for uses within a Twisted application. | |
This extends the base Connection object with a single method | |
that generates Deferred objects for operation results. | |
""" | |
if not reactor: | |
reactor = tx_reactor | |
iops = v0Iops(reactor) | |
super(Connection, self).__init__(iops=iops, **kwargs) | |
@classmethod | |
def create_connection(cls, *args, **kwargs): | |
obj = cls(*args, **kwargs) | |
d = Deferred() | |
obj._conncb = lambda x: d.callback(obj) if x else d.errback(None) | |
return d | |
@staticmethod | |
def _d_fail(ft, exc): | |
failure = Failure(exc_value=exobj, exc_type=extype, exc_tb=extb) | |
ft.userdata.errback(failure) | |
ft.userdata = None | |
@staticmethod | |
def _d_ok(ft): | |
ft.userdata.callback(ft) | |
ft.userdata = None | |
def _defer(self, opres): | |
""" | |
Converts a return value from a :class:`couchbase.connection.Connection` | |
object into a :class:`Deferred`. This should only be used on APIs | |
which do not have a twisted wrapper, and in such case, the APIs | |
should be called through the base class (e.g. `Connection.foo(cb, ..)`) | |
rather than on the object itself (i.e. `cb.foo(..)`) in case this | |
method is later wrapped. | |
:param opres: The result returned by one of the base class' API methods | |
:type opres: :class:`couchbase.results.AsyncResult` | |
:return: A :class:`Deferred` object. | |
""" | |
d = Deferred() | |
opres.userdata = d | |
opres.callback = self._d_ok | |
opres.errback = self._d_fail | |
return d | |
def _retmulti(self, meth, *args, **kwargs): | |
return self._defer(meth(self, *args, **kwargs)) | |
def _retone(self, meth, *args, **kwargs): | |
d = self._defer(meth(self, *args, **kwargs)) | |
k = args[0] | |
d.addCallback(lambda x: x[k]) | |
return d | |
def get(self, *args, **kwargs): | |
return self._retone(Async.get, *args, **kwargs) | |
def set(self, *args, **kwargs): | |
return self._retone(Async.set, *args, **kwargs) | |
def add(self, *args, **kwargs): | |
return sef._retone(Async.add, *args, **kwargs) | |
def replace(self, *args, **kwargs): | |
return self._retone(Async.replace, *args, **kwargs) | |
def append(self, *args, **kwargs): | |
return self._retone(Async.append, *args, **kwargs) | |
def prepend(self, *args, **kwargs): | |
return self._retone(Async.prepend, *args, **kwargs) | |
def delete(self, *args, **kwargs): | |
return self._retone(Async.delete, *args, **kwargs) | |
def lock(self, *args, **kwargs): | |
return self._retone(Async.lock, *args, **kwargs) | |
def unlock(self, *args, **kwargs): | |
return self._retone(Async.unlock, *args, **kwargs) | |
def get_multi(self, *args, **kwargs): | |
return self._retmulti(Async.get_multi, *args, **kwargs) | |
def set_multi(self, *args, **kwargs): | |
return self._retmulti(Async.set_multi, *args, **kwargs) | |
def add_multi(self, *args, **kwargs): | |
return self._retmulti(Async.add_multi, *args, **kwargs) | |
def replace_multi(self, *args, **kwargs): | |
return self._retmulti(Async.replace_multi, *args, **kwargs) | |
def append_multi(self, *args, **kwargs): | |
return self._retmulti(Async.append_multi, *args, **kwargs) | |
def prepend_multi(self, *args, **kwargs): | |
return self._retmulti(Async.prepend_multi, *args, **kwargs) | |
def lock_multi(self, *args, **kwargs): | |
return self._retmulti(Async.lock_multi, *args, **kwargs) | |
def unlock_multi(self, *args, **kwargs): | |
return self._retmulti(Async.unlock_multi, *args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment