SpeedOptimizer類繼承自Task類,實現了繼承來的Execute()函式,又在Execute()中呼叫了其純虛成員函式Process()。而DpStSpeedOptimizer類又繼承了SpeedOptimizer類,並實現了Process()函式,在Process()中呼叫了成員函式SearchStGraph(),到這裡,才算進入了使用動態規劃演算法進行速度規劃的正題。總之,Apollo的這個層次設計是真的優美!以上,可見apollo\modules\planning\tasks\task。h 及 apollo\modules\planning\tasks\optimizers\speed_optimizer。h 及 apollo\modules\planning\tasks\optimizers\dp_st_speed\dp_st_speed_optimizer。h 檔案。

bool

DpStSpeedOptimizer

::

SearchStGraph

SpeedData

*

speed_data

const

{

DpStGraph

st_graph

reference_line_info_

->

st_graph_data

(),

dp_st_speed_config_

reference_line_info_

->

path_decision

()

->

obstacles

()。

Items

(),

init_point_

adc_sl_boundary_

);

if

st_graph

Search

speed_data

)。

ok

())

{

AERROR

<<

“failed to search graph with dynamic programming。”

return

false

}

return

true

}

在DpStSpeedOptimizer::SearchStGraph()中,首先構造了DpStGraph類例項,該類包含了所有可能的(s,t)節點的代價表,應用動態規劃演算法查找出代價最小的路徑,然後計算速度,就得到了速度規劃的結果。

Status

DpStGraph

::

Search

SpeedData

*

const

speed_data

{

constexpr

double

kBounadryEpsilon

=

1e-2

for

const

auto

&

boundary

st_graph_data_

st_boundaries

())

{

//KEEP_CLEAR的範圍不影響速度規劃

if

boundary

->

boundary_type

()

==

STBoundary

::

BoundaryType

::

KEEP_CLEAR

{

continue

}

//若boundary包含原點或非常接近原點(即自車現在的位置),自車應該stop,不再前進

//故s,v,a都是0,即隨著t推移,自車不動

if

boundary

->

IsPointInBoundary

({

0。0

0。0

})

||

std

::

fabs

boundary

->

min_t

())

<

kBounadryEpsilon

&&

std

::

fabs

boundary

->

min_s

())

<

kBounadryEpsilon

))

{

std

::

vector

<

SpeedPoint

>

speed_profile

double

t

=

0。0

for

int

i

=

0

i

<=

dp_st_speed_config_

matrix_dimension_t

();

++

i

t

+=

unit_t_

{

SpeedPoint

speed_point

speed_point

set_s

0。0

);

speed_point

set_t

t

);

speed_point

set_v

0。0

);

speed_point

set_a

0。0

);

speed_profile

emplace_back

speed_point

);

}

*

speed_data

=

SpeedData

speed_profile

);

return

Status

::

OK

();

}

}

if

InitCostTable

()。

ok

())

{

。。。

}

if

CalculateTotalCost

()。

ok

())

{

。。。

}

if

RetrieveSpeedProfile

speed_data

)。

ok

())

{

。。。

}

return

Status

::

OK

();

}

在對自車當前位置(原點)判斷後,速度規劃主要分為3步:構建代價表InitCostTable(),更新代價表cost_table_中每個節點的代價CalculateTotalCost(),查詢代價最小的規劃結束點並逆向構建路徑、求取速度RetrieveSpeedProfile()。

// cost_table_[t][s]

// row: s, col: t

//StGraphPoint的total_cost_被初始化為+inf

std

::

vector

<

std

::

vector

<

StGraphPoint

>>

cost_table_

DpStGraph::InitCostTable()沒什麼好說的,主要小心cost_table_外層是t,內層是s,行數是s,列數是t。

DpStGraph::CalculateTotalCost()的目的是對cost_table_中的每個節點更新其cost,在此,我們先說如何更新單個節點的cost,請看DpStGraph::CalculateCostAt()函式,動態規劃的核心都在這個函數里。

void

DpStGraph

::

CalculateCostAt

const

std

::

shared_ptr

<

StGraphMessage

>&

msg

{

//動態規劃

。。。

const

auto

&

cost_init

=

cost_table_

0

][

0

];

if

c

