160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import urllib.request
|
|
import urllib.parse
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
# Configuration
|
|
DATA_FILE = os.path.join('data', 'prompts.json')
|
|
IMAGE_DIR = os.path.join('public', 'images', 'prompts')
|
|
CATEGORY_MAP = {
|
|
'Man': 'NAM',
|
|
'Beauty': 'NỮ',
|
|
'Girl': 'NỮ',
|
|
'Female': 'NỮ',
|
|
'Woman': 'NỮ',
|
|
'Kid': 'TRẺ EM',
|
|
'Baby': 'TRẺ EM',
|
|
'Couple': 'COUPLE',
|
|
'Birthday': 'SINH NHẬT',
|
|
'Sinh nhật': 'SINH NHẬT',
|
|
'Halloween': 'HALLOWEEN',
|
|
'Noel': 'NOEL',
|
|
'Christmas': 'NOEL',
|
|
'Tet': 'NEW YEAR',
|
|
'New Year': 'NEW YEAR',
|
|
'Trung thu': 'TRUNG THU',
|
|
'Mom': 'CHA - MẸ',
|
|
'Family': 'CHA - MẸ',
|
|
'Maternity': 'MẸ BẦU',
|
|
'Bau': 'MẸ BẦU',
|
|
'Other': 'ĐẶC BIỆT'
|
|
}
|
|
DEFAULT_CATEGORY = 'ĐẶC BIỆT'
|
|
|
|
def ensure_dir(directory):
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
def download_image(url, filename):
|
|
if not url or not url.startswith('http'):
|
|
return None
|
|
|
|
target_path = os.path.join(IMAGE_DIR, filename)
|
|
|
|
# Check if already exists
|
|
if os.path.exists(target_path):
|
|
return f'/images/prompts/{filename}'
|
|
|
|
try:
|
|
print(f"Downloading {url}...")
|
|
# Add headers to mimic browser to avoid 403s
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=None,
|
|
headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
)
|
|
with urllib.request.urlopen(req) as response, open(target_path, 'wb') as out_file:
|
|
data = response.read()
|
|
out_file.write(data)
|
|
return f'/images/prompts/{filename}'
|
|
except Exception as e:
|
|
print(f"Failed to download {url}: {e}")
|
|
return None
|
|
|
|
def normalize_category(cat):
|
|
if not cat:
|
|
return DEFAULT_CATEGORY
|
|
|
|
# Direct match
|
|
if cat in CATEGORY_MAP:
|
|
return CATEGORY_MAP[cat]
|
|
|
|
# Case insensitive check
|
|
for key, val in CATEGORY_MAP.items():
|
|
if key.lower() == cat.lower():
|
|
return val
|
|
|
|
# Partial match heuristics
|
|
cat_lower = cat.lower()
|
|
if 'man' in cat_lower and 'wo' not in cat_lower: return 'NAM'
|
|
if 'girl' in cat_lower or 'woman' in cat_lower or 'lady' in cat_lower: return 'NỮ'
|
|
if 'kid' in cat_lower or 'baby' in cat_lower or 'boy' in cat_lower: return 'TRẺ EM'
|
|
if 'couple' in cat_lower: return 'COUPLE'
|
|
if 'halloween' in cat_lower: return 'HALLOWEEN'
|
|
if 'christmas' in cat_lower or 'noel' in cat_lower: return 'NOEL'
|
|
|
|
return cat.upper() # Default to uppercasing unknown ones, effectively "Other" or custom
|
|
|
|
def main():
|
|
print("Starting processing...")
|
|
ensure_dir(IMAGE_DIR)
|
|
|
|
try:
|
|
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"File not found: {DATA_FILE}")
|
|
return
|
|
|
|
prompts = data.get('prompts', [])
|
|
processed_count = 0
|
|
updated_categories = 0
|
|
downloaded_images = 0
|
|
|
|
for p in prompts:
|
|
# 1. Image Caching
|
|
original_url = None
|
|
if p.get('images') and len(p['images']) > 0:
|
|
original_url = p['images'][0]
|
|
elif p.get('imageUrl'):
|
|
original_url = p['imageUrl']
|
|
|
|
if original_url and original_url.startswith('http'):
|
|
# Create a filename based on ID or source_url hash to avoid collisions
|
|
# Using prompt ID is safest if it exists
|
|
ext = '.png'
|
|
if '.jpg' in original_url.lower(): ext = '.jpg'
|
|
if '.jpeg' in original_url.lower(): ext = '.jpeg'
|
|
if '.webp' in original_url.lower(): ext = '.webp'
|
|
|
|
# Sanitizing ID for filename
|
|
safe_id = str(p.get('id', 'unknown')).replace('/', '_').replace('\\', '_')
|
|
filename = f"{safe_id}{ext}"
|
|
|
|
local_path = download_image(original_url, filename)
|
|
if local_path:
|
|
p['images'] = [local_path]
|
|
if 'imageUrl' in p:
|
|
p['imageUrl'] = local_path # Update legacy field too
|
|
downloaded_images += 1
|
|
|
|
# 2. Category Normalization
|
|
old_cat = p.get('category', '')
|
|
new_cat = normalize_category(old_cat)
|
|
if old_cat != new_cat:
|
|
p['category'] = new_cat
|
|
updated_categories += 1
|
|
|
|
processed_count += 1
|
|
if processed_count % 10 == 0:
|
|
print(f"Processed {processed_count} prompts...")
|
|
|
|
# Update metadata
|
|
data['categories'] = {} # Reset or rebuild logic if needed
|
|
categories_set = set(p['category'] for p in prompts)
|
|
data['categories']['style'] = list(categories_set)
|
|
data['last_updated'] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
|
|
|
|
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Done! Processed {processed_count} prompts.")
|
|
print(f"Updated categories for {updated_categories} items.")
|
|
print(f"Downloaded/Verified {downloaded_images} images.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|