Skip to content

Instantly share code, notes, and snippets.

@sergeant-wizard
Created October 23, 2018 22:45
Show Gist options
  • Save sergeant-wizard/73be52137c63e5f51ea4afec8282814d to your computer and use it in GitHub Desktop.
Save sergeant-wizard/73be52137c63e5f51ea4afec8282814d to your computer and use it in GitHub Desktop.
import rospy
import numpy as np
import std_msgs.msg
from rx import Observable
class TopicObservable(object):
def __init__(self, topic_name, topic_type, max_msgs=None):
self._topic_name = topic_name
self._topic_type = topic_type
self._max_msgs = max_msgs
self._cnt = 0
def __call__(self, observer):
self._observer = observer
self._sub = rospy.Subscriber(
self._topic_name,
self._topic_type,
self._on_topic
)
def _on_topic(self, msg):
self._observer.on_next(msg)
self._cnt += 1
if self._cnt == self._max_msgs:
print('completed')
self._observer.on_completed()
self.close()
def close(self):
self._sub.unregister()
def _print(msg=''):
print('{}'.format(msg))
def _print_error(x):
print('error: {}'.format(x))
def _print_next(x):
print('next: {}'.format(x))
class BufferTimeout(Exception):
pass
def check_length(x):
assert len(x) > 0
if __name__ == '__main__':
rospy.init_node('hoge', anonymous=True)
topic_name = 'my_topic'
topic_type = std_msgs.msg.Int16
pub = rospy.Publisher(topic_name, topic_type, queue_size=1)
rospy.Timer(rospy.Duration(0.1), lambda _: pub.publish(3))
to = TopicObservable(topic_name, topic_type, 3)
source = Observable.create(to)
source \
.map(lambda x: x.data) \
.buffer_with_time_or_count(1000, 3) \
.take(1) \
.do_action(check_length) \
.map(np.median) \
.subscribe(
on_next=_print_next,
on_error=_print_error,
)
input('block...\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment