孤立森林模板
下图为项目概览:
项目分为四部分:
- 第一部分:数据生成,生成用于训练以及预测的数据
import numpy as np
import pandas as pd
import suanpan
from suanpan.app import app
from suanpan.app.arguments import Csv
def toDataFrame(X):
featureColumns = ["feature_{}".format(i) for i in range(X.shape[1])]
return pd.DataFrame(X, columns=featureColumns)
@app.output(Csv(key="outputData1"))
@app.output(Csv(key="outputData2"))
@app.output(Csv(key="outputData3"))
def Demo(context):
args = context.args
rng = np.random.RandomState(42)
# Generate train data
X = 0.3 * rng.randn(100, 2)
X_train = np.r_[X + 2, X - 2]
# Generate some regular novel observations
X = 0.3 * rng.randn(20, 2)
X_test = np.r_[X + 2, X - 2]
# Generate some abnormal novel observations
X_outliers = rng.uniform(low=-4, high=4, size=(20, 2))
return toDataFrame(X_train), toDataFrame(X_test), toDataFrame(X_outliers)
if __name__ == "__main__":
suanpan.run(app)
- 第二部分:孤立森林节点,接收第一部分生成训练数据,训练模型
- 模型参数:max_samples=100, contamination=0.1
- 第三部分:Decision Function输出,使用模型的decision_function功能,输出结果
import joblib
import numpy as np
import suanpan
from suanpan.app import app
from suanpan.app.arguments import File, Npy
@app.input(
File(
key="inputModel1", alias="inputModel", name="model", type="model", required=True
)
)
@app.output(Npy(key="outputData1"))
@app.output(Npy(key="outputData2"))
@app.output(Npy(key="outputData3"))
def Demo(context):
args = context.args
model = joblib.load(args.inputModel)
xx, yy = np.meshgrid(np.linspace(-5, 5, 50), np.linspace(-5, 5, 50))
Z = model.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
return xx, yy, Z
if __name__ == "__main__":
suanpan.run(app)
- 第四部分:结果展示
import os
from matplotlib import pyplot as plt
import suanpan
from suanpan.app import app
from suanpan.app.arguments import Csv, Folder, Npy
TMP_FOLDER = "/tmp/result"
@app.input(Csv(key="inputData1", alias="X_train"))
@app.input(Csv(key="inputData2", alias="X_test"))
@app.input(Csv(key="inputData3", alias="X_outliers"))
@app.input(Npy(key="inputData4", alias="xx"))
@app.input(Npy(key="inputData5", alias="yy"))
@app.input(Npy(key="inputData6", alias="Z"))
@app.output(Folder(key="outputData1"))
def Demo(context):
args = context.args
X_train = args.X_train.values
X_test = args.X_test.values
X_outliers = args.X_outliers.values
if not os.path.exists(TMP_FOLDER):
os.makedirs(TMP_FOLDER)
plt.title("IsolationForest")
plt.contourf(
args.xx, args.yy, args.Z, cmap=plt.cm.Blues_r # pylint: disable=no-member
)
b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c="white", s=20, edgecolor="k")
b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c="green", s=20, edgecolor="k")
c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c="red", s=20, edgecolor="k")
plt.axis("tight")
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
loc="upper left",
)
plt.savefig(os.path.join(TMP_FOLDER, "isolation_forest.png"), format="png")
return TMP_FOLDER
if __name__ == "__main__":
suanpan.run(app)
效果如下: