TB3-源码剖析-5.颜色(交通灯)检测
Python
import rospy
import numpy as np
import cv2
from enum import Enum
from std_msgs.msg import UInt8, Float64
from sensor_msgs.msg import Image, CompressedImage
from cv_bridge import CvBridge, CvBridgeError
from dynamic_reconfigure.server import Server
from turtlebot3_autorace_detect.cfg import DetectTrafficLightParamsConfig
class DetectTrafficLight():
def __init__(self):
self.hue_red_l = rospy.get_param("~detect/lane/red/hue_l", 0)
self.hue_red_h = rospy.get_param("~detect/lane/red/hue_h", 26)
self.saturation_red_l = rospy.get_param("~detect/lane/red/saturation_l", 239)
self.saturation_red_h = rospy.get_param("~detect/lane/red/saturation_h", 255)
self.lightness_red_l = rospy.get_param("~detect/lane/red/lightness_l", 123)
self.lightness_red_h = rospy.get_param("~detect/lane/red/lightness_h", 250)
self.hue_yellow_l = rospy.get_param("~detect/lane/yellow/hue_l", 19)
self.hue_yellow_h = rospy.get_param("~detect/lane/yellow/hue_h", 33)
self.saturation_yellow_l = rospy.get_param("~detect/lane/yellow/saturation_l", 237)
self.saturation_yellow_h = rospy.get_param("~detect/lane/yellow/saturation_h", 255)
self.lightness_yellow_l = rospy.get_param("~detect/lane/yellow/lightness_l", 231)
self.lightness_yellow_h = rospy.get_param("~detect/lane/yellow/lightness_h", 255)
self.hue_green_l = rospy.get_param("~detect/lane/green/hue_l", 40)
self.hue_green_h = rospy.get_param("~detect/lane/green/hue_h", 113)
self.saturation_green_l = rospy.get_param("~detect/lane/green/saturation_l", 210)
self.saturation_green_h = rospy.get_param("~detect/lane/green/saturation_h", 255)
self.lightness_green_l = rospy.get_param("~detect/lane/green/lightness_l", 131)
self.lightness_green_h = rospy.get_param("~detect/lane/green/lightness_h", 255)
self.is_calibration_mode = rospy.get_param("~is_detection_calibration_mode", False)
if self.is_calibration_mode == True:
srv_detect_lane = Server(DetectTrafficLightParamsConfig, self.cbGetDetectTrafficLightParam)
self.sub_image_type = "compressed" # "compressed" / "raw"
self.pub_image_type = "compressed" # "compressed" / "raw"
self.counter = 1
if self.sub_image_type == "compressed":
# subscribes compressed image
self.sub_image_original = rospy.Subscriber('/detect/image_input/compressed', CompressedImage, self.cbGetImage, queue_size = 1)
elif self.sub_image_type == "raw":
# subscribes raw image
self.sub_image_original = rospy.Subscriber('/detect/image_input', Image, self.cbGetImage, queue_size = 1)
if self.pub_image_type == "compressed":
# publishes compensated image in compressed type
self.pub_image_traffic_light = rospy.Publisher('/detect/image_output/compressed', CompressedImage, queue_size = 1)
elif self.pub_image_type == "raw":
# publishes compensated image in raw type
self.pub_image_traffic_light = rospy.Publisher('/detect/image_output', Image, queue_size = 1)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes light image in compressed type
self.pub_image_red_light = rospy.Publisher('/detect/image_output_sub1/compressed', CompressedImage, queue_size = 1)
self.pub_image_yellow_light = rospy.Publisher('/detect/image_output_sub2/compressed', CompressedImage, queue_size = 1)
self.pub_image_green_light = rospy.Publisher('/detect/image_output_sub3/compressed', CompressedImage, queue_size = 1)
elif self.pub_image_type == "raw":
# publishes light image in raw type
self.pub_image_red_light = rospy.Publisher('/detect/image_output_sub1', Image, queue_size = 1)
self.pub_image_yellow_light = rospy.Publisher('/detect/image_output_sub2', Image, queue_size = 1)
self.pub_image_green_light = rospy.Publisher('/detect/image_output_sub3', Image, queue_size = 1)
self.sub_traffic_light_finished = rospy.Subscriber('/control/traffic_light_finished', UInt8, self.cbTrafficLightFinished, queue_size = 1)
self.pub_traffic_light_return = rospy.Publisher('/detect/traffic_light_stamped', UInt8, queue_size=1)
self.pub_parking_start = rospy.Publisher('/control/traffic_light_start', UInt8, queue_size = 1)
self.pub_traffic_light = rospy.Publisher('/detect/traffic_light', UInt8, queue_size=1)
self.CurrentMode = Enum('CurrentMode', 'idle lane_following traffic_light')
self.cvBridge = CvBridge()
self.cv_image = None
self.is_image_available = False
self.is_traffic_light_finished = False
self.green_count = 0
self.yellow_count = 0
self.red_count = 0
self.stop_count = 0
self.off_traffic = False
rospy.sleep(1)
loop_rate = rospy.Rate(10)
while not rospy.is_shutdown():
if self.is_image_available == True:
if self.is_traffic_light_finished == False:
self.fnFindTrafficLight()
loop_rate.sleep()
def cbGetDetectTrafficLightParam(self, config, level):
rospy.loginfo("[Detect Traffic Light] Detect Traffic Light Calibration Parameter reconfigured to")
rospy.loginfo("hue_red_l : %d", config.hue_red_l)
rospy.loginfo("hue_red_h : %d", config.hue_red_h)
rospy.loginfo("saturation_red_l : %d", config.saturation_red_l)
rospy.loginfo("saturation_red_h : %d", config.saturation_red_h)
rospy.loginfo("lightness_red_l : %d", config.lightness_red_l)
rospy.loginfo("lightness_red_h : %d", config.lightness_red_h)
rospy.loginfo("hue_yellow_l : %d", config.hue_yellow_l)
rospy.loginfo("hue_yellow_h : %d", config.hue_yellow_h)
rospy.loginfo("saturation_yellow_l : %d", config.saturation_yellow_l)
rospy.loginfo("saturation_yellow_h : %d", config.saturation_yellow_h)
rospy.loginfo("lightness_yellow_l : %d", config.lightness_yellow_l)
rospy.loginfo("lightness_yellow_h : %d", config.lightness_yellow_h)
rospy.loginfo("hue_green_l : %d", config.hue_green_l)
rospy.loginfo("hue_green_h : %d", config.hue_green_h)
rospy.loginfo("saturation_green_l : %d", config.saturation_green_l)
rospy.loginfo("saturation_green_h : %d", config.saturation_green_h)
rospy.loginfo("lightness_green_l : %d", config.lightness_green_l)
rospy.loginfo("lightness_green_h : %d", config.lightness_green_h)
self.hue_red_l = config.hue_red_l
self.hue_red_h = config.hue_red_h
self.saturation_red_l = config.saturation_red_l
self.saturation_red_h = config.saturation_red_h
self.lightness_red_l = config.lightness_red_l
self.lightness_red_h = config.lightness_red_h
self.hue_yellow_l = config.hue_yellow_l
self.hue_yellow_h = config.hue_yellow_h
self.saturation_yellow_l = config.saturation_yellow_l
self.saturation_yellow_h = config.saturation_yellow_h
self.lightness_yellow_l = config.lightness_yellow_l
self.lightness_yellow_h = config.lightness_yellow_h
self.hue_green_l = config.hue_green_l
self.hue_green_h = config.hue_green_h
self.saturation_green_l = config.saturation_green_l
self.saturation_green_h = config.saturation_green_h
self.lightness_green_l = config.lightness_green_l
self.lightness_green_h = config.lightness_green_h
return config
def cbGetImage(self, image_msg):
# drop the frame to 1/5 (6fps) because of the processing speed. This is up to your computer's operating power.
if self.counter % 3 != 0:
self.counter += 1
return
else:
self.counter = 1
if self.sub_image_type == "compressed":
np_arr = np.frombuffer(image_msg.data, np.uint8)
self.cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
else:
self.cv_image = self.cvBridge.imgmsg_to_cv2(image_msg, "bgr8")
self.is_image_available = True
def fnFindTrafficLight(self):
cv_image_mask = self.fnMaskGreenTrafficLight()
cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0)
status1 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'green')
if status1 == 1 or status1 == 5:
rospy.loginfo("detect GREEN")
self.stop_count = 0
self.green_count += 1
else:
self.green_count = 0
cv_image_mask = self.fnMaskYellowTrafficLight()
cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0)
status2 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'yellow')
if status2 == 2:
rospy.loginfo("detect YELLOW")
self.yellow_count += 1
else:
self.yellow_count = 0
cv_image_mask = self.fnMaskRedTrafficLight()
cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0)
status3 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'red')
if status3 == 3:
rospy.loginfo("detect RED")
self.red_count += 1
elif status3 == 4:
self.red_count = 0
self.stop_count += 1
else:
self.red_count = 0
self.stop_count = 0
if self.green_count >= 3:
rospy.loginfo("GREEN")
cv2.putText(self.cv_image,"GREEN", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (80, 255, 0))
msg_sign = UInt8()
msg_sign.data = self.CurrentMode.lane_following.value
self.pub_traffic_light.publish(msg_sign)
self.is_traffic_light_finished = True
if self.yellow_count >= 3:
rospy.loginfo("YELLOW")
cv2.putText(self.cv_image,"YELLOW", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 255, 255))
if self.red_count >= 3:
rospy.loginfo("RED")
cv2.putText(self.cv_image,"RED", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 0, 255))
if self.stop_count >= 8:
rospy.loginfo("STOP")
self.off_traffic = True
cv2.putText(self.cv_image,"STOP", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 0, 255))
if self.pub_image_type == "compressed":
# publishes traffic light image in compressed type
self.pub_image_traffic_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(self.cv_image, "jpg"))
elif self.pub_image_type == "raw":
# publishes traffic light image in raw type
self.pub_image_traffic_light.publish(self.cvBridge.cv2_to_imgmsg(self.cv_image, "bgr8"))
def fnMaskRedTrafficLight(self):
image = np.copy(self.cv_image)
# Convert BGR to HSV
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
Hue_l = self.hue_red_l
Hue_h = self.hue_red_h
Saturation_l = self.saturation_red_l
Saturation_h = self.saturation_red_h
Lightness_l = self.lightness_red_l
Lightness_h = self.lightness_red_h
# define range of red color in HSV
lower_red = np.array([Hue_l, Saturation_l, Lightness_l])
upper_red = np.array([Hue_h, Saturation_h, Lightness_h])
# Threshold the HSV image to get only red colors
mask = cv2.inRange(hsv, lower_red, upper_red)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(image, image, mask = mask)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes red light filtered image in compressed type
self.pub_image_red_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))
elif self.pub_image_type == "raw":
# publishes red light filtered image in raw type
self.pub_image_red_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8"))
mask = cv2.bitwise_not(mask)
return mask
def fnMaskYellowTrafficLight(self):
image = np.copy(self.cv_image)
# Convert BGR to HSV
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
Hue_l = self.hue_yellow_l
Hue_h = self.hue_yellow_h
Saturation_l = self.saturation_yellow_l
Saturation_h = self.saturation_yellow_h
Lightness_l = self.lightness_yellow_l
Lightness_h = self.lightness_yellow_h
# define range of yellow color in HSV
lower_yellow = np.array([Hue_l, Saturation_l, Lightness_l])
upper_yellow = np.array([Hue_h, Saturation_h, Lightness_h])
# Threshold the HSV image to get only yellow colors
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(image, image, mask = mask)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes yellow light filtered image in compressed type
self.pub_image_yellow_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))
elif self.pub_image_type == "raw":
# publishes yellow light filtered image in raw type
self.pub_image_yellow_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8"))
mask = cv2.bitwise_not(mask)
return mask
def fnMaskGreenTrafficLight(self):
image = np.copy(self.cv_image)
# Convert BGR to HSV
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
Hue_l = self.hue_green_l
Hue_h = self.hue_green_h
Saturation_l = self.saturation_green_l
Saturation_h = self.saturation_green_h
Lightness_l = self.lightness_green_l
Lightness_h = self.lightness_green_h
# define range of green color in HSV
lower_green = np.array([Hue_l, Saturation_l, Lightness_l])
upper_green = np.array([Hue_h, Saturation_h, Lightness_h])
# Threshold the HSV image to get only green colors
mask = cv2.inRange(hsv, lower_green, upper_green)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(image, image, mask = mask)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes green light filtered image in compressed type
self.pub_image_green_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))
elif self.pub_image_type == "raw":
# publishes green light filtered image in raw type
self.pub_image_green_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8"))
mask = cv2.bitwise_not(mask)
return mask
def fnFindCircleOfTrafficLight(self, mask, find_color):
status = 0
params=cv2.SimpleBlobDetector_Params()
# Change thresholds
params.minThreshold = 0
params.maxThreshold = 255
# Filter by Area.
params.filterByArea = True
params.minArea = 50
params.maxArea = 600
# Filter by Circularity
params.filterByCircularity = True
params.minCircularity = 0.4
# Filter by Convexity
params.filterByConvexity = True
params.minConvexity = 0.6
det=cv2.SimpleBlobDetector_create(params)
keypts=det.detect(mask)
frame=cv2.drawKeypoints(self.cv_image,keypts,np.array([]),(0,255,255),cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
col1 = 180
col2 = 270
col3 = 305
low1 = 50
low2 = 170
low3 = 170
# if detected more than 1 light
for i in range(len(keypts)):
self.point_col = int(keypts[i].pt[0])
self.point_low = int(keypts[i].pt[1])
if self.point_col > col1 and self.point_col < col2 and self.point_low > low1 and self.point_low < low2:
if find_color == 'green':
status = 1
elif find_color == 'yellow':
status = 2
elif find_color == 'red':
status = 3
elif self.point_col > col2 and self.point_col < col3 and self.point_low > low1 and self.point_low < low3:
if find_color == 'red':
status = 4
elif find_color == 'green':
status = 5
else:
status = 6
return status
def cbTrafficLightFinished(self, traffic_light_finished_msg):
self.is_traffic_light_finished = True
def main(self):
rospy.spin()
if __name__ == '__main__':
rospy.init_node('detect_traffic_light')
node = DetectTrafficLight()
node.main()
Footer
<< 上一篇
下一篇 >>