==

0

{

//設定起始點的cost為0,為什麼不再剛進入函式的時候檢查?

DCHECK_EQ

r

0

<<

“Incorrect。 Row should be 0 with col = 0。 row: ”

<<

r

cost_cr

SetTotalCost

0。0

);

return

}

。。。

if

c

==

1

{

//c=1需要單獨處理,因為c=0的1列只有1個節點,即初始節點

const

double

acc

=

r

*

unit_s_

/

unit_t_

-

init_point_

v

())

/

unit_t_

//如果加速度超出範圍,不計算cost

if

acc

<

dp_st_speed_config_

max_deceleration

()

||

acc

>

dp_st_speed_config_

max_acceleration

())

{

return

}

//如果從起點到當前點有障礙物,不計算cost

if

CheckOverlapOnDpStGraph

st_graph_data_

st_boundaries

(),

cost_cr

cost_init

)){

return

}

cost_cr

SetTotalCost

//點內cost

cost_cr

obstacle_cost

()

+

cost_init

total_cost

()

+

//點間cost,從起點到[c=1,r]的各項cost,即[t0,s0]->[t1,sn]的cost

CalculateEdgeCostForSecondCol

r

speed_limit

soft_speed_limit

));

cost_cr

SetPrePoint

cost_init

);

return

}

constexpr

double

kSpeedRangeBuffer

=

0。20

//單時間步長最大可能前進的s

const

uint32_t

max_s_diff

=

static_cast

<

uint32_t

>

FLAGS_planning_upper_speed_limit

*

1

+

kSpeedRangeBuffer

*

unit_t_

/

unit_s_

);

//縮小行範圍到[r_low,r],因為要更新[c,r]的cost,就要考察

//[c前一列即前一時刻,r前幾行]分別到[c,r]的代價,取最小值,鬆弛操作,動態規劃

const

uint32_t

r_low

=

max_s_diff

<

r

r

-

max_s_diff

0

);

const

auto

&

pre_col

=

cost_table_

c

-

1

];

if

c

==

2

{

for

uint32_t

r_pre

=

r_low

r_pre

<=

r

++

r_pre

{

const

double

acc

=

r

*

unit_s_

-

2

*

r_pre

*

unit_s_

/

unit_t_

*

unit_t_

);

if

acc

<

dp_st_speed_config_

max_deceleration

()

||

acc

>

dp_st_speed_config_

max_acceleration

())

{

continue

}

if

CheckOverlapOnDpStGraph

st_graph_data_

st_boundaries

(),

cost_cr

pre_col

r_pre

]))

{

continue

}

const

double

cost

=

//第二項是考察的s方向前面某點的cost

cost_cr

obstacle_cost

()

+

pre_col

r_pre

]。

total_cost

()

+

CalculateEdgeCostForThirdCol

r

r_pre

speed_limit

soft_speed_limit

);

//在未賦有效cost值之前,cost_cr的total_cost是+inf

if

cost

<

cost_cr

total_cost

())

{

cost_cr

SetTotalCost

cost

);

cost_cr

SetPrePoint

pre_col

r_pre

]);

}

}

return

}

//固定col即t,考察row即s

for

uint32_t

r_pre

=

r_low

r_pre

<=

r

++

r_pre

{

//[pre_col,r_pre]不可達,自然不必考慮從該點到[c,r]點

if

std

::

isinf

pre_col

r_pre

]。

total_cost

())

||

pre_col

r_pre

]。

pre_point

()

==

nullptr

{

continue

}

const

double

curr_a

=

cost_cr

index_s

()

*

unit_s_

+

pre_col

r_pre

]。

pre_point

()

->

index_s

()

*

unit_s_

-

2

*

pre_col

r_pre

]。

index_s

()

*

unit_s_

/

unit_t_

*

unit_t_

);

if

curr_a

>

vehicle_param_

max_acceleration

()

||

curr_a

<

vehicle_param_

max_deceleration

())

{

continue

}

if

CheckOverlapOnDpStGraph

st_graph_data_

st_boundaries

(),

cost_cr

pre_col

r_pre

]))

{

continue

}

//接下來的2個if判斷是為了確認triple_pre_point有效,以計算jerk cost

