Phenology/Code/Unsupervised_learning/Past_codes/PCA.py
2025-11-25 11:30:37 +01:00

280 lines
11 KiB
Python

import os
import argparse
import random
import json
from pathlib import Path
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA, IncrementalPCA
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, homogeneity_score, completeness_score
from sklearn.preprocessing import StandardScaler
import umap
# -------------------------
# Utilities / Dataset
# -------------------------
class ImageCSV_Dataset(Dataset):
def __init__(self, records, image_root, image_col='image_path', label_col='label', transform=None):
self.records = records.reset_index(drop=True)
self.image_root = Path(image_root)
self.image_col = image_col
self.label_col = label_col
self.transform = transform
def __len__(self):
return len(self.records)
def _load_image(self, img_path):
p = Path(img_path)
if not p.is_absolute():
p = self.image_root / img_path
# Some CSVs store windows backslashes
p = Path(str(p))
img = Image.open(p).convert('RGB')
return img
def __getitem__(self, idx):
row = self.records.loc[idx]
img = self._load_image(row[self.image_col])
if self.transform:
img = self.transform(img)
label = row[self.label_col] if self.label_col in row.index else -1
return img, label, idx
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# -------------------------
# Feature extractor
# -------------------------
class FeatureExtractor:
def __init__(self, device='cpu', batch_size=64, num_workers=4, no_grad=True):
self.device = torch.device(device)
self.batch_size = batch_size
self.num_workers = num_workers
self.no_grad = no_grad
# Pretrained ResNet50 without final fc
model = models.resnet50(pretrained=True)
modules = list(model.children())[:-1] # drop the last fc layer
self.backbone = nn.Sequential(*modules).to(self.device)
self.backbone.eval()
def extract(self, dataset):
loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers)
feats = []
labels = []
indices = []
with torch.no_grad() if self.no_grad else dummy_cm():
for imgs, labs, idxs in loader:
imgs = imgs.to(self.device)
out = self.backbone(imgs) # shape [B, 2048, 1, 1]
out = out.view(out.size(0), -1).cpu().numpy()
feats.append(out)
labels.extend(labs.numpy().tolist() if isinstance(labs, torch.Tensor) else labs)
indices.extend(idxs.numpy().tolist() if isinstance(idxs, torch.Tensor) else idxs)
feats = np.vstack(feats)
labels = np.array(labels)
indices = np.array(indices)
return feats, labels, indices
class dummy_cm:
def __enter__(self): return None
def __exit__(self, exc_type, exc, tb): return False
# -------------------------
# Clustering & evaluation
# -------------------------
def compute_purity(y_true, y_pred):
# purity = sum(max count for each cluster) / N
contingency = defaultdict(lambda: Counter())
for t, p in zip(y_true, y_pred):
contingency[p][t] += 1
total = len(y_true)
pure = sum(max(c.values()) for c in contingency.values())
return pure / total
def evaluate_clusters(y_true, y_pred):
results = {}
results['ARI'] = float(adjusted_rand_score(y_true, y_pred))
results['NMI'] = float(normalized_mutual_info_score(y_true, y_pred))
results['homogeneity'] = float(homogeneity_score(y_true, y_pred))
results['completeness'] = float(completeness_score(y_true, y_pred))
results['purity'] = float(compute_purity(y_true, y_pred))
return results
# -------------------------
# Main experiment flow
# -------------------------
def main(args):
set_seed(args.seed)
os.makedirs(args.out_dir, exist_ok=True)
# Read CSV
df = pd.read_csv(args.csv)
if args.image_col not in df.columns or args.label_col not in df.columns:
raise ValueError(f"Column names not found in CSV. Available: {list(df.columns)}")
# Drop missing
df = df.dropna(subset=[args.image_col])
df[args.label_col] = df[args.label_col].fillna("NA").astype(str)
# Stratified splits using labels for even representation (labels used only for stratify)
train_val, test = train_test_split(df, test_size=args.test_size, stratify=df[args.label_col], random_state=args.seed)
train, val = train_test_split(train_val, test_size=args.val_size / (1 - args.test_size),
stratify=train_val[args.label_col], random_state=args.seed)
for name, part in [('train', train), ('val', val), ('test', test)]:
print(f"{name} size: {len(part)}")
# Transforms (ResNet50 expects 224x224)
transform = transforms.Compose([
transforms.Resize((256, 256)),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
datasets = {
'train': ImageCSV_Dataset(train, args.image_root, image_col=args.image_col, label_col=args.label_col, transform=transform),
'val': ImageCSV_Dataset(val, args.image_root, image_col=args.image_col, label_col=args.label_col, transform=transform),
'test': ImageCSV_Dataset(test, args.image_root, image_col=args.image_col, label_col=args.label_col, transform=transform),
}
extractor = FeatureExtractor(device=args.device, batch_size=args.batch_size, num_workers=args.num_workers)
feats_all = {}
labels_all = {}
indices_all = {}
for split in ['train', 'val', 'test']:
print(f"Extracting features for {split} ...")
feats, labs, idxs = extractor.extract(datasets[split])
feats_all[split] = feats
labels_all[split] = labs
indices_all[split] = idxs
print(f" features shape: {feats.shape}")
# Combine features for unsupervised clustering (train+val used to fit PCA/cluster, test for final eval)
X_fit = np.vstack([feats_all['train'], feats_all['val']])
y_fit = np.concatenate([labels_all['train'], labels_all['val']])
X_test = feats_all['test']
y_test = labels_all['test']
# Standardize before PCA/clustering
scaler = StandardScaler().fit(X_fit)
X_fit_s = scaler.transform(X_fit)
X_test_s = scaler.transform(X_test)
# PCA (can use IncrementalPCA for large datasets)
n_components = min(args.pca_components, X_fit_s.shape[1])
print(f"Running PCA -> {n_components} components")
pca = PCA(n_components=n_components, random_state=args.seed)
X_fit_pca = pca.fit_transform(X_fit_s)
X_test_pca = pca.transform(X_test_s)
# Optionally UMAP for visualization
if args.do_umap and umap is not None:
print("Computing UMAP embeddings for visualization")
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=2, random_state=args.seed)
umap_fit = reducer.fit_transform(X_fit_pca)
umap_test = reducer.transform(X_test_pca)
np.save(os.path.join(args.out_dir, "umap_fit.npy"), umap_fit)
np.save(os.path.join(args.out_dir, "umap_test.npy"), umap_test)
elif args.do_umap:
print("UMAP not available. Install umap-learn to enable.")
# Clustering: KMeans and GMM
n_clusters = args.n_clusters if args.n_clusters > 0 else len(np.unique(y_fit))
print(f"Clustering with {n_clusters} clusters (KMeans + GMM)")
kmeans = KMeans(n_clusters=n_clusters, random_state=args.seed).fit(X_fit_pca)
gmm = GaussianMixture(n_components=n_clusters, random_state=args.seed).fit(X_fit_pca)
# Predict cluster labels on test set
kmeans_test = kmeans.predict(X_test_pca)
gmm_test = gmm.predict(X_test_pca)
results = {
'kmeans_test': evaluate_clusters(y_test, kmeans_test),
'gmm_test': evaluate_clusters(y_test, gmm_test),
}
# Optional: HDBSCAN (density-based)
if args.use_hdbscan and hdbscan is not None:
print("Running HDBSCAN on fit data")
clusterer = hdbscan.HDBSCAN(min_cluster_size= args.hdbscan_min_cluster_size, prediction_data=True)
clusterer.fit(X_fit_pca)
# predict on test if possible
try:
hdb_test = hdbscan.prediction.approximate_predict(clusterer, X_test_pca)[0]
results['hdbscan_test'] = evaluate_clusters(y_test, hdb_test)
except Exception as e:
print("HDBSCAN prediction failed:", e)
elif args.use_hdbscan:
print("HDBSCAN requested but hdbscan package not installed.")
# Save results
with open(os.path.join(args.out_dir, "cluster_results.json"), 'w', encoding='utf8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
# Save assignments CSV for test set
test_indices = indices_all['test']
test_rows = df.iloc[test_indices].copy().reset_index(drop=True)
test_rows['kmeans_cluster'] = kmeans_test
test_rows['gmm_cluster'] = gmm_test
test_rows.to_csv(os.path.join(args.out_dir, "test_cluster_assignments.csv"), index=False)
print("Results saved to", args.out_dir)
print("Summary (test):")
for k, v in results.items():
print(f" {k}: {v}")
# -------------------------
# Argument parsing
# -------------------------
def parse_args():
p = argparse.ArgumentParser(description="Unsupervised clustering pipeline for image dataset (uses pretrained ResNet50).")
p.add_argument('--csv', default=r"C:\Users\sof12\Desktop\ML\Datasets\Nocciola\GBIF\tags.csv")
p.add_argument('--image-root', default=r"C:\Users\sof12\Desktop\ML\Datasets\Nocciola\GBIF")
p.add_argument('--image-col', type=str, default='id_img', help="CSV column for image relative path")
p.add_argument('--label-col', type=str, default='fase V', help="CSV column that contains labels (used only for stratify/eval)")
p.add_argument('--out-dir', type=str, default='./results', help="Directory to save results")
p.add_argument('--batch-size', type=int, default=64)
p.add_argument('--num-workers', type=int, default=4)
p.add_argument('--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu')
p.add_argument('--seed', type=int, default=42)
p.add_argument('--test-size', type=float, default=0.2)
p.add_argument('--val-size', type=float, default=0.1)
p.add_argument('--pca-components', type=int, default=128)
p.add_argument('--n-clusters', type=int, default=0, help="If 0, use number of unique labels in train+val")
p.add_argument('--do-umap', action='store_true', help="Compute UMAP embeddings for visualization (optional)")
p.add_argument('--use-hdbscan', action='store_true', help="Run HDBSCAN clustering (optional)")
p.add_argument('--hdbscan-min-cluster-size', type=int, default=5)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
main(args)