SpeedOptimizer類繼承自Task類,實現了繼承來的Execute()函式,又在Execute()中呼叫了其純虛成員函式Process()。而DpStSpeedOptimizer類又繼承了SpeedOptimizer類,並實現了Process()函式,在Process()中呼叫了成員函式SearchStGraph(),到這裡,才算進入了使用動態規劃演算法進行速度規劃的正題。總之,Apollo的這個層次設計是真的優美!以上,可見apollo\modules\planning\tasks\task。h 及 apollo\modules\planning\tasks\optimizers\speed_optimizer。h 及 apollo\modules\planning\tasks\optimizers\dp_st_speed\dp_st_speed_optimizer。h 檔案。
bool
DpStSpeedOptimizer
::
SearchStGraph
(
SpeedData
*
speed_data
)
const
{
DpStGraph
st_graph
(
reference_line_info_
->
st_graph_data
(),
dp_st_speed_config_
,
reference_line_info_
->
path_decision
()
->
obstacles
()。
Items
(),
init_point_
,
adc_sl_boundary_
);
if
(
!
st_graph
。
Search
(
speed_data
)。
ok
())
{
AERROR
<<
“failed to search graph with dynamic programming。”
;
return
false
;
}
return
true
;
}
在DpStSpeedOptimizer::SearchStGraph()中,首先構造了DpStGraph類例項,該類包含了所有可能的(s,t)節點的代價表,應用動態規劃演算法查找出代價最小的路徑,然後計算速度,就得到了速度規劃的結果。
Status
DpStGraph
::
Search
(
SpeedData
*
const
speed_data
)
{
constexpr
double
kBounadryEpsilon
=
1e-2
;
for
(
const
auto
&
boundary
:
st_graph_data_
。
st_boundaries
())
{
//KEEP_CLEAR的範圍不影響速度規劃
if
(
boundary
->
boundary_type
()
==
STBoundary
::
BoundaryType
::
KEEP_CLEAR
)
{
continue
;
}
//若boundary包含原點或非常接近原點(即自車現在的位置),自車應該stop,不再前進
//故s,v,a都是0,即隨著t推移,自車不動
if
(
boundary
->
IsPointInBoundary
({
0。0
,
0。0
})
||
(
std
::
fabs
(
boundary
->
min_t
())
<
kBounadryEpsilon
&&
std
::
fabs
(
boundary
->
min_s
())
<
kBounadryEpsilon
))
{
std
::
vector
<
SpeedPoint
>
speed_profile
;
double
t
=
0。0
;
for
(
int
i
=
0
;
i
<=
dp_st_speed_config_
。
matrix_dimension_t
();
++
i
,
t
+=
unit_t_
)
{
SpeedPoint
speed_point
;
speed_point
。
set_s
(
0。0
);
speed_point
。
set_t
(
t
);
speed_point
。
set_v
(
0。0
);
speed_point
。
set_a
(
0。0
);
speed_profile
。
emplace_back
(
speed_point
);
}
*
speed_data
=
SpeedData
(
speed_profile
);
return
Status
::
OK
();
}
}
if
(
!
InitCostTable
()。
ok
())
{
。。。
}
if
(
!
CalculateTotalCost
()。
ok
())
{
。。。
}
if
(
!
RetrieveSpeedProfile
(
speed_data
)。
ok
())
{
。。。
}
return
Status
::
OK
();
}
在對自車當前位置(原點)判斷後,速度規劃主要分為3步:構建代價表InitCostTable(),更新代價表cost_table_中每個節點的代價CalculateTotalCost(),查詢代價最小的規劃結束點並逆向構建路徑、求取速度RetrieveSpeedProfile()。
// cost_table_[t][s]
// row: s, col: t
//StGraphPoint的total_cost_被初始化為+inf
std
::
vector
<
std
::
vector
<
StGraphPoint
>>
cost_table_
;
DpStGraph::InitCostTable()沒什麼好說的,主要小心cost_table_外層是t,內層是s,行數是s,列數是t。
DpStGraph::CalculateTotalCost()的目的是對cost_table_中的每個節點更新其cost,在此,我們先說如何更新單個節點的cost,請看DpStGraph::CalculateCostAt()函式,動態規劃的核心都在這個函數里。
void
DpStGraph
::
CalculateCostAt
(
const
std
::
shared_ptr
<
StGraphMessage
>&
msg
)
{
//動態規劃
。。。
const
auto
&
cost_init
=
cost_table_
[
0
][
0
];
if
(
c
==
0
)
{
//設定起始點的cost為0,為什麼不再剛進入函式的時候檢查?
DCHECK_EQ
(
r
,
0
)
<<
“Incorrect。 Row should be 0 with col = 0。 row: ”
<<
r
;
cost_cr
。
SetTotalCost
(
0。0
);
return
;
}
。。。
if
(
c
==
1
)
{
//c=1需要單獨處理,因為c=0的1列只有1個節點,即初始節點
const
double
acc
=
(
r
*
unit_s_
/
unit_t_
-
init_point_
。
v
())
/
unit_t_
;
//如果加速度超出範圍,不計算cost
if
(
acc
<
dp_st_speed_config_
。
max_deceleration
()
||
acc
>
dp_st_speed_config_
。
max_acceleration
())
{
return
;
}
//如果從起點到當前點有障礙物,不計算cost
if
(
CheckOverlapOnDpStGraph
(
st_graph_data_
。
st_boundaries
(),
cost_cr
,
cost_init
)){
return
;
}
cost_cr
。
SetTotalCost
(
//點內cost
cost_cr
。
obstacle_cost
()
+
cost_init
。
total_cost
()
+
//點間cost,從起點到[c=1,r]的各項cost,即[t0,s0]->[t1,sn]的cost
CalculateEdgeCostForSecondCol
(
r
,
speed_limit
,
soft_speed_limit
));
cost_cr
。
SetPrePoint
(
cost_init
);
return
;
}
constexpr
double
kSpeedRangeBuffer
=
0。20
;
//單時間步長最大可能前進的s
const
uint32_t
max_s_diff
=
static_cast
<
uint32_t
>
(
FLAGS_planning_upper_speed_limit
*
(
1
+
kSpeedRangeBuffer
)
*
unit_t_
/
unit_s_
);
//縮小行範圍到[r_low,r],因為要更新[c,r]的cost,就要考察
//[c前一列即前一時刻,r前幾行]分別到[c,r]的代價,取最小值,鬆弛操作,動態規劃
const
uint32_t
r_low
=
(
max_s_diff
<
r
?
r
-
max_s_diff
:
0
);
const
auto
&
pre_col
=
cost_table_
[
c
-
1
];
if
(
c
==
2
)
{
for
(
uint32_t
r_pre
=
r_low
;
r_pre
<=
r
;
++
r_pre
)
{
const
double
acc
=
(
r
*
unit_s_
-
2
*
r_pre
*
unit_s_
)
/
(
unit_t_
*
unit_t_
);
if
(
acc
<
dp_st_speed_config_
。
max_deceleration
()
||
acc
>
dp_st_speed_config_
。
max_acceleration
())
{
continue
;
}
if
(
CheckOverlapOnDpStGraph
(
st_graph_data_
。
st_boundaries
(),
cost_cr
,
pre_col
[
r_pre
]))
{
continue
;
}
const
double
cost
=
//第二項是考察的s方向前面某點的cost
cost_cr
。
obstacle_cost
()
+
pre_col
[
r_pre
]。
total_cost
()
+
CalculateEdgeCostForThirdCol
(
r
,
r_pre
,
speed_limit
,
soft_speed_limit
);
//在未賦有效cost值之前,cost_cr的total_cost是+inf
if
(
cost
<
cost_cr
。
total_cost
())
{
cost_cr
。
SetTotalCost
(
cost
);
cost_cr
。
SetPrePoint
(
pre_col
[
r_pre
]);
}
}
return
;
}
//固定col即t,考察row即s
for
(
uint32_t
r_pre
=
r_low
;
r_pre
<=
r
;
++
r_pre
)
{
//[pre_col,r_pre]不可達,自然不必考慮從該點到[c,r]點
if
(
std
::
isinf
(
pre_col
[
r_pre
]。
total_cost
())
||
pre_col
[
r_pre
]。
pre_point
()
==
nullptr
)
{
continue
;
}
const
double
curr_a
=
(
cost_cr
。
index_s
()
*
unit_s_
+
pre_col
[
r_pre
]。
pre_point
()
->
index_s
()
*
unit_s_
-
2
*
pre_col
[
r_pre
]。
index_s
()
*
unit_s_
)
/
(
unit_t_
*
unit_t_
);
if
(
curr_a
>
vehicle_param_
。
max_acceleration
()
||
curr_a
<
vehicle_param_
。
max_deceleration
())
{
continue
;
}
if
(
CheckOverlapOnDpStGraph
(
st_graph_data_
。
st_boundaries
(),
cost_cr
,
pre_col
[
r_pre
]))
{
continue
;
}
//接下來的2個if判斷是為了確認triple_pre_point有效,以計算jerk cost
//因為jerk cost的計算方式不同,對col分3類討論(思路相同),對應DpStCost類
//中3種計算jerk cost的函式。jerk cost是速度規劃有效性的保障,若此處不計算,
//在後續的軌跡validaty check中排除無效軌跡也行,這種預先限制的方法更好
//accel的cost計算和範圍檢查也是同理,此處沒有對jerk範圍進行檢查
uint32_t
r_prepre
=
pre_col
[
r_pre
]。
pre_point
()
->
index_s
();
const
StGraphPoint
&
prepre_graph_point
=
cost_table_
[
c
-
2
][
r_prepre
];
if
(
std
::
isinf
(
prepre_graph_point
。
total_cost
()))
{
continue
;
}
if
(
!
prepre_graph_point
。
pre_point
())
{
continue
;
}
const
STPoint
&
triple_pre_point
=
prepre_graph_point
。
pre_point
()
->
point
();
const
STPoint
&
prepre_point
=
prepre_graph_point
。
point
();
const
STPoint
&
pre_point
=
pre_col
[
r_pre
]。
point
();
const
STPoint
&
curr_point
=
cost_cr
。
point
();
//triple_pre_point和prepre_point只是計算jerk cost需要
double
cost
=
cost_cr
。
obstacle_cost
()
+
pre_col
[
r_pre
]。
total_cost
()
+
CalculateEdgeCost
(
triple_pre_point
,
prepre_point
,
pre_point
,
curr_point
,
speed_limit
,
soft_speed_limit
);
if
(
cost
<
cost_cr
。
total_cost
())
{
cost_cr
。
SetTotalCost
(
cost
);
cost_cr
。
SetPrePoint
(
pre_col
[
r_pre
]);
}
}
}
DpStGraph::CalculateTotalCost()中是對每一行、每一列的遍歷,巧的地方在於使用了GetRowRange()來計算由當前節點可到達的s(行)範圍。
Status
DpStGraph
::
CalculateTotalCost
()
{
// col and row are for STGraph
// t corresponding to col
// s corresponding to row
size_t
next_highest_row
=
0
;
size_t
next_lowest_row
=
0
;
for
(
size_t
c
=
0
;
c
<
cost_table_
。
size
();
++
c
)
{
size_t
highest_row
=
0
;
auto
lowest_row
=
cost_table_
。
back
()。
size
()
-
1
;
//行數-1
int
count
=
static_cast
<
int
>
(
next_highest_row
)
-
static_cast
<
int
>
(
next_lowest_row
)
+
1
;
//count始終>0
if
(
count
>
0
)
{
std
::
vector
<
std
::
future
<
void
>>
results
;
for
(
size_t
r
=
next_lowest_row
;
r
<=
next_highest_row
;
++
r
)
{
auto
msg
=
std
::
make_shared
<
StGraphMessage
>
(
c
,
r
);
。。。
CalculateCostAt
(
msg
);
}
。。。
}
for
(
size_t
r
=
next_lowest_row
;
r
<=
next_highest_row
;
++
r
)
{
const
auto
&
cost_cr
=
cost_table_
[
c
][
r
];
if
(
cost_cr
。
total_cost
()
<
std
::
numeric_limits
<
double
>::
infinity
())
{
size_t
h_r
=
0
;
size_t
l_r
=
0
;
GetRowRange
(
cost_cr
,
&
h_r
,
&
l_r
);
//由第c列、第 next_lowest_row~next_highest_row 行的節點可到達的最遠的s
highest_row
=
std
::
max
(
highest_row
,
h_r
);
//可到達的最近的s
lowest_row
=
std
::
min
(
lowest_row
,
static_cast
<
size_t
>
(
l_r
));
}
}
next_highest_row
=
highest_row
;
next_lowest_row
=
lowest_row
;
}
return
Status
::
OK
();
}
DpStGraph::GetRowRange()中我有個疑惑的地方,就是對s的計算為什麼不採用
這個公式,而是使用了
?
不過,考慮到 max_acceleration>0,max_deceleration<0,從計算結果來說,前者計算的 [lower_bound, upper_bound] 是後者計算結果的子集,倒是不影響程式正常邏輯。
//計算從point出發可能到達的s範圍
void
DpStGraph
::
GetRowRange
(
const
StGraphPoint
&
point
,
size_t
*
next_highest_row
,
size_t
*
next_lowest_row
)
{
。。。
const
auto
max_s_size
=
cost_table_
。
back
()。
size
()
-
1
;
//行數-1
const
double
speed_coeff
=
unit_t_
*
unit_t_
;
//為什麼不是v0*t+0。5am*t^2?
const
double
delta_s_upper_bound
=
v0
*
unit_t_
+
vehicle_param_
。
max_acceleration
()
*
speed_coeff
;
*
next_highest_row
=
point
。
index_s
()
+
static_cast
<
int
>
(
delta_s_upper_bound
/
unit_s_
);
if
(
*
next_highest_row
>=
max_s_size
)
{
*
next_highest_row
=
max_s_size
;
}
const
double
delta_s_lower_bound
=
std
::
fmax
(
0。0
,
v0
*
unit_t_
+
vehicle_param_
。
max_deceleration
()
*
speed_coeff
);
*
next_lowest_row
=
point
。
index_s
()
+
static_cast
<
int
>
(
delta_s_lower_bound
/
unit_s_
);
if
(
*
next_lowest_row
>
max_s_size
)
{
*
next_lowest_row
=
max_s_size
;
}
else
if
(
*
next_lowest_row
<
0
)
{
*
next_lowest_row
=
0
;
}
}
CalculateCostAt()和GetRowRange()中都有基於當前點,判斷可到達s(行)範圍的操作。不同的是,GetRowRange()中是向s增大的方向查詢,因為要逐行的擴充套件節點、更新其cost。而CalculateCostAt()中是向s減小的方向查詢,即查詢哪些行可以到達該點,因為該函式用來計算某節點的cost,就要基於之前經過的節點的cost。
DpStGraph::RetrieveSpeedProfile()以及過程中的其他類與函式,程式碼都很簡潔清晰,就不在此贅述了。只以DpStCost::GetAccelCost()函式為例,提一下動態規劃中常用的計算結果快取技巧。具體請參考apollo\modules\planning\tasks\optimizers\dp_st_speed\dp_st_cost。h
//boundary_cost_也使用了快取機制
std
::
vector
<
std
::
vector
<
std
::
pair
<
double
,
double
>>>
boundary_cost_
;
//2個array用來快取資料,避免多次重複計算
std
::
array
<
double
,
200
>
accel_cost_
;
std
::
array
<
double
,
400
>
jerk_cost_
;
————————————————————————————————————————
double
DpStCost
::
GetAccelCost
(
const
double
accel
)
{
double
cost
=
0。0
;
constexpr
double
kEpsilon
=
0。1
;
constexpr
size_t
kShift
=
100
;
//將accel的數值和在accel_cost_中的index聯絡起來
//根據0<=10*accel+0。5+100<=200,得出程式預設accel正常在[-10,10]範圍內,符合實際
const
size_t
accel_key
=
static_cast
<
size_t
>
(
accel
/
kEpsilon
+
0。5
+
kShift
);
DCHECK_LT
(
accel_key
,
accel_cost_
。
size
());
//過大的加速度
if
(
accel_key
>=
accel_cost_
。
size
())
{
return
kInf
;
}
//accel_cost_初始化為-1,<0即從沒有計算過
if
(
accel_cost_
。
at
(
accel_key
)
<
0。0
)
{
const
double
accel_sq
=
accel
*
accel
;
。。。
if
(
accel
>
0。0
)
{
cost
=
accel_penalty
*
accel_sq
;
}
else
{
cost
=
decel_penalty
*
accel_sq
;
}
cost
+=
accel_sq
*
decel_penalty
*
decel_penalty
/
(
1
+
std
::
exp
(
1。0
*
(
accel
-
max_dec
)))
+
accel_sq
*
accel_penalty
*
accel_penalty
/
(
1
+
std
::
exp
(
-
1。0
*
(
accel
-
max_acc
)));
accel_cost_
。
at
(
accel_key
)
=
cost
;
}
else
{
//針對這個accel的cost早就被計算過,並儲存在accel_cost_內
cost
=
accel_cost_
。
at
(
accel_key
);
}
return
cost
*
unit_t_
;
}
DpStCost::JerkCost()的流程同理。這裡提一個有趣的點。
double
DpStCost
::
JerkCost
(
const
double
jerk
)
{
double
cost
=
0。0
;
constexpr
double
kEpsilon
=
0。1
;
constexpr
size_t
kShift
=
200
;
//同上面accel cost的處理,認為jerk正常情況下在[-20,20]範圍內
const
size_t
jerk_key
=
static_cast
<
size_t
>
(
jerk
/
kEpsilon
+
0。5
+
kShift
);
if
(
jerk_key
>=
jerk_cost_
。
size
())
{
return
kInf
;
}
。。。
}