OPTICS聚类算法演示
下图为项目总览:
项目分为四个部分:
- 第一部分,数据生成,生成用于聚类的数据
import numpy as np
import pandas as pd
import suanpan
from suanpan.app import app
from suanpan.app.arguments import Csv
@app.output(Csv(key="outputData1"))
def Demo(_):
# Generate sample data
np.random.seed(0)
n_points_per_cluster = 250
C1 = [-5, -2] + 0.8 * np.random.randn(n_points_per_cluster, 2)
C2 = [4, -1] + 0.1 * np.random.randn(n_points_per_cluster, 2)
C3 = [1, -2] + 0.2 * np.random.randn(n_points_per_cluster, 2)
C4 = [-2, 3] + 0.3 * np.random.randn(n_points_per_cluster, 2)
C5 = [3, -2] + 1.6 * np.random.randn(n_points_per_cluster, 2)
C6 = [5, 6] + 2 * np.random.randn(n_points_per_cluster, 2)
X = np.vstack((C1, C2, C3, C4, C5, C6))
print(X.shape)
return pd.DataFrame(X, columns=["feature_1", "feature_2"])
if __name__ == "__main__":
suanpan.run(app)
- 第二部分,OPTICS聚类,所用参数为min_samples=50,xi=.05, min_cluster_size=.05
- 第三部分,模型信息提取
- 生成label
import joblib
from sklearn.cluster import cluster_optics_dbscan
import suanpan
from suanpan.app import app
from suanpan.app.arguments import File, Npy
def load_model(model_file):
return joblib.load(model_file)
@app.input(
File(
key="inputModel1", alias="inputModel", name="model", type="model", required=True
)
)
@app.output(Npy(key="outputData1"))
@app.output(Npy(key="outputData2"))
def Demo(context):
args = context.args
model = load_model(args.inputModel)
labels_050 = cluster_optics_dbscan(
reachability=model.reachability_,
core_distances=model.core_distances_,
ordering=model.ordering_,
eps=0.5,
)
labels_200 = cluster_optics_dbscan(
reachability=model.reachability_,
core_distances=model.core_distances_,
ordering=model.ordering_,
eps=2,
)
return labels_050, labels_200
if __name__ == "__main__":
suanpan.run(app)
- 获取模型信息
import joblib
import suanpan
from suanpan.app import app
from suanpan.app.arguments import File, Npy
def load_model(model_file):
return joblib.load(model_file)
@app.input(
File(
key="inputModel1", alias="inputModel", name="model", type="model", required=True
)
)
@app.output(Npy(key="outputData1"))
@app.output(Npy(key="outputData2"))
def HelloWorld(context):
args = context.args
model = load_model(args.inputModel)
reachability = model.reachability_[model.ordering_]
labels = model.labels_[model.ordering_]
return reachability, labels
if __name__ == "__main__":
suanpan.run(app)
- 第四部分,结果展示
import os
import matplotlib.gridspec as gridspec
import numpy as np
from matplotlib import pyplot as plt
import suanpan
from suanpan.app import app
from suanpan.app.arguments import Csv, Folder, Npy
TMP_FOLDER = "/tmp/result"
@app.input(Csv(key="inputData1"))
@app.input(Npy(key="inputData2"))
@app.input(Npy(key="inputData3"))
@app.input(Npy(key="inputData4"))
@app.input(Npy(key="inputData5"))
@app.output(Folder(key="outputData1"))
def Demo(context):
args = context.args
if not os.path.exists(TMP_FOLDER):
os.makedirs(TMP_FOLDER)
X = args.inputData1.values
labels_050 = args.inputData2
labels_200 = args.inputData3
reachability = args.inputData4
labels = args.inputData5
space = np.arange(len(X))
plt.figure(figsize=(12.8, 9.6))
G = gridspec.GridSpec(2, 3)
ax1 = plt.subplot(G[0, :])
ax2 = plt.subplot(G[1, 0])
ax3 = plt.subplot(G[1, 1])
ax4 = plt.subplot(G[1, 2])
# Reachability plot
colors = ["g.", "r.", "b.", "y.", "c."]
for klass, color in zip(range(0, 5), colors):
Xk = space[labels == klass]
Rk = reachability[labels == klass]
ax1.plot(Xk, Rk, color, alpha=0.3)
ax1.plot(space[labels == -1], reachability[labels == -1], "k.", alpha=0.3)
ax1.plot(space, np.full_like(space, 2.0, dtype=float), "k-", alpha=0.5)
ax1.plot(space, np.full_like(space, 0.5, dtype=float), "k-.", alpha=0.5)
ax1.set_ylabel("Reachability (epsilon distance)")
ax1.set_title("Reachability Plot")
# OPTICS
colors = ["g.", "r.", "b.", "y.", "c."]
for klass, color in zip(range(0, 5), colors):
Xk = X[labels == klass]
ax2.plot(Xk[:, 0], Xk[:, 1], color, alpha=0.3)
ax2.plot(X[labels == -1, 0], X[labels == -1, 1], "k+", alpha=0.1)
ax2.set_title("Automatic Clustering\nOPTICS")
# DBSCAN at 0.5
colors = ["g", "greenyellow", "olive", "r", "b", "c"]
for klass, color in zip(range(0, 6), colors):
Xk = X[labels_050 == klass]
ax3.plot(Xk[:, 0], Xk[:, 1], color, alpha=0.3, marker=".")
ax3.plot(X[labels_050 == -1, 0], X[labels_050 == -1, 1], "k+", alpha=0.1)
ax3.set_title("Clustering at 0.5 epsilon cut\nDBSCAN")
# DBSCAN at 2.
colors = ["g.", "m.", "y.", "c."]
for klass, color in zip(range(0, 4), colors):
Xk = X[labels_200 == klass]
ax4.plot(Xk[:, 0], Xk[:, 1], color, alpha=0.3)
ax4.plot(X[labels_200 == -1, 0], X[labels_200 == -1, 1], "k+", alpha=0.1)
ax4.set_title("Clustering at 2.0 epsilon cut\nDBSCAN")
plt.savefig(os.path.join(TMP_FOLDER, "optics.png"), format="png")
return TMP_FOLDER
if __name__ == "__main__":
suanpan.run(app)