//因為jerk cost的計算方式不同,對col分3類討論(思路相同),對應DpStCost類

//中3種計算jerk cost的函式。jerk cost是速度規劃有效性的保障,若此處不計算,

//在後續的軌跡validaty check中排除無效軌跡也行,這種預先限制的方法更好

//accel的cost計算和範圍檢查也是同理,此處沒有對jerk範圍進行檢查

uint32_t

r_prepre

=

pre_col

r_pre

]。

pre_point

()

->

index_s

();

const

StGraphPoint

&

prepre_graph_point

=

cost_table_

c

-

2

][

r_prepre

];

if

std

::

isinf

prepre_graph_point

total_cost

()))

{

continue

}

if

prepre_graph_point

pre_point

())

{

continue

}

const

STPoint

&

triple_pre_point

=

prepre_graph_point

pre_point

()

->

point

();

const

STPoint

&

prepre_point

=

prepre_graph_point

point

();

const

STPoint

&

pre_point

=

pre_col

r_pre

]。

point

();

const

STPoint

&

curr_point

=

cost_cr

point

();

//triple_pre_point和prepre_point只是計算jerk cost需要

double

cost

=

cost_cr

obstacle_cost

()

+

pre_col

r_pre

]。

total_cost

()

+

CalculateEdgeCost

triple_pre_point

prepre_point

pre_point

curr_point

speed_limit

soft_speed_limit

);

if

cost

<

cost_cr

total_cost

())

{

cost_cr

SetTotalCost

cost

);

cost_cr

SetPrePoint

pre_col

r_pre

]);

}

}

}

DpStGraph::CalculateTotalCost()中是對每一行、每一列的遍歷,巧的地方在於使用了GetRowRange()來計算由當前節點可到達的s(行)範圍。

Status

DpStGraph

::

CalculateTotalCost

()

{

// col and row are for STGraph

// t corresponding to col

// s corresponding to row

size_t

next_highest_row

=

0

size_t

next_lowest_row

=

0

for

size_t

c

=

0

c

<

cost_table_

size

();

++

c

{

size_t

highest_row

=

0

auto

lowest_row

=

cost_table_

back

()。

size

()

-

1

//行數-1

int

count

=

static_cast

<

int

>

next_highest_row

-

static_cast

<

int

>

next_lowest_row

+

1

//count始終>0

if

count

>

0

{

std

::

vector

<

std

::

future

<

void

>>

results

for

size_t

r

=

next_lowest_row

r

<=

next_highest_row

++

r

{

auto

msg

=

std

::

make_shared

<

StGraphMessage

>

c

r

);

。。。

CalculateCostAt

msg

);

}

。。。

}

for

size_t

r

=

next_lowest_row

r

<=

next_highest_row

++

r

{

const

auto

&

cost_cr

=

cost_table_

c

][

r

];

if

cost_cr

total_cost

()

<

std

::

numeric_limits

<

double

>::

infinity

())

{

size_t

h_r

=

0

size_t

l_r

=

0

GetRowRange

cost_cr

&

h_r

&

l_r

);

//由第c列、第 next_lowest_row~next_highest_row 行的節點可到達的最遠的s

highest_row

=

std

::

max

highest_row

h_r

);

//可到達的最近的s

lowest_row

=

std

::

min

lowest_row

static_cast

<

size_t

>

l_r

));

}

}

next_highest_row

=

highest_row

next_lowest_row

=

lowest_row

}

return

Status

::

OK

();

}

DpStGraph::GetRowRange()中我有個疑惑的地方,就是對s的計算為什麼不採用

s = v_{0}t + \frac{1}{2}at^{2}

這個公式,而是使用了

s = v_{0}t + at^{2}

不過,考慮到 max_acceleration>0,max_deceleration<0,從計算結果來說,前者計算的 [lower_bound, upper_bound] 是後者計算結果的子集,倒是不影響程式正常邏輯。

//計算從point出發可能到達的s範圍

void

DpStGraph

::

GetRowRange

const

StGraphPoint

&

point

size_t

*

next_highest_row

size_t

*

next_lowest_row

{

。。。

const

auto

max_s_size

=

cost_table_

back

()。

size

()

-

1

//行數-1

const

double

speed_coeff

=

unit_t_

*

unit_t_

//為什麼不是v0*t+0。5am*t^2?

const

double

delta_s_upper_bound

=

v0

*

unit_t_

+

vehicle_param_

max_acceleration

()

*

speed_coeff

*

next_highest_row

=

point

index_s

()

+

static_cast

<

int

>

delta_s_upper_bound

/

unit_s_

);

if

*

next_highest_row

>=

max_s_size

{

*

next_highest_row

=

max_s_size

}

const

double

delta_s_lower_bound

=

std

::

fmax

0。0

v0

*

unit_t_

+

vehicle_param_

max_deceleration

()

*

speed_coeff

);

*

next_lowest_row

=

point

index_s

()

+

static_cast

<

int

>

delta_s_lower_bound

/

unit_s_

);

if

*

next_lowest_row

>

max_s_size

{

*

next_lowest_row

=

max_s_size

}

else

if

*

next_lowest_row

<

0

{

*

next_lowest_row

=

0

}

}

CalculateCostAt()和GetRowRange()中都有基於當前點,判斷可到達s(行)範圍的操作。不同的是,GetRowRange()中是向s增大的方向查詢,因為要逐行的擴充套件節點、更新其cost。而CalculateCostAt()中是向s減小的方向查詢,即查詢哪些行可以到達該點,因為該函式用來計算某節點的cost,就要基於之前經過的節點的cost。

DpStGraph::RetrieveSpeedProfile()以及過程中的其他類與函式,程式碼都很簡潔清晰,就不在此贅述了。只以DpStCost::GetAccelCost()函式為例,提一下動態規劃中常用的計算結果快取技巧。具體請參考apollo\modules\planning\tasks\optimizers\dp_st_speed\dp_st_cost。h

//boundary_cost_也使用了快取機制

std

::

vector

<

std

::

vector

<

std

::

pair

<

double

double

>>>

boundary_cost_

//2個array用來快取資料,避免多次重複計算

std

::

array

<

double

200

>

accel_cost_

std

::

array

<

double

400

>

jerk_cost_

————————————————————————————————————————

double

DpStCost

::

GetAccelCost

const

double

accel

{

double

cost

=

0。0

constexpr

double

kEpsilon

=

0。1

constexpr

size_t

kShift

=

100

//將accel的數值和在accel_cost_中的index聯絡起來

//根據0<=10*accel+0。5+100<=200,得出程式預設accel正常在[-10,10]範圍內,符合實際

const

size_t

accel_key

=

static_cast

<

size_t

>

accel

/

kEpsilon

+

0。5

+

kShift

);

DCHECK_LT

accel_key

accel_cost_

size

());

//過大的加速度

if

accel_key

>=

accel_cost_

size

())

{

return

kInf

}

//accel_cost_初始化為-1,<0即從沒有計算過

if

accel_cost_

at

accel_key

<

0。0

{

const

double

accel_sq

=

accel

*

accel

。。。

if

accel

>

0。0

{

cost

=

accel_penalty

*

accel_sq

}

else

{

cost

=

decel_penalty

*

accel_sq

}

cost

+=

accel_sq

*

decel_penalty

*

decel_penalty

/

1

+

std

::

exp

1。0

*

accel

-

max_dec

)))

+

accel_sq

*

accel_penalty

*

accel_penalty

/

1

+

std

::

exp

-

1。0

*

accel

-

max_acc

)));

accel_cost_

at

accel_key

=

cost

}

else

{

//針對這個accel的cost早就被計算過,並儲存在accel_cost_內

cost

=

accel_cost_

at

accel_key

);

}

return

cost

*

unit_t_

}

DpStCost::JerkCost()的流程同理。這裡提一個有趣的點。

double

DpStCost

::

JerkCost

const

double

jerk

{

double

cost

=

0。0

constexpr

double

kEpsilon

=

0。1

constexpr

size_t

kShift

=

200

//同上面accel cost的處理,認為jerk正常情況下在[-20,20]範圍內

const

size_t

jerk_key

=

static_cast

<

size_t

>

jerk

/

kEpsilon

+

0。5

+

kShift

);

if

jerk_key

>=

jerk_cost_

size

())

{

return

kInf

}

。。。

}