232 lines
8.9 KiB
Python
232 lines
8.9 KiB
Python
"""
|
|
visualize_from_saved_features.py
|
|
|
|
- Carga assignments.csv (contiene filename, cluster, split, fases).
|
|
- Busca features (features.npy / feature_paths.npy / embeddings.npy / feature_paths.pkl) y objetos scaler.joblib / pca50.joblib.
|
|
- Prepara representación (PCA50) aplicando scaler + pca si es necesario.
|
|
- Reduce a 2D con UMAP o t-SNE.
|
|
- Une resultados con assignments.csv (por basename) y guarda/visualiza scatter colored by cluster and/or fase.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import joblib
|
|
import glob
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
|
|
from sklearn.manifold import TSNE
|
|
import umap
|
|
|
|
# ========== CONFIG ==========
|
|
ASSIGNMENTS_CSV = r"C:\Users\sof12\Desktop\ML\Datasets\Nocciola_GBIF\TrainingV7\assignments.csv"
|
|
OUT_DIR = os.path.dirname(ASSIGNMENTS_CSV) # donde el pipeline guardó joblibs / features
|
|
METHOD = "umap" # 'umap' o 'tsne'
|
|
RANDOM_STATE = 42
|
|
UMAP_N_NEIGHBORS = 15
|
|
UMAP_MIN_DIST = 0.1
|
|
TSNE_PERPLEXITY = 30
|
|
TSNE_ITER = 1000
|
|
SAVE_PLOT = True
|
|
PLOT_BY = ["cluster", "fase V", "fase R"] # lista de columnas de assignments.csv para colorear (usa lo que tengas)
|
|
# ============================
|
|
|
|
def find_file(patterns, folder):
|
|
for p in patterns:
|
|
f = os.path.join(folder, p)
|
|
matches = glob.glob(f)
|
|
if matches:
|
|
return matches[0]
|
|
return None
|
|
|
|
def try_load_features(folder):
|
|
# candidates in order of preference
|
|
candidates = [
|
|
"features.npy",
|
|
"features_all.npy",
|
|
"embeddings.npy",
|
|
"emb_all.npy",
|
|
"embeddings_all.npy",
|
|
"feature_vectors.npy",
|
|
]
|
|
feat_path = find_file(candidates, folder)
|
|
paths_path = find_file(["feature_paths.npy", "feature_paths.pkl", "feature_paths.csv"], folder)
|
|
if feat_path is None:
|
|
# search recursively (sometimes saved in parent)
|
|
for root, dirs, files in os.walk(folder):
|
|
for name in files:
|
|
if name.lower() in [c.lower() for c in candidates]:
|
|
feat_path = os.path.join(root, name)
|
|
break
|
|
if feat_path:
|
|
break
|
|
return feat_path, paths_path
|
|
|
|
def load_feature_paths(paths_path):
|
|
if paths_path is None:
|
|
return None
|
|
if paths_path.endswith(".npy"):
|
|
return np.load(paths_path, allow_pickle=True)
|
|
elif paths_path.endswith(".pkl") or paths_path.endswith(".joblib"):
|
|
return joblib.load(paths_path)
|
|
elif paths_path.endswith(".csv"):
|
|
dfp = pd.read_csv(paths_path)
|
|
# attempt common columns
|
|
for c in ["path", "filepath", "filename", "file"]:
|
|
if c in dfp.columns:
|
|
return dfp[c].values
|
|
# else return first column
|
|
return dfp.iloc[:,0].values
|
|
else:
|
|
return None
|
|
|
|
def basename_from_path(p):
|
|
try:
|
|
return os.path.basename(str(p))
|
|
except Exception:
|
|
return str(p)
|
|
|
|
def find_and_load_scaler_pca(folder):
|
|
scaler_path = find_file(["scaler.joblib","scaler.pkl","scaler.save"], folder)
|
|
pca_path = find_file(["pca50.joblib","pca50.pkl","pca50.save","pca50.joblib"], folder)
|
|
scaler = joblib.load(scaler_path) if scaler_path else None
|
|
pca = joblib.load(pca_path) if pca_path else None
|
|
return scaler, pca, scaler_path, pca_path
|
|
|
|
def reduce_to_2d(X_for_umap, method="umap"):
|
|
if method == "umap":
|
|
reducer = umap.UMAP(n_components=2, random_state=RANDOM_STATE,
|
|
n_neighbors=UMAP_N_NEIGHBORS, min_dist=UMAP_MIN_DIST)
|
|
X2 = reducer.fit_transform(X_for_umap)
|
|
elif method == "tsne":
|
|
ts = TSNE(n_components=2, random_state=RANDOM_STATE,
|
|
perplexity=TSNE_PERPLEXITY, n_iter=TSNE_ITER)
|
|
X2 = ts.fit_transform(X_for_umap)
|
|
else:
|
|
raise ValueError("method must be 'umap' or 'tsne'")
|
|
return X2
|
|
|
|
def main():
|
|
print("Cargando assignments:", ASSIGNMENTS_CSV)
|
|
df_assign = pd.read_csv(ASSIGNMENTS_CSV, encoding="utf-8")
|
|
print("Assignments loaded, rows:", len(df_assign))
|
|
|
|
feat_path, paths_path = try_load_features(OUT_DIR)
|
|
print("Buscando features en:", OUT_DIR)
|
|
print("Found features:", feat_path)
|
|
print("Found feature paths:", paths_path)
|
|
|
|
scaler, pca, scaler_path, pca_path = find_and_load_scaler_pca(OUT_DIR)
|
|
print("Scaler:", scaler_path)
|
|
print("PCA50:", pca_path)
|
|
|
|
if feat_path is None:
|
|
print("ERROR: No pude encontrar un archivo de features en el directorio. Busca 'features.npy' o embeddings guardados.")
|
|
sys.exit(1)
|
|
|
|
# load raw features
|
|
feats = np.load(feat_path, allow_pickle=True)
|
|
print("Features shape:", feats.shape)
|
|
|
|
# load paths if exist
|
|
feature_paths = load_feature_paths(paths_path) if paths_path else None
|
|
if feature_paths is not None:
|
|
feature_basenames = [basename_from_path(p) for p in feature_paths]
|
|
else:
|
|
# if no feature_paths we cannot match by filename; try to rely on order and assignments length
|
|
feature_basenames = None
|
|
print("ATENCIÓN: No se encontró feature_paths. Solo se podrá mapear por índice si el orden coincide con assignments.csv.")
|
|
|
|
# Determine if feats are already PCA50
|
|
is_pca50 = False
|
|
if pca is not None and feats.shape[1] == getattr(pca, "n_components_", None):
|
|
is_pca50 = True
|
|
print("Las features ya parecen ser PCA50 (mismo número de componentes que pca50).")
|
|
|
|
# if not pca50 and we have scaler+pca, transform
|
|
if not is_pca50:
|
|
if scaler is None or pca is None:
|
|
print("ERROR: Las features no son PCA50 y faltan scaler.joblib o pca50.joblib. No puedo transformar correctamente.")
|
|
sys.exit(1)
|
|
print("Aplicando scaler.transform + pca.transform para obtener PCA50...")
|
|
feats_scaled = scaler.transform(feats)
|
|
feats_pca50 = pca.transform(feats_scaled)
|
|
else:
|
|
feats_pca50 = feats
|
|
|
|
print("PCA50 shape:", feats_pca50.shape)
|
|
|
|
# Ahora reducimos PCA50 -> 2D usando UMAP o t-SNE
|
|
print(f"Reduciendo a 2D con {METHOD}...")
|
|
X2 = reduce_to_2d(feats_pca50, method=METHOD)
|
|
|
|
# Build DataFrame for coords and merge with assignments.csv
|
|
df_coords = pd.DataFrame({
|
|
"feat_index": np.arange(len(X2)),
|
|
"basename": feature_basenames if feature_basenames is not None else [f"idx_{i}" for i in range(len(X2))],
|
|
"dim1": X2[:,0],
|
|
"dim2": X2[:,1]
|
|
})
|
|
|
|
# Try merging by basename if possible
|
|
if "filename" in df_assign.columns:
|
|
assign_basename = df_assign["filename"].astype(str)
|
|
else:
|
|
# try other common names
|
|
found = None
|
|
for c in ["file", "filepath", "path", "image", "image_path", "file_name"]:
|
|
if c in df_assign.columns:
|
|
found = c
|
|
break
|
|
if found:
|
|
assign_basename = df_assign[found].astype(str).apply(lambda p: os.path.basename(p))
|
|
else:
|
|
assign_basename = None
|
|
|
|
if assign_basename is not None and feature_basenames is not None:
|
|
df_assign = df_assign.copy()
|
|
df_assign["basename_assign"] = assign_basename.apply(lambda x: os.path.basename(str(x)))
|
|
merged = pd.merge(df_assign, df_coords, left_on="basename_assign", right_on="basename", how="inner")
|
|
if len(merged) == 0:
|
|
print("WARNING: Merge por basename no produjo coincidencias. Chequea los nombres de archivo.")
|
|
else:
|
|
print("Merge exitoso. Filas combinadas:", len(merged))
|
|
else:
|
|
# fallback: if lengths match, merge by index
|
|
if len(df_assign) == len(df_coords):
|
|
merged = pd.concat([df_assign.reset_index(drop=True), df_coords.reset_index(drop=True)], axis=1)
|
|
print("No se pudo hacer merge por basename; hice merge por índice (longitudes coinciden).")
|
|
else:
|
|
print("ERROR: No se puede unir assignments con features (ni basename ni longitud coinciden).")
|
|
sys.exit(1)
|
|
|
|
# Plotting: por cada columna solicitada en PLOT_BY si existe
|
|
sns.set(style="white", rc={"figure.figsize": (10,8)})
|
|
for col in PLOT_BY:
|
|
if col not in merged.columns:
|
|
print(f"Columna {col} no encontrada en assignments; saltando.")
|
|
continue
|
|
plt.figure()
|
|
unique_vals = merged[col].fillna("NA").unique()
|
|
palette = sns.color_palette("tab10", n_colors=max(2, len(unique_vals)))
|
|
sns.scatterplot(x="dim1", y="dim2", hue=col, data=merged, palette=palette, s=20, alpha=0.8, linewidth=0, legend="full")
|
|
plt.title(f"{METHOD.upper()} projection colored by {col}")
|
|
plt.xlabel("dim1"); plt.ylabel("dim2")
|
|
plt.legend(title=col, bbox_to_anchor=(1.05,1), loc='upper left')
|
|
plt.tight_layout()
|
|
if SAVE_PLOT:
|
|
out_png = os.path.join(OUT_DIR, f"{METHOD}_k_visual_by_{col.replace(' ','_')}.png")
|
|
plt.savefig(out_png, dpi=300)
|
|
print("Saved:", out_png)
|
|
plt.show()
|
|
|
|
# Optionally save merged coords
|
|
merged_out = os.path.join(OUT_DIR, f"{METHOD}_coords_merged.csv")
|
|
merged.to_csv(merged_out, index=False, encoding="utf-8")
|
|
print("Merged CSV saved to:", merged_out)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|