TB3-源码剖析-5.颜色(交通灯)检测
import rospy import numpy as np import cv2 from enum import Enum from std_msgs.msg import UInt8, Float64 from sensor_msgs.msg import Image, CompressedImage from cv_bridge import CvBridge, CvBridgeError from dynamic_reconfigure.server import Server from turtlebot3_autorace_detect.cfg import DetectTrafficLightParamsConfig class DetectTrafficLight(): def __init__(self): self.hue_red_l = rospy.get_param("~detect/lane/red/hue_l", 0) self.hue_red_h = rospy.get_param("~detect/lane/red/hue_h", 26) self.saturation_red_l = rospy.get_param("~detect/lane/red/saturation_l", 239) self.saturation_red_h = rospy.get_param("~detect/lane/red/saturation_h", 255) self.lightness_red_l = rospy.get_param("~detect/lane/red/lightness_l", 123) self.lightness_red_h = rospy.get_param("~detect/lane/red/lightness_h", 250) self.hue_yellow_l = rospy.get_param("~detect/lane/yellow/hue_l", 19) self.hue_yellow_h = rospy.get_param("~detect/lane/yellow/hue_h", 33) self.saturation_yellow_l = rospy.get_param("~detect/lane/yellow/saturation_l", 237) self.saturation_yellow_h = rospy.get_param("~detect/lane/yellow/saturation_h", 255) self.lightness_yellow_l = rospy.get_param("~detect/lane/yellow/lightness_l", 231) self.lightness_yellow_h = rospy.get_param("~detect/lane/yellow/lightness_h", 255) self.hue_green_l = rospy.get_param("~detect/lane/green/hue_l", 40) self.hue_green_h = rospy.get_param("~detect/lane/green/hue_h", 113) self.saturation_green_l = rospy.get_param("~detect/lane/green/saturation_l", 210) self.saturation_green_h = rospy.get_param("~detect/lane/green/saturation_h", 255) self.lightness_green_l = rospy.get_param("~detect/lane/green/lightness_l", 131) self.lightness_green_h = rospy.get_param("~detect/lane/green/lightness_h", 255) self.is_calibration_mode = rospy.get_param("~is_detection_calibration_mode", False) if self.is_calibration_mode == True: srv_detect_lane = Server(DetectTrafficLightParamsConfig, self.cbGetDetectTrafficLightParam) self.sub_image_type = "compressed" # "compressed" / "raw" self.pub_image_type = "compressed" # "compressed" / "raw" self.counter = 1 if self.sub_image_type == "compressed": # subscribes compressed image self.sub_image_original = rospy.Subscriber('/detect/image_input/compressed', CompressedImage, self.cbGetImage, queue_size = 1) elif self.sub_image_type == "raw": # subscribes raw image self.sub_image_original = rospy.Subscriber('/detect/image_input', Image, self.cbGetImage, queue_size = 1) if self.pub_image_type == "compressed": # publishes compensated image in compressed type self.pub_image_traffic_light = rospy.Publisher('/detect/image_output/compressed', CompressedImage, queue_size = 1) elif self.pub_image_type == "raw": # publishes compensated image in raw type self.pub_image_traffic_light = rospy.Publisher('/detect/image_output', Image, queue_size = 1) if self.is_calibration_mode == True: if self.pub_image_type == "compressed": # publishes light image in compressed type self.pub_image_red_light = rospy.Publisher('/detect/image_output_sub1/compressed', CompressedImage, queue_size = 1) self.pub_image_yellow_light = rospy.Publisher('/detect/image_output_sub2/compressed', CompressedImage, queue_size = 1) self.pub_image_green_light = rospy.Publisher('/detect/image_output_sub3/compressed', CompressedImage, queue_size = 1) elif self.pub_image_type == "raw": # publishes light image in raw type self.pub_image_red_light = rospy.Publisher('/detect/image_output_sub1', Image, queue_size = 1) self.pub_image_yellow_light = rospy.Publisher('/detect/image_output_sub2', Image, queue_size = 1) self.pub_image_green_light = rospy.Publisher('/detect/image_output_sub3', Image, queue_size = 1) self.sub_traffic_light_finished = rospy.Subscriber('/control/traffic_light_finished', UInt8, self.cbTrafficLightFinished, queue_size = 1) self.pub_traffic_light_return = rospy.Publisher('/detect/traffic_light_stamped', UInt8, queue_size=1) self.pub_parking_start = rospy.Publisher('/control/traffic_light_start', UInt8, queue_size = 1) self.pub_traffic_light = rospy.Publisher('/detect/traffic_light', UInt8, queue_size=1) self.CurrentMode = Enum('CurrentMode', 'idle lane_following traffic_light') self.cvBridge = CvBridge() self.cv_image = None self.is_image_available = False self.is_traffic_light_finished = False self.green_count = 0 self.yellow_count = 0 self.red_count = 0 self.stop_count = 0 self.off_traffic = False rospy.sleep(1) loop_rate = rospy.Rate(10) while not rospy.is_shutdown(): if self.is_image_available == True: if self.is_traffic_light_finished == False: self.fnFindTrafficLight() loop_rate.sleep() def cbGetDetectTrafficLightParam(self, config, level): rospy.loginfo("[Detect Traffic Light] Detect Traffic Light Calibration Parameter reconfigured to") rospy.loginfo("hue_red_l : %d", config.hue_red_l) rospy.loginfo("hue_red_h : %d", config.hue_red_h) rospy.loginfo("saturation_red_l : %d", config.saturation_red_l) rospy.loginfo("saturation_red_h : %d", config.saturation_red_h) rospy.loginfo("lightness_red_l : %d", config.lightness_red_l) rospy.loginfo("lightness_red_h : %d", config.lightness_red_h) rospy.loginfo("hue_yellow_l : %d", config.hue_yellow_l) rospy.loginfo("hue_yellow_h : %d", config.hue_yellow_h) rospy.loginfo("saturation_yellow_l : %d", config.saturation_yellow_l) rospy.loginfo("saturation_yellow_h : %d", config.saturation_yellow_h) rospy.loginfo("lightness_yellow_l : %d", config.lightness_yellow_l) rospy.loginfo("lightness_yellow_h : %d", config.lightness_yellow_h) rospy.loginfo("hue_green_l : %d", config.hue_green_l) rospy.loginfo("hue_green_h : %d", config.hue_green_h) rospy.loginfo("saturation_green_l : %d", config.saturation_green_l) rospy.loginfo("saturation_green_h : %d", config.saturation_green_h) rospy.loginfo("lightness_green_l : %d", config.lightness_green_l) rospy.loginfo("lightness_green_h : %d", config.lightness_green_h) self.hue_red_l = config.hue_red_l self.hue_red_h = config.hue_red_h self.saturation_red_l = config.saturation_red_l self.saturation_red_h = config.saturation_red_h self.lightness_red_l = config.lightness_red_l self.lightness_red_h = config.lightness_red_h self.hue_yellow_l = config.hue_yellow_l self.hue_yellow_h = config.hue_yellow_h self.saturation_yellow_l = config.saturation_yellow_l self.saturation_yellow_h = config.saturation_yellow_h self.lightness_yellow_l = config.lightness_yellow_l self.lightness_yellow_h = config.lightness_yellow_h self.hue_green_l = config.hue_green_l self.hue_green_h = config.hue_green_h self.saturation_green_l = config.saturation_green_l self.saturation_green_h = config.saturation_green_h self.lightness_green_l = config.lightness_green_l self.lightness_green_h = config.lightness_green_h return config def cbGetImage(self, image_msg): # drop the frame to 1/5 (6fps) because of the processing speed. This is up to your computer's operating power. if self.counter % 3 != 0: self.counter += 1 return else: self.counter = 1 if self.sub_image_type == "compressed": np_arr = np.frombuffer(image_msg.data, np.uint8) self.cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) else: self.cv_image = self.cvBridge.imgmsg_to_cv2(image_msg, "bgr8") self.is_image_available = True def fnFindTrafficLight(self): cv_image_mask = self.fnMaskGreenTrafficLight() cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0) status1 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'green') if status1 == 1 or status1 == 5: rospy.loginfo("detect GREEN") self.stop_count = 0 self.green_count += 1 else: self.green_count = 0 cv_image_mask = self.fnMaskYellowTrafficLight() cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0) status2 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'yellow') if status2 == 2: rospy.loginfo("detect YELLOW") self.yellow_count += 1 else: self.yellow_count = 0 cv_image_mask = self.fnMaskRedTrafficLight() cv_image_mask = cv2.GaussianBlur(cv_image_mask,(5,5),0) status3 = self.fnFindCircleOfTrafficLight(cv_image_mask, 'red') if status3 == 3: rospy.loginfo("detect RED") self.red_count += 1 elif status3 == 4: self.red_count = 0 self.stop_count += 1 else: self.red_count = 0 self.stop_count = 0 if self.green_count >= 3: rospy.loginfo("GREEN") cv2.putText(self.cv_image,"GREEN", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (80, 255, 0)) msg_sign = UInt8() msg_sign.data = self.CurrentMode.lane_following.value self.pub_traffic_light.publish(msg_sign) self.is_traffic_light_finished = True if self.yellow_count >= 3: rospy.loginfo("YELLOW") cv2.putText(self.cv_image,"YELLOW", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 255, 255)) if self.red_count >= 3: rospy.loginfo("RED") cv2.putText(self.cv_image,"RED", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 0, 255)) if self.stop_count >= 8: rospy.loginfo("STOP") self.off_traffic = True cv2.putText(self.cv_image,"STOP", (self.point_col, self.point_low), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0, 0, 255)) if self.pub_image_type == "compressed": # publishes traffic light image in compressed type self.pub_image_traffic_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(self.cv_image, "jpg")) elif self.pub_image_type == "raw": # publishes traffic light image in raw type self.pub_image_traffic_light.publish(self.cvBridge.cv2_to_imgmsg(self.cv_image, "bgr8")) def fnMaskRedTrafficLight(self): image = np.copy(self.cv_image) # Convert BGR to HSV hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) Hue_l = self.hue_red_l Hue_h = self.hue_red_h Saturation_l = self.saturation_red_l Saturation_h = self.saturation_red_h Lightness_l = self.lightness_red_l Lightness_h = self.lightness_red_h # define range of red color in HSV lower_red = np.array([Hue_l, Saturation_l, Lightness_l]) upper_red = np.array([Hue_h, Saturation_h, Lightness_h]) # Threshold the HSV image to get only red colors mask = cv2.inRange(hsv, lower_red, upper_red) # Bitwise-AND mask and original image res = cv2.bitwise_and(image, image, mask = mask) if self.is_calibration_mode == True: if self.pub_image_type == "compressed": # publishes red light filtered image in compressed type self.pub_image_red_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg")) elif self.pub_image_type == "raw": # publishes red light filtered image in raw type self.pub_image_red_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8")) mask = cv2.bitwise_not(mask) return mask def fnMaskYellowTrafficLight(self): image = np.copy(self.cv_image) # Convert BGR to HSV hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) Hue_l = self.hue_yellow_l Hue_h = self.hue_yellow_h Saturation_l = self.saturation_yellow_l Saturation_h = self.saturation_yellow_h Lightness_l = self.lightness_yellow_l Lightness_h = self.lightness_yellow_h # define range of yellow color in HSV lower_yellow = np.array([Hue_l, Saturation_l, Lightness_l]) upper_yellow = np.array([Hue_h, Saturation_h, Lightness_h]) # Threshold the HSV image to get only yellow colors mask = cv2.inRange(hsv, lower_yellow, upper_yellow) # Bitwise-AND mask and original image res = cv2.bitwise_and(image, image, mask = mask) if self.is_calibration_mode == True: if self.pub_image_type == "compressed": # publishes yellow light filtered image in compressed type self.pub_image_yellow_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg")) elif self.pub_image_type == "raw": # publishes yellow light filtered image in raw type self.pub_image_yellow_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8")) mask = cv2.bitwise_not(mask) return mask def fnMaskGreenTrafficLight(self): image = np.copy(self.cv_image) # Convert BGR to HSV hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) Hue_l = self.hue_green_l Hue_h = self.hue_green_h Saturation_l = self.saturation_green_l Saturation_h = self.saturation_green_h Lightness_l = self.lightness_green_l Lightness_h = self.lightness_green_h # define range of green color in HSV lower_green = np.array([Hue_l, Saturation_l, Lightness_l]) upper_green = np.array([Hue_h, Saturation_h, Lightness_h]) # Threshold the HSV image to get only green colors mask = cv2.inRange(hsv, lower_green, upper_green) # Bitwise-AND mask and original image res = cv2.bitwise_and(image, image, mask = mask) if self.is_calibration_mode == True: if self.pub_image_type == "compressed": # publishes green light filtered image in compressed type self.pub_image_green_light.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg")) elif self.pub_image_type == "raw": # publishes green light filtered image in raw type self.pub_image_green_light.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8")) mask = cv2.bitwise_not(mask) return mask def fnFindCircleOfTrafficLight(self, mask, find_color): status = 0 params=cv2.SimpleBlobDetector_Params() # Change thresholds params.minThreshold = 0 params.maxThreshold = 255 # Filter by Area. params.filterByArea = True params.minArea = 50 params.maxArea = 600 # Filter by Circularity params.filterByCircularity = True params.minCircularity = 0.4 # Filter by Convexity params.filterByConvexity = True params.minConvexity = 0.6 det=cv2.SimpleBlobDetector_create(params) keypts=det.detect(mask) frame=cv2.drawKeypoints(self.cv_image,keypts,np.array([]),(0,255,255),cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS) col1 = 180 col2 = 270 col3 = 305 low1 = 50 low2 = 170 low3 = 170 # if detected more than 1 light for i in range(len(keypts)): self.point_col = int(keypts[i].pt[0]) self.point_low = int(keypts[i].pt[1]) if self.point_col > col1 and self.point_col < col2 and self.point_low > low1 and self.point_low < low2: if find_color == 'green': status = 1 elif find_color == 'yellow': status = 2 elif find_color == 'red': status = 3 elif self.point_col > col2 and self.point_col < col3 and self.point_low > low1 and self.point_low < low3: if find_color == 'red': status = 4 elif find_color == 'green': status = 5 else: status = 6 return status def cbTrafficLightFinished(self, traffic_light_finished_msg): self.is_traffic_light_finished = True def main(self): rospy.spin() if __name__ == '__main__': rospy.init_node('detect_traffic_light') node = DetectTrafficLight() node.main() Footer
<< 上一篇
下一篇 >>