Added new servo support

This commit is contained in:
Patrick Goebel 2015-12-13 08:06:59 -08:00
parent fcffbda0f7
commit 33104433ca
15 changed files with 575 additions and 41 deletions

View File

@ -11,7 +11,7 @@
Created for the Pi Robot Project: http://www.pirobot.org Created for the Pi Robot Project: http://www.pirobot.org
and the Home Brew Robotics Club (HBRC): http://hbrobotics.org and the Home Brew Robotics Club (HBRC): http://hbrobotics.org
Authors: Patrick Goebel, James Nugen Authors: Patrick Goebel, James Nugen, Nathaniel Gallinger
Inspired and modeled after the ArbotiX driver by Michael Ferguson Inspired and modeled after the ArbotiX driver by Michael Ferguson
@ -62,12 +62,20 @@
//#define ARDUINO_ENC_COUNTER //#define ARDUINO_ENC_COUNTER
#endif #endif
#define USE_SERVOS // Enable/disable use of PWM servos as defined in servos.h //#define USE_SERVOS // Enable/disable use of old PWM servo support as defined in servos.h
/* Include servo support if required */ #define USE_SERVOS2 // Enable/disable use of new PWM servo support as defined in servos2.h
/* Include old servo support if required */
#ifdef USE_SERVOS #ifdef USE_SERVOS
#include "Servo.h" #include "Servo.h"
#include "servos.h" #include "servos.h"
/* Include new servo support if desired */
#elif defined(USE_SERVOS2)
#include "Servo.h"
#include "servos2.h"
int nServos = 0;
#endif #endif
#if defined(ARDUINO) && ARDUINO >= 100 #if defined(ARDUINO) && ARDUINO >= 100
@ -141,13 +149,13 @@ void resetCommand() {
/* Run a command. Commands are defined in commands.h */ /* Run a command. Commands are defined in commands.h */
int runCommand() { int runCommand() {
int i = 0; int i;
char *p = argv1; char *p = argv1;
char *str; char *str;
int pid_args[4]; int pid_args[4];
arg1 = atoi(argv1); arg1 = atoi(argv1);
arg2 = atoi(argv2); arg2 = atoi(argv2);
switch(cmd) { switch(cmd) {
case GET_BAUDRATE: case GET_BAUDRATE:
Serial.println(BAUDRATE); Serial.println(BAUDRATE);
@ -183,6 +191,33 @@ int runCommand() {
case SERVO_READ: case SERVO_READ:
Serial.println(servos[arg1].getServo().read()); Serial.println(servos[arg1].getServo().read());
break; break;
#elif defined(USE_SERVOS2)
case CONFIG_SERVO:
myServos[arg1].initServo(arg1, arg2);
myServoPins[nServos] = arg1;
nServos++;
Serial.println("OK");
break;
case SERVO_WRITE:
myServos[arg1].setTargetPosition(arg2);
Serial.println("OK");
break;
case SERVO_READ:
Serial.println(myServos[arg1].getCurrentPosition());
break;
case SERVO_DELAY:
myServos[arg1].setServoDelay(arg1, arg2);
Serial.println("OK");
break;
case DETACH_SERVO:
myServos[arg1].getServo().detach();
Serial.println("OK");
break;
case ATTACH_SERVO:
myServos[arg1].getServo().attach(arg1);
Serial.println("OK");
break;
#endif #endif
#ifdef USE_BASE #ifdef USE_BASE
@ -209,6 +244,7 @@ int runCommand() {
Serial.println("OK"); Serial.println("OK");
break; break;
case UPDATE_PID: case UPDATE_PID:
i = 0;
while ((str = strtok_r(p, ":", &p)) != '\0') { while ((str = strtok_r(p, ":", &p)) != '\0') {
pid_args[i] = atoi(str); pid_args[i] = atoi(str);
i++; i++;
@ -334,6 +370,13 @@ void loop() {
for (i = 0; i < N_SERVOS; i++) { for (i = 0; i < N_SERVOS; i++) {
servos[i].doSweep(); servos[i].doSweep();
} }
#endif
#elif defined(USE_SERVOS2)
int i, pin;
for (i = 0; i < nServos; i++) {
pin = myServoPins[i];
myServos[pin].moveServo();
}
#endif
} }

View File

@ -10,14 +10,18 @@
#define PIN_MODE 'c' #define PIN_MODE 'c'
#define DIGITAL_READ 'd' #define DIGITAL_READ 'd'
#define READ_ENCODERS 'e' #define READ_ENCODERS 'e'
#define CONFIG_SERVO 'j'
#define MOTOR_SPEEDS 'm' #define MOTOR_SPEEDS 'm'
#define PING 'p' #define PING 'p'
#define RESET_ENCODERS 'r' #define RESET_ENCODERS 'r'
#define SERVO_WRITE 's' #define SERVO_WRITE 's'
#define SERVO_READ 't' #define SERVO_READ 't'
#define UPDATE_PID 'u' #define UPDATE_PID 'u'
#define SERVO_DELAY 'v'
#define DIGITAL_WRITE 'w' #define DIGITAL_WRITE 'w'
#define ANALOG_WRITE 'x' #define ANALOG_WRITE 'x'
#define ATTACH_SERVO 'y'
#define DETACH_SERVO 'z'
#define LEFT 0 #define LEFT 0
#define RIGHT 1 #define RIGHT 1

View File

@ -0,0 +1,45 @@
#ifndef SERVOS2_H
#define SERVOS2_H
#define MAX_N_SERVOS 48
// This delay in milliseconds determines the pause
// between each one degree step the servo travels. Increasing
// this number will make the servo sweep more slowly.
// Decreasing this number will make the servo sweep more quickly.
// Zero is the default number and will make the servos spin at
// full speed. 150 ms makes them spin very slowly.
class SweepServo2
{
public:
SweepServo2();
void initServo(
int servoPin,
int stepDelayMs);
void setServoDelay(
int servoPin,
int stepDelayMs);
void moveServo();
void setTargetPosition(int position);
void setServoDelay(int delay);
int getCurrentPosition();
Servo getServo();
private:
Servo servo;
int stepDelayMs;
int currentPositionDegrees;
int targetPositionDegrees;
long lastSweepCommand;
};
SweepServo2 myServos [MAX_N_SERVOS];
int myServoPins [MAX_N_SERVOS];
#endif

View File

@ -0,0 +1,89 @@
/***************************************************************
Servo Sweep - by Nathaniel Gallinger
Sweep servos one degree step at a time with a user defined
delay in between steps. Supports changing direction
mid-sweep. Important for applications such as robotic arms
where the stock servo speed is too fast for the strength
of your system.
*************************************************************/
#ifdef USE_SERVOS2
// NOTE: ServoTimer2.cpp moves the servo to 90 degrees after
// an attach() commmand so we set targetPositionDegrees to 90
// when constructing the servo object.
// Constructor
SweepServo2::SweepServo2()
{
this->currentPositionDegrees = 0;
this->targetPositionDegrees = 90;
this->lastSweepCommand = 0;
}
// Init
void SweepServo2::initServo(
int servoPin,
int stepDelayMs)
{
this->stepDelayMs = stepDelayMs;
this->currentPositionDegrees = 0;
this->targetPositionDegrees = 90;
this->lastSweepCommand = millis();
this->servo.attach(servoPin);
}
// Init
void SweepServo2::setServoDelay(
int servoPin,
int stepDelayMs)
{
this->stepDelayMs = stepDelayMs;
}
// Perform Sweep
void SweepServo2::moveServo()
{
// Get ellapsed time
int delta = millis() - this->lastSweepCommand;
// Check if time for a step
if (delta > this->stepDelayMs) {
// Check step direction
if (this->targetPositionDegrees > this->currentPositionDegrees) {
this->currentPositionDegrees++;
this->servo.write(this->currentPositionDegrees);
}
else if (this->targetPositionDegrees < this->currentPositionDegrees) {
this->currentPositionDegrees--;
this->servo.write(this->currentPositionDegrees);
}
// if target == current position, do nothing
// reset timer
this->lastSweepCommand = millis();
}
}
// Set a new target position
void SweepServo2::setTargetPosition(int position)
{
this->targetPositionDegrees = position;
}
int SweepServo2::getCurrentPosition()
{
return this->currentPositionDegrees;
}
// Accessor for servo object
Servo SweepServo2::getServo()
{
return this->servo;
}
#endif

View File

@ -12,13 +12,17 @@ add_message_files(FILES
) )
add_service_files(FILES add_service_files(FILES
DigitalSetDirection.srv
DigitalWrite.srv
DigitalRead.srv
ServoRead.srv
ServoWrite.srv
AnalogWrite.srv AnalogWrite.srv
AnalogRead.srv AnalogRead.srv
DigitalRead.srv
DigitalSetDirection.srv
DigitalWrite.srv
Enable.srv
Relax.srv
ServoRead.srv
ServoWrite.srv
SetSpeed.srv
SetServoSpeed.srv
) )
generate_messages( generate_messages(

View File

@ -0,0 +1,3 @@
bool enable
---
bool state

2
ros_arduino_msgs/srv/Relax.srv Executable file
View File

@ -0,0 +1,2 @@
---

View File

@ -0,0 +1,3 @@
uint8 id
float32 value
---

View File

@ -0,0 +1,2 @@
float32 value
---

View File

@ -24,9 +24,12 @@ from ros_arduino_python.arduino_driver import Arduino
from ros_arduino_python.arduino_sensors import * from ros_arduino_python.arduino_sensors import *
from ros_arduino_msgs.srv import * from ros_arduino_msgs.srv import *
from ros_arduino_python.base_controller import BaseController from ros_arduino_python.base_controller import BaseController
from ros_arduino_python.servo_controller import Servo, ServoController
from ros_arduino_python.joint_state_publisher import JointStatePublisher
from geometry_msgs.msg import Twist from geometry_msgs.msg import Twist
import os, time import os, time
import thread import thread
from math import radians
class ArduinoROS(): class ArduinoROS():
def __init__(self): def __init__(self):
@ -86,11 +89,11 @@ class ArduinoROS():
# A service to read the value of an analog sensor # A service to read the value of an analog sensor
rospy.Service('~analog_read', AnalogRead, self.AnalogReadHandler) rospy.Service('~analog_read', AnalogRead, self.AnalogReadHandler)
# Initialize the controlller # Initialize the device
self.controller = Arduino(self.port, self.baud, self.timeout) self.device = Arduino(self.port, self.baud, self.timeout)
# Make the connection # Make the connection
self.controller.connect() self.device.connect()
rospy.loginfo("Connected to Arduino on port " + self.port + " at " + str(self.baud) + " baud") rospy.loginfo("Connected to Arduino on port " + self.port + " at " + str(self.baud) + " baud")
@ -100,8 +103,10 @@ class ArduinoROS():
# Initialize any sensors # Initialize any sensors
self.mySensors = list() self.mySensors = list()
# Read in the sensors parameter dictionary
sensor_params = rospy.get_param("~sensors", dict({})) sensor_params = rospy.get_param("~sensors", dict({}))
# Initialize individual sensors appropriately
for name, params in sensor_params.iteritems(): for name, params in sensor_params.iteritems():
# Set the direction to input if not specified # Set the direction to input if not specified
try: try:
@ -110,19 +115,19 @@ class ArduinoROS():
params['direction'] = 'input' params['direction'] = 'input'
if params['type'] == "Ping": if params['type'] == "Ping":
sensor = Ping(self.controller, name, params['pin'], params['rate'], self.base_frame) sensor = Ping(self.device, name, params['pin'], params['rate'], self.base_frame)
elif params['type'] == "GP2D12": elif params['type'] == "GP2D12":
sensor = GP2D12(self.controller, name, params['pin'], params['rate'], self.base_frame) sensor = GP2D12(self.device, name, params['pin'], params['rate'], self.base_frame)
elif params['type'] == 'Digital': elif params['type'] == 'Digital':
sensor = DigitalSensor(self.controller, name, params['pin'], params['rate'], self.base_frame, direction=params['direction']) sensor = DigitalSensor(self.device, name, params['pin'], params['rate'], self.base_frame, direction=params['direction'])
elif params['type'] == 'Analog': elif params['type'] == 'Analog':
sensor = AnalogSensor(self.controller, name, params['pin'], params['rate'], self.base_frame, direction=params['direction']) sensor = AnalogSensor(self.device, name, params['pin'], params['rate'], self.base_frame, direction=params['direction'])
elif params['type'] == 'PololuMotorCurrent': elif params['type'] == 'PololuMotorCurrent':
sensor = PololuMotorCurrent(self.controller, name, params['pin'], params['rate'], self.base_frame) sensor = PololuMotorCurrent(self.device, name, params['pin'], params['rate'], self.base_frame)
elif params['type'] == 'PhidgetsVoltage': elif params['type'] == 'PhidgetsVoltage':
sensor = PhidgetsVoltage(self.controller, name, params['pin'], params['rate'], self.base_frame) sensor = PhidgetsVoltage(self.device, name, params['pin'], params['rate'], self.base_frame)
elif params['type'] == 'PhidgetsCurrent': elif params['type'] == 'PhidgetsCurrent':
sensor = PhidgetsCurrent(self.controller, name, params['pin'], params['rate'], self.base_frame) sensor = PhidgetsCurrent(self.device, name, params['pin'], params['rate'], self.base_frame)
# if params['type'] == "MaxEZ1": # if params['type'] == "MaxEZ1":
# self.sensors[len(self.sensors)]['trigger_pin'] = params['trigger_pin'] # self.sensors[len(self.sensors)]['trigger_pin'] = params['trigger_pin']
@ -130,12 +135,37 @@ class ArduinoROS():
self.mySensors.append(sensor) self.mySensors.append(sensor)
rospy.loginfo(name + " " + str(params) + " published on topic " + rospy.get_name() + "/sensor/" + name) rospy.loginfo(name + " " + str(params) + " published on topic " + rospy.get_name() + "/sensor/" + name)
# Initialize any joints (servos)
self.joints = dict()
# Read in the joints (if any)
joint_params = rospy.get_param("~joints", dict())
if len(joint_params) != 0:
self.have_joints = True
# Configure each servo
for name, params in joint_params.iteritems():
self.joints[name] = Servo(self.device, name)
# Display the joint setup on the terminal
rospy.loginfo(name + " " + str(params))
# The servo controller determines when to read and write position values to the servos
self.servo_controller = ServoController(self.device, self.joints, "ServoController")
# The joint state publisher publishes the latest joint values on the /joint_states topic
self.joint_state_publisher = JointStatePublisher()
else:
self.have_joints = False
# Initialize the base controller if used # Initialize the base controller if used
if self.use_base_controller: if self.use_base_controller:
self.myBaseController = BaseController(self.controller, self.base_frame) self.myBaseController = BaseController(self.device, self.base_frame)
# Start polling the sensors and base controller # Start polling the sensors, base controller, and servo controller
while not rospy.is_shutdown(): while not rospy.is_shutdown():
for sensor in self.mySensors: for sensor in self.mySensors:
mutex.acquire() mutex.acquire()
@ -146,6 +176,12 @@ class ArduinoROS():
mutex.acquire() mutex.acquire()
self.myBaseController.poll() self.myBaseController.poll()
mutex.release() mutex.release()
if self.have_joints:
mutex.acquire()
self.servo_controller.poll()
self.joint_state_publisher.poll(self.joints.values())
mutex.release()
# Publish all sensor values on a single topic for convenience # Publish all sensor values on a single topic for convenience
now = rospy.Time.now() now = rospy.Time.now()
@ -168,42 +204,59 @@ class ArduinoROS():
# Service callback functions # Service callback functions
def ServoWriteHandler(self, req): def ServoWriteHandler(self, req):
self.controller.servo_write(req.id, req.value) self.device.servo_write(req.id, req.value)
return ServoWriteResponse() return ServoWriteResponse()
def SetServoSpeedWriteHandler(self, req):
index = self.joint.values['pin'].index(req.pin)
name = self.joints.keys[index]
# Convert servo speed in deg/s to a step delay in milliseconds
step_delay = self.joints[name].get_step_delay(req.value)
# Update the servo speed
self.device.config_servo(pin, step_delay)
return SetServoSpeedResponse()
def ServoReadHandler(self, req): def ServoReadHandler(self, req):
pos = self.controller.servo_read(req.id) pos = self.device.servo_read(req.id)
return ServoReadResponse(pos) return ServoReadResponse(pos)
def DigitalSetDirectionHandler(self, req): def DigitalSetDirectionHandler(self, req):
self.controller.pin_mode(req.pin, req.direction) self.device.pin_mode(req.pin, req.direction)
return DigitalSetDirectionResponse() return DigitalSetDirectionResponse()
def DigitalWriteHandler(self, req): def DigitalWriteHandler(self, req):
self.controller.digital_write(req.pin, req.value) self.device.digital_write(req.pin, req.value)
return DigitalWriteResponse() return DigitalWriteResponse()
def DigitalReadHandler(self, req): def DigitalReadHandler(self, req):
value = self.controller.digital_read(req.pin) value = self.device.digital_read(req.pin)
return DigitalReadResponse(value) return DigitalReadResponse(value)
def AnalogWriteHandler(self, req): def AnalogWriteHandler(self, req):
self.controller.analog_write(req.pin, req.value) self.device.analog_write(req.pin, req.value)
return AnalogWriteResponse() return AnalogWriteResponse()
def AnalogReadHandler(self, req): def AnalogReadHandler(self, req):
value = self.controller.analog_read(req.pin) value = self.device.analog_read(req.pin)
return AnalogReadResponse(value) return AnalogReadResponse(value)
def shutdown(self): def shutdown(self):
# Stop the robot
try:
rospy.loginfo("Stopping the robot...")
self.cmd_vel_pub.Publish(Twist())
rospy.sleep(2)
except:
pass
rospy.loginfo("Shutting down Arduino Node...") rospy.loginfo("Shutting down Arduino Node...")
# Stop the robot
rospy.loginfo("Stopping the robot...")
self.cmd_vel_pub.publish(Twist())
rospy.sleep(2)
# Detach any servos
if self.have_joints:
rospy.loginfo("Detaching servos...")
for joint in self.joints.values():
self.device.detach_servo(joint.pin)
rospy.sleep(0.1)
if __name__ == '__main__': if __name__ == '__main__':
myArduino = ArduinoROS() myArduino = ArduinoROS()

View File

@ -309,11 +309,15 @@ class Arduino:
def pin_mode(self, pin, mode): def pin_mode(self, pin, mode):
return self.execute_ack('c %d %d' %(pin, mode)) return self.execute_ack('c %d %d' %(pin, mode))
def config_servo(self, pin, step_delay):
''' Configure a PWM servo '''
return self.execute_ack('j %d %d' %(pin, step_delay))
def servo_write(self, id, pos): def servo_write(self, id, pos):
''' Usage: servo_write(id, pos) ''' Usage: servo_write(id, pos)
Position is given in radians and converted to degrees before sending Position is given in radians and converted to degrees before sending
''' '''
return self.execute_ack('s %d %d' %(id, min(SERVO_MAX, max(SERVO_MIN, degrees(pos))))) return self.execute_ack('s %d %d' %(id, min(SERVO_MAX, max(SERVO_MIN, degrees(pos)))))
def servo_read(self, id): def servo_read(self, id):
@ -321,7 +325,25 @@ class Arduino:
The returned position is converted from degrees to radians The returned position is converted from degrees to radians
''' '''
return radians(self.execute('t %d' %id)) return radians(self.execute('t %d' %id))
def set_servo_delay(self, id, delay):
''' Usage: set_servo_delay(id, delay)
Set the delay in ms inbetween servo position updates. Controls speed of servo movement.
'''
return self.execute_ack('v %d %d' %(id, delay))
def detach_servo(self, id):
''' Usage: detach_servo(id)
Detach a servo from control by the Arduino
'''
return self.execute_ack('z %d' %id)
def attach_servo(self, id):
''' Usage: attach_servo(id)
Attach a servo to the Arduino
'''
return self.execute_ack('y %d' %id)
def ping(self, pin): def ping(self, pin):
''' The srf05/Ping command queries an SRF05/Ping sonar sensor ''' The srf05/Ping command queries an SRF05/Ping sonar sensor
connected to the General Purpose I/O line pinId for a distance, connected to the General Purpose I/O line pinId for a distance,

View File

@ -19,7 +19,6 @@
http://www.gnu.org/licenses/gpl.html http://www.gnu.org/licenses/gpl.html
""" """
import roslib
import rospy import rospy
from sensor_msgs.msg import Range from sensor_msgs.msg import Range
from ros_arduino_msgs.msg import * from ros_arduino_msgs.msg import *

View File

@ -20,7 +20,7 @@
http://www.gnu.org/licenses http://www.gnu.org/licenses
""" """
import roslib
import rospy import rospy
import os import os

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python
"""
Class to publish joint states used with the ROS Arduino Bridge package
Created for the Pi Robot Project: http://www.pirobot.org
Copyright (c) 2012 Patrick Goebel. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details at:
http://www.gnu.org/licenses/gpl.html
Borrowed heavily from Mike Ferguson's ArbotiX package.
Original copyright:
Copyright (c) 2011 Vanadium Labs LLC. All right reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Vanadium Labs LLC nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import rospy
from sensor_msgs.msg import JointState
class JointStatePublisher:
""" Class to publish joint_states message. """
def __init__(self):
self.rate = rospy.get_param("~joint_update_rate", 10.0)
self.t_delta = rospy.Duration(1.0 / self.rate)
self.t_next = rospy.Time.now() + self.t_delta
# Publisher
self.pub = rospy.Publisher('joint_states', JointState, queue_size=5)
def poll(self, joints):
""" Publish joint states. """
if rospy.Time.now() > self.t_next:
msg = JointState()
msg.header.stamp = rospy.Time.now()
msg.name = list()
msg.position = list()
msg.velocity = list()
for joint in joints:
msg.name.append(joint.name)
msg.position.append(joint.position)
msg.velocity.append(joint.velocity)
self.pub.publish(msg)
self.t_next = rospy.Time.now() + self.t_delta

View File

@ -0,0 +1,185 @@
#!/usr/bin/env python
"""
Servo class for the arudino_python package
Created for the Pi Robot Project: http://www.pirobot.org
Copyright (c) 2015 Patrick Goebel. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details at:
http://www.gnu.org/licenses/gpl.html
Borrowed heavily from Mike Feguson's ArbotiX servos_controller.py code.
"""
import rospy
from std_msgs.msg import Float64
from ros_arduino_msgs.srv import Relax, Enable, SetSpeed, SetSpeedResponse, RelaxResponse, EnableResponse
from math import radians, degrees
class Joint:
def __init__(self, device, name):
self.device = device
self.name = name
self.target_position = 0.0 # radians
self.position = 0.0 # radians
self.position_last = 0.0 # radians
self.velocity = 0.0 # rad/s
class Servo(Joint):
def __init__(self, device, name, ns="~joints"):
Joint.__init__(self, device, name)
# Construct the namespace for the joint
n = ns + "/" + name + "/"
# The Arduino pin used by this servo
self.pin = int(rospy.get_param(n + "pin"))
# Hobby servos have a rated speed giving in seconds per 60 degrees
# A value of 0.24 seconds per 60 degrees is typical.
self.rated_speed = rospy.get_param(n + "rated_speed", 0.24) # seconds per 60 degrees
self.max_deg_per_sec = 60.0 / self.rated_speed
self.ms_per_deg = 1000.0 / self.max_deg_per_sec
# Convert initial servo speed in deg/s to a step delay in milliseconds
step_delay = self.get_step_delay(rospy.get_param(n + "init_speed", 90.0))
# Update the servo speed
self.device.config_servo(self.pin, step_delay)
# Min/max/neutral values
self.neutral = radians(rospy.get_param(n + "neutral", 90.0)) # degrees
self.max_angle = radians(rospy.get_param(n + "max_angle", 90.0)) # degrees
self.min_angle = radians(rospy.get_param(n + "min_angle", -90.0)) # degrees
self.range = radians(rospy.get_param(n + "range", 180)) # degrees
self.max_speed = radians(rospy.get_param(n + "max_speed", 250.0)) # deg/s
# Do we want to reverse positive motion
self.invert = rospy.get_param(n + "invert", False)
# Where to we want the servo positioned
self.desired = self.neutral - radians(rospy.get_param(n + "init_position", 90))
# Where is the servo positioned now
self.position = 0.0
# Subscribe to the servo's command topic for setting its position
rospy.Subscriber("/" + name + '/command', Float64, self.command_cb)
# Provide a number of services for controlling the servos
rospy.Service(name + '/relax', Relax, self.relax_cb)
rospy.Service(name + '/enable', Enable, self.enable_cb)
rospy.Service(name + '/set_speed', SetSpeed, self.set_speed_cb)
def command_cb(self, req):
# Check limits
if req.data > self.max_angle:
req.data = self.max_angle
if req.data < self.min_angle:
req.data = self.min_angle
# Adjust for the neutral offset
if self.invert:
target_adjusted = self.neutral - req.data
else:
target_adjusted = self.neutral + req.data
# Set the target position for the next servo controller update
self.desired = target_adjusted
def get_step_delay(self, target_speed=90):
# Don't allow negative speeds
target_speed = abs(target_speed)
if target_speed > self.max_deg_per_sec:
rospy.logdebug("Target speed exceeds max servo speed. Using max.")
step_delay = 0
else:
# Catch division by zero and set to slowest speed possible
try:
step_delay = self.ms_per_deg * (self.max_deg_per_sec / target_speed - 1)
except:
step_delay = 32767
# Minimum step delay is 1 millisecond
step_delay = max(1, step_delay)
return int(step_delay)
def get_current_position(self):
return self.device.servo_read(self.pin) - self.neutral
def set_target_position(self):
return self.device.servo_read(self.pin)
def relax_cb(self, req):
self.device.detach_servo(self.pin)
return RelaxResponse()
def enable_cb(self, req):
if req.enable:
self.device.attach_servo(self.pin)
else:
self.device.detach_servo(self.pin)
return EnableResponse()
def set_speed_cb(self, req):
# Convert servo speed in deg/s to a step delay in milliseconds
step_delay = self.get_step_delay(req.value)
# Update the servo speed
self.device.set_servo_delay(self.pin, step_delay)
return SetSpeedResponse()
class ServoController():
def __init__(self, device, joints, name):
self.device = device
self.servos = list()
joint_update_rate = rospy.get_param("~joint_update_rate", 10.0)
# Get the servo objects from the joint list
for servo in joints.values():
self.servos.append(servo)
servo.position_last = servo.get_current_position()
self.w_delta = rospy.Duration(1.0 / joint_update_rate)
self.w_next = rospy.Time.now() + self.w_delta
self.r_delta = rospy.Duration(1.0 / joint_update_rate)
self.r_next = rospy.Time.now() + self.r_delta
def poll(self):
""" Read and write servo positions and velocities. """
if rospy.Time.now() > self.r_next:
for servo in self.servos:
servo.position = servo.get_current_position()
# Compute velocity
servo.velocity = (servo.position - servo.position_last) / self.r_delta.to_sec()
servo.position_last = servo.position
self.r_next = rospy.Time.now() + self.r_delta
if rospy.Time.now() > self.w_next:
for servo in self.servos:
self.device.servo_write(servo.pin, servo.desired)
self.w_next = rospy.Time.now() + self.w_delta