Created
January 18, 2023 07:54
-
-
Save YoshiRi/7f3724cd5ab89cd51266d6f6d31b5841 to your computer and use it in GitHub Desktop.
python mock to publish dummy detected objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env /usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# ----------------------------------------------- | |
# ROS2 Node 送信側 | |
# | |
# The MIT License (MIT) | |
# Copyright (C) 2022 yoshi ri. | |
# ----------------------------------------------- | |
import rclpy | |
from rclpy.node import Node | |
from tf2_ros import TransformBroadcaster | |
import math | |
from autoware_auto_perception_msgs.msg import DetectedObject | |
from autoware_auto_perception_msgs.msg import DetectedObjects | |
from autoware_auto_perception_msgs.msg import ObjectClassification | |
from autoware_auto_perception_msgs.msg import Shape | |
import numpy as np | |
from geometry_msgs.msg import TransformStamped | |
from geometry_msgs.msg import Quaternion | |
# for debugging in odaiba spkadai55 | |
INIT_X=89570.6 | |
INIT_Y=42334.9 | |
INIT_Z=8.5 | |
INIT_YAW = 120/180*math.pi | |
INIT_X=12.0 | |
INIT_Y=0.0 | |
INIT_Z=0.0 | |
INIT_YAW = 0.0 | |
# self vehicle pose | |
SELF_POSE = [89571.414, 42301.364, 6.637] | |
SELF_QUAT = [-0.012, -0.002, 0.286, 0.958] | |
SELF_POSE = [0. ,0., 0.] | |
SELF_QUAT = [0., 0., 0., 1.] | |
def quaternion_from_euler(roll, pitch, yaw): | |
""" | |
Converts euler roll, pitch, yaw to quaternion (w in last place) | |
quat = [x, y, z, w] | |
Bellow should be replaced when porting for ROS 2 Python tf_conversions is done. | |
""" | |
cy = math.cos(yaw * 0.5) | |
sy = math.sin(yaw * 0.5) | |
cp = math.cos(pitch * 0.5) | |
sp = math.sin(pitch * 0.5) | |
cr = math.cos(roll * 0.5) | |
sr = math.sin(roll * 0.5) | |
q = Quaternion() | |
q.x = cy * cp * cr + sy * sp * sr | |
q.y = cy * cp * sr - sy * sp * cr | |
q.z = sy * cp * sr + cy * sp * cr | |
q.w = sy * cp * cr - cy * sp * sr | |
return q | |
# create bus object and set classification | |
def create_object(label = ObjectClassification.BUS): | |
# init | |
bus1 = DetectedObject() | |
# classification | |
classification = ObjectClassification() | |
classification.label = label | |
classification.probability = 0.8 | |
bus1.classification.append(classification) | |
return bus1 | |
def create_bus_object(): | |
bus1 = create_object(ObjectClassification.BUS) | |
bus1.shape.dimensions.x = 7.0 | |
bus1.shape.dimensions.y = 2.0 | |
bus1.shape.dimensions.z = 2.0 | |
return bus1 | |
def create_bike_object(): | |
bike = create_object(ObjectClassification.MOTORCYCLE) | |
bike.shape.dimensions.x = 1.4 | |
bike.shape.dimensions.y = 0.7 | |
bike.shape.dimensions.z = 1.7 | |
return bike | |
# Create Detection | |
def creating_shape_changing_object(timing_count): | |
bus1 = create_bus_object() | |
# set pose and shape | |
x_shape_offset = 2.0 * math.sin(2* math.pi * timing_count * 0.1 / 10) | |
bus1.shape.dimensions.x = 7.0 + x_shape_offset | |
bus1.shape.dimensions.y = 2.0 | |
bus1.shape.dimensions.z = 2.0 | |
bus1.kinematics.pose_with_covariance.pose.position.x = INIT_X + math.cos(INIT_YAW)*x_shape_offset/2 | |
bus1.kinematics.pose_with_covariance.pose.position.y = INIT_Y + math.sin(INIT_YAW)*x_shape_offset/2 | |
bus1.kinematics.pose_with_covariance.pose.position.z = INIT_Z | |
#bus1.kinematics.twist_with_covariance.twist.linear.x = 4/10*math.pi * math.cos(2* math.pi * self.count * 0.1 / 10) | |
bus1.kinematics.pose_with_covariance.pose.orientation = quaternion_from_euler(INIT_YAW,0,0) | |
# setting | |
bus1.shape.type = Shape.BOUNDING_BOX | |
bus1.existence_probability = 0.8 | |
bus1.kinematics.has_position_covariance = False | |
bus1.kinematics.has_twist = False | |
bus1.kinematics.has_twist_covariance = False | |
bus1.kinematics.orientation_availability = 1 | |
bus1.kinematics.pose_with_covariance.covariance = np.diag([16,16,0.4,0.2,0.2,0.2]).reshape(-1) | |
return bus1 | |
# create turning object | |
class BicycleModel(): | |
# state = x, y, yaw, v, beta | |
def __init__(self, init_state = [10., 0., 0., 5., 0.]) -> None: | |
self.state = init_state | |
self.lr = 3.5 | |
self.steer_angle = np.pi/40 | |
self.count = 0 | |
self.loop = 20 | |
def update(self, dt = 0.1): | |
x,y,yaw,v,beta = self.state | |
beta = np.arctan(0.5 *np.tan(self.steer_angle)) | |
x += v*np.cos(yaw+beta)*dt | |
y += v*np.sin(yaw+beta)*dt | |
yaw += v/self.lr * np.sin(beta) * dt | |
v_ = 5. * np.sin(2*np.pi*self.count/self.loop /4) | |
v = v_ if v_ > 0 else 0. | |
self.state = [x,y,yaw,v,beta] | |
self.count = self.count + 1 if self.count < 4 * self.loop else 0 | |
def return_turning_object(self): | |
#obj1 = create_bus_object() | |
obj1 = create_bike_object() | |
obj1.kinematics.pose_with_covariance.pose.position.x = self.state[0] | |
obj1.kinematics.pose_with_covariance.pose.position.y = self.state[1] | |
obj1.kinematics.pose_with_covariance.pose.orientation = quaternion_from_euler(self.state[2],0,0) | |
obj1.kinematics.twist_with_covariance.twist.linear.x = self.state[3] | |
x,y,yaw,v,beta = self.state | |
obj1.kinematics.twist_with_covariance.twist.angular.z = v/self.lr * np.sin(beta) * dt | |
return obj1 | |
class MyPublisher(Node): | |
""" | |
Test publisher | |
""" | |
# ノード名 | |
SELFNODE = "test_MOT" | |
# トピック名 | |
PUBTOPIC = "/perception/object_recognition/detection/objects" | |
SUBTOPIC = "/tracked_object" | |
def __init__(self): | |
# ノードの初期化 | |
super().__init__(self.SELFNODE) | |
# コンソールに表示 | |
self.get_logger().info("%s initializing..." % (self.SELFNODE)) | |
# publisherインスタンスを生成 | |
self.pub = self.create_publisher(DetectedObjects, self.PUBTOPIC, 1) | |
# tf broad caster | |
self.tf_broadcaster = TransformBroadcaster(self) | |
# timer | |
self.dt = 0.1 | |
# タイマーのインスタンスを生成(1秒ごとに発生) | |
self.create_timer(self.dt, self.callback) | |
# カウンタをリセット | |
self.count = 0 | |
self.get_logger().info("%s do..." % self.SELFNODE) | |
self.model = BicycleModel() | |
def __del__(self): | |
""" | |
デストラクタ | |
""" | |
# コンソールに表示 | |
self.get_logger().info("%s done." % self.SELFNODE) | |
def generate_detected_object(self): | |
msg = DetectedObjects() | |
msg.header.frame_id = "map" | |
msg.header.stamp = self.get_clock().now().to_msg() | |
# creating shape changing object | |
bus1 = creating_shape_changing_object(self.count) | |
self.model.update(self.dt) | |
bus1 = self.model.return_turning_object() | |
msg.objects.append(bus1) | |
return msg | |
def broadcast_self_vehicle(self): | |
seconds, _ = self.get_clock().now().seconds_nanoseconds() | |
t = TransformStamped() | |
t.header.stamp = self.get_clock().now().to_msg() | |
t.header.frame_id = 'map' | |
t.child_frame_id = 'base_link' | |
t.transform.translation.x = SELF_POSE[0] | |
t.transform.translation.y = SELF_POSE[1] | |
t.transform.translation.z = SELF_POSE[2] | |
t.transform.rotation.x = SELF_QUAT[0] | |
t.transform.rotation.y = SELF_QUAT[1] | |
t.transform.rotation.z = SELF_QUAT[2] | |
t.transform.rotation.w = SELF_QUAT[3] | |
self.tf_broadcaster.sendTransform(t) | |
def callback(self): | |
""" | |
タイマーの実行部 | |
""" | |
self.get_logger().info("Publish [%s]" % (self.count)) | |
# 送信するメッセージの作成 | |
msg = self.generate_detected_object() | |
# 送信 | |
self.pub.publish(msg) | |
# カウンタをインクリメント | |
self.count += 1 | |
# broadcast tf | |
self.broadcast_self_vehicle() | |
def main(args=None): | |
""" | |
メイン関数 | |
Parameters | |
---------- | |
""" | |
try: | |
# rclpyの初期化 | |
rclpy.init(args=args) | |
# インスタンスを生成 | |
node = MyPublisher() | |
# プロセス終了までアイドリング | |
rclpy.spin(node) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
# 終了処理 | |
node.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment