333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""
|
|
Script to upload images to S3/R2 cloud storage for the Phenology project.
|
|
Handles directory structures and provides upload statistics.
|
|
"""
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
|
|
class CloudUploader:
|
|
"""Class to manage image uploads to S3/R2"""
|
|
|
|
def __init__(self, endpoint_url: str, access_key: str, secret_key: str, bucket_name: str):
|
|
"""
|
|
Initializes the S3 client
|
|
|
|
Args:
|
|
endpoint_url: S3 endpoint URL
|
|
access_key: Access key
|
|
secret_key: Secret key
|
|
bucket_name: Bucket name
|
|
"""
|
|
self.bucket_name = bucket_name
|
|
self.s3_client = boto3.client(
|
|
's3',
|
|
endpoint_url=endpoint_url,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
region_name='auto' # For Cloudflare R2
|
|
)
|
|
|
|
def create_folder(self, folder_path: str) -> bool:
|
|
"""
|
|
Creates a folder in S3 (actually creates an empty object with a trailing slash)
|
|
|
|
Args:
|
|
folder_path: Folder path (should end with /)
|
|
|
|
Returns:
|
|
True if created successfully, False otherwise
|
|
"""
|
|
try:
|
|
if not folder_path.endswith('/'):
|
|
folder_path += '/'
|
|
|
|
self.s3_client.put_object(
|
|
Bucket=self.bucket_name,
|
|
Key=folder_path,
|
|
Body=b''
|
|
)
|
|
print(f"Folder created: {folder_path}")
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error creating folder {folder_path}: {e}")
|
|
return False
|
|
|
|
def upload_file(self, local_path: str, s3_path: str) -> bool:
|
|
"""
|
|
Uploads a file to S3
|
|
|
|
Args:
|
|
local_path: Local file path
|
|
s3_path: Destination path in S3
|
|
|
|
Returns:
|
|
True if uploaded successfully, False otherwise
|
|
"""
|
|
try:
|
|
self.s3_client.upload_file(local_path, self.bucket_name, s3_path)
|
|
print(f"File uploaded: {s3_path}")
|
|
return True
|
|
except ClientError as e:
|
|
print(f"Error uploading {local_path}: {e}")
|
|
return False
|
|
except FileNotFoundError:
|
|
print(f"File not found: {local_path}")
|
|
return False
|
|
|
|
def upload_directory(self, local_dir: str, dataset_name: str, source_type: str) -> dict:
|
|
"""
|
|
Uploads an entire directory to S3 with nested structure
|
|
|
|
Args:
|
|
local_dir: Local directory to upload
|
|
dataset_name: Dataset name (nocciola, artichoke, etc.)
|
|
source_type: Source type (GBIF, SanDam, AV, Combi)
|
|
|
|
Returns:
|
|
Dictionary with upload statistics
|
|
"""
|
|
stats = {
|
|
'total_files': 0,
|
|
'uploaded': 0,
|
|
'failed': 0,
|
|
'folders_created': 0
|
|
}
|
|
|
|
local_path = Path(local_dir)
|
|
if not local_path.exists():
|
|
print(f"Directory does not exist: {local_dir}")
|
|
return stats
|
|
|
|
# Create base folder structure
|
|
base_path = f"datasets/{dataset_name}/{source_type}"
|
|
|
|
# Create base folder
|
|
if self.create_folder(base_path):
|
|
stats['folders_created'] += 1
|
|
|
|
# Gather all folders and files
|
|
folders_created = set()
|
|
|
|
for root, dirs, files in os.walk(local_dir):
|
|
# Calculate relative path
|
|
rel_path = Path(root).relative_to(local_path)
|
|
|
|
# Create folders in S3 if necessary
|
|
if str(rel_path) != '.':
|
|
s3_folder = f"{base_path}/{rel_path}".replace('\\', '/')
|
|
if s3_folder not in folders_created:
|
|
if self.create_folder(s3_folder):
|
|
stats['folders_created'] += 1
|
|
folders_created.add(s3_folder)
|
|
|
|
# Upload files
|
|
for file in files:
|
|
stats['total_files'] += 1
|
|
local_file = os.path.join(root, file)
|
|
|
|
if str(rel_path) != '.':
|
|
s3_file = f"{base_path}/{rel_path}/{file}".replace('\\', '/')
|
|
else:
|
|
s3_file = f"{base_path}/{file}".replace('\\', '/')
|
|
|
|
if self.upload_file(local_file, s3_file):
|
|
stats['uploaded'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
return stats
|
|
|
|
def list_objects(self, prefix: str = "", max_keys: int = 1000) -> List[str]:
|
|
"""
|
|
Lists objects in the bucket with a given prefix
|
|
|
|
Args:
|
|
prefix: Prefix to filter objects
|
|
max_keys: Maximum number of objects to list
|
|
|
|
Returns:
|
|
List of object keys
|
|
"""
|
|
try:
|
|
response = self.s3_client.list_objects_v2(
|
|
Bucket=self.bucket_name,
|
|
Prefix=prefix,
|
|
MaxKeys=max_keys
|
|
)
|
|
|
|
if 'Contents' in response:
|
|
return [obj['Key'] for obj in response['Contents']]
|
|
return []
|
|
except ClientError as e:
|
|
print(f"Error listing objects: {e}")
|
|
return []
|
|
|
|
def delete_folder(self, folder_path: str) -> dict:
|
|
"""
|
|
Deletes a complete folder and all its contents from S3
|
|
|
|
Args:
|
|
folder_path: Path of the folder to delete (e.g., "datasets/nocciola/GBIF")
|
|
|
|
Returns:
|
|
Dictionary with deletion statistics
|
|
"""
|
|
stats = {
|
|
'total_objects': 0,
|
|
'deleted': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
# Ensure the path ends with / to match folders
|
|
if not folder_path.endswith('/'):
|
|
folder_path += '/'
|
|
|
|
print(f"Searching for objects in: {folder_path}")
|
|
|
|
try:
|
|
# List all objects with the given prefix
|
|
continuation_token = None
|
|
|
|
while True:
|
|
# Build the list request
|
|
list_kwargs = {
|
|
'Bucket': self.bucket_name,
|
|
'Prefix': folder_path
|
|
}
|
|
|
|
if continuation_token:
|
|
list_kwargs['ContinuationToken'] = continuation_token
|
|
|
|
response = self.s3_client.list_objects_v2(**list_kwargs)
|
|
|
|
if 'Contents' not in response:
|
|
print(f"No objects found in {folder_path}")
|
|
break
|
|
|
|
# Prepare objects for deletion
|
|
objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
|
|
stats['total_objects'] += len(objects_to_delete)
|
|
|
|
# Delete objects in batches (max 1000 per request)
|
|
if objects_to_delete:
|
|
delete_response = self.s3_client.delete_objects(
|
|
Bucket=self.bucket_name,
|
|
Delete={'Objects': objects_to_delete}
|
|
)
|
|
|
|
# Count successful deletions
|
|
if 'Deleted' in delete_response:
|
|
deleted_count = len(delete_response['Deleted'])
|
|
stats['deleted'] += deleted_count
|
|
print(f"Deleted {deleted_count} objects")
|
|
|
|
# Count failed deletions
|
|
if 'Errors' in delete_response:
|
|
failed_count = len(delete_response['Errors'])
|
|
stats['failed'] += failed_count
|
|
for error in delete_response['Errors']:
|
|
print(f"Error deleting {error['Key']}: {error['Message']}")
|
|
|
|
# Check if there are more objects to list
|
|
if response.get('IsTruncated', False):
|
|
continuation_token = response.get('NextContinuationToken')
|
|
else:
|
|
break
|
|
|
|
print(f"\nDeletion completed for: {folder_path}")
|
|
print(f" - Total objects found: {stats['total_objects']}")
|
|
print(f" - Successfully deleted: {stats['deleted']}")
|
|
print(f" - Failed: {stats['failed']}")
|
|
|
|
except ClientError as e:
|
|
print(f"Error deleting folder {folder_path}: {e}")
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
"""Main function to run the script"""
|
|
|
|
# Connection configuration
|
|
ENDPOINT_URL = "https://98202a9866f06230112dc6a966150a5a.r2.cloudflarestorage.com"
|
|
ACCESS_KEY = "ece94c6f3a4d9caac8a0eaeae13c12b9"
|
|
SECRET_KEY = "eb01630b93e383ff33aee3b4c3c027370ea3f869f5a6505fa2ffcdb880d7c291"
|
|
BUCKET_NAME = "phenology"
|
|
|
|
# Inicializar uploader
|
|
uploader = CloudUploader(ENDPOINT_URL, ACCESS_KEY, SECRET_KEY, BUCKET_NAME)
|
|
|
|
print("=" * 60)
|
|
print("UPLOAD OF IMAGES TO THE CLOUD - PHENOLOGY PROJECT")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# Usage examples:
|
|
|
|
# # Example 1: Upload Artichoke GBIF dataset
|
|
print("TRY: Upload Artichoke/GBIF")
|
|
print("-" * 60)
|
|
local_dir = r"c:\Users\sof12\Desktop\ML\Datasets\Nocciola\AV"
|
|
stats = uploader.upload_directory(local_dir, "nocciola", "AV")
|
|
print(f"\nStatistics:")
|
|
print(f" - Folders created: {stats['folders_created']}")
|
|
print(f" - Total files: {stats['total_files']}")
|
|
print(f" - Successfully uploaded: {stats['uploaded']}")
|
|
print(f" - Failed: {stats['failed']}")
|
|
print()
|
|
|
|
# # Example 2: Upload Artichoke GBIF dataset
|
|
print("TRY: Upload Artichoke/GBIF")
|
|
print("-" * 60)
|
|
local_dir = r"c:\Users\sof12\Desktop\ML\Datasets\Nocciola\combi"
|
|
stats = uploader.upload_directory(local_dir, "nocciola", "combi")
|
|
print(f"\nStatistics:")
|
|
print(f" - Folders created: {stats['folders_created']}")
|
|
print(f" - Total files: {stats['total_files']}")
|
|
print(f" - Successfully uploaded: {stats['uploaded']}")
|
|
print(f" - Failed: {stats['failed']}")
|
|
print()
|
|
|
|
# # Example 3: Upload Artichoke GBIF dataset
|
|
print("TRY: Upload Artichoke/GBIF")
|
|
print("-" * 60)
|
|
local_dir = r"c:\Users\sof12\Desktop\ML\Datasets\Nocciola\GBIF"
|
|
stats = uploader.upload_directory(local_dir, "nocciola", "GBIF")
|
|
print(f"\nStatistics:")
|
|
print(f" - Folders created: {stats['folders_created']}")
|
|
print(f" - Total files: {stats['total_files']}")
|
|
print(f" - Successfully uploaded: {stats['uploaded']}")
|
|
print(f" - Failed: {stats['failed']}")
|
|
print()
|
|
|
|
# # Example 4: Upload Artichoke GBIF dataset
|
|
print("TRY: Upload Artichoke/GBIF")
|
|
print("-" * 60)
|
|
local_dir = r"c:\Users\sof12\Desktop\ML\Datasets\Nocciola\SanDam"
|
|
stats = uploader.upload_directory(local_dir, "nocciola", "SanDam")
|
|
print(f"\nStatistics:")
|
|
print(f" - Folders created: {stats['folders_created']}")
|
|
print(f" - Total files: {stats['total_files']}")
|
|
print(f" - Successfully uploaded: {stats['uploaded']}")
|
|
print(f" - Failed: {stats['failed']}")
|
|
print()
|
|
|
|
# # # Example 2: Delete a folder (uncomment to use)
|
|
# print("Example: Delete a folder")
|
|
# print("-" * 60)
|
|
# folder_to_delete = "datasets/artichoke/Robo_ImageCV"
|
|
# stats = uploader.delete_folder(folder_to_delete)
|
|
# print()
|
|
|
|
stats= uploader.list_objects(prefix="datasets/artichoke", max_keys=3000)
|
|
for obj in stats:
|
|
print(obj)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|