TB3-源码剖析-3.车道线检测功能
Python
import rospy
import numpy as np
import cv2
from cv_bridge import CvBridge
from std_msgs.msg import UInt8, Float64
from sensor_msgs.msg import Image, CompressedImage
from dynamic_reconfigure.server import Server
from turtlebot3_autorace_detect.cfg import DetectLaneParamsConfig
class DetectLane():
def __init__(self):
self.hue_white_l = rospy.get_param("~detect/lane/white/hue_l", 0)
self.hue_white_h = rospy.get_param("~detect/lane/white/hue_h", 179)
self.saturation_white_l = rospy.get_param("~detect/lane/white/saturation_l", 0)
self.saturation_white_h = rospy.get_param("~detect/lane/white/saturation_h", 70)
self.lightness_white_l = rospy.get_param("~detect/lane/white/lightness_l", 105)
self.lightness_white_h = rospy.get_param("~detect/lane/white/lightness_h", 255)
self.hue_yellow_l = rospy.get_param("~detect/lane/yellow/hue_l", 10)
self.hue_yellow_h = rospy.get_param("~detect/lane/yellow/hue_h", 127)
self.saturation_yellow_l = rospy.get_param("~detect/lane/yellow/saturation_l", 70)
self.saturation_yellow_h = rospy.get_param("~detect/lane/yellow/saturation_h", 255)
self.lightness_yellow_l = rospy.get_param("~detect/lane/yellow/lightness_l", 95)
self.lightness_yellow_h = rospy.get_param("~detect/lane/yellow/lightness_h", 255)
self.is_calibration_mode = rospy.get_param("~is_detection_calibration_mode", False)
if self.is_calibration_mode == True:
srv_detect_lane = Server(DetectLaneParamsConfig, self.cbGetDetectLaneParam)
self.sub_image_type = "raw" # you can choose image type "compressed", "raw"
self.pub_image_type = "compressed" # you can choose image type "compressed", "raw"
if self.sub_image_type == "compressed":
# subscribes compressed image
self.sub_image_original = rospy.Subscriber('/detect/image_input/compressed', CompressedImage, self.cbFindLane, queue_size = 1)
elif self.sub_image_type == "raw":
# subscribes raw image
self.sub_image_original = rospy.Subscriber('/detect/image_input', Image, self.cbFindLane, queue_size = 1)
if self.pub_image_type == "compressed":
# publishes lane image in compressed type
self.pub_image_lane = rospy.Publisher('/detect/image_output/compressed', CompressedImage, queue_size = 1)
elif self.pub_image_type == "raw":
# publishes lane image in raw type
self.pub_image_lane = rospy.Publisher('/detect/image_output', Image, queue_size = 1)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes lane image in compressed type
self.pub_image_white_lane = rospy.Publisher('/detect/image_output_sub1/compressed', CompressedImage, queue_size = 1)
self.pub_image_yellow_lane = rospy.Publisher('/detect/image_output_sub2/compressed', CompressedImage, queue_size = 1)
elif self.pub_image_type == "raw":
# publishes lane image in raw type
self.pub_image_white_lane = rospy.Publisher('/detect/image_output_sub1', Image, queue_size = 1)
self.pub_image_yellow_lane = rospy.Publisher('/detect/image_output_sub2', Image, queue_size = 1)
self.pub_lane = rospy.Publisher('/detect/lane', Float64, queue_size = 1)
# subscribes state : yellow line reliability
self.pub_yellow_line_reliability = rospy.Publisher('/detect/yellow_line_reliability', UInt8, queue_size=1)
# subscribes state : white line reliability
self.pub_white_line_reliability = rospy.Publisher('/detect/white_line_reliability', UInt8, queue_size=1)
self.cvBridge = CvBridge()
self.counter = 1
self.window_width = 1000.
self.window_height = 600.
self.reliability_white_line = 100
self.reliability_yellow_line = 100
def cbGetDetectLaneParam(self, config, level):
rospy.loginfo("[Detect Lane] Detect Lane Calibration Parameter reconfigured to")
rospy.loginfo("hue_white_l : %d", config.hue_white_l)
rospy.loginfo("hue_white_h : %d", config.hue_white_h)
rospy.loginfo("saturation_white_l : %d", config.saturation_white_l)
rospy.loginfo("saturation_white_h : %d", config.saturation_white_h)
rospy.loginfo("lightness_white_l : %d", config.lightness_white_l)
rospy.loginfo("lightness_white_h : %d", config.lightness_white_h)
rospy.loginfo("hue_yellow_l : %d", config.hue_yellow_l)
rospy.loginfo("hue_yellow_h : %d", config.hue_yellow_h)
rospy.loginfo("saturation_yellow_l : %d", config.saturation_yellow_l)
rospy.loginfo("saturation_yellow_h : %d", config.saturation_yellow_h)
rospy.loginfo("lightness_yellow_l : %d", config.lightness_yellow_l)
rospy.loginfo("lightness_yellow_h : %d", config.lightness_yellow_h)
self.hue_white_l = config.hue_white_l
self.hue_white_h = config.hue_white_h
self.saturation_white_l = config.saturation_white_l
self.saturation_white_h = config.saturation_white_h
self.lightness_white_l = config.lightness_white_l
self.lightness_white_h = config.lightness_white_h
self.hue_yellow_l = config.hue_yellow_l
self.hue_yellow_h = config.hue_yellow_h
self.saturation_yellow_l = config.saturation_yellow_l
self.saturation_yellow_h = config.saturation_yellow_h
self.lightness_yellow_l = config.lightness_yellow_l
self.lightness_yellow_h = config.lightness_yellow_h
return config
def cbFindLane(self, image_msg):
# Change the frame rate by yourself. Now, it is set to 1/3 (10fps).
# Unappropriate value of frame rate may cause huge delay on entire recognition process.
# This is up to your computer's operating power.
if self.counter % 3 != 0:
self.counter += 1
return
else:
self.counter = 1
if self.sub_image_type == "compressed":
#converting compressed image to opencv image
np_arr = np.frombuffer(image_msg.data, np.uint8)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
elif self.sub_image_type == "raw":
cv_image = self.cvBridge.imgmsg_to_cv2(image_msg, "bgr8")
# find White and Yellow Lanes
white_fraction, cv_white_lane = self.maskWhiteLane(cv_image)
yellow_fraction, cv_yellow_lane = self.maskYellowLane(cv_image)
try:
if yellow_fraction > 3000:
self.left_fitx, self.left_fit = self.fit_from_lines(self.left_fit, cv_yellow_lane)
self.mov_avg_left = np.append(self.mov_avg_left,np.array([self.left_fit]), axis=0)
if white_fraction > 3000:
self.right_fitx, self.right_fit = self.fit_from_lines(self.right_fit, cv_white_lane)
self.mov_avg_right = np.append(self.mov_avg_right,np.array([self.right_fit]), axis=0)
except:
if yellow_fraction > 3000:
self.left_fitx, self.left_fit = self.sliding_windown(cv_yellow_lane, 'left')
self.mov_avg_left = np.array([self.left_fit])
if white_fraction > 3000:
self.right_fitx, self.right_fit = self.sliding_windown(cv_white_lane, 'right')
self.mov_avg_right = np.array([self.right_fit])
MOV_AVG_LENGTH = 5
self.left_fit = np.array([np.mean(self.mov_avg_left[::-1][:, 0][0:MOV_AVG_LENGTH]),
np.mean(self.mov_avg_left[::-1][:, 1][0:MOV_AVG_LENGTH]),
np.mean(self.mov_avg_left[::-1][:, 2][0:MOV_AVG_LENGTH])])
self.right_fit = np.array([np.mean(self.mov_avg_right[::-1][:, 0][0:MOV_AVG_LENGTH]),
np.mean(self.mov_avg_right[::-1][:, 1][0:MOV_AVG_LENGTH]),
np.mean(self.mov_avg_right[::-1][:, 2][0:MOV_AVG_LENGTH])])
if self.mov_avg_left.shape[0] > 1000:
self.mov_avg_left = self.mov_avg_left[0:MOV_AVG_LENGTH]
if self.mov_avg_right.shape[0] > 1000:
self.mov_avg_right = self.mov_avg_right[0:MOV_AVG_LENGTH]
self.make_lane(cv_image, white_fraction, yellow_fraction)
def maskWhiteLane(self, image):
# Convert BGR to HSV
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
Hue_l = self.hue_white_l
Hue_h = self.hue_white_h
Saturation_l = self.saturation_white_l
Saturation_h = self.saturation_white_h
Lightness_l = self.lightness_white_l
Lightness_h = self.lightness_white_h
# define range of white color in HSV
lower_white = np.array([Hue_l, Saturation_l, Lightness_l])
upper_white = np.array([Hue_h, Saturation_h, Lightness_h])
# Threshold the HSV image to get only white colors
mask = cv2.inRange(hsv, lower_white, upper_white)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(image, image, mask = mask)
fraction_num = np.count_nonzero(mask)
if self.is_calibration_mode == False:
if fraction_num > 35000:
if self.lightness_white_l < 250:
self.lightness_white_l += 5
elif fraction_num < 5000:
if self.lightness_white_l > 50:
self.lightness_white_l -= 5
how_much_short = 0
for i in range(0, 600):
if np.count_nonzero(mask[i,::]) > 0:
how_much_short += 1
how_much_short = 600 - how_much_short
if how_much_short > 100:
if self.reliability_white_line >= 5:
self.reliability_white_line -= 5
elif how_much_short <= 100:
if self.reliability_white_line <= 99:
self.reliability_white_line += 5
msg_white_line_reliability = UInt8()
msg_white_line_reliability.data = self.reliability_white_line
self.pub_white_line_reliability.publish(msg_white_line_reliability)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes white lane filtered image in compressed type
self.pub_image_white_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))
elif self.pub_image_type == "raw":
# publishes white lane filtered image in raw type
self.pub_image_white_lane.publish(self.cvBridge.cv2_to_imgmsg(mask, "bgr8"))
return fraction_num, mask
def maskYellowLane(self, image):
# Convert BGR to HSV
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
Hue_l = self.hue_yellow_l
Hue_h = self.hue_yellow_h
Saturation_l = self.saturation_yellow_l
Saturation_h = self.saturation_yellow_h
Lightness_l = self.lightness_yellow_l
Lightness_h = self.lightness_yellow_h
# define range of yellow color in HSV
lower_yellow = np.array([Hue_l, Saturation_l, Lightness_l])
upper_yellow = np.array([Hue_h, Saturation_h, Lightness_h])
# Threshold the HSV image to get only yellow colors
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
# Bitwise-AND mask and original image
res = cv2.bitwise_and(image, image, mask = mask)
fraction_num = np.count_nonzero(mask)
if self.is_calibration_mode == False:
if fraction_num > 35000:
if self.lightness_yellow_l < 250:
self.lightness_yellow_l += 20
elif fraction_num < 5000:
if self.lightness_yellow_l > 90:
self.lightness_yellow_l -= 20
how_much_short = 0
for i in range(0, 600):
if np.count_nonzero(mask[i,::]) > 0:
how_much_short += 1
how_much_short = 600 - how_much_short
if how_much_short > 100:
if self.reliability_yellow_line >= 5:
self.reliability_yellow_line -= 5
elif how_much_short <= 100:
if self.reliability_yellow_line <= 99:
self.reliability_yellow_line += 5
msg_yellow_line_reliability = UInt8()
msg_yellow_line_reliability.data = self.reliability_yellow_line
self.pub_yellow_line_reliability.publish(msg_yellow_line_reliability)
if self.is_calibration_mode == True:
if self.pub_image_type == "compressed":
# publishes yellow lane filtered image in compressed type
self.pub_image_yellow_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg"))
elif self.pub_image_type == "raw":
# publishes yellow lane filtered image in raw type
self.pub_image_yellow_lane.publish(self.cvBridge.cv2_to_imgmsg(mask, "bgr8"))
return fraction_num, mask
def fit_from_lines(self, lane_fit, image):
nonzero = image.nonzero()
nonzeroy = np.array(nonzero[0])
nonzerox = np.array(nonzero[1])
margin = 100
lane_inds = ((nonzerox > (lane_fit[0] * (nonzeroy ** 2) + lane_fit[1] * nonzeroy + lane_fit[2] - margin)) & (
nonzerox < (lane_fit[0] * (nonzeroy ** 2) + lane_fit[1] * nonzeroy + lane_fit[2] + margin)))
# Again, extract line pixel positions
x = nonzerox[lane_inds]
y = nonzeroy[lane_inds]
# Fit a second order polynomial to each
lane_fit = np.polyfit(y, x, 2)
# Generate x and y values for plotting
ploty = np.linspace(0, image.shape[0] - 1, image.shape[0])
lane_fitx = lane_fit[0] * ploty ** 2 + lane_fit[1] * ploty + lane_fit[2]
return lane_fitx, lane_fit
def sliding_windown(self, img_w, left_or_right):
histogram = np.sum(img_w[int(img_w.shape[0] / 2):, :], axis=0)
# Create an output image to draw on and visualize the result
out_img = np.dstack((img_w, img_w, img_w)) * 255
# Find the peak of the left and right halves of the histogram
# These will be the starting point for the left and right lines
midpoint = np.int(histogram.shape[0] / 2)
if left_or_right == 'left':
lane_base = np.argmax(histogram[:midpoint])
elif left_or_right == 'right':
lane_base = np.argmax(histogram[midpoint:]) + midpoint
# Choose the number of sliding windows
nwindows = 20
# Set height of windows
window_height = np.int(img_w.shape[0] / nwindows)
# Identify the x and y positions of all nonzero pixels in the image
nonzero = img_w.nonzero()
nonzeroy = np.array(nonzero[0])
nonzerox = np.array(nonzero[1])
# Current positions to be updated for each window
x_current = lane_base
# Set the width of the windows +/- margin
margin = 50
# Set minimum number of pixels found to recenter window
minpix = 50
# Create empty lists to receive lane pixel indices
lane_inds = []
# Step through the windows one by one
for window in range(nwindows):
# Identify window boundaries in x and y
win_y_low = img_w.shape[0] - (window + 1) * window_height
win_y_high = img_w.shape[0] - window * window_height
win_x_low = x_current - margin
win_x_high = x_current + margin
# Draw the windows on the visualization image
cv2.rectangle(out_img, (win_x_low, win_y_low), (win_x_high, win_y_high), (0, 255, 0), 2)
# Identify the nonzero pixels in x and y within the window
good_lane_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_x_low) & (
nonzerox < win_x_high)).nonzero()[0]
# Append these indices to the lists
lane_inds.append(good_lane_inds)
# If you found > minpix pixels, recenter next window on their mean position
if len(good_lane_inds) > minpix:
x_current = np.int(np.mean(nonzerox[good_lane_inds]))
# Concatenate the arrays of indices
lane_inds = np.concatenate(lane_inds)
# Extract line pixel positions
x = nonzerox[lane_inds]
y = nonzeroy[lane_inds]
# Fit a second order polynomial to each
try:
lane_fit = np.polyfit(y, x, 2)
self.lane_fit_bef = lane_fit
except:
lane_fit = self.lane_fit_bef
# Generate x and y values for plotting
ploty = np.linspace(0, img_w.shape[0] - 1, img_w.shape[0])
lane_fitx = lane_fit[0] * ploty ** 2 + lane_fit[1] * ploty + lane_fit[2]
return lane_fitx, lane_fit
def make_lane(self, cv_image, white_fraction, yellow_fraction):
# Create an image to draw the lines on
warp_zero = np.zeros((cv_image.shape[0], cv_image.shape[1], 1), dtype=np.uint8)
color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
color_warp_lines = np.dstack((warp_zero, warp_zero, warp_zero))
ploty = np.linspace(0, cv_image.shape[0] - 1, cv_image.shape[0])
if yellow_fraction > 3000:
pts_left = np.array([np.flipud(np.transpose(np.vstack([self.left_fitx, ploty])))])
cv2.polylines(color_warp_lines, np.int_([pts_left]), isClosed=False, color=(0, 0, 255), thickness=25)
if white_fraction > 3000:
pts_right = np.array([np.transpose(np.vstack([self.right_fitx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_right]), isClosed=False, color=(255, 255, 0), thickness=25)
self.is_center_x_exist = True
if self.reliability_white_line > 50 and self.reliability_yellow_line > 50:
if white_fraction > 3000 and yellow_fraction > 3000:
centerx = np.mean([self.left_fitx, self.right_fitx], axis=0)
pts = np.hstack((pts_left, pts_right))
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)
# Draw the lane onto the warped blank image
cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0))
if white_fraction > 3000 and yellow_fraction <= 3000:
centerx = np.subtract(self.right_fitx, 320)
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)
if white_fraction <= 3000 and yellow_fraction > 3000:
centerx = np.add(self.left_fitx, 320)
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)
elif self.reliability_white_line <= 50 and self.reliability_yellow_line > 50:
centerx = np.add(self.left_fitx, 320)
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)
elif self.reliability_white_line > 50 and self.reliability_yellow_line <= 50:
centerx = np.subtract(self.right_fitx, 320)
pts_center = np.array([np.transpose(np.vstack([centerx, ploty]))])
cv2.polylines(color_warp_lines, np.int_([pts_center]), isClosed=False, color=(0, 255, 255), thickness=12)
else:
self.is_center_x_exist = False
pass
# Combine the result with the original image
final = cv2.addWeighted(cv_image, 1, color_warp, 0.2, 0)
final = cv2.addWeighted(final, 1, color_warp_lines, 1, 0)
if self.pub_image_type == "compressed":
if self.is_center_x_exist == True:
# publishes lane center
msg_desired_center = Float64()
msg_desired_center.data = centerx.item(350)
self.pub_lane.publish(msg_desired_center)
self.pub_image_lane.publish(self.cvBridge.cv2_to_compressed_imgmsg(final, "jpg"))
elif self.pub_image_type == "raw":
if self.is_center_x_exist == True:
# publishes lane center
msg_desired_center = Float64()
msg_desired_center.data = centerx.item(350)
self.pub_lane.publish(msg_desired_center)
self.pub_image_lane.publish(self.cvBridge.cv2_to_imgmsg(final, "bgr8"))
def main(self):
rospy.spin()
if __name__ == '__main__':
rospy.init_node('detect_lane')
node = DetectLane()
node.main()
<< 上一篇
下一篇 >>