215 lines
8 KiB
Python
215 lines
8 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import re
|
|
import urllib.request
|
|
from urllib.error import URLError
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# Configuration
|
|
DATA_FILE = os.path.join('data', 'prompts.json')
|
|
IMAGE_DIR = os.path.join('public', 'images', 'prompts')
|
|
MAX_WORKERS = 20 # Number of parallel downloads
|
|
|
|
CATEGORY_MAP = {
|
|
'Man': 'NAM',
|
|
'Beauty': 'NỮ',
|
|
'Girl': 'NỮ',
|
|
'Female': 'NỮ',
|
|
'Woman': 'NỮ',
|
|
'Kid': 'TRẺ EM',
|
|
'Baby': 'TRẺ EM',
|
|
'Couple': 'COUPLE',
|
|
'Birthday': 'SINH NHẬT',
|
|
'Sinh nhật': 'SINH NHẬT',
|
|
'Halloween': 'HALLOWEEN',
|
|
'Noel': 'NOEL',
|
|
'Christmas': 'NOEL',
|
|
'Tet': 'NEW YEAR',
|
|
'New Year': 'NEW YEAR',
|
|
'Trung thu': 'TRUNG THU',
|
|
'Mom': 'CHA - MẸ',
|
|
'Family': 'CHA - MẸ',
|
|
'Maternity': 'MẸ BẦU',
|
|
'Bau': 'MẸ BẦU',
|
|
'Other': 'ĐẶC BIỆT'
|
|
}
|
|
DEFAULT_CATEGORY = 'ĐẶC BIỆT'
|
|
|
|
def ensure_dir(directory):
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
def download_image(args):
|
|
"""
|
|
args is a tuple: (index, prompt_data)
|
|
Returns: (index, updated_image_path or None)
|
|
"""
|
|
idx, p = args
|
|
|
|
original_url = None
|
|
if p.get('images') and len(p['images']) > 0:
|
|
original_url = p['images'][0]
|
|
elif p.get('imageUrl'):
|
|
original_url = p['imageUrl']
|
|
|
|
# If it's already local, skip
|
|
if original_url and original_url.startswith('/images/prompts/'):
|
|
return idx, original_url
|
|
|
|
if not original_url or not original_url.startswith('http'):
|
|
return idx, None
|
|
|
|
# Generate filename
|
|
ext = '.png'
|
|
if '.jpg' in original_url.lower(): ext = '.jpg'
|
|
if '.jpeg' in original_url.lower(): ext = '.jpeg'
|
|
if '.webp' in original_url.lower(): ext = '.webp'
|
|
|
|
safe_id = str(p.get('id', idx)).replace('/', '_').replace('\\', '_')
|
|
filename = f"{safe_id}{ext}"
|
|
target_path = os.path.join(IMAGE_DIR, filename)
|
|
|
|
# Check if exists
|
|
if os.path.exists(target_path):
|
|
return idx, f'/images/prompts/{filename}'
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
original_url,
|
|
data=None,
|
|
headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as response, open(target_path, 'wb') as out_file:
|
|
data = response.read()
|
|
out_file.write(data)
|
|
return idx, f'/images/prompts/{filename}'
|
|
except Exception as e:
|
|
print(f"Failed to download {original_url}: {e}")
|
|
return idx, None
|
|
|
|
def normalize_category(p):
|
|
# Try to extract meaningful category from title or category field
|
|
cat = p.get('category', '')
|
|
title = p.get('title', '')
|
|
|
|
# If non-Habu source has a Vietnamese category, it might be a stale bad mapping.
|
|
# Reset it to force re-evaluation unless we find a better match.
|
|
if p.get('source') != 'habu' and cat in CATEGORY_MAP.values():
|
|
cat = ''
|
|
|
|
# Priority keys to check: Title, Category, Name
|
|
candidates = [title, cat, p.get('name', '')]
|
|
|
|
for candidate in candidates:
|
|
if not candidate: continue
|
|
|
|
# Exact match
|
|
if candidate in CATEGORY_MAP:
|
|
return CATEGORY_MAP[candidate]
|
|
|
|
# Case insensitive
|
|
for key, val in CATEGORY_MAP.items():
|
|
if key.lower() == candidate.lower():
|
|
return val
|
|
|
|
# Heuristics (Title-based)
|
|
candidate_lower = candidate.lower()
|
|
if 'man' in candidate_lower and 'wo' not in candidate_lower: return 'NAM'
|
|
if 'girl' in candidate_lower or 'woman' in candidate_lower or 'lady' in candidate_lower or 'beauty' in candidate_lower: return 'NỮ'
|
|
if 'kid' in candidate_lower or 'baby' in candidate_lower or 'boy' in candidate_lower: return 'TRẺ EM'
|
|
if 'couple' in candidate_lower or 'wedding' in candidate_lower or 'marriage' in candidate_lower: return 'COUPLE'
|
|
if 'halloween' in candidate_lower: return 'HALLOWEEN'
|
|
if 'christmas' in candidate_lower or 'noel' in candidate_lower: return 'NOEL'
|
|
if 'trung thu' in candidate_lower: return 'TRUNG THU'
|
|
if 'tet' in candidate_lower or 'new year' in candidate_lower: return 'NEW YEAR'
|
|
if 'mom' in candidate_lower or 'family' in candidate_lower: return 'CHA - MẸ'
|
|
if 'maternity' in candidate_lower or 'bau' in candidate_lower: return 'MẸ BẦU'
|
|
if 'birthday' in candidate_lower or 'sinh nhật' in candidate_lower: return 'SINH NHẬT'
|
|
|
|
return cat.upper() if cat else DEFAULT_CATEGORY
|
|
|
|
def main():
|
|
print("Starting optimized processing...")
|
|
ensure_dir(IMAGE_DIR)
|
|
|
|
try:
|
|
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"File not found: {DATA_FILE}")
|
|
return
|
|
|
|
prompts = data.get('prompts', [])
|
|
print(f"Total prompts to process: {len(prompts)}")
|
|
|
|
# 1. Update Categories In-Place (Fast)
|
|
updated_categories = 0
|
|
# 1. Update Categories In-Place (Fast)
|
|
updated_categories = 0
|
|
for p in prompts:
|
|
# Update Title from Name if available and Title is generic
|
|
if p.get('name') and (not p.get('title') or 'Untitled' in p.get('title', '')):
|
|
p['title'] = p['name']
|
|
|
|
old_cat = p.get('category', '')
|
|
new_cat = normalize_category(p)
|
|
|
|
# If the category is not one of our standardized Vietnamese ones, try to map it using prompt text
|
|
standard_cats = {'NAM', 'NỮ', 'SINH NHẬT', 'HALLOWEEN', 'NOEL', 'NEW YEAR', 'TRẺ EM', 'COUPLE', 'CHA - MẸ', 'MẸ BẦU'}
|
|
if new_cat not in standard_cats:
|
|
# Fallback: check prompt text for keywords using Regex for whole words
|
|
prompt_text = p.get('prompt', '').lower()
|
|
|
|
if re.search(r'\b(man|boy|male)\b', prompt_text): new_cat = 'NAM'
|
|
elif re.search(r'\b(woman|girl|female|lady)\b', prompt_text): new_cat = 'NỮ'
|
|
elif re.search(r'\b(kid|child|children|baby|toddler)\b', prompt_text): new_cat = 'TRẺ EM'
|
|
elif re.search(r'\b(couple|wedding|marriage|proposal)\b', prompt_text): new_cat = 'COUPLE'
|
|
elif re.search(r'\b(family)\b', prompt_text): new_cat = 'CHA - MẸ'
|
|
|
|
if old_cat != new_cat:
|
|
p['category'] = new_cat
|
|
updated_categories += 1
|
|
|
|
print(f"Updated categories for {updated_categories} items.")
|
|
|
|
# 2. Parallel Image Download
|
|
download_tasks = []
|
|
for idx, p in enumerate(prompts):
|
|
download_tasks.append((idx, p))
|
|
|
|
downloaded_count = 0
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
# Submit all tasks
|
|
future_to_idx = {executor.submit(download_image, task): task[0] for task in download_tasks}
|
|
|
|
for future in as_completed(future_to_idx):
|
|
idx, local_path = future.result()
|
|
if local_path:
|
|
prompts[idx]['images'] = [local_path]
|
|
# prompts[idx]['imageUrl'] = local_path # Optional: sync both
|
|
downloaded_count += 1
|
|
|
|
if downloaded_count % 100 == 0:
|
|
print(f"Images verified/downloaded: {downloaded_count}...")
|
|
|
|
# Save
|
|
data['categories'] = {}
|
|
categories_set = set(p['category'] for p in prompts)
|
|
data['categories']['style'] = list(categories_set)
|
|
data['last_updated'] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
|
|
|
|
# Sort categories to prioritized ones first
|
|
priority = ['NAM', 'NỮ', 'SINH NHẬT', 'HALLOWEEN', 'NOEL', 'NEW YEAR', 'TRẺ EM', 'COUPLE', 'CHA - MẸ', 'MẸ BẦU', 'ĐẶC BIỆT']
|
|
sorted_cats = sorted(list(categories_set), key=lambda x: priority.index(x) if x in priority else 999)
|
|
data['categories']['style'] = sorted_cats
|
|
|
|
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Done! Saved {len(prompts)} prompts.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|