Created
November 30, 2019 11:49
-
-
Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import time | |
import numpy as np | |
import rospy | |
from sensor_msgs.msg import Imu | |
import mavros_msgs | |
from mavros_msgs.msg import ActuatorControl, State, Thrust | |
from mavros_msgs.srv import CommandBool, SetMode | |
from geometry_msgs.msg import PoseStamped | |
from tf.transformations import euler_from_quaternion | |
class Drone(): | |
def __init__(self): | |
rospy.init_node('test_node', anonymous=True) | |
# Variables | |
self.state = State() | |
# ROS services | |
service_timeout = 30 | |
rospy.loginfo("waiting for ROS services") | |
try: | |
rospy.wait_for_service('mavros/cmd/arming', service_timeout) | |
rospy.wait_for_service('mavros/set_mode', service_timeout) | |
rospy.loginfo("ROS services are up") | |
except rospy.ROSException: | |
self.fail("failed to connect to services") | |
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',CommandBool) | |
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode) | |
# ROS Subscribers | |
rospy.Subscriber("/mavros/state", State, self.state_callback) | |
# rospy.Subscriber("/mavros/imu/data", Imu, self.imu_callback) | |
# ROS publishers | |
# self.thrust_pub = rospy.Publisher("/mavros/setpoint_attitude/thrust", Thrust, queue_size=10) | |
# self.actuator_pub = rospy.Publisher("/mavros/target_actuator_control", ActuatorControl, queue_size=10) | |
self.rate = rospy.Rate(50) | |
def set_arm(self, arm, timeout): | |
"""arm: True to arm or False to disarm, timeout(int): seconds""" | |
rospy.loginfo("setting FCU arm: {}".format(arm)) | |
old_arm = self.state.armed | |
loop_freq = 1 # Hz | |
rate = rospy.Rate(loop_freq) | |
arm_set = False | |
for i in xrange(timeout * loop_freq): | |
if self.state.armed == arm: | |
arm_set = True | |
rospy.loginfo("set arm success | seconds: {0} of {1}".format( | |
i / loop_freq, timeout)) | |
break | |
else: | |
try: | |
res = self.set_arming_srv(arm) | |
if not res.success: | |
rospy.logerr("failed to send arm command") | |
except rospy.ServiceException as e: | |
rospy.logerr(e) | |
# try: | |
rate.sleep() | |
# except rospy.ROSException as e: | |
# self.fail(e) | |
# self.assertTrue(arm_set, ( | |
# "failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}". | |
# format(arm, old_arm, timeout))) | |
def set_mode(self, mode, timeout): | |
"""mode: PX4 mode string, timeout(int): seconds""" | |
rospy.loginfo("setting FCU mode: {0}".format(mode)) | |
old_mode = self.state.mode | |
loop_freq = 1 # Hz | |
rate = rospy.Rate(loop_freq) | |
mode_set = False | |
for i in xrange(timeout * loop_freq): | |
if self.state.mode == mode: | |
mode_set = True | |
rospy.loginfo("set mode success | seconds: {0} of {1}".format( | |
i / loop_freq, timeout)) | |
break | |
else: | |
try: | |
res = self.set_mode_srv(0, mode) # 0 is custom mode | |
if not res.mode_sent: | |
rospy.logerr("failed to send mode command") | |
except rospy.ServiceException as e: | |
rospy.logerr(e) | |
# try: | |
rate.sleep() | |
# except rospy.ROSException as e: | |
# self.fail(e) | |
# self.assertTrue(mode_set, ( | |
# "failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}". | |
# format(mode, old_mode, timeout))) | |
# CALLBACKS | |
def state_callback(self, data): | |
self.state = data | |
print(self.state.mode) | |
def imu_callback(self, data): | |
print(data.mode) | |
# rospy.loginfo(rospy.get_caller_id() + "\nlinear acceleration:\nx: [{}]\ny: [{}]\nz: [{}]" | |
# .format(data.linear_acceleration.x, data.linear_acceleration.y, data.linear_acceleration.z)) | |
# def command_actuator(self, a, inputs): | |
# a.header.stamp = rospy.Time.now() | |
# a.group_mix = 2 # ActuatorControl.PX4_MIX_PAYLOAD | |
# a.controls = inputs | |
if __name__ == '__main__': | |
drone1 = Drone() | |
# Wait for heartbeat | |
# send junky setpoints | |
junkmsg = Thrust() | |
junkmsg.thrust = 0.5 | |
# print(junkmsg) | |
thrust_pub = rospy.Publisher("mavros/setpoint_attitude/thrust", Thrust, queue_size=3) | |
time.sleep(1) | |
# thrust_pub.publish(junkmsg) | |
for i in range(100): | |
time.sleep(0.05) | |
thrust_pub.publish(junkmsg) | |
# set mode to OFFBOARD | |
drone1.set_mode("OFFBOARD", 10) | |
# time.sleep(2) | |
# Arm the drone | |
# drone1.set_arm(True,5) | |
# time.sleep(2) | |
# Initialize Actuator | |
# drone1.actuator_control_message_1 = ActuatorControl() | |
# # Set desired rpm | |
# inputs = np.array((0.0, 0.3, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) | |
# drone1.actuator_control_message_1.header.stamp = rospy.Time.now() | |
# drone1.actuator_control_message_1.group_mix = 3 # ActuatorControl.PX4_MIX_PAYLOAD | |
# drone1.actuator_control_message_1.controls = inputs | |
# drone1.actuator_pub.publish(drone1.actuator_control_message_1) | |
# # listener() | |
rospy.spin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment