Created
October 23, 2018 22:45
-
-
Save sergeant-wizard/73be52137c63e5f51ea4afec8282814d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rospy | |
import numpy as np | |
import std_msgs.msg | |
from rx import Observable | |
class TopicObservable(object): | |
def __init__(self, topic_name, topic_type, max_msgs=None): | |
self._topic_name = topic_name | |
self._topic_type = topic_type | |
self._max_msgs = max_msgs | |
self._cnt = 0 | |
def __call__(self, observer): | |
self._observer = observer | |
self._sub = rospy.Subscriber( | |
self._topic_name, | |
self._topic_type, | |
self._on_topic | |
) | |
def _on_topic(self, msg): | |
self._observer.on_next(msg) | |
self._cnt += 1 | |
if self._cnt == self._max_msgs: | |
print('completed') | |
self._observer.on_completed() | |
self.close() | |
def close(self): | |
self._sub.unregister() | |
def _print(msg=''): | |
print('{}'.format(msg)) | |
def _print_error(x): | |
print('error: {}'.format(x)) | |
def _print_next(x): | |
print('next: {}'.format(x)) | |
class BufferTimeout(Exception): | |
pass | |
def check_length(x): | |
assert len(x) > 0 | |
if __name__ == '__main__': | |
rospy.init_node('hoge', anonymous=True) | |
topic_name = 'my_topic' | |
topic_type = std_msgs.msg.Int16 | |
pub = rospy.Publisher(topic_name, topic_type, queue_size=1) | |
rospy.Timer(rospy.Duration(0.1), lambda _: pub.publish(3)) | |
to = TopicObservable(topic_name, topic_type, 3) | |
source = Observable.create(to) | |
source \ | |
.map(lambda x: x.data) \ | |
.buffer_with_time_or_count(1000, 3) \ | |
.take(1) \ | |
.do_action(check_length) \ | |
.map(np.median) \ | |
.subscribe( | |
on_next=_print_next, | |
on_error=_print_error, | |
) | |
input('block...\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment