""" visualize_from_saved_features.py - Carga assignments.csv (contiene filename, cluster, split, fases). - Busca features (features.npy / feature_paths.npy / embeddings.npy / feature_paths.pkl) y objetos scaler.joblib / pca50.joblib. - Prepara representación (PCA50) aplicando scaler + pca si es necesario. - Reduce a 2D con UMAP o t-SNE. - Une resultados con assignments.csv (por basename) y guarda/visualiza scatter colored by cluster and/or fase. """ import os import sys import joblib import glob import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.manifold import TSNE import umap # ========== CONFIG ========== ASSIGNMENTS_CSV = r"C:\Users\sof12\Desktop\ML\Datasets\Carciofo_GBIF\TrainingV2\assignments.csv" OUT_DIR = os.path.dirname(ASSIGNMENTS_CSV) # donde el pipeline guardó joblibs / features METHOD = "umap" # 'umap' o 'tsne' RANDOM_STATE = 42 UMAP_N_NEIGHBORS = 15 UMAP_MIN_DIST = 0.1 TSNE_PERPLEXITY = 30 TSNE_ITER = 1000 SAVE_PLOT = True PLOT_BY = ["cluster", "fase"] # lista de columnas de assignments.csv para colorear (usa lo que tengas) # ============================ def find_file(patterns, folder): for p in patterns: f = os.path.join(folder, p) matches = glob.glob(f) if matches: return matches[0] return None def try_load_features(folder): # candidates in order of preference candidates = [ "features.npy", "features_all.npy", "embeddings.npy", "emb_all.npy", "embeddings_all.npy", "feature_vectors.npy", ] feat_path = find_file(candidates, folder) paths_path = find_file(["feature_paths.npy", "feature_paths.pkl", "feature_paths.csv"], folder) if feat_path is None: # search recursively (sometimes saved in parent) for root, dirs, files in os.walk(folder): for name in files: if name.lower() in [c.lower() for c in candidates]: feat_path = os.path.join(root, name) break if feat_path: break return feat_path, paths_path def load_feature_paths(paths_path): if paths_path is None: return None if paths_path.endswith(".npy"): return np.load(paths_path, allow_pickle=True) elif paths_path.endswith(".pkl") or paths_path.endswith(".joblib"): return joblib.load(paths_path) elif paths_path.endswith(".csv"): dfp = pd.read_csv(paths_path) # attempt common columns for c in ["path", "filepath", "filename", "file"]: if c in dfp.columns: return dfp[c].values # else return first column return dfp.iloc[:,0].values else: return None def basename_from_path(p): try: return os.path.basename(str(p)) except Exception: return str(p) def find_and_load_scaler_pca(folder): scaler_path = find_file(["scaler.joblib","scaler.pkl","scaler.save"], folder) pca_path = find_file(["pca50.joblib","pca50.pkl","pca50.save","pca50.joblib"], folder) scaler = joblib.load(scaler_path) if scaler_path else None pca = joblib.load(pca_path) if pca_path else None return scaler, pca, scaler_path, pca_path def reduce_to_2d(X_for_umap, method="umap"): if method == "umap": reducer = umap.UMAP(n_components=2, random_state=RANDOM_STATE, n_neighbors=UMAP_N_NEIGHBORS, min_dist=UMAP_MIN_DIST) X2 = reducer.fit_transform(X_for_umap) elif method == "tsne": ts = TSNE(n_components=2, random_state=RANDOM_STATE, perplexity=TSNE_PERPLEXITY, n_iter=TSNE_ITER) X2 = ts.fit_transform(X_for_umap) else: raise ValueError("method must be 'umap' or 'tsne'") return X2 def main(): print("Cargando assignments:", ASSIGNMENTS_CSV) df_assign = pd.read_csv(ASSIGNMENTS_CSV, encoding="utf-8") print("Assignments loaded, rows:", len(df_assign)) feat_path, paths_path = try_load_features(OUT_DIR) print("Buscando features en:", OUT_DIR) print("Found features:", feat_path) print("Found feature paths:", paths_path) scaler, pca, scaler_path, pca_path = find_and_load_scaler_pca(OUT_DIR) print("Scaler:", scaler_path) print("PCA50:", pca_path) if feat_path is None: print("ERROR: No pude encontrar un archivo de features en el directorio. Busca 'features.npy' o embeddings guardados.") sys.exit(1) # load raw features feats = np.load(feat_path, allow_pickle=True) print("Features shape:", feats.shape) # load paths if exist feature_paths = load_feature_paths(paths_path) if paths_path else None if feature_paths is not None: feature_basenames = [basename_from_path(p) for p in feature_paths] else: # if no feature_paths we cannot match by filename; try to rely on order and assignments length feature_basenames = None print("ATENCIÓN: No se encontró feature_paths. Solo se podrá mapear por índice si el orden coincide con assignments.csv.") # Determine if feats are already PCA50 is_pca50 = False if pca is not None and feats.shape[1] == getattr(pca, "n_components_", None): is_pca50 = True print("Las features ya parecen ser PCA50 (mismo número de componentes que pca50).") # if not pca50 and we have scaler+pca, transform if not is_pca50: if scaler is None or pca is None: print("ERROR: Las features no son PCA50 y faltan scaler.joblib o pca50.joblib. No puedo transformar correctamente.") sys.exit(1) print("Aplicando scaler.transform + pca.transform para obtener PCA50...") feats_scaled = scaler.transform(feats) feats_pca50 = pca.transform(feats_scaled) else: feats_pca50 = feats print("PCA50 shape:", feats_pca50.shape) # Ahora reducimos PCA50 -> 2D usando UMAP o t-SNE print(f"Reduciendo a 2D con {METHOD}...") X2 = reduce_to_2d(feats_pca50, method=METHOD) # Build DataFrame for coords and merge with assignments.csv df_coords = pd.DataFrame({ "feat_index": np.arange(len(X2)), "basename": feature_basenames if feature_basenames is not None else [f"idx_{i}" for i in range(len(X2))], "dim1": X2[:,0], "dim2": X2[:,1] }) # Try merging by basename if possible if "filename" in df_assign.columns: assign_basename = df_assign["filename"].astype(str) else: # try other common names found = None for c in ["file", "filepath", "path", "image", "image_path", "file_name"]: if c in df_assign.columns: found = c break if found: assign_basename = df_assign[found].astype(str).apply(lambda p: os.path.basename(p)) else: assign_basename = None if assign_basename is not None and feature_basenames is not None: df_assign = df_assign.copy() df_assign["basename_assign"] = assign_basename.apply(lambda x: os.path.basename(str(x))) merged = pd.merge(df_assign, df_coords, left_on="basename_assign", right_on="basename", how="inner") if len(merged) == 0: print("WARNING: Merge por basename no produjo coincidencias. Chequea los nombres de archivo.") else: print("Merge exitoso. Filas combinadas:", len(merged)) else: # fallback: if lengths match, merge by index if len(df_assign) == len(df_coords): merged = pd.concat([df_assign.reset_index(drop=True), df_coords.reset_index(drop=True)], axis=1) print("No se pudo hacer merge por basename; hice merge por índice (longitudes coinciden).") else: print("ERROR: No se puede unir assignments con features (ni basename ni longitud coinciden).") sys.exit(1) # Plotting: por cada columna solicitada en PLOT_BY si existe sns.set(style="white", rc={"figure.figsize": (10,8)}) for col in PLOT_BY: if col not in merged.columns: print(f"Columna {col} no encontrada en assignments; saltando.") continue plt.figure() unique_vals = merged[col].fillna("NA").unique() palette = sns.color_palette("tab10", n_colors=max(2, len(unique_vals))) sns.scatterplot(x="dim1", y="dim2", hue=col, data=merged, palette=palette, s=20, alpha=0.8, linewidth=0, legend="full") plt.title(f"{METHOD.upper()} projection colored by {col}") plt.xlabel("dim1"); plt.ylabel("dim2") plt.legend(title=col, bbox_to_anchor=(1.05,1), loc='upper left') plt.tight_layout() if SAVE_PLOT: out_png = os.path.join(OUT_DIR, f"{METHOD}_k_visual_by_{col.replace(' ','_')}.png") plt.savefig(out_png, dpi=300) print("Saved:", out_png) plt.show() # Optionally save merged coords merged_out = os.path.join(OUT_DIR, f"{METHOD}_coords_merged.csv") merged.to_csv(merged_out, index=False, encoding="utf-8") print("Merged CSV saved to:", merged_out) if __name__ == "__main__": main()