selfLearning,有時也被稱作selfTraining,中文譯作自學習,是一個比較簡單的半監督演算法框架:
假設我們有標籤資料
和無標籤資料
;
1)首先利用有標籤資料訓練一個模型
;
2)利用模型對無標籤資料預測,得到無標籤資料的機率;
3)設定一個機率閾值(如0。8),將標籤為1的機率大於閾值的無標籤樣本打上1的標籤,同理打上0的標籤,並從無標籤的資料中剔除;
4)將3中打上標籤的無標籤樣本加入訓練集,重新訓練得到模型;
重複2-4步驟,直到沒有資料可以加入訓練集為止(或達到迭代閾值);
從上面的步驟中其實可以看出這個演算法的做法,即將預測可能性最大的無標籤樣本加到訓練集中。所以這個演算法的缺陷也很明顯:如果前面加入的樣本的標籤出現偏差,則會導致後面的迭代越來越偏離正確的解,“garbage in,garbage out”,所以這個演算法本身不算是太穩定。
另外,在演算法的第三步,需要設定一個機率閾值,這種做法在均衡標籤中是沒問題的,但如果是在極度非均衡標籤中(1很少),可能會導致迭代的過程中基本沒有1的樣本加入,完全照搬是有點問題的,這種情況下,按一定比例取首尾的樣本可能更合適些,但是迭代的次數就需要自己設定,如下。
from
sklearn。base
import
BaseEstimator
import
sklearn。metrics
#import sys
import
numpy
import
numpy
as
np
#from sklearn。linear_model import LogisticRegression as LR
from
xgboost。sklearn
import
XGBClassifier
from
sklearn。metrics
import
roc_curve
,
auc
import
copy
import
pandas
as
pd
def
auc_score
(
y_true
,
predict_proba
):
‘’‘
y_true: numpy。ndarray,不能是帶索引的series
’‘’
false_positive_rate
,
recall
,
thresholds
=
roc_curve
(
y_true
,
predict_proba
)
roc_auc
=
auc
(
false_positive_rate
,
recall
)
return
roc_auc
class
SelfLearningModel
(
BaseEstimator
):
“”“
self-training的簡版框架;
基礎模型需要是一些類sklearn的模型,主要是train和predict的方法等等;
self-training的一些資料: 如:http://pages。cs。wisc。edu/~jerryzhu/pub/sslicml07。pdf
Parameters
——————
basemodel : 基模型;
max_iter : int,最大迭代次數;
prob_threshold_pos : float, 即將unlabeled的樣本加入訓練樣本的閾值(正例);
prob_threshold_neg : float, 即將unlabeled的樣本加入訓練樣本的閾值(負例);
unlabeled_sample_weight: float, 當無標籤樣本加入訓練時,給予的權重;
top_n:int,即每次加入1標籤的樣本數
NPratio:float,即每次加入1標籤樣本與0標籤樣本數之比,即標籤不均衡時需調整
de_ratio:float,即這次加入的樣本數與前次的比值,即加入的樣本數在逐漸衰減。
”“”
def
__init__
(
self
,
basemodel
,
max_iter
=
200
,
prob_threshold_pos
=
0。8
,
prob_threshold_neg
=
0。8
,
unlabeled_sample_weight
=
0。8
\
,
top_n
=
10
,
NPratio
=
1
,
de_ratio
=
0。9
,
predictors
=
[],
dev_data
=
None
,
stopping_t
=
30
):
self
。
model
=
basemodel
self
。
max_iter
=
max_iter
self
。
prob_threshold_pos
=
prob_threshold_pos
self
。
prob_threshold_neg
=
prob_threshold_neg
self
。
unlabeled_sample_weight
=
unlabeled_sample_weight
self
。
top_n
=
top_n
self
。
NPratio
=
NPratio
self
。
de_ratio
=
de_ratio
self
。
predictors
=
predictors
self
。
dev_data
=
dev_data
self
。
stopping_t
=
stopping_t
def
fit
(
self
,
df
):
“”“
Basemodel的train方法;
df:DataFrame,dep=‘y’,無標籤樣本的y=-1
Returns
————-
self : returns an instance of self。
”“”
X
=
df
[
self
。
predictors
]
y_
=
df
[
‘y’
]
y
=
copy
。
deepcopy
(
y_
)
unlabeledX
=
X
。
loc
[
y
==-
1
,
:]
#取無標籤的變數
labeledX
=
X
。
loc
[
y
!=-
1
,
:]
#取有標籤的變數
labeledy
=
y
[
y
!=-
1
]
#取有標籤的y
sample_weight_
=
np
。
array
([
1。0
]
*
(
X
。
shape
[
0
]))
sample_weight_
[
y
==-
1
]
=
self
。
unlabeled_sample_weight
self
。
model
。
fit
(
labeledX
。
values
,
labeledy
。
values
)
#先將有標籤的樣本進行訓練
unlabeledy
=
self
。
predict
(
unlabeledX
。
values
)
#對無標籤的樣本進行標籤預測
unlabeledprob
=
self
。
predict_proba
(
unlabeledX
。
values
)
#對無標籤的樣本進行機率進行預測
pos_prob
=
pd
。
Series
(
unlabeledprob
[:,
1
],
index
=
unlabeledX
。
index
)
neg_prob
=
pd
。
Series
(
unlabeledprob
[:,
0
],
index
=
unlabeledX
。
index
)
pos_top
=
pos_prob
。
sort_values
(
ascending
=
False
)
。
head
(
int
(
self
。
top_n
))
uidx_pos
=
pos_top
。
index
neg_top
=
neg_prob
。
sort_values
(
ascending
=
False
)
。
head
(
int
((
self
。
top_n
/
self
。
NPratio
)))
uidx_neg
=
neg_top
。
index
(
“pos_top:min_prob=
%f
,mean_prob=
%f
”
%
(
pos_top
。
min
(),
pos_top
。
mean
()))
(
“neg_top:min_prob=
%f
,mean_prob=
%f
”
%
(
neg_top
。
min
(),
neg_top
。
mean
()))
df_pos
=
df
。
loc
[
uidx_pos
,:]
df_neg
=
df
。
loc
[
uidx_neg
,:]
# print(“pos_top: ss:%i, y_10:%i, y_30:%i”%(df_pos[df_pos[‘y_10’]。isin([0,1])]。shape[0],df_pos[df_pos[‘y_10’]。isin([0,1])][‘y_10’]。sum(),df_pos[df_pos[‘y_10’]。isin([0,1])][‘y_30’]。sum()))
# print(“neg_top: ss:%i, y_10:%i, y_30:%i”%(df_neg[df_neg[‘y_10’]。isin([0,1])]。shape[0],df_neg[df_neg[‘y_10’]。isin([0,1])][‘y_10’]。sum(),df_neg[df_neg[‘y_10’]。isin([0,1])][‘y_30’]。sum()))
#uidx_pos = pos_prob[pos_prob > self。prob_threshold_pos]。index #unlabeled判斷為正例的樣本
#uidx_neg = neg_prob[neg_prob > self。prob_threshold_neg]。index #unlabeled判斷為負例的樣本
uidx
=
np
。
hstack
((
uidx_pos
,
uidx_neg
))
self
。
uidx_pos
=
{}
self
。
uidx_pos
[
0
]
=
uidx_pos
self
。
uidx_neg
=
{}
self
。
uidx_neg
[
0
]
=
uidx_neg
self
。
auc
=
{}
#re-train, labeling unlabeled instances with model predictions, until convergence
i
=
0
(
‘iter:
%i
, n_pos:
%i
, n_neg:
%i
。’
%
(
i
,
uidx_pos
。
shape
[
0
],
uidx_neg
。
shape
[
0
]))
#組合
(
‘uidx num: ’
,
uidx
。
shape
[
0
])
#dev_auc
# max_auc_10 = auc_score(self。dev_data[self。dev_data[‘y_10’]。isin([0,1])][‘y_10’], self。predict_proba(self。dev_data[self。dev_data[‘y_10’]。isin([0,1])][self。predictors]。values)[:,1])
# max_auc_30 = auc_score(self。dev_data[self。dev_data[‘y_30’]。isin([0,1])][‘y_30’], self。predict_proba(self。dev_data[self。dev_data[‘y_30’]。isin([0,1])][self。predictors]。values)[:,1])
# stop_t = 0
while
(
len
(
uidx
)
!=
0
)
and
i
<
self
。
max_iter
:
#當樣本不滿足閾值或達到迭代閾值時,停止迭代
#部分U重新分配
y
[
uidx_pos
]
=
1
y
[
uidx_neg
]
=
0
unlabeledX
=
X
。
loc
[
y
==-
1
,
:]
labeledX
=
X
。
loc
[
y
!=-
1
,
:]
labeledy
=
y
[
y
!=-
1
]
#訓練新的資料的樣本,並加上樣本的weight
self
。
model
。
fit
(
labeledX
。
values
,
labeledy
。
values
,
sample_weight
=
sample_weight_
[
y
!=-
1
])
unlabeledprob
=
self
。
predict_proba
(
unlabeledX
。
values
)
pos_prob
=
pd
。
Series
(
unlabeledprob
[:,
1
],
index
=
unlabeledX
。
index
)
neg_prob
=
pd
。
Series
(
unlabeledprob
[:,
0
],
index
=
unlabeledX
。
index
)
pos_top
=
pos_prob
。
sort_values
(
ascending
=
False
)
。
head
(
int
(
self
。
top_n
*
(
self
。
de_ratio
**
(
i
+
1
))))
uidx_pos
=
pos_top
。
index
neg_top
=
neg_prob
。
sort_values
(
ascending
=
False
)
。
head
(
int
((
self
。
top_n
/
self
。
NPratio
)
*
(
self
。
de_ratio
**
(
i
+
1
))))
uidx_neg
=
neg_top
。
index
(
“pos_top:min_prob=
%f
,mean_prob=
%f
”
%
(
pos_top
。
min
(),
pos_top
。
mean
()))
(
“neg_top:min_prob=
%f
,mean_prob=
%f
”
%
(
neg_top
。
min
(),
neg_top
。
mean
()))
df_pos
=
df
。
loc
[
uidx_pos
,:]
df_neg
=
df
。
loc
[
uidx_neg
,:]
# print(“pos_top: ss:%i, y_10:%i, y_30:%i”%(df_pos[df_pos[‘y_10’]。isin([0,1])]。shape[0],df_pos[df_pos[‘y_10’]。isin([0,1])][‘y_10’]。sum(),df_pos[df_pos[‘y_10’]。isin([0,1])][‘y_30’]。sum()))
# print(“neg_top: ss:%i, y_10:%i, y_30:%i”%(df_neg[df_neg[‘y_10’]。isin([0,1])]。shape[0],df_neg[df_neg[‘y_10’]。isin([0,1])][‘y_10’]。sum(),df_neg[df_neg[‘y_10’]。isin([0,1])][‘y_30’]。sum()))
# uidx_pos = pos_prob[pos_prob > self。prob_threshold_pos]。index #unlabeled判斷為正例的樣本
# uidx_neg = neg_prob[neg_prob > self。prob_threshold_neg]。index #unlabeled判斷為負例的樣本
uidx
=
np
。
hstack
((
uidx_pos
,
uidx_neg
))
i
+=
1
(
‘iter:
%i
, n_pos:
%i
, n_neg:
%i
。’
%
(
i
,
uidx_pos
。
shape
[
0
],
uidx_neg
。
shape
[
0
]))
#組合
(
‘uidx num: ’
,
uidx
。
shape
[
0
])
self
。
uidx_pos
[
i
]
=
uidx_pos
self
。
uidx_neg
[
i
]
=
uidx_neg
# auc_10 = auc_score(self。dev_data[self。dev_data[‘y_10’]。isin([0,1])][‘y_10’], self。predict_proba(self。dev_data[self。dev_data[‘y_10’]。isin([0,1])][self。predictors]。values)[:,1])
# auc_30 = auc_score(self。dev_data[self。dev_data[‘y_30’]。isin([0,1])][‘y_30’], self。predict_proba(self。dev_data[self。dev_data[‘y_30’]。isin([0,1])][self。predictors]。values)[:,1])
# if auc_10>max_auc_10:
# max_auc_10 = auc_10
# stop_t=0
# if auc_30>max_auc_30:
# max_auc_30 = auc_30
# stop_t=0
# else:
# stop_t+=1
# print (‘auc_10: %f’%auc_10)
# print (‘auc_30: %f’%auc_30)
# self。auc[i] = [auc_10,auc_30]
return
self
def
predict_proba
(
self
,
X
):
return
self
。
model
。
predict_proba
(
X
)
def
predict
(
self
,
X
):
return
self
。
model
。
predict
(
X
)
def
score
(
self
,
X
,
y
,
sample_weight
=
None
):
return
sklearn
。
metrics
。
accuracy_score
(
y
,
self
。
predict
(
X
),
sample_weight
=
sample_weight
)
def
auc_score
(
self
,
X
,
y
):
false_positive_rate
,
recall
,
thresholds
=
roc_curve
(
y
,
self
。
predict_proba
(
X
))
return
auc
(
false_positive_rate
,
recall
)