Removed DOS-like CR/LF line terminators in arduino_driver.py

This commit is contained in:
Patrick Goebel 2016-04-06 06:43:30 -07:00
parent 142e529de9
commit 59488c7b36

View File

@ -1,409 +1,429 @@
#!/usr/bin/env python #!/usr/bin/env python
""" """
A Python driver for the Arduino microcontroller running the A Python driver for the Arduino microcontroller running the
ROSArduinoBridge firmware. ROSArduinoBridge firmware.
Created for the Pi Robot Project: http://www.pirobot.org Created for the Pi Robot Project: http://www.pirobot.org
Copyright (c) 2012 Patrick Goebel. All rights reserved. Copyright (c) 2012 Patrick Goebel. All rights reserved.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or the Free Software Foundation; either version 2 of the License, or
(at your option) any later version. (at your option) any later version.
This program is distributed in the hope that it will be useful, This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details at: GNU General Public License for more details at:
http://www.gnu.org/licenses/gpl.html http://www.gnu.org/licenses/gpl.html
""" """
import thread import thread
from math import pi as PI, degrees, radians from math import pi as PI, degrees, radians
import os import os
import time import time
import sys, traceback import sys, traceback
from serial.serialutil import SerialException from serial.serialutil import SerialException
import serial import serial
class Arduino: class Arduino:
''' Configuration Parameters ''' Configuration Parameters
''' '''
N_ANALOG_PORTS = 6 N_ANALOG_PORTS = 6
N_DIGITAL_PORTS = 12 N_DIGITAL_PORTS = 12
def __init__(self, port="/dev/ttyUSB0", baudrate=57600, timeout=0.5): def __init__(self, port="/dev/ttyUSB0", baudrate=57600, timeout=0.5):
self.PID_RATE = 30 # Do not change this! It is a fixed property of the Arduino PID controller. self.PID_RATE = 30 # Do not change this! It is a fixed property of the Arduino PID controller.
self.PID_INTERVAL = 1000 / 30 self.PID_INTERVAL = 1000 / 30
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
self.timeout = timeout self.timeout = timeout
self.encoder_count = 0 self.encoder_count = 0
self.writeTimeout = timeout self.writeTimeout = timeout
self.interCharTimeout = timeout / 30. self.interCharTimeout = timeout / 30.
# Keep things thread safe # Keep things thread safe
self.mutex = thread.allocate_lock() self.mutex = thread.allocate_lock()
# An array to cache analog sensor readings # An array to cache analog sensor readings
self.analog_sensor_cache = [None] * self.N_ANALOG_PORTS self.analog_sensor_cache = [None] * self.N_ANALOG_PORTS
# An array to cache digital sensor readings # An array to cache digital sensor readings
self.digital_sensor_cache = [None] * self.N_DIGITAL_PORTS self.digital_sensor_cache = [None] * self.N_DIGITAL_PORTS
def connect(self): def connect(self):
try: try:
print "Connecting to Arduino on port", self.port, "..." print "Connecting to Arduino on port", self.port, "..."
# The port has to be open once with the default baud rate before opening again for real # The port has to be open once with the default baud rate before opening again for real
self.serial_port = serial.Serial(port=self.port) self.serial_port = serial.Serial(port=self.port)
# Needed for Leonardo only # Needed for Leonardo only
while not self.port: while not self.port:
time.sleep(self.timeout) time.sleep(self.timeout)
# Now open the port with the real settings # Now open the port with the real settings
self.serial_port = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout) self.serial_port = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout)
# Test the connection by reading the baudrate # Test the connection by reading the baudrate
test = self.get_baud() test = self.get_baud()
if test != self.baudrate: if test != self.baudrate:
time.sleep(self.timeout) time.sleep(self.timeout)
test = self.get_baud() test = self.get_baud()
if test != self.baudrate: if test != self.baudrate:
raise SerialException raise SerialException
print "Connected at", self.baudrate print "Connected at", self.baudrate
print "Arduino is ready." print "Arduino is ready."
except SerialException: except SerialException:
print "Serial Exception:" print "Serial Exception:"
print sys.exc_info() print sys.exc_info()
print "Traceback follows:" print "Traceback follows:"
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
print "Cannot connect to Arduino!" print "Cannot connect to Arduino!"
os._exit(1) os._exit(1)
def open(self): def open(self):
''' Open the serial port. ''' Open the serial port.
''' '''
self.serial_port.open() self.serial_port.open()
def close(self): def close(self):
''' Close the serial port. ''' Close the serial port.
''' '''
self.serial_port.close() self.serial_port.close()
def send(self, cmd): def send(self, cmd):
''' This command should not be used on its own: it is called by the execute commands ''' This command should not be used on its own: it is called by the execute commands
below in a thread safe manner. below in a thread safe manner.
''' '''
self.serial_port.write(cmd + '\r') self.serial_port.write(cmd + '\r')
def recv(self, timeout=0.5): def recv(self, timeout=0.5):
timeout = min(timeout, self.timeout) timeout = min(timeout, self.timeout)
''' This command should not be used on its own: it is called by the execute commands ''' This command should not be used on its own: it is called by the execute commands
below in a thread safe manner. Note: we use read() instead of readline() since below in a thread safe manner. Note: we use read() instead of readline() since
readline() tends to return garbage characters from the Arduino readline() tends to return garbage characters from the Arduino
''' '''
c = '' c = ''
value = '' value = ''
attempts = 0 start = time.time()
while c != '\r':
c = self.serial_port.read(1) while c != '\r':
value += c c = self.serial_port.read(1)
attempts += 1 value += c
if attempts * self.interCharTimeout > timeout: if time.time() - start > timeout:
return None return None
value = value.strip('\r') value = value.strip('\r')
return value return value
def recv_ack(self): # def recv(self, timeout=0.5):
''' This command should not be used on its own: it is called by the execute commands # timeout = min(timeout, self.timeout)
below in a thread safe manner. # ''' This command should not be used on its own: it is called by the execute commands
''' # below in a thread safe manner. Note: we use read() instead of readline() since
ack = self.recv(self.timeout) # readline() tends to return garbage characters from the Arduino
return ack == 'OK' # '''
# c = ''
def recv_int(self): # value = ''
''' This command should not be used on its own: it is called by the execute commands # attempts = 0
below in a thread safe manner. # while c != '\r':
''' # c = self.serial_port.read(1)
value = self.recv(self.timeout) # value += c
try: # attempts += 1
return int(value) # if attempts * self.interCharTimeout > timeout:
except: # return None
return None #
# value = value.strip('\r')
def recv_array(self): #
''' This command should not be used on its own: it is called by the execute commands # return value
below in a thread safe manner.
''' def recv_ack(self):
try: ''' This command should not be used on its own: it is called by the execute commands
values = self.recv(self.timeout * self.N_ANALOG_PORTS).split() below in a thread safe manner.
return map(int, values) '''
except: ack = self.recv(self.timeout)
return [] return ack == 'OK'
def execute(self, cmd): def recv_int(self):
''' Thread safe execution of "cmd" on the Arduino returning a single integer value. ''' This command should not be used on its own: it is called by the execute commands
''' below in a thread safe manner.
self.mutex.acquire() '''
value = self.recv(self.timeout)
try: try:
self.serial_port.flushInput() return int(value)
except: except:
pass return None
ntries = 1 def recv_array(self):
attempts = 0 ''' This command should not be used on its own: it is called by the execute commands
below in a thread safe manner.
try: '''
self.serial_port.write(cmd + '\r') try:
value = self.recv(self.timeout) values = self.recv(self.timeout * self.N_ANALOG_PORTS).split()
while attempts < ntries and (value == '' or value == 'Invalid Command' or value == None): return map(int, values)
try: except:
self.serial_port.flushInput() return []
self.serial_port.write(cmd + '\r')
value = self.recv(self.timeout) def execute(self, cmd):
except: ''' Thread safe execution of "cmd" on the Arduino returning a single integer value.
print "Exception executing command: " + cmd '''
attempts += 1 self.mutex.acquire()
except:
self.mutex.release() try:
print "Exception executing command: " + cmd self.serial_port.flushInput()
value = None except:
pass
self.mutex.release()
return int(value) ntries = 1
attempts = 0
def execute_array(self, cmd):
''' Thread safe execution of "cmd" on the Arduino returning an array. try:
''' self.serial_port.write(cmd + '\r')
self.mutex.acquire() value = self.recv(self.timeout)
while attempts < ntries and (value == '' or value == 'Invalid Command' or value == None):
try: try:
self.serial_port.flushInput() self.serial_port.flushInput()
except: self.serial_port.write(cmd + '\r')
pass value = self.recv(self.timeout)
except:
ntries = 1 print "Exception executing command: " + cmd
attempts = 0 attempts += 1
except:
try: self.mutex.release()
self.serial_port.write(cmd + '\r') print "Exception executing command: " + cmd
values = self.recv_array() value = None
while attempts < ntries and (values == '' or values == 'Invalid Command' or values == [] or values == None):
try: self.mutex.release()
self.serial_port.flushInput() return int(value)
self.serial_port.write(cmd + '\r')
values = self.recv_array() def execute_array(self, cmd):
except: ''' Thread safe execution of "cmd" on the Arduino returning an array.
print("Exception executing command: " + cmd) '''
attempts += 1 self.mutex.acquire()
except:
self.mutex.release() try:
print "Exception executing command: " + cmd self.serial_port.flushInput()
raise SerialException except:
return [] pass
try: ntries = 1
values = map(int, values) attempts = 0
except:
values = [] try:
self.serial_port.write(cmd + '\r')
self.mutex.release() values = self.recv_array()
return values while attempts < ntries and (values == '' or values == 'Invalid Command' or values == [] or values == None):
try:
def execute_ack(self, cmd): self.serial_port.flushInput()
''' Thread safe execution of "cmd" on the Arduino returning True if response is ACK. self.serial_port.write(cmd + '\r')
''' values = self.recv_array()
self.mutex.acquire() except:
print("Exception executing command: " + cmd)
try: attempts += 1
self.serial_port.flushInput() except:
except: self.mutex.release()
pass print "Exception executing command: " + cmd
raise SerialException
ntries = 1 return []
attempts = 0
try:
try: values = map(int, values)
self.serial_port.write(cmd + '\r') except:
ack = self.recv(self.timeout) values = []
while attempts < ntries and (ack == '' or ack == 'Invalid Command' or ack == None):
try: self.mutex.release()
self.serial_port.flushInput() return values
self.serial_port.write(cmd + '\r')
ack = self.recv(self.timeout) def execute_ack(self, cmd):
except: ''' Thread safe execution of "cmd" on the Arduino returning True if response is ACK.
print "Exception executing command: " + cmd '''
attempts += 1 self.mutex.acquire()
except:
self.mutex.release() try:
print "execute_ack exception when executing", cmd self.serial_port.flushInput()
print sys.exc_info() except:
return 0 pass
self.mutex.release() ntries = 1
return ack == 'OK' attempts = 0
def update_pid(self, Kp, Kd, Ki, Ko): try:
''' Set the PID parameters on the Arduino self.serial_port.write(cmd + '\r')
''' ack = self.recv(self.timeout)
print "Updating PID parameters" while attempts < ntries and (ack == '' or ack == 'Invalid Command' or ack == None):
cmd = 'u ' + str(Kp) + ':' + str(Kd) + ':' + str(Ki) + ':' + str(Ko) try:
self.execute_ack(cmd) self.serial_port.flushInput()
self.serial_port.write(cmd + '\r')
def get_baud(self): ack = self.recv(self.timeout)
''' Get the current baud rate on the serial port. except:
''' print "Exception executing command: " + cmd
try: attempts += 1
return int(self.execute('b')) except:
except: self.mutex.release()
print "Failed to determine baud rate of Arduino so aborting!" print "execute_ack exception when executing", cmd
os._exit(1) print sys.exc_info()
return 0
def get_encoder_counts(self):
values = self.execute_array('e') self.mutex.release()
if len(values) != 2: return ack == 'OK'
print "Encoder count was not 2"
raise SerialException def update_pid(self, Kp, Kd, Ki, Ko):
return None ''' Set the PID parameters on the Arduino
else: '''
return values print "Updating PID parameters"
cmd = 'u ' + str(Kp) + ':' + str(Kd) + ':' + str(Ki) + ':' + str(Ko)
def reset_encoders(self): self.execute_ack(cmd)
''' Reset the encoder counts to 0
''' def get_baud(self):
return self.execute_ack('r') ''' Get the current baud rate on the serial port.
'''
def drive(self, right, left): try:
''' Speeds are given in encoder ticks per PID interval return int(self.execute('b'))
''' except:
return self.execute_ack('m %d %d' %(right, left)) print "Failed to determine baud rate of Arduino so aborting!"
os._exit(1)
def drive_m_per_s(self, right, left):
''' Set the motor speeds in meters per second. def get_encoder_counts(self):
''' values = self.execute_array('e')
left_revs_per_second = float(left) / (self.wheel_diameter * PI) if len(values) != 2:
right_revs_per_second = float(right) / (self.wheel_diameter * PI) print "Encoder count was not 2"
raise SerialException
left_ticks_per_loop = int(left_revs_per_second * self.encoder_resolution * self.PID_INTERVAL * self.gear_reduction) return None
right_ticks_per_loop = int(right_revs_per_second * self.encoder_resolution * self.PID_INTERVAL * self.gear_reduction) else:
return values
self.drive(right_ticks_per_loop , left_ticks_per_loop )
def reset_encoders(self):
def stop(self): ''' Reset the encoder counts to 0
''' Stop both motors. '''
''' return self.execute_ack('r')
self.drive(0, 0)
def drive(self, right, left):
def analog_pin_mode(self, pin, mode): ''' Speeds are given in encoder ticks per PID interval
return self.execute_ack('c A%d %d' %(pin, mode)) '''
return self.execute_ack('m %d %d' %(right, left))
def analog_read(self, pin):
return self.execute('a %d' %pin) def drive_m_per_s(self, right, left):
''' Set the motor speeds in meters per second.
def analog_write(self, pin, value): '''
return self.execute_ack('x %d %d' %(pin, value)) left_revs_per_second = float(left) / (self.wheel_diameter * PI)
right_revs_per_second = float(right) / (self.wheel_diameter * PI)
def digital_read(self, pin):
return self.execute('d %d' %pin) left_ticks_per_loop = int(left_revs_per_second * self.encoder_resolution * self.PID_INTERVAL * self.gear_reduction)
right_ticks_per_loop = int(right_revs_per_second * self.encoder_resolution * self.PID_INTERVAL * self.gear_reduction)
def digital_write(self, pin, value):
return self.execute_ack('w %d %d' %(pin, value)) self.drive(right_ticks_per_loop , left_ticks_per_loop )
def digital_pin_mode(self, pin, mode): def stop(self):
return self.execute_ack('c %d %d' %(pin, mode)) ''' Stop both motors.
'''
def config_servo(self, pin, step_delay): self.drive(0, 0)
''' Configure a PWM servo '''
return self.execute_ack('j %d %u' %(pin, step_delay)) def analog_pin_mode(self, pin, mode):
return self.execute_ack('c A%d %d' %(pin, mode))
def servo_write(self, id, pos):
''' Usage: servo_write(id, pos) def analog_read(self, pin):
Position is given in degrees from 0-180 return self.execute('a %d' %pin)
'''
return self.execute_ack('s %d %d' %(id, pos)) def analog_write(self, pin, value):
return self.execute_ack('x %d %d' %(pin, value))
def servo_read(self, id):
''' Usage: servo_read(id) def digital_read(self, pin):
The returned position is in degrees return self.execute('d %d' %pin)
'''
return int(self.execute('t %d' %id)) def digital_write(self, pin, value):
return self.execute_ack('w %d %d' %(pin, value))
def set_servo_delay(self, id, delay):
''' Usage: set_servo_delay(id, delay) def digital_pin_mode(self, pin, mode):
Set the delay in ms inbetween servo position updates. Controls speed of servo movement. return self.execute_ack('c %d %d' %(pin, mode))
'''
return self.execute_ack('v %d %d' %(id, delay)) def config_servo(self, pin, step_delay):
''' Configure a PWM servo '''
def detach_servo(self, id): return self.execute_ack('j %d %u' %(pin, step_delay))
''' Usage: detach_servo(id)
Detach a servo from control by the Arduino def servo_write(self, id, pos):
''' ''' Usage: servo_write(id, pos)
return self.execute_ack('z %d' %id) Position is given in degrees from 0-180
'''
def attach_servo(self, id): return self.execute_ack('s %d %d' %(id, pos))
''' Usage: attach_servo(id)
Attach a servo to the Arduino def servo_read(self, id):
''' ''' Usage: servo_read(id)
return self.execute_ack('y %d' %id) The returned position is in degrees
'''
def ping(self, pin): return int(self.execute('t %d' %id))
''' The srf05/Ping command queries an SRF05/Ping sonar sensor
connected to the General Purpose I/O line pinId for a distance, def set_servo_delay(self, id, delay):
and returns the range in cm. Sonar distance resolution is integer based. ''' Usage: set_servo_delay(id, delay)
''' Set the delay in ms inbetween servo position updates. Controls speed of servo movement.
return self.execute('p %d' %pin); '''
return self.execute_ack('v %d %d' %(id, delay))
# def get_maxez1(self, triggerPin, outputPin):
# ''' The maxez1 command queries a Maxbotix MaxSonar-EZ1 sonar def detach_servo(self, id):
# sensor connected to the General Purpose I/O lines, triggerPin, and ''' Usage: detach_servo(id)
# outputPin, for a distance, and returns it in Centimeters. NOTE: MAKE Detach a servo from control by the Arduino
# SURE there's nothing directly in front of the MaxSonar-EZ1 upon '''
# power up, otherwise it wont range correctly for object less than 6 return self.execute_ack('z %d' %id)
# inches away! The sensor reading defaults to use English units
# (inches). The sensor distance resolution is integer based. Also, the def attach_servo(self, id):
# maxsonar trigger pin is RX, and the echo pin is PW. ''' Usage: attach_servo(id)
# ''' Attach a servo to the Arduino
# return self.execute('z %d %d' %(triggerPin, outputPin)) '''
return self.execute_ack('y %d' %id)
""" Basic test for connectivity """ def ping(self, pin):
if __name__ == "__main__": ''' The srf05/Ping command queries an SRF05/Ping sonar sensor
if os.name == "posix": connected to the General Purpose I/O line pinId for a distance,
portName = "/dev/ttyACM0" and returns the range in cm. Sonar distance resolution is integer based.
else: '''
portName = "COM43" # Windows style COM port. return self.execute('p %d' %pin);
baudRate = 57600 # def get_maxez1(self, triggerPin, outputPin):
# ''' The maxez1 command queries a Maxbotix MaxSonar-EZ1 sonar
myArduino = Arduino(port=portName, baudrate=baudRate, timeout=0.5) # sensor connected to the General Purpose I/O lines, triggerPin, and
myArduino.connect() # outputPin, for a distance, and returns it in Centimeters. NOTE: MAKE
# SURE there's nothing directly in front of the MaxSonar-EZ1 upon
print "Sleeping for 1 second..." # power up, otherwise it wont range correctly for object less than 6
time.sleep(1) # inches away! The sensor reading defaults to use English units
# (inches). The sensor distance resolution is integer based. Also, the
print "Reading on analog port 0", myArduino.analog_read(0) # maxsonar trigger pin is RX, and the echo pin is PW.
print "Reading on digital port 0", myArduino.digital_read(0) # '''
print "Blinking the LED 3 times" # return self.execute('z %d %d' %(triggerPin, outputPin))
for i in range(3):
myArduino.digital_write(13, 1)
time.sleep(1.0) """ Basic test for connectivity """
#print "Current encoder counts", myArduino.encoders() if __name__ == "__main__":
if os.name == "posix":
print "Connection test successful.", portName = "/dev/ttyACM0"
else:
myArduino.stop() portName = "COM43" # Windows style COM port.
myArduino.close()
baudRate = 57600
print "Shutting down Arduino."
myArduino = Arduino(port=portName, baudrate=baudRate, timeout=0.5)
myArduino.connect()
print "Sleeping for 1 second..."
time.sleep(1)
print "Reading on analog port 0", myArduino.analog_read(0)
print "Reading on digital port 0", myArduino.digital_read(0)
print "Blinking the LED 3 times"
for i in range(3):
myArduino.digital_write(13, 1)
time.sleep(1.0)
#print "Current encoder counts", myArduino.encoders()
print "Connection test successful.",
myArduino.stop()
myArduino.close()
print "Shutting down Arduino."