import json import os import time import re import urllib.request from urllib.error import URLError from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration DATA_FILE = os.path.join('data', 'prompts.json') IMAGE_DIR = os.path.join('public', 'images', 'prompts') MAX_WORKERS = 20 # Number of parallel downloads CATEGORY_MAP = { 'Man': 'NAM', 'Beauty': 'NỮ', 'Girl': 'NỮ', 'Female': 'NỮ', 'Woman': 'NỮ', 'Kid': 'TRẺ EM', 'Baby': 'TRẺ EM', 'Couple': 'COUPLE', 'Birthday': 'SINH NHẬT', 'Sinh nhật': 'SINH NHẬT', 'Halloween': 'HALLOWEEN', 'Noel': 'NOEL', 'Christmas': 'NOEL', 'Tet': 'NEW YEAR', 'New Year': 'NEW YEAR', 'Trung thu': 'TRUNG THU', 'Mom': 'CHA - MẸ', 'Family': 'CHA - MẸ', 'Maternity': 'MẸ BẦU', 'Bau': 'MẸ BẦU', 'Other': 'ĐẶC BIỆT' } DEFAULT_CATEGORY = 'ĐẶC BIỆT' def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) def download_image(args): """ args is a tuple: (index, prompt_data) Returns: (index, updated_image_path or None) """ idx, p = args original_url = None if p.get('images') and len(p['images']) > 0: original_url = p['images'][0] elif p.get('imageUrl'): original_url = p['imageUrl'] # If it's already local, skip if original_url and original_url.startswith('/images/prompts/'): return idx, original_url if not original_url or not original_url.startswith('http'): return idx, None # Generate filename ext = '.png' if '.jpg' in original_url.lower(): ext = '.jpg' if '.jpeg' in original_url.lower(): ext = '.jpeg' if '.webp' in original_url.lower(): ext = '.webp' safe_id = str(p.get('id', idx)).replace('/', '_').replace('\\', '_') filename = f"{safe_id}{ext}" target_path = os.path.join(IMAGE_DIR, filename) # Check if exists if os.path.exists(target_path): return idx, f'/images/prompts/{filename}' try: req = urllib.request.Request( original_url, data=None, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } ) with urllib.request.urlopen(req, timeout=10) as response, open(target_path, 'wb') as out_file: data = response.read() out_file.write(data) return idx, f'/images/prompts/{filename}' except Exception as e: print(f"Failed to download {original_url}: {e}") return idx, None def normalize_category(p): # Try to extract meaningful category from title or category field cat = p.get('category', '') title = p.get('title', '') # If non-Habu source has a Vietnamese category, it might be a stale bad mapping. # Reset it to force re-evaluation unless we find a better match. if p.get('source') != 'habu' and cat in CATEGORY_MAP.values(): cat = '' # Priority keys to check: Title, Category, Name candidates = [title, cat, p.get('name', '')] for candidate in candidates: if not candidate: continue # Exact match if candidate in CATEGORY_MAP: return CATEGORY_MAP[candidate] # Case insensitive for key, val in CATEGORY_MAP.items(): if key.lower() == candidate.lower(): return val # Heuristics (Title-based) candidate_lower = candidate.lower() if 'man' in candidate_lower and 'wo' not in candidate_lower: return 'NAM' if 'girl' in candidate_lower or 'woman' in candidate_lower or 'lady' in candidate_lower or 'beauty' in candidate_lower: return 'NỮ' if 'kid' in candidate_lower or 'baby' in candidate_lower or 'boy' in candidate_lower: return 'TRẺ EM' if 'couple' in candidate_lower or 'wedding' in candidate_lower or 'marriage' in candidate_lower: return 'COUPLE' if 'halloween' in candidate_lower: return 'HALLOWEEN' if 'christmas' in candidate_lower or 'noel' in candidate_lower: return 'NOEL' if 'trung thu' in candidate_lower: return 'TRUNG THU' if 'tet' in candidate_lower or 'new year' in candidate_lower: return 'NEW YEAR' if 'mom' in candidate_lower or 'family' in candidate_lower: return 'CHA - MẸ' if 'maternity' in candidate_lower or 'bau' in candidate_lower: return 'MẸ BẦU' if 'birthday' in candidate_lower or 'sinh nhật' in candidate_lower: return 'SINH NHẬT' return cat.upper() if cat else DEFAULT_CATEGORY def main(): print("Starting optimized processing...") ensure_dir(IMAGE_DIR) try: with open(DATA_FILE, 'r', encoding='utf-8') as f: data = json.load(f) except FileNotFoundError: print(f"File not found: {DATA_FILE}") return prompts = data.get('prompts', []) print(f"Total prompts to process: {len(prompts)}") # 1. Update Categories In-Place (Fast) updated_categories = 0 # 1. Update Categories In-Place (Fast) updated_categories = 0 for p in prompts: # Update Title from Name if available and Title is generic if p.get('name') and (not p.get('title') or 'Untitled' in p.get('title', '')): p['title'] = p['name'] old_cat = p.get('category', '') new_cat = normalize_category(p) # If the category is not one of our standardized Vietnamese ones, try to map it using prompt text standard_cats = {'NAM', 'NỮ', 'SINH NHẬT', 'HALLOWEEN', 'NOEL', 'NEW YEAR', 'TRẺ EM', 'COUPLE', 'CHA - MẸ', 'MẸ BẦU'} if new_cat not in standard_cats: # Fallback: check prompt text for keywords using Regex for whole words prompt_text = p.get('prompt', '').lower() if re.search(r'\b(man|boy|male)\b', prompt_text): new_cat = 'NAM' elif re.search(r'\b(woman|girl|female|lady)\b', prompt_text): new_cat = 'NỮ' elif re.search(r'\b(kid|child|children|baby|toddler)\b', prompt_text): new_cat = 'TRẺ EM' elif re.search(r'\b(couple|wedding|marriage|proposal)\b', prompt_text): new_cat = 'COUPLE' elif re.search(r'\b(family)\b', prompt_text): new_cat = 'CHA - MẸ' if old_cat != new_cat: p['category'] = new_cat updated_categories += 1 print(f"Updated categories for {updated_categories} items.") # 2. Parallel Image Download download_tasks = [] for idx, p in enumerate(prompts): download_tasks.append((idx, p)) downloaded_count = 0 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Submit all tasks future_to_idx = {executor.submit(download_image, task): task[0] for task in download_tasks} for future in as_completed(future_to_idx): idx, local_path = future.result() if local_path: prompts[idx]['images'] = [local_path] # prompts[idx]['imageUrl'] = local_path # Optional: sync both downloaded_count += 1 if downloaded_count % 100 == 0: print(f"Images verified/downloaded: {downloaded_count}...") # Save data['categories'] = {} categories_set = set(p['category'] for p in prompts) data['categories']['style'] = list(categories_set) data['last_updated'] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) # Sort categories to prioritized ones first priority = ['NAM', 'NỮ', 'SINH NHẬT', 'HALLOWEEN', 'NOEL', 'NEW YEAR', 'TRẺ EM', 'COUPLE', 'CHA - MẸ', 'MẸ BẦU', 'ĐẶC BIỆT'] sorted_cats = sorted(list(categories_set), key=lambda x: priority.index(x) if x in priority else 999) data['categories']['style'] = sorted_cats with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"Done! Saved {len(prompts)} prompts.") if __name__ == "__main__": main()