"""Anim Studio 网站后端(仅用 Python 标准库,无 web 框架依赖)。 启动: pip install Pillow # 唯一必需第三方库 python server.py 浏览器打开 http://127.0.0.1:7861 路由: GET / 可视化网站 GET /api/manifest 默认 manifest GET /api/games 已生成的 game 列表 GET /api/library?game=.. 某 game 的资源/动画库 GET /assets// 资源文件 POST /api/generate 运行生成管线 POST /api/export 把某 game 打包成 Cocos 整合包 POST /api/open-folder 打开某 game 的本地素材目录 POST /api/delete 删除某 game 的资源库 """ import json import mimetypes import os import posixpath import shutil import subprocess import sys import threading import time import traceback import uuid import base64 from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import urlparse, parse_qs, unquote import pipeline import exporter import slot_workflow import spine_builder import providers import config import baidu_segment import asset_quality HERE = os.path.dirname(os.path.abspath(__file__)) OUT_ROOT = os.path.join(HERE, "out") WEB_DIR = os.path.join(HERE, "web") DEFAULT_MANIFEST = os.path.join(HERE, "animation_manifest.json") PORT = 7861 DEFAULT_BASE_URL = config.get("ANIM_STUDIO_BASE_URL", "https://x.long.bid/v1") DEFAULT_API_KEY = config.get("ANIM_STUDIO_API_KEY", "") DEFAULT_IMAGE_MODEL = config.get("ANIM_STUDIO_IMAGE_MODEL", "gpt-image-2") DEFAULT_TEXT_MODEL = config.get("ANIM_STUDIO_TEXT_MODEL", "gpt-5.4-mini") JOBS = {} JOBS_LOCK = threading.Lock() def list_games(): if not os.path.isdir(OUT_ROOT): return [] return [n for n in sorted(os.listdir(OUT_ROOT)) if os.path.isfile(os.path.join(OUT_ROOT, n, "library.json"))] def safe_join(root, *parts): p = posixpath.normpath("/".join(parts)).lstrip("/") full = os.path.join(root, *p.split("/")) if not os.path.abspath(full).startswith(os.path.abspath(root)): raise ValueError("path traversal") return full def _job_snapshot(job_id): with JOBS_LOCK: job = JOBS.get(job_id) return dict(job) if job else None def _set_job(job_id, **fields): with JOBS_LOCK: job = JOBS.setdefault(job_id, {}) job.update(fields) def _append_job_log(job_id, msg): with JOBS_LOCK: job = JOBS.setdefault(job_id, {}) job.setdefault("logs", []).append(msg) job["updatedAt"] = time.time() def _run_generate_job(job_id, manifest, creds): _set_job(job_id, status="running", logs=[], game=manifest.get("game"), startedAt=time.time(), updatedAt=time.time()) try: lib, _ = pipeline.run(manifest, OUT_ROOT, creds=creds, log=lambda m: _append_job_log(job_id, m)) _set_job(job_id, status="done", ok=True, game=lib["game"], updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 生成任务失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _load_library(game): library_path = os.path.join(OUT_ROOT, game, "library.json") if not os.path.isfile(library_path): return {"game": game, "characters": [], "vfx": [], "ui": [], "ui_art": []} with open(library_path, encoding="utf-8") as f: lib = json.load(f) if _repair_library_index(game, lib): with open(library_path, "w", encoding="utf-8") as f: json.dump(lib, f, ensure_ascii=False, indent=2) return lib def _save_library(game, lib): library_path = os.path.join(OUT_ROOT, game, "library.json") os.makedirs(os.path.dirname(library_path), exist_ok=True) with open(library_path, "w", encoding="utf-8") as f: json.dump(lib, f, ensure_ascii=False, indent=2) def _spine_size_from_json(path): try: data = json.load(open(path, encoding="utf-8")) sk = data.get("skeleton", {}) return int(round(sk.get("width") or 1000)), int(round(sk.get("height") or 1000)) except Exception: return 1000, 1000 def _image_size(path): try: from PIL import Image with Image.open(path) as img: return img.size except Exception: return 0, 0 def _repair_library_index(game, lib): """Index files that exist on disk but were not written to library.json. This lets partial retry jobs become visible immediately even if a later required task failed before the old library index was updated. """ changed = False base = os.path.join(OUT_ROOT, game) manifest = _manifest_from_library(lib) by_section = { "characters": {x.get("id"): x for x in lib.get("characters", [])}, "ui_art": {x.get("id"): x for x in lib.get("ui_art", [])}, "vfx": {x.get("id"): x for x in lib.get("vfx", [])}, "ui": {x.get("id"): x for x in lib.get("ui", [])}, } for c in manifest.get("characters", []): cid = c.get("id") if not cid: continue paths = [os.path.join(base, "characters", f"{cid}.{ext}") for ext in ("json", "atlas", "png")] existing_item = by_section["characters"].get(cid) if existing_item: preview_path = os.path.join(base, "characters", f"{cid}_preview.png") if c.get("type") == "spine_parts" and not os.path.isfile(preview_path): spine_builder.write_parts_preview_from_files(cid, os.path.join(base, "characters")) if c.get("type") == "spine_parts": parts = spine_builder.write_part_pngs_from_files(cid, os.path.join(base, "characters")) if parts and existing_item.get("parts") != parts: existing_item["parts"] = parts files = existing_item.setdefault("files", []) for part in parts: if part["file"] not in files: files.append(part["file"]) changed = True if os.path.isfile(preview_path) and not existing_item.get("preview"): existing_item["preview"] = f"characters/{cid}_preview.png" if existing_item["preview"] not in existing_item.setdefault("files", []): existing_item["files"].append(existing_item["preview"]) changed = True continue if all(os.path.isfile(p) for p in paths): preview_path = os.path.join(base, "characters", f"{cid}_preview.png") if c.get("type") == "spine_parts" and not os.path.isfile(preview_path): spine_builder.write_parts_preview_from_files(cid, os.path.join(base, "characters")) w, h = _spine_size_from_json(paths[0]) item = { "id": cid, "png": f"characters/{cid}.png", "w": w, "h": h, "type": c.get("type", "spine"), "role": c.get("role", ""), "preview": f"characters/{cid}_preview.png" if os.path.isfile(preview_path) else "", "animations": spine_builder.anim_data(c.get("animations", ["idle"])), "files": [f"characters/{cid}.json", f"characters/{cid}.atlas", f"characters/{cid}.png"], } if c.get("type") == "spine_parts": item["parts"] = spine_builder.write_part_pngs_from_files(cid, os.path.join(base, "characters")) item["files"].extend([p["file"] for p in item["parts"]]) if item["preview"]: item["files"].append(item["preview"]) lib.setdefault("characters", []).append(item) by_section["characters"][cid] = item changed = True for a in manifest.get("ui_art", []): aid = a.get("id") if not aid or aid in by_section["ui_art"]: continue path = os.path.join(base, "ui_art", f"{aid}.png") if os.path.isfile(path): w, h = _image_size(path) item = {"id": aid, "file": f"ui_art/{aid}.png", "w": w, "h": h, "transparent": a.get("transparent", True)} lib.setdefault("ui_art", []).append(item) by_section["ui_art"][aid] = item changed = True for v in manifest.get("vfx", []): vid = v.get("id") if not vid or vid in by_section["vfx"]: continue path = os.path.join(base, "vfx", f"{vid}.particle.json") if os.path.isfile(path): cfg = json.load(open(path, encoding="utf-8")) item = {"id": vid, "template": v.get("template"), "file": f"vfx/{vid}.particle.json", "config": cfg} lib.setdefault("vfx", []).append(item) by_section["vfx"][vid] = item changed = True ui_path = os.path.join(base, "ui", "TweenPresets.ts") if os.path.isfile(ui_path): for u in manifest.get("ui", []): uid = u.get("id") if uid and uid not in by_section["ui"]: item = {"id": uid, "preset": u.get("preset"), "params": u.get("params", {})} lib.setdefault("ui", []).append(item) by_section["ui"][uid] = item changed = True return changed def _manifest_from_library(lib): if lib.get("slot_config"): return slot_workflow.complete_manifest({"slot_config": lib["slot_config"]}) return { "game": lib.get("game") or "game", "style": "", "characters": [], "ui_art": [], "vfx": [], "ui": [], } TASK_KIND_MAP = { "characters": "characters", "character": "characters", "ui_art": "ui_art", "art": "ui_art", "vfx": "vfx", "ui": "ui", } def _zh_name(kind, item, slot_config=None): item_id = item.get("id", "") role = item.get("role", "") known = { "wild": "百搭符号", "scatter": "免费旋转触发符号", "coin_cash": "现金金币符号", "collect": "收集符号", "bg_main": "主背景", "cover": "封面图", "logo": "游戏 Logo", "reel_frame": "卷轴框", "btn_spin": "旋转按钮", "btn_round": "通用圆按钮", "hud_pill": "HUD 信息条", "boss_explosion": "大魔王裂开爆炸特效", } if role == "boss" or item_id == (slot_config or {}).get("boss", {}).get("id"): return (slot_config or {}).get("boss", {}).get("title") or "大魔王关主" if item_id in known: return known[item_id] if item_id.startswith("jelly_"): return "果冻角色 " + item_id.replace("jelly_", "") if kind == "vfx": return "粒子特效 " + item_id if kind == "ui": return "界面动效 " + item_id return "素材 " + item_id def _task_use(kind, item): item_id = item.get("id", "") role = item.get("role", "") if role == "boss": return "关主角色。待机时 watch/charge,玩家赢时撒币并裂开爆炸,玩家输时举剑踩踏耀武扬威。" if kind == "characters": return "卷轴符号或可动画角色,用于 slot 盘面和中奖反馈。" if kind == "ui_art": if item_id == "bg_main": return "游戏主场景背景。" if item_id == "cover": return "封面、分享图或入口展示。" return "界面美术元素,用于 Cocos 原型的背景、按钮、框体或 HUD。" if kind == "vfx": return "粒子特效配置,用于中奖、金币雨、爆炸或强调反馈。" return "UI tween 动效预设,用于按钮、弹窗、数字滚动等界面反馈。" def _task_prompt(kind, item): if kind == "characters" and item.get("type") == "spine_parts": parts = item.get("parts") or [] part_text = "\n".join([f"- {p.get('id')}: {p.get('prompt', '')}" for p in parts]) sheet = item.get("spriteSheet") or {} mode = "" if item.get("partGeneration") == "sprite_sheet" or sheet.get("enabled"): mode = f"\n生成方式:先生成 {sheet.get('cols', 4)}×{sheet.get('rows', 4)} 透明拆件表,再按格子自动切图。" return (item.get("prompt", "") + mode + ("\n拆件:\n" + part_text if part_text else "")).strip() return item.get("prompt") or item.get("template") or item.get("preset") or "" def _asset_quality_for_task(game, kind, item, asset): if not asset: return {"ok": False, "errors": [], "warnings": []} if kind != "characters" or asset.get("type") != "spine_parts": return {"ok": True, "errors": [], "warnings": []} base = os.path.join(OUT_ROOT, game) errors = [] preview = asset.get("preview") if preview: ppath = os.path.join(base, preview) if os.path.isfile(ppath): ok, reason, _ = asset_quality.boss_preview_quality(ppath) if not ok: errors.append(f"完整预览不可用:{reason}") else: errors.append("完整预览文件缺失") else: errors.append("完整预览缺失") preview_version = asset.get("previewVersion", "") part_versions = asset.get("partVersions") or {} if preview_version: stale = [ part.get("id", "") for part in asset.get("parts") or [] if part_versions.get(part.get("id", "")) != preview_version ] if stale: errors.append(f"拆件与当前主图版本不一致,请按主图重生全部拆件:{', '.join(stale[:6])}") for part in asset.get("parts") or []: pfile = part.get("file", "") ppath = os.path.join(base, pfile) if not pfile or not os.path.isfile(ppath): errors.append(f"拆件缺失:{part.get('id', '')}") continue ok, reason, _ = asset_quality.boss_part_quality(part.get("id", ""), ppath) if not ok: errors.append(f"{part.get('id', '')}:{reason}") return {"ok": not errors, "errors": errors, "warnings": []} def _build_tasks(lib): manifest = _manifest_from_library(lib) slot_config = manifest.get("slot_config", {}) existing = { "characters": {x.get("id"): x for x in lib.get("characters", [])}, "ui_art": {x.get("id"): x for x in lib.get("ui_art", [])}, "vfx": {x.get("id"): x for x in lib.get("vfx", [])}, "ui": {x.get("id"): x for x in lib.get("ui", [])}, } tasks = {k: [] for k in ("characters", "ui_art", "vfx", "ui")} for kind in tasks: for item in manifest.get(kind, []): item_id = item.get("id") asset = existing[kind].get(item_id) quality = _asset_quality_for_task(lib.get("game", ""), kind, item, asset) status = "done" if asset else "missing" if asset and not quality.get("ok", True): status = "invalid" tasks[kind].append({ "kind": kind, "id": item_id, "englishName": item_id, "chineseName": _zh_name(kind, item, slot_config), "use": _task_use(kind, item), "prompt": _task_prompt(kind, item), "status": status, "asset": asset, "quality": quality, "assetType": item.get("type") or kind, "animations": item.get("animations", []), "transparent": item.get("transparent"), "size": item.get("size"), }) return tasks def _filter_manifest_tasks(manifest, kind, task_ids): wanted = set(task_ids) filtered = json.loads(json.dumps(manifest, ensure_ascii=False)) for key in ("characters", "ui_art", "vfx", "ui"): filtered[key] = [x for x in manifest.get(key, []) if key == kind and x.get("id") in wanted] return filtered def _filter_manifest_groups(manifest, groups): filtered = json.loads(json.dumps(manifest, ensure_ascii=False)) for key in ("characters", "ui_art", "vfx", "ui"): wanted = set(groups.get(key, [])) filtered[key] = [x for x in manifest.get(key, []) if x.get("id") in wanted] return filtered def _run_retry_job(job_id, game, kind, task_ids, creds): _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time()) try: lib = _load_library(game) manifest = _manifest_from_library(lib) partial = _filter_manifest_tasks(manifest, kind, task_ids) labels = ", ".join(task_ids) _append_job_log(job_id, f"补生成任务:{kind} / {labels}") new_lib, _ = pipeline.run(partial, OUT_ROOT, creds=creds, log=lambda m: _append_job_log(job_id, m), merge_existing=True) _set_job(job_id, status="done", ok=True, game=new_lib["game"], updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 补生成失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _run_retry_missing_job(job_id, game, groups, creds): _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time()) try: lib = _load_library(game) manifest = _manifest_from_library(lib) partial = _filter_manifest_groups(manifest, groups) labels = [] for kind, ids in groups.items(): if ids: labels.append(f"{kind}: {', '.join(ids)}") _append_job_log(job_id, "批量补生成缺失项:" + ";".join(labels)) new_lib, _ = pipeline.run(partial, OUT_ROOT, creds=creds, log=lambda m: _append_job_log(job_id, m), merge_existing=True) _set_job(job_id, status="done", ok=True, game=new_lib["game"], updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 批量补生成失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _transparent_prompt(extra): return ", ".join([ extra, "生成纯透明背景 PNG,真实 Alpha 通道,不要棋盘格,不要白底,不要阴影。", ]) def _has_alpha(img): alpha = img.convert("RGBA").getchannel("A") return alpha.getextrema()[0] == 0 def _generate_alpha_image(creds, prompt, size, label, log): img = providers.generate(creds["provider"], prompt, creds["api_key"], creds.get("base_url", "https://api.openai.com/v1"), creds.get("model", "gpt-image-2"), size) if _has_alpha(img): return img log(f"🧠 [{label}] 模型没有真实 Alpha,改用百度智能抠图兜底…") fixed = baidu_segment.remove_background(img, label=label, log=log) if _has_alpha(fixed): return fixed raise RuntimeError("图片没有真实 Alpha,抠图后仍不合格") def _new_asset_version(): return f"v{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" def _image_data_url(path): with open(path, "rb") as f: raw = f.read() return "data:image/png;base64," + base64.b64encode(raw).decode("ascii") def _boss_preview_analysis(preview_path, creds, log): try: notes = providers.analyze_reference_images( image_data_urls=[_image_data_url(preview_path)], api_key=creds.get("api_key", ""), base_url=creds.get("base_url", DEFAULT_BASE_URL), model=DEFAULT_TEXT_MODEL, ) text = json.dumps(notes, ensure_ascii=False) log(f"🔎 主图风格解析完成,用于约束拆件一致性") return text[:1800] except Exception as e: log(f"⚠️ 主图风格解析失败,改用文字 prompt 约束一致性:{e}") return "" def _part_consistency_prompt(row, preview_analysis): version = row.get("previewVersion", "") return ( f"Match the current boss preview exactly. previewVersion={version}. " "Keep the same character identity, jelly material, color palette, crown/armor shapes, proportions, lighting, and candy-land art style. " "This must be one isolated rigging part cropped from that same character design, not a redesigned character. " + (f"Reference preview analysis: {preview_analysis}" if preview_analysis else "") ) def _run_retry_boss_part_job(job_id, game, boss_id, part_id, creds): _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time()) try: lib = _load_library(game) manifest = _manifest_from_library(lib) base = os.path.join(OUT_ROOT, game) chars_out = os.path.join(base, "characters") boss = next((c for c in manifest.get("characters", []) if c.get("id") == boss_id or c.get("role") == "boss"), None) if not boss or boss.get("type") != "spine_parts": raise RuntimeError(f"找不到可拆件关主:{boss_id}") parts = boss.get("parts") or [] target = next((p for p in parts if p.get("id") == part_id), None) if not target: raise RuntimeError(f"找不到关主拆件:{part_id}") row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None) or {} preview_version = row.get("previewVersion", "") consistency = _part_consistency_prompt(row, row.get("previewAnalysis", "")) _append_job_log(job_id, f"重生关主拆件:{boss_id}/{part_id}") part_images = {} for part in parts: pid = part["id"] ppath = os.path.join(chars_out, f"{boss_id}_parts", f"{pid}.png") if pid != part_id: if not os.path.isfile(ppath): raise RuntimeError(f"缺少现有拆件,无法局部重建:{pid}") from PIL import Image part_images[pid] = Image.open(ppath).convert("RGBA") style = manifest.get("style", "") base_prompt = ", ".join(x for x in [ target.get("prompt", ""), style, consistency, _transparent_prompt( f"only the {part_id} rigging part from the boss character, isolated single part, " "not a full character, no complete body, no other body parts, centered" ), ] if x) correction = "" last_reason = "" for attempt in range(1, 4): if attempt > 1: _append_job_log(job_id, f"🔁 [{part_id}] 拆件不合格,重新生成(第 {attempt}/3 次)…") prompt = base_prompt if not correction else ", ".join([base_prompt, correction]) img = _generate_alpha_image(creds, prompt, target.get("size", boss.get("size", creds.get("size", "1024x1024"))), f"{boss_id}/{part_id}", lambda m: _append_job_log(job_id, m)) ok, reason, detail = asset_quality.boss_part_quality(part_id, img) if ok: _append_job_log(job_id, f"✅ [{part_id}] 拆件质量通过:最大主体 {detail.get('largestShare', 0):.0%}") part_images[part_id] = img break last_reason = reason _append_job_log(job_id, f"⚠️ [{part_id}] 拆件质量失败:{reason}") correction = ( "这不是干净的单个拆件。请只生成这一件,不要包含任何其他身体部位、碎片、武器边缘或相邻格内容。" ) if part_id not in part_images: raise RuntimeError(f"拆件重生失败:{last_reason}") spine_builder.build_parts_character(boss_id, part_images, chars_out, boss.get("animations", ["idle"]), parts, write_preview=False) part_files = spine_builder.write_part_pngs_from_files(boss_id, chars_out) if row: row["parts"] = part_files row["preview"] = f"characters/{boss_id}_preview.png" versions = row.setdefault("partVersions", {}) if preview_version: versions[part_id] = preview_version files = [f"characters/{boss_id}.json", f"characters/{boss_id}.atlas", f"characters/{boss_id}.png", f"characters/{boss_id}_preview.png"] files.extend([p["file"] for p in part_files]) row["files"] = files _save_library(game, lib) _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 拆件重生失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _run_retry_boss_preview_job(job_id, game, boss_id, creds): _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time()) try: lib = _load_library(game) manifest = _manifest_from_library(lib) base = os.path.join(OUT_ROOT, game) chars_out = os.path.join(base, "characters") boss = next((c for c in manifest.get("characters", []) if c.get("id") == boss_id or c.get("role") == "boss"), None) if not boss: raise RuntimeError(f"找不到关主:{boss_id}") style = manifest.get("style", "") base_prompt = ", ".join(x for x in [ boss.get("prompt", ""), style, _transparent_prompt( "single complete assembled boss character preview, full body, centered, readable silhouette, " "one complete character only, no separated parts, no sprite sheet, no atlas, no cropped body" ), ] if x) correction = "" last_reason = "" _append_job_log(job_id, f"重生关主完整预览:{boss_id}") for attempt in range(1, 4): if attempt > 1: _append_job_log(job_id, f"🔁 [{boss_id}/preview] 主图不合格,重新生成(第 {attempt}/3 次)…") prompt = base_prompt if not correction else ", ".join([base_prompt, correction]) img = _generate_alpha_image(creds, prompt, boss.get("size", creds.get("size", "1024x1024")), f"{boss_id}/preview", lambda m: _append_job_log(job_id, m)) ok, reason, detail = asset_quality.boss_preview_quality(img) if ok: os.makedirs(chars_out, exist_ok=True) spine_builder.trim_to_content(img, pad=16).save(os.path.join(chars_out, f"{boss_id}_preview.png")) _append_job_log(job_id, f"✅ [{boss_id}/preview] 主图质量通过:最大主体 {detail.get('largestShare', 0):.0%}") break last_reason = reason _append_job_log(job_id, f"⚠️ [{boss_id}/preview] 主图质量失败:{reason}") correction = ( "这不是完整主图。请只生成一个完整、站立、全身、主体连贯的关主角色;" "不要拆件、不要把部件分开、不要 atlas、不要只画头或半身。" ) else: raise RuntimeError(f"主图重生失败:{last_reason}") row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None) if row: preview_version = _new_asset_version() preview_path = os.path.join(chars_out, f"{boss_id}_preview.png") row["preview"] = f"characters/{boss_id}_preview.png" row["previewVersion"] = preview_version row["partsSourcePreviewVersion"] = row.get("partsSourcePreviewVersion", "") row["partsStale"] = True row["previewAnalysis"] = _boss_preview_analysis(preview_path, creds, lambda m: _append_job_log(job_id, m)) files = row.setdefault("files", []) if row["preview"] not in files: files.append(row["preview"]) _save_library(game, lib) _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 主图重生失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _run_retry_boss_parts_from_preview_job(job_id, game, boss_id, creds): _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time()) try: lib = _load_library(game) manifest = _manifest_from_library(lib) base = os.path.join(OUT_ROOT, game) chars_out = os.path.join(base, "characters") row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None) boss = next((c for c in manifest.get("characters", []) if c.get("id") == boss_id or c.get("role") == "boss"), None) if not row or not boss or boss.get("type") != "spine_parts": raise RuntimeError(f"找不到可按主图重生的关主:{boss_id}") preview_path = os.path.join(base, row.get("preview") or f"characters/{boss_id}_preview.png") if not os.path.isfile(preview_path): raise RuntimeError("请先重生主图,再按主图重生拆件") preview_version = row.get("previewVersion") or _new_asset_version() row["previewVersion"] = preview_version if not row.get("previewAnalysis"): row["previewAnalysis"] = _boss_preview_analysis(preview_path, creds, lambda m: _append_job_log(job_id, m)) parts = boss.get("parts") or [] style = manifest.get("style", "") consistency = _part_consistency_prompt(row, row.get("previewAnalysis", "")) part_images = {} _append_job_log(job_id, f"按当前主图重生全部拆件:{boss_id},共 {len(parts)} 个") for idx, part in enumerate(parts, start=1): pid = part["id"] base_prompt = ", ".join(x for x in [ part.get("prompt", ""), style, consistency, _transparent_prompt( f"only the {pid} rigging part from the current boss preview, isolated single part, " "not a full character, no complete body, no other body parts, centered" ), ] if x) correction = "" last_reason = "" for attempt in range(1, 4): if attempt > 1: _append_job_log(job_id, f"🔁 [{pid}] 拆件不合格,重新生成(第 {attempt}/3 次)…") prompt = base_prompt if not correction else ", ".join([base_prompt, correction]) img = _generate_alpha_image(creds, prompt, part.get("size", boss.get("size", creds.get("size", "1024x1024"))), f"{boss_id}/{pid}", lambda m: _append_job_log(job_id, m)) ok, reason, detail = asset_quality.boss_part_quality(pid, img) if ok: _append_job_log(job_id, f"✅ [{idx}/{len(parts)}] {pid} 通过:最大主体 {detail.get('largestShare', 0):.0%}") part_images[pid] = img break last_reason = reason _append_job_log(job_id, f"⚠️ [{pid}] 拆件质量失败:{reason}") correction = "请严格参考主图,只输出这一件拆件,不要完整角色,不要其他部件,不要相邻碎片。" if pid not in part_images: raise RuntimeError(f"{pid} 重生失败:{last_reason}") spine_builder.build_parts_character(boss_id, part_images, chars_out, boss.get("animations", ["idle"]), parts, write_preview=False) part_files = spine_builder.write_part_pngs_from_files(boss_id, chars_out) row["parts"] = part_files row["partVersions"] = {p["id"]: preview_version for p in part_files} row["partsSourcePreviewVersion"] = preview_version row["partsStale"] = False files = [f"characters/{boss_id}.json", f"characters/{boss_id}.atlas", f"characters/{boss_id}.png", f"characters/{boss_id}_preview.png"] files.extend([p["file"] for p in part_files]) row["files"] = files _save_library(game, lib) _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time()) except Exception as e: traceback.print_exc() _append_job_log(job_id, f"❌ 按主图重生拆件失败: {e}") _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time()) def _split_lines(value): if isinstance(value, list): return [str(x).strip() for x in value if str(x).strip()] return [x.strip() for x in str(value or "").splitlines() if x.strip()] def _creative_fallback(data): brief = (data.get("brief") or "").lower() text = " ".join([brief, data.get("styleNotes", "").lower(), data.get("avoidNotes", "").lower()]) allowed_themes = {"jelly", "fruit", "egypt", "pirate", "pirate_jelly", "cyber"} requested_theme = data.get("theme") if data.get("theme") in allowed_themes else "" theme = requested_theme or "jelly" pirate_hint = any(k in text for k in ("pirate", "海盗", "treasure", "宝藏", "金币", "船长")) jelly_hint = any(k in text for k in ("jelly", "果冻", "candy", "糖果", "gummy", "软糖")) if not requested_theme or requested_theme == "jelly": if pirate_hint and jelly_hint: theme = "pirate_jelly" elif any(k in text for k in ("egypt", "埃及", "金字塔", "法老")): theme = "egypt" elif pirate_hint: theme = "pirate" elif any(k in text for k in ("cyber", "赛博", "neon", "霓虹")): theme = "cyber" elif any(k in text for k in ("fruit", "水果", "cherry", "樱桃")): theme = "fruit" reel_mode = data.get("reelMode") or ("cluster" if any(k in text for k in ("消除", "cluster", "连通", "match")) else "ways") volatility = data.get("volatility") or ("high" if any(k in text for k in ("刺激", "大奖", "高波动", "big win")) else "medium") features = list(data.get("features") or ["cascades", "free_spins", "wilds"]) if any(k in text for k in ("金币", "jackpot", "hold", "respin", "大奖池")) and "hold_win" not in features: features.append("hold_win") if any(k in text for k in ("倍率", "multiplier", "连锁")) and "multipliers" not in features: features.append("multipliers") if theme == "pirate_jelly": for feature in ("hold_win", "multipliers"): if feature not in features: features.append(feature) title = data.get("title") or "AI Custom Slot" style = data.get("styleNotes") or data.get("brief") or "" return { "gameId": data.get("gameId") or title, "title": title, "theme": theme, "style": style, "reelMode": reel_mode, "volatility": volatility, "targetRtp": data.get("targetRtp", 96), "characterCount": data.get("characterCount", 10), "uiCompleteness": data.get("uiCompleteness", "full"), "feedbackIntensity": data.get("feedbackIntensity", "standard"), "enableBoss": bool(data.get("enableBoss", True)), "bossPresence": data.get("bossPresence", "full"), "enableMathModel": bool(data.get("enableMathModel", True)), "features": features, } def build_game_plan(slot_request, creative_payload, source): features = slot_request.get("features") or [] hook_parts = [] if "hold_win" in features: hook_parts.append("金币锁格 respin 大奖目标") if "cascades" in features: hook_parts.append("连锁下落爽感") if "free_spins" in features: hook_parts.append("Scatter 免费旋转期待") if "multipliers" in features: hook_parts.append("递增倍率爆点") core_hook = " + ".join(hook_parts[:2]) or "轻量现代老虎机循环" vision = creative_payload.get("visionStyleAnalysis") or {} art_direction = { "theme": slot_request.get("theme"), "style": slot_request.get("style", ""), "reference_style_analysis": vision, "avoid": creative_payload.get("avoidNotes", ""), } return { "source": source, "creative": { "brief": creative_payload.get("brief", ""), "references": creative_payload.get("references", []), "uploadedReferenceImages": creative_payload.get("uploadedReferenceImages", 0), "visionStyleAnalysis": vision, "visionStyleAnalysisError": creative_payload.get("visionStyleAnalysisError", ""), }, "gameDesign": { "title": slot_request.get("title"), "coreHook": core_hook, "artDirection": art_direction, "reelExperience": slot_request.get("reelMode"), "volatility": slot_request.get("volatility"), "differentiators": hook_parts, "bossDesign": { "enabled": bool(slot_request.get("enableBoss", True)), "presence": slot_request.get("bossPresence", "full"), "idle": "静静等待时会呼吸、环顾、蓄力", "playerWin": "玩家赢钱时撒币,然后受击裂开爆炸", "playerLose": "玩家输钱时举剑嘲讽、踩踏并攻击", }, "assetStrategy": { "symbolCount": slot_request.get("characterCount"), "uiCompleteness": slot_request.get("uiCompleteness"), "feedbackIntensity": slot_request.get("feedbackIntensity"), }, }, } def creative_to_slot_request(data): api_key = (data.get("api_key") or DEFAULT_API_KEY).strip() base_url = (data.get("base_url") or DEFAULT_BASE_URL).strip() text_model = (data.get("text_model") or DEFAULT_TEXT_MODEL).strip() references = _split_lines(data.get("references")) reference_images = data.get("reference_images") or [] if not isinstance(reference_images, list): reference_images = [] payload = { "title": data.get("title") or "AI Custom Slot", "gameId": data.get("gameId") or "", "brief": data.get("brief") or "", "references": references, "uploadedReferenceImages": len(reference_images), "styleNotes": data.get("styleNotes") or "", "avoidNotes": data.get("avoidNotes") or "", "theme": data.get("theme") or "", "reelMode": data.get("reelMode") or "", "volatility": data.get("volatility") or "", "targetRtp": data.get("targetRtp", 96), "characterCount": data.get("characterCount", 10), "uiCompleteness": data.get("uiCompleteness", "full"), "feedbackIntensity": data.get("feedbackIntensity", "standard"), "enableBoss": bool(data.get("enableBoss", True)), "bossPresence": data.get("bossPresence", "full"), "features": data.get("features") or [], "enableMathModel": bool(data.get("enableMathModel", True)), } if not api_key: fallback = _creative_fallback(payload) return fallback, "fallback_no_api_key", build_game_plan(fallback, payload, "fallback_no_api_key") vision_notes = {} direct_image_urls = [ x for x in references if x.lower().startswith(("http://", "https://")) and x.lower().split("?")[0].endswith((".png", ".jpg", ".jpeg", ".webp")) ] if direct_image_urls or reference_images: try: vision_notes = providers.analyze_reference_images( reference_urls=direct_image_urls, image_data_urls=reference_images, api_key=api_key, base_url=base_url, model=text_model, ) payload["visionStyleAnalysis"] = vision_notes if vision_notes.get("image_prompt_style"): payload["styleNotes"] = ";".join([ str(payload.get("styleNotes") or ""), "视觉参考分析:" + str(vision_notes.get("image_prompt_style")), ]).strip(";") elif vision_notes: payload["styleNotes"] = ";".join([ str(payload.get("styleNotes") or ""), "视觉参考分析:" + json.dumps(vision_notes, ensure_ascii=False), ]).strip(";") except Exception as e: payload["visionStyleAnalysisError"] = str(e)[:500] messages = [ { "role": "system", "content": ( "你是资深移动老虎机游戏策划和美术总监。根据用户的基础描述、参考图或网址、风格要求," "生成一份可执行的 slot 工作流输入 JSON。必须原创,不能复刻参考图里的具体 IP、logo、角色。" "只输出 JSON,不要解释。" ), }, { "role": "user", "content": json.dumps({ "task": "把创意简报转换为 slot_workflow.build_workflow 可接受的输入", "allowedFields": { "gameId": "string slug or title", "title": "string", "theme": "jelly|fruit|egypt|pirate|pirate_jelly|cyber", "style": "English image-generation style prompt, include reference-derived art direction", "reelMode": "ways|paylines|megaways|cluster", "volatility": "low|medium|high", "targetRtp": "number percent, e.g. 96", "characterCount": "6|8|10|12", "uiCompleteness": "basic|full", "feedbackIntensity": "quiet|standard|loud", "enableBoss": "boolean", "bossPresence": "light|standard|full", "enableMathModel": "boolean", "features": "array of cascades, free_spins, wilds, hold_win, multipliers" }, "creativeBrief": payload, "outputRules": [ "Return JSON object with exactly these workflow fields.", "Pick one clear core hook, not every feature.", "Use references as style inspiration only; do not copy protected characters, logos, or exact layout.", "Style must be specific enough for image generation." ], }, ensure_ascii=False), }, ] try: obj = providers.chat_json_openai(messages, api_key=api_key, base_url=base_url, model=text_model) except Exception: fallback = _creative_fallback(payload) return fallback, "fallback_text_model_failed", build_game_plan(fallback, payload, "fallback_text_model_failed") fallback = _creative_fallback(payload) out = dict(fallback) for key in ("gameId", "title", "theme", "style", "reelMode", "volatility", "targetRtp", "characterCount", "uiCompleteness", "feedbackIntensity", "enableBoss", "bossPresence", "enableMathModel"): if key in obj and obj[key] not in (None, ""): out[key] = obj[key] if isinstance(obj.get("features"), list) and obj["features"]: allowed = {"cascades", "free_spins", "wilds", "hold_win", "multipliers"} out["features"] = [x for x in obj["features"] if x in allowed] source = "vision_text_model" if vision_notes else "text_model" return out, source, build_game_plan(out, payload, source) class Handler(BaseHTTPRequestHandler): def _send(self, code, body, ctype="application/json; charset=utf-8"): if isinstance(body, (dict, list)): body = json.dumps(body, ensure_ascii=False).encode("utf-8") elif isinstance(body, str): body = body.encode("utf-8") self.send_response(code) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _file(self, path): if not os.path.isfile(path): return self._send(404, {"error": "not found"}) ctype = mimetypes.guess_type(path)[0] or "application/octet-stream" with open(path, "rb") as f: data = f.read() self.send_response(200) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, *a): pass # 静音 def do_GET(self): u = urlparse(self.path) path, qs = u.path, parse_qs(u.query) if path == "/" or path == "/index.html": return self._file(os.path.join(WEB_DIR, "index.html")) if path == "/api/manifest": with open(DEFAULT_MANIFEST, encoding="utf-8") as f: return self._send(200, f.read()) if path == "/api/games": return self._send(200, {"games": list_games()}) if path == "/api/job": job_id = qs.get("id", [""])[0] job = _job_snapshot(job_id) if not job: return self._send(404, {"ok": False, "error": "job not found"}) job["id"] = job_id return self._send(200, job) if path == "/api/library": games = list_games() game = (qs.get("game", [None])[0]) or (games[-1] if games else None) if not game: return self._send(200, {"game": None, "games": games, "characters": [], "vfx": [], "ui": [], "tasks": {}}) library_path = os.path.join(OUT_ROOT, game, "library.json") if not os.path.isfile(library_path): return self._send(200, {"game": game, "games": games, "assetBase": f"/assets/{game}/", "characters": [], "vfx": [], "ui": [], "tasks": {}}) with open(library_path, encoding="utf-8") as f: lib = json.load(f) lib["games"] = games lib["assetBase"] = f"/assets/{game}/" lib["tasks"] = _build_tasks(lib) all_tasks = [t for rows in lib["tasks"].values() for t in rows] lib["taskSummary"] = { "total": len(all_tasks), "done": sum(1 for t in all_tasks if t.get("status") == "done"), "missing": sum(1 for t in all_tasks if t.get("status") != "done"), } return self._send(200, lib) if path.startswith("/assets/"): rel = unquote(path[len("/assets/"):]) try: return self._file(safe_join(OUT_ROOT, rel)) except ValueError: return self._send(400, {"error": "bad path"}) return self._send(404, {"error": "not found"}) def do_POST(self): try: return self._do_POST() except Exception as e: traceback.print_exc() return self._send(500, {"ok": False, "error": str(e)}) def _read_json_body(self): length = int(self.headers.get("Content-Length", 0)) return json.loads(self.rfile.read(length) or b"{}") def _do_POST(self): route = urlparse(self.path).path if route == "/api/export": return self._post_export() if route == "/api/open-folder": return self._post_open_folder() if route == "/api/retry-task": return self._post_retry_task() if route == "/api/retry-boss-part": return self._post_retry_boss_part() if route == "/api/retry-boss-preview": return self._post_retry_boss_preview() if route == "/api/retry-boss-parts-from-preview": return self._post_retry_boss_parts_from_preview() if route == "/api/retry-missing": return self._post_retry_missing() if route == "/api/delete": return self._post_delete() if route == "/api/slot-workflow": return self._post_slot_workflow() if route == "/api/creative-manifest": return self._post_creative_manifest() if route != "/api/generate": return self._send(404, {"error": "not found"}) try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) try: manifest = json.loads(data["manifest"]) if isinstance(data.get("manifest"), str) \ else data.get("manifest") manifest = slot_workflow.complete_manifest(manifest) except Exception as e: return self._send(400, {"ok": False, "error": f"manifest 非法 JSON: {e}"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } logs = [] if data.get("async", True): job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["任务已创建,等待开始…"], "game": manifest.get("game"), "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_generate_job, args=(job_id, manifest, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": manifest.get("game")}) try: lib, _ = pipeline.run(manifest, OUT_ROOT, creds=creds, log=lambda m: logs.append(m)) except Exception as e: return self._send(500, {"ok": False, "error": str(e), "logs": logs}) return self._send(200, {"ok": True, "logs": logs, "game": lib["game"]}) def _post_slot_workflow(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) try: result = slot_workflow.build_workflow(data) except Exception as e: traceback.print_exc() return self._send(500, {"ok": False, "error": str(e)}) return self._send(200, {"ok": True, **result}) def _post_creative_manifest(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) try: normalized, source, game_plan = creative_to_slot_request(data) normalized["creative"] = game_plan.get("creative", {}) normalized["gameDesign"] = game_plan.get("gameDesign", {}) result = slot_workflow.build_workflow(normalized) return self._send(200, {"ok": True, "source": source, "game_plan": game_plan, "creative_request": normalized, **result}) except Exception as e: traceback.print_exc() return self._send(500, {"ok": False, "error": str(e)}) def _post_export(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) logs = [] try: pack = exporter.export(game, OUT_ROOT, log=lambda m: logs.append(m)) except Exception as e: traceback.print_exc() return self._send(500, {"ok": False, "error": str(e), "logs": logs}) return self._send(200, {"ok": True, "logs": logs, "game": game, "pack": os.path.abspath(pack)}) def _post_open_folder(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) target = os.path.abspath(os.path.join(OUT_ROOT, game)) if not target.startswith(os.path.abspath(OUT_ROOT) + os.sep) or not os.path.isdir(target): return self._send(400, {"ok": False, "error": "素材目录不存在或路径非法"}) try: if sys.platform == "darwin": subprocess.Popen(["open", target]) elif os.name == "nt": os.startfile(target) # type: ignore[attr-defined] else: subprocess.Popen(["xdg-open", target]) except Exception as e: return self._send(500, {"ok": False, "error": f"打开素材目录失败: {e}"}) return self._send(200, {"ok": True, "game": game, "path": target}) def _post_retry_task(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) kind = TASK_KIND_MAP.get((data.get("kind") or "").strip()) if not kind: return self._send(400, {"ok": False, "error": "无效的任务类型"}) task_ids = data.get("ids") or data.get("taskIds") or [data.get("id")] if isinstance(task_ids, str): task_ids = [task_ids] task_ids = [str(x).strip() for x in task_ids if str(x or "").strip()] if not task_ids: return self._send(400, {"ok": False, "error": "没有选择要重试的任务"}) lib = _load_library(game) tasks = _build_tasks(lib).get(kind, []) valid_ids = {t["id"] for t in tasks} bad = [x for x in task_ids if x not in valid_ids] if bad: return self._send(400, {"ok": False, "error": f"任务不存在: {', '.join(bad)}"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["补生成任务已创建,等待开始…"], "game": game, "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_retry_job, args=(job_id, game, kind, task_ids, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": game}) def _post_retry_boss_part(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() boss_id = (data.get("bossId") or data.get("id") or "").strip() part_id = (data.get("partId") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) if not boss_id or not part_id: return self._send(400, {"ok": False, "error": "缺少 bossId 或 partId"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["拆件重生任务已创建,等待开始…"], "game": game, "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_retry_boss_part_job, args=(job_id, game, boss_id, part_id, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": game}) def _post_retry_boss_preview(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() boss_id = (data.get("bossId") or data.get("id") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) if not boss_id: return self._send(400, {"ok": False, "error": "缺少 bossId"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["主图重生任务已创建,等待开始…"], "game": game, "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_retry_boss_preview_job, args=(job_id, game, boss_id, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": game}) def _post_retry_boss_parts_from_preview(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() boss_id = (data.get("bossId") or data.get("id") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) if not boss_id: return self._send(400, {"ok": False, "error": "缺少 bossId"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["按主图重生拆件任务已创建,等待开始…"], "game": game, "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_retry_boss_parts_from_preview_job, args=(job_id, game, boss_id, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": game}) def _post_retry_missing(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) lib = _load_library(game) tasks = _build_tasks(lib) groups = {k: [t["id"] for t in rows if t.get("status") != "done"] for k, rows in tasks.items()} groups = {k: v for k, v in groups.items() if v} if not groups: return self._send(400, {"ok": False, "error": "当前资源库没有缺失任务"}) creds = { "provider": data.get("provider", "OpenAI 兼容接口"), "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(), "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(), "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(), "size": data.get("size", "1024x1024"), } job_id = uuid.uuid4().hex with JOBS_LOCK: JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["批量补生成任务已创建,等待开始…"], "game": game, "createdAt": time.time(), "updatedAt": time.time()} threading.Thread(target=_run_retry_missing_job, args=(job_id, game, groups, creds), daemon=True).start() return self._send(200, {"ok": True, "jobId": job_id, "game": game, "groups": groups}) def _post_delete(self): try: data = self._read_json_body() except Exception as e: return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"}) game = (data.get("game") or "").strip() if not game or game not in list_games(): return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"}) target = os.path.join(OUT_ROOT, game) # 安全校验:必须落在 OUT_ROOT 内 if not os.path.abspath(target).startswith(os.path.abspath(OUT_ROOT) + os.sep): return self._send(400, {"ok": False, "error": "非法路径"}) shutil.rmtree(target) return self._send(200, {"ok": True, "deleted": game, "games": list_games()}) if __name__ == "__main__": os.makedirs(OUT_ROOT, exist_ok=True) print(f"Anim Studio 网站: http://127.0.0.1:{PORT}") ThreadingHTTPServer(("127.0.0.1", PORT), Handler).serve_forever()