""" MobileNetV2 Transfer Learning para Clasificación de Fases Fenológicas - Nocciola Adaptado para Visual Studio Code Dataset: Nocciola GBIF Objetivo: Predecir fase R (fenológica reproductive) """ import os import shutil import random import argparse from pathlib import Path import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import tensorflow as tf from tensorflow.keras import layers, models from tensorflow.keras.applications import MobileNetV2 from tensorflow.keras.preprocessing.image import ImageDataGenerator from sklearn.metrics import classification_report, confusion_matrix from sklearn.utils import class_weight import json # ----------------- CONFIG ----------------- PROJECT_PATH = r'C:\Users\sof12\Desktop\ML\Datasets\Nocciola\GBIF' IMAGES_DIR = PROJECT_PATH # Las imágenes están en el directorio principal CSV_PATH = os.path.join(PROJECT_PATH, 'assignments.csv') # CSV principal OUTPUT_DIR = os.path.join(PROJECT_PATH, 'results_mobilenet_faseV_V1') os.makedirs(OUTPUT_DIR, exist_ok=True) IMG_SIZE = (224, 224) # Recomendado para MobileNetV2 BATCH_SIZE = 16 # Reducido para mejor estabilidad SEED = 42 SPLIT = {'train': 0.7, 'val': 0.15, 'test': 0.15} FORCE_SPLIT = False # ----------------- Utilities ----------------- def set_seed(seed=42): """Establecer semilla para reproducibilidad""" random.seed(seed) np.random.seed(seed) tf.random.set_seed(seed) def analyze_class_distribution(df, column_name='fase V'): """Analizar distribución de clases y detectar desbalances""" print(f"\n📊 === Análisis de Distribución de Clases ===") # Contar por clase counts = df[column_name].value_counts() total = len(df) print(f"📊 Total de muestras: {total}") print(f"📊 Número de clases: {len(counts)}") print(f"📊 Distribución por clase:") # Mostrar estadísticas detalladas for clase, count in counts.items(): percentage = (count / total) * 100 print(f" - {clase}: {count} muestras ({percentage:.1f}%)") # Detectar clases problemáticas min_samples = 5 # Umbral mínimo recomendado small_classes = counts[counts < min_samples] if len(small_classes) > 0: print(f"\n⚠️ Clases con menos de {min_samples} muestras:") for clase, count in small_classes.items(): print(f" - {clase}: {count} muestras") print(f"\n💡 Recomendaciones:") print(f" 1. Considera recolectar más datos para estas clases") print(f" 2. O fusionar clases similares") print(f" 3. O usar técnicas de data augmentation específicas") return counts, small_classes def safe_read_csv(path): """Leer CSV con manejo de encoding""" if not os.path.exists(path): raise FileNotFoundError(f'CSV no encontrado: {path}') try: df = pd.read_csv(path, encoding='utf-8') except UnicodeDecodeError: try: df = pd.read_csv(path, encoding='latin-1') except: df = pd.read_csv(path, encoding='iso-8859-1') return df def resolve_image_path(images_dir, img_id): """Resolver la ruta completa de una imagen""" if pd.isna(img_id) or str(img_id).strip() == '': return None img_id = str(img_id).strip() # Verificar si ya incluye extensión y existe direct_path = os.path.join(images_dir, img_id) if os.path.exists(direct_path): return direct_path # Probar con extensiones comunes stem = os.path.splitext(img_id)[0] for ext in ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']: img_path = os.path.join(images_dir, stem + ext) if os.path.exists(img_path): return img_path return None def prepare_image_folders(df, images_dir, out_dir, split=SPLIT, seed=SEED): """Crear estructura de carpetas para flow_from_directory""" set_seed(seed) # Filtrar solo filas con fase R válida e imágenes existentes print(f"📊 Datos iniciales: {len(df)} filas") # Filtrar filas con fase R válida df_valid = df.dropna(subset=['fase V']).copy() df_valid = df_valid[df_valid['fase V'].str.strip() != ''] print(f"📊 Con fase V válida: {len(df_valid)} filas") # Verificar existencia de imágenes valid_rows = [] for _, row in df_valid.iterrows(): img_path = resolve_image_path(images_dir, row['id_img']) if img_path: valid_rows.append(row) else: print(f"⚠️ Imagen no encontrada: {row['id_img']}") if not valid_rows: raise ValueError("❌ No se encontraron imágenes válidas") df_final = pd.DataFrame(valid_rows) print(f"📊 Con imágenes existentes: {len(df_final)} filas") # Mostrar distribución de clases fase_counts = df_final['fase V'].value_counts() print(f"\n📊 Distribución de fases R:") for fase, count in fase_counts.items(): print(f" - {fase}: {count} imágenes") # Remover clases con muy pocas muestras (menos de 3) min_samples = 3 valid_phases = fase_counts[fase_counts >= min_samples].index.tolist() if len(valid_phases) < len(fase_counts): excluded = fase_counts[fase_counts < min_samples].index.tolist() print(f"⚠️ Excluyendo fases con menos de {min_samples} muestras: {excluded}") df_final = df_final[df_final['fase V'].isin(valid_phases)] print(f"📊 Después de filtrar: {len(df_final)} filas, {len(valid_phases)} clases") labels = df_final['fase V'].unique().tolist() print(f"📊 Clases finales: {labels}") # Mezclar y dividir datos df_shuffled = df_final.sample(frac=1, random_state=seed).reset_index(drop=True) n = len(df_shuffled) n_train = int(n * split['train']) n_val = int(n * split['val']) train_df = df_shuffled.iloc[:n_train] val_df = df_shuffled.iloc[n_train:n_train + n_val] test_df = df_shuffled.iloc[n_train + n_val:] print(f"📊 División final:") print(f" - Entrenamiento: {len(train_df)} imágenes") print(f" - Validación: {len(val_df)} imágenes") print(f" - Prueba: {len(test_df)} imágenes") # Crear estructura de carpetas for part in ['train', 'val', 'test']: for label in labels: label_dir = os.path.join(out_dir, part, str(label)) os.makedirs(label_dir, exist_ok=True) # Función para copiar imágenes def copy_subset(subdf, subset_name): copied, missing = 0, 0 for _, row in subdf.iterrows(): src = resolve_image_path(images_dir, row['id_img']) if src: fase = str(row['fase V']) dst = os.path.join(out_dir, subset_name, fase, f"{row['id_img']}.jpg") try: shutil.copy2(src, dst) copied += 1 except Exception as e: print(f"⚠️ Error copiando {src}: {e}") missing += 1 else: missing += 1 print(f"✅ {subset_name}: {copied} imágenes copiadas, {missing} fallidas") return copied # Copiar imágenes a las carpetas correspondientes copy_subset(train_df, 'train') copy_subset(val_df, 'val') copy_subset(test_df, 'test') return train_df, val_df, test_df def main(): """Función principal del pipeline""" parser = argparse.ArgumentParser(description='MobileNetV2 Transfer Learning para Nocciola') parser.add_argument('--csv_path', type=str, default=CSV_PATH, help='Ruta al archivo CSV con metadatos') parser.add_argument('--images_dir', type=str, default=IMAGES_DIR, help='Directorio con las imágenes') parser.add_argument('--output_dir', type=str, default=OUTPUT_DIR, help='Directorio de salida para resultados') parser.add_argument('--epochs', type=int, default=30, help='Número de épocas de entrenamiento') parser.add_argument('--force_split', action='store_true', help='Forzar recreación del split de datos') args = parser.parse_args() print('\n🚀 === Inicio del pipeline MobileNetV2 para Nocciola ===') print(f"📁 Directorio de imágenes: {args.images_dir}") print(f"📄 Archivo CSV: {args.csv_path}") print(f"📂 Directorio de salida: {args.output_dir}") # Establecer semilla set_seed(SEED) # Crear directorio de salida os.makedirs(args.output_dir, exist_ok=True) # Leer datos print('\n📊 === Cargando datos ===') df = safe_read_csv(args.csv_path) print(f'📊 Total de registros en CSV: {len(df)}') print(f'📊 Columnas disponibles: {list(df.columns)}') # Verificar columnas requeridas required_cols = {'id_img', 'fase V'} if not required_cols.issubset(set(df.columns)): missing = required_cols - set(df.columns) raise ValueError(f'❌ CSV debe contener las columnas: {missing}') # Analizar distribución de clases antes del procesamiento analyze_class_distribution(df, 'fase V') # Preparar estructura de carpetas SPLIT_DIR = os.path.join(args.output_dir, 'data_split') if args.force_split and os.path.exists(SPLIT_DIR): print("🗑️ Eliminando split existente...") shutil.rmtree(SPLIT_DIR) if not os.path.exists(SPLIT_DIR): print("\n📁 === Creando nueva división de datos ===") train_df, val_df, test_df = prepare_image_folders(df, args.images_dir, SPLIT_DIR) # Guardar información del split train_df.to_csv(os.path.join(args.output_dir, 'train_split.csv'), index=False) val_df.to_csv(os.path.join(args.output_dir, 'val_split.csv'), index=False) test_df.to_csv(os.path.join(args.output_dir, 'test_split.csv'), index=False) else: print("\n♻️ === Reutilizando división existente ===") # Cargar información del split si existe try: train_df = pd.read_csv(os.path.join(args.output_dir, 'train_split.csv')) val_df = pd.read_csv(os.path.join(args.output_dir, 'val_split.csv')) test_df = pd.read_csv(os.path.join(args.output_dir, 'test_split.csv')) except: print("⚠️ No se pudieron cargar los archivos de split, recreando...") train_df, val_df, test_df = prepare_image_folders(df, args.images_dir, SPLIT_DIR) # Crear generadores de datos print("\n🔄 === Creando generadores de datos ===") # Data augmentation para entrenamiento train_datagen = ImageDataGenerator( rescale=1./255, rotation_range=20, width_shift_range=0.1, height_shift_range=0.1, shear_range=0.1, zoom_range=0.1, horizontal_flip=True, fill_mode='nearest' ) # Solo normalización para validación y test val_test_datagen = ImageDataGenerator(rescale=1./255) # Crear generadores train_gen = train_datagen.flow_from_directory( os.path.join(SPLIT_DIR, 'train'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', seed=SEED ) val_gen = val_test_datagen.flow_from_directory( os.path.join(SPLIT_DIR, 'val'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False ) test_gen = val_test_datagen.flow_from_directory( os.path.join(SPLIT_DIR, 'test'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False ) # Guardar mapeo de clases class_indices = train_gen.class_indices print(f'🏷️ Mapeo de clases: {class_indices}') with open(os.path.join(args.output_dir, 'class_indices.json'), 'w') as f: json.dump(class_indices, f, indent=2) print(f"📊 Muestras por conjunto:") print(f" - Entrenamiento: {train_gen.samples}") print(f" - Validación: {val_gen.samples}") print(f" - Prueba: {test_gen.samples}") print(f" - Número de clases: {train_gen.num_classes}") # Crear y entrenar modelo print("\n🤖 === Construcción del modelo ===") # Modelo base MobileNetV2 base_model = MobileNetV2( weights='imagenet', include_top=False, input_shape=(*IMG_SIZE, 3) ) base_model.trainable = False # Congelar inicialmente # Construir modelo secuencial model = models.Sequential([ base_model, layers.GlobalAveragePooling2D(), layers.Dropout(0.3), layers.Dense(128, activation='relu'), layers.Dropout(0.3), layers.Dense(train_gen.num_classes, activation='softmax') ]) # Compilar modelo model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'] ) print("📋 Resumen del modelo:") model.summary() # Calcular pesos de clase print("\n⚖️ === Calculando pesos de clase ===") try: # Obtener etiquetas de entrenamiento train_labels = [] for i in range(len(train_gen)): _, labels = train_gen[i] train_labels.extend(np.argmax(labels, axis=1)) if len(train_labels) >= train_gen.samples: break # Calcular pesos balanceados class_weights = class_weight.compute_class_weight( 'balanced', classes=np.unique(train_labels), y=train_labels ) class_weight_dict = dict(zip(np.unique(train_labels), class_weights)) print(f"⚖️ Pesos de clase: {class_weight_dict}") except Exception as e: print(f"⚠️ Error calculando pesos de clase: {e}") class_weight_dict = None # Callbacks para entrenamiento early_stopping = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=7, restore_best_weights=True, verbose=1 ) model_checkpoint = tf.keras.callbacks.ModelCheckpoint( os.path.join(args.output_dir, 'best_model.keras'), save_best_only=True, monitor='val_loss', verbose=1 ) reduce_lr = tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.2, patience=3, min_lr=1e-7, verbose=1 ) callbacks = [early_stopping, model_checkpoint, reduce_lr] # Entrenamiento inicial print(f"\n🏋️ === Entrenamiento inicial ({args.epochs} épocas) ===") try: history = model.fit( train_gen, validation_data=val_gen, epochs=args.epochs, callbacks=callbacks, class_weight=class_weight_dict, verbose=1 ) print("✅ Entrenamiento inicial completado") except Exception as e: print(f"❌ Error durante entrenamiento: {e}") # Entrenar sin class_weight si hay problemas print("🔄 Intentando entrenamiento sin pesos de clase...") history = model.fit( train_gen, validation_data=val_gen, epochs=args.epochs, callbacks=callbacks, verbose=1 ) # Fine-tuning print("\n🔧 === Fine-tuning ===") # Descongelar algunas capas del modelo base base_model.trainable = True fine_tune_at = 100 # Descongelar las últimas 100 capas for layer in base_model.layers[:fine_tune_at]: layer.trainable = False # Recompilar con learning rate más bajo model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy'] ) # Continuar entrenamiento fine_tune_epochs = 10 total_epochs = len(history.history['loss']) + fine_tune_epochs try: history_fine = model.fit( train_gen, validation_data=val_gen, epochs=total_epochs, initial_epoch=len(history.history['loss']), callbacks=callbacks, verbose=1 ) print("✅ Fine-tuning completado") # Combinar historiales for key in history.history: if key in history_fine.history: history.history[key].extend(history_fine.history[key]) except Exception as e: print(f"⚠️ Error durante fine-tuning: {e}") print("Continuando con modelo del entrenamiento inicial...") # Evaluación final print("\n📊 === Evaluación en conjunto de prueba ===") # Cargar mejor modelo try: model.load_weights(os.path.join(args.output_dir, 'best_model.keras')) print("✅ Cargado mejor modelo guardado") except: print("⚠️ Usando modelo actual") # Guardar modelo final model.save(os.path.join(args.output_dir, 'final_model.keras')) print("💾 Modelo final guardado") # Predicciones en test test_gen.reset() y_pred_prob = model.predict(test_gen, verbose=1) y_pred = np.argmax(y_pred_prob, axis=1) y_true = test_gen.classes # Mapeo de índices a nombres de clase index_to_class = {v: k for k, v in class_indices.items()} # Obtener solo las clases que realmente aparecen en el conjunto de test unique_test_classes = np.unique(np.concatenate([y_true, y_pred])) test_class_names = [index_to_class[i] for i in unique_test_classes] print(f"📊 Clases en conjunto de test: {len(unique_test_classes)}") print(f"📊 Todas las clases entrenadas: {len(class_indices)}") print(f"📊 Clases presentes en test: {test_class_names}") # Verificar si hay clases faltantes all_classes = set(range(len(class_indices))) test_classes = set(unique_test_classes) missing_classes = all_classes - test_classes if missing_classes: missing_names = [index_to_class[i] for i in missing_classes] print(f"⚠️ Clases sin muestras en test: {missing_names}") # Reporte de clasificación con clases filtradas print("\n📋 === Reporte de Clasificación ===") try: report = classification_report( y_true, y_pred, labels=unique_test_classes, # Especificar las clases exactas target_names=test_class_names, output_dict=False, zero_division=0 # Manejar divisiones por cero ) print(report) # Guardar reporte with open(os.path.join(args.output_dir, 'classification_report.txt'), 'w') as f: f.write(f"Clases evaluadas: {test_class_names}\n") f.write(f"Clases faltantes en test: {[index_to_class[i] for i in missing_classes] if missing_classes else 'Ninguna'}\n\n") f.write(report) except Exception as e: print(f"❌ Error en classification_report: {e}") print("📊 Generando reporte alternativo...") # Reporte manual si falla el automático from collections import Counter true_counts = Counter(y_true) pred_counts = Counter(y_pred) print("\n📊 Distribución manual:") print("Clase | Verdaderos | Predichos") print("-" * 35) for class_idx in unique_test_classes: class_name = index_to_class[class_idx] true_count = true_counts.get(class_idx, 0) pred_count = pred_counts.get(class_idx, 0) print(f"{class_name[:15]:15} | {true_count:10} | {pred_count:9}") # Calcular accuracy básico accuracy = np.mean(y_true == y_pred) print(f"\n📊 Accuracy general: {accuracy:.4f}") # Guardar reporte manual with open(os.path.join(args.output_dir, 'classification_report.txt'), 'w') as f: f.write("REPORTE MANUAL DE CLASIFICACIÓN\n") f.write("=" * 40 + "\n\n") f.write(f"Clases evaluadas: {test_class_names}\n") f.write(f"Clases faltantes en test: {[index_to_class[i] for i in missing_classes] if missing_classes else 'Ninguna'}\n\n") f.write("Distribución por clase:\n") f.write("Clase | Verdaderos | Predichos\n") f.write("-" * 35 + "\n") for class_idx in unique_test_classes: class_name = index_to_class[class_idx] true_count = true_counts.get(class_idx, 0) pred_count = pred_counts.get(class_idx, 0) f.write(f"{class_name[:15]:15} | {true_count:10} | {pred_count:9}\n") f.write(f"\nAccuracy general: {accuracy:.4f}\n") # Matriz de confusión con clases filtradas cm = confusion_matrix(y_true, y_pred, labels=unique_test_classes) print(f"\n🔢 Matriz de Confusión ({len(unique_test_classes)} clases):") print(cm) np.savetxt(os.path.join(args.output_dir, 'confusion_matrix.csv'), cm, delimiter=',', fmt='%d') # Visualizaciones con clases filtradas print("\n📈 === Generando visualizaciones ===") # Gráfico de entrenamiento plot_training_history(history, args.output_dir) # Matriz de confusión visual con clases filtradas plot_confusion_matrix(cm, test_class_names, args.output_dir) # Ejemplos de predicciones con clases filtradas plot_prediction_examples(test_gen, y_true, y_pred, test_class_names, args.output_dir, unique_test_classes) print(f"\n🎉 === Pipeline completado ===") print(f"📁 Resultados guardados en: {args.output_dir}") print(f"📊 Precisión final en test: {np.mean(y_true == y_pred):.4f}") print(f"📊 Clases evaluadas: {len(unique_test_classes)}/{len(class_indices)}") # Información adicional sobre clases desbalanceadas if missing_classes: print(f"\n⚠️ === Información sobre Clases Desbalanceadas ===") print(f"❌ Clases sin muestras en test: {len(missing_classes)}") for missing_idx in missing_classes: missing_name = index_to_class[missing_idx] print(f" - {missing_name} (índice {missing_idx})") print(f"💡 Sugerencia: Considera aumentar el dataset o fusionar clases similares") def plot_training_history(history, output_dir): """Graficar historial de entrenamiento""" try: plt.figure(figsize=(12, 4)) # Accuracy plt.subplot(1, 2, 1) plt.plot(history.history['accuracy'], label='Entrenamiento') if 'val_accuracy' in history.history: plt.plot(history.history['val_accuracy'], label='Validación') plt.title('Precisión del Modelo') plt.xlabel('Época') plt.ylabel('Precisión') plt.legend() plt.grid(True) # Loss plt.subplot(1, 2, 2) plt.plot(history.history['loss'], label='Entrenamiento') if 'val_loss' in history.history: plt.plot(history.history['val_loss'], label='Validación') plt.title('Pérdida del Modelo') plt.xlabel('Época') plt.ylabel('Pérdida') plt.legend() plt.grid(True) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'training_history.png'), dpi=300, bbox_inches='tight') plt.close() print("✅ Gráfico de entrenamiento guardado") except Exception as e: print(f"⚠️ Error creando gráfico de entrenamiento: {e}") def plot_confusion_matrix(cm, class_names, output_dir): """Graficar matriz de confusión""" try: plt.figure(figsize=(10, 8)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names) plt.title('Matriz de Confusión') plt.ylabel('Etiqueta Verdadera') plt.xlabel('Etiqueta Predicha') plt.xticks(rotation=45, ha='right') plt.yticks(rotation=0) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'confusion_matrix.png'), dpi=300, bbox_inches='tight') plt.close() print("✅ Matriz de confusión guardada") except Exception as e: print(f"⚠️ Error creando matriz de confusión: {e}") def plot_prediction_examples(test_gen, y_true, y_pred, class_names, output_dir, unique_classes=None, n_examples=12): """Mostrar ejemplos de predicciones correctas e incorrectas""" try: # Obtener índices de predicciones correctas e incorrectas correct_idx = np.where(y_true == y_pred)[0] incorrect_idx = np.where(y_true != y_pred)[0] # Seleccionar ejemplos n_correct = min(n_examples // 2, len(correct_idx)) n_incorrect = min(n_examples // 2, len(incorrect_idx)) selected_correct = np.random.choice(correct_idx, n_correct, replace=False) if len(correct_idx) > 0 else [] selected_incorrect = np.random.choice(incorrect_idx, n_incorrect, replace=False) if len(incorrect_idx) > 0 else [] selected_indices = np.concatenate([selected_correct, selected_incorrect]) if len(selected_indices) == 0: print("⚠️ No hay ejemplos para mostrar") return # Crear gráfico n_show = len(selected_indices) cols = 4 rows = (n_show + cols - 1) // cols plt.figure(figsize=(15, 4 * rows)) for i, idx in enumerate(selected_indices): plt.subplot(rows, cols, i + 1) # Obtener imagen # Nota: esto es una aproximación, idealmente necesitaríamos acceder a las imágenes originales img_path = test_gen.filepaths[idx] img = plt.imread(img_path) plt.imshow(img) plt.axis('off') true_label = class_names[y_true[idx]] pred_label = class_names[y_pred[idx]] color = 'green' if y_true[idx] == y_pred[idx] else 'red' plt.title(f'Real: {true_label}\nPredicción: {pred_label}', color=color, fontsize=10) plt.suptitle('Ejemplos de Predicciones (Verde=Correcta, Rojo=Incorrecta)', fontsize=14) plt.tight_layout() plt.savefig(os.path.join(output_dir, 'prediction_examples.png'), dpi=300, bbox_inches='tight') plt.close() print("✅ Ejemplos de predicciones guardados") except Exception as e: print(f"⚠️ Error creando ejemplos de predicciones: {e}") if __name__ == "__main__": main() # ----------------- MAIN ----------------- print('\n=== Start of the pipeline ===') df = safe_read_csv(CSV_PATH) print('Total registered images in the CSV:', len(df)) # Check columns required_cols = {'id_img','fase V'} if not required_cols.issubset(set(df.columns)): raise ValueError(f'CSV must contain the columns: {required_cols}') # Prepare folders SPLIT_DIR = os.path.join(PROJECT_PATH, 'results_nocc/split_fase V') if FORCE_SPLIT: shutil.rmtree(SPLIT_DIR, ignore_errors=True) if not os.path.exists(SPLIT_DIR): print("Creating a new split...") train_df, val_df, test_df = prepare_image_folders(df, IMAGES_DIR, SPLIT_DIR) else: print("Reusing existing split...") # Load the dataframes from the created split directories train_df = pd.DataFrame([(f.name.split('.')[0], Path(f).parent.name) for f in Path(os.path.join(SPLIT_DIR, 'train')).rglob('*.jpg')], columns=['id_img', 'fase']) val_df = pd.DataFrame([(f.name.split('.')[0], Path(f).parent.name) for f in Path(os.path.join(SPLIT_DIR, 'val')).rglob('*.jpg')], columns=['id_img', 'fase']) test_df = pd.DataFrame([(f.name.split('.')[0], Path(f).parent.name) for f in Path(os.path.join(SPLIT_DIR, 'test')).rglob('*.jpg')], columns=['id_img', 'fase']) # Data generators train_datagen = ImageDataGenerator(rescale=1./255, rotation_range=20, width_shift_range=0.1, height_shift_range=0.1, shear_range=0.1, zoom_range=0.1, horizontal_flip=True, fill_mode='nearest') val_test_datagen = ImageDataGenerator(rescale=1./255) train_gen = train_datagen.flow_from_directory(os.path.join(SPLIT_DIR,'train'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', seed=SEED) val_gen = val_test_datagen.flow_from_directory(os.path.join(SPLIT_DIR,'val'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False) test_gen = val_test_datagen.flow_from_directory(os.path.join(SPLIT_DIR,'test'), target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False) # Save class mapping->index class_indices = train_gen.class_indices print('Class indices:', class_indices) import json with open(os.path.join(OUTPUT_DIR,'class_indices.txt'),'w') as f: json.dump(class_indices, f) # ----------------- Modelo (Transfer Learning MobileNetV2) - Retraining from scratch ----------------- print('\n=== Inicio del entrenamiento desde cero ===') # Define the MobileNetV2 base model base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(*IMG_SIZE,3)) # Set the base model to not be trainable initially base_model.trainable = False # Build a new sequential model model = models.Sequential([ base_model, layers.GlobalAveragePooling2D(), layers.Dropout(0.3), layers.Dense(128, activation='relu'), layers.Dropout(0.3), layers.Dense(train_gen.num_classes, activation='softmax') # Use the correct number of classes ]) # Compile the new model model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy']) model.summary() # Define callbacks early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) chk = tf.keras.callbacks.ModelCheckpoint(os.path.join(OUTPUT_DIR,'best_model.keras'), save_best_only=True) # Calculate class weights (re-calculate in case the dataframe changed) from sklearn.utils import class_weight class_weights = class_weight.compute_class_weight( 'balanced', classes=np.unique(train_df['fase']), y=train_df['fase'] ) class_weights = dict(zip(np.unique(train_gen.classes), class_weights)) print("Class weights for training:", class_weights) # Train the new model EPOCHS = 45 # Use the original number of epochs for the first phase history = model.fit( train_gen, validation_data=val_gen, epochs=EPOCHS, callbacks=[early, chk], class_weight=class_weights ) # --- FINE-TUNING --- print('\n=== Start of fine-tuning phase ===') # Thawing some layers base_model.trainable = True fine_tune_at = 100 # This value could be adjusted, for example the last 100 layers for layer in base_model.layers[:fine_tune_at]: layer.trainable = False # Recompiling the model with a lower learning rate model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy']) model.summary() # Continuar entrenamiento fine_tune_epochs = 10 total_epochs = EPOCHS + fine_tune_epochs history_fine_tune = model.fit( train_gen, validation_data=val_gen, epochs=total_epochs, initial_epoch=history.epoch[-1], callbacks=[early, chk] # Using the same callbacks ) # ----------------- Evaluación en Test ----------------- print('\n=== Evaluation in a test set ===') # Load the best weights saved during the training (from either initial or fine-tuning phase) model.load_weights(os.path.join(OUTPUT_DIR,'best_model.keras')) # Predictions y_pred_prob = model.predict(test_gen) y_pred = np.argmax(y_pred_prob, axis=1) y_true = test_gen.classes # Load the full class indices mapping import json with open(os.path.join(OUTPUT_DIR,'class_indices.txt'),'r') as f: full_class_indices = json.load(f) # Get the corresponding class names for all classes from the loaded class_indices index_to_class = {v: k for k, v in full_class_indices.items()} all_class_names = [index_to_class[i] for i in sorted(index_to_class.keys())] # Get the unique class indices present in the test set unique_test_indices = np.unique(y_true) # Get the corresponding class names for the unique test indices test_labels_filtered = [index_to_class[i] for i in unique_test_indices] # Reporte report = classification_report(y_true, y_pred, labels=unique_test_indices, target_names=test_labels_filtered) # Use unique_test_indices for labels and test_labels_filtered for target_names cm = confusion_matrix(y_true, y_pred, labels=unique_test_indices) # Specify labels for confusion matrix print('\nClassification Report:\n', report) print('\nConfusion Matrix:\n', cm) with open(os.path.join(OUTPUT_DIR,'classification_report.txt'),'w') as f: f.write(report) np.savetxt(os.path.join(OUTPUT_DIR,'confusion_matrix.csv'), cm, delimiter=',', fmt='%d') # ----------------- Visualizations ----------------- def show_examples(test_gen, y_true, y_pred, labels, n=6): filepaths = [] for i in range(len(test_gen.filepaths)): filepaths.append(test_gen.filepaths[i]) # select examples correct_idx = [i for i,(a,b) in enumerate(zip(y_true,y_pred)) if a==b] wrong_idx = [i for i,(a,b) in enumerate(zip(y_true,y_pred)) if a!=b] examples = (correct_idx[:n//2] if len(correct_idx)>0 else []) + (wrong_idx[:n//2] if len(wrong_idx)>0 else []) plt.figure(figsize=(15,8)) for i, idx in enumerate(examples): img = plt.imread(filepaths[idx]) plt.subplot(2, n//2, i+1) plt.imshow(img) plt.axis('off') plt.title(f'True: {labels[y_true[idx]]}\nPred: {labels[y_pred[idx]]}') plt.suptitle('Examples: Right and Wrong') plt.show() # Call the function with the correct variables from the evaluation step show_examples(test_gen, y_true, y_pred, all_class_names, n=6) # plt.figure(figsize=(8,4)) plt.plot(history.history['accuracy'], label='train_acc') plt.plot(history.history['val_accuracy'], label='val_acc') # Include fine-tuning history if it exists if 'history_fine_tune' in locals(): plt.plot(history_fine_tune.history['accuracy'], label='fine_tune_train_acc') plt.plot(history_fine_tune.history['val_accuracy'], label='fine_tune_val_acc') plt.title('Accuracy during training') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() plt.grid() plt.show() # Heatmap of the confusion matrix plt.figure(figsize=(8,6)) sns.heatmap(cm, annot=True, fmt='d', xticklabels=all_class_names, yticklabels=all_class_names, cmap='Blues') plt.xlabel('Prediction') plt.ylabel('True') plt.title('Matriz de confusión') plt.show()