Skip to content

Instantly share code, notes, and snippets.

@mnunberg
Created September 30, 2013 15:14
Show Gist options
  • Save mnunberg/6765280 to your computer and use it in GitHub Desktop.
Save mnunberg/6765280 to your computer and use it in GitHub Desktop.
#
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import select
import time
import couchbase._libcouchbase as LCB
from couchbase._libcouchbase import (
Event, TimerEvent, IOEvent,
LCB_READ_EVENT, LCB_WRITE_EVENT, LCB_RW_EVENT,
PYCBC_EVSTATE_ACTIVE,
PYCBC_EVACTION_WATCH,
PYCBC_EVACTION_UNWATCH
)
class SelectIOPS(object):
def __init__(self):
self._do_watch = False
self._next_timeout = 0
self._ioevents = set()
self._timers = set()
# Active readers and writers
self._evwr = set()
self._evrd = set()
def _unregister_timer(self, event):
if event in self._timers:
self._timers.remove(event)
def _unregister_event(self, event):
try:
self._evrd.remove(event)
except KeyError:
pass
try:
self._evwr.remove(event)
except KeyError:
pass
try:
self._ioevents.remove(event)
except KeyError:
pass
def update_timer(self, timer, action, usecs):
if action == PYCBC_EVACTION_UNWATCH:
self._unregister_timer(timer)
return
self._timers.add(timer)
timer.pydata = time.time() + usecs / 1000000
def update_event(self, event, action, flags):
if action == PYCBC_EVACTION_UNWATCH:
self._unregister_event(event)
return
elif action == PYCBC_EVACTION_WATCH:
if flags & LCB_READ_EVENT:
self._evrd.add(event)
else:
try:
self._evrd.remove(event)
except KeyError:
pass
if flags & LCB_WRITE_EVENT:
self._evwr.add(event)
else:
try:
self._evwr.remove(event)
except KeyError:
pass
def _poll(self):
rin = self._evrd
win = self._evwr
ein = list(rin) + list(win)
rout, wout, eout = select.select(rin, win, ein)
now = time.time()
ready_events = {}
for ev in rout:
ready_events[ev] = LCB_READ_EVENT
for ev in wout:
if ev in ready_events:
ready_events[ev] |= LCB_WRITE_EVENT
else:
ready_events[ev] = LCB_WRITE_EVENT
for ev in eout:
ready_events[ev] = LCB_RW_EVENT
for ev, flags in ready_events.items():
if ev.state == PYCBC_EVSTATE_ACTIVE:
ev.ready(flags)
for ev in tuple(self._timers):
if not ev.state == PYCBC_EVSTATE_ACTIVE:
continue
if ev.pydata > now:
continue
ev.ready(0)
def start_watching(self):
if self._do_watch:
return
self._do_watch = True
while self._do_watch:
self._poll()
def stop_watching(self):
self._do_watch = False
from twisted.internet import error as TxErrors
import couchbase._libcouchbase as LCB
from couchbase._libcouchbase import (
Event, TimerEvent, IOEvent,
LCB_READ_EVENT, LCB_WRITE_EVENT, LCB_RW_EVENT,
PYCBC_EVSTATE_ACTIVE,
PYCBC_EVACTION_WATCH,
PYCBC_EVACTION_UNWATCH
)
class TxIOEvent(IOEvent):
"""
IOEvent is a class implemented in C. It exposes
a 'fileno()' method, so we don't have to.
"""
def __init__(self):
super(TxIOEvent, self).__init__()
# We have fileno
def doRead(self):
self.ready(LCB_READ_EVENT)
def doWrite(self):
self.ready(LCB_WRITE_EVENT)
def connectionLost(self, reason):
pass
#print "Argh..." + str(reason)
#self.ready(LCB_READ_EVENT)
def logPrefix(self):
return "Couchbase FD={0}".format(self.fileno())
class v0Iops(object):
"""
IOPS Implementation to be used with Twisted's "FD" based reactors
"""
def __init__(self, reactor, is_sync=False):
self.reactor = reactor
self.is_sync = is_sync
self._stop = False
def update_event(self, event, action, flags, fd=None):
"""
Called by libcouchbase to add/remove event watchers
"""
if action == PYCBC_EVACTION_UNWATCH:
if event.flags & LCB_READ_EVENT:
self.reactor.removeReader(event)
if event.flags & LCB_WRITE_EVENT:
self.reactor.removeWriter(event)
elif action == PYCBC_EVACTION_WATCH:
if flags & LCB_READ_EVENT:
self.reactor.addReader(event)
if flags & LCB_WRITE_EVENT:
self.reactor.addWriter(event)
if flags & LCB_READ_EVENT == 0:
self.reactor.removeReader(event)
if flags & LCB_WRITE_EVENT == 0:
self.reactor.removeWriter(event)
def _make_tmcb(self, tmev):
def ret(*args, **kwargs):
tmev.ready(0)
return ret
def update_timer(self, timer, action, usecs):
"""
Called by libcouchbase to add/remove timers
"""
if action == PYCBC_EVACTION_UNWATCH:
try:
timer.txhandle.cancel()
except TxErrors.AlreadyCalled:
pass
timer.txhandle = None
else:
ret = self.reactor.callLater(usecs / 1000000,
self._make_tmcb(timer))
timer.txhandle = ret
def io_event_factory(self):
return TxIOEvent()
def start_watching(self):
"""
Start/Stop operations. This is a no-op in twisted because
it's a continuously running async loop
"""
if not self.is_sync:
return
self._stop = False
while not self._stop:
self.reactor.doIteration(0)
def stop_watching(self):
self._stop = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment