| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328 |
- """Anim Studio 网站后端(仅用 Python 标准库,无 web 框架依赖)。
- 启动:
- pip install Pillow # 唯一必需第三方库
- python server.py
- 浏览器打开 http://127.0.0.1:7861
- 路由:
- GET / 可视化网站
- GET /api/manifest 默认 manifest
- GET /api/games 已生成的 game 列表
- GET /api/library?game=.. 某 game 的资源/动画库
- GET /assets/<game>/<path> 资源文件
- POST /api/generate 运行生成管线
- POST /api/export 把某 game 打包成 Cocos 整合包
- POST /api/open-folder 打开某 game 的本地素材目录
- POST /api/delete 删除某 game 的资源库
- """
- import json
- import mimetypes
- import os
- import posixpath
- import shutil
- import subprocess
- import sys
- import threading
- import time
- import traceback
- import uuid
- import base64
- from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
- from urllib.parse import urlparse, parse_qs, unquote
- import pipeline
- import exporter
- import slot_workflow
- import spine_builder
- import providers
- import config
- import baidu_segment
- import asset_quality
- HERE = os.path.dirname(os.path.abspath(__file__))
- OUT_ROOT = os.path.join(HERE, "out")
- WEB_DIR = os.path.join(HERE, "web")
- DEFAULT_MANIFEST = os.path.join(HERE, "animation_manifest.json")
- PORT = 7861
- DEFAULT_BASE_URL = config.get("ANIM_STUDIO_BASE_URL", "https://x.long.bid/v1")
- DEFAULT_API_KEY = config.get("ANIM_STUDIO_API_KEY", "")
- DEFAULT_IMAGE_MODEL = config.get("ANIM_STUDIO_IMAGE_MODEL", "gpt-image-2")
- DEFAULT_TEXT_MODEL = config.get("ANIM_STUDIO_TEXT_MODEL", "gpt-5.4-mini")
- DEFAULT_IMAGE_TIMEOUT = int(config.get("ANIM_STUDIO_IMAGE_TIMEOUT", "600") or "600")
- JOBS = {}
- JOBS_LOCK = threading.Lock()
- def list_games():
- if not os.path.isdir(OUT_ROOT):
- return []
- return [n for n in sorted(os.listdir(OUT_ROOT))
- if os.path.isfile(os.path.join(OUT_ROOT, n, "library.json"))]
- def safe_join(root, *parts):
- p = posixpath.normpath("/".join(parts)).lstrip("/")
- full = os.path.join(root, *p.split("/"))
- if not os.path.abspath(full).startswith(os.path.abspath(root)):
- raise ValueError("path traversal")
- return full
- def _job_snapshot(job_id):
- with JOBS_LOCK:
- job = JOBS.get(job_id)
- return dict(job) if job else None
- def _set_job(job_id, **fields):
- with JOBS_LOCK:
- job = JOBS.setdefault(job_id, {})
- job.update(fields)
- def _append_job_log(job_id, msg):
- with JOBS_LOCK:
- job = JOBS.setdefault(job_id, {})
- job.setdefault("logs", []).append(msg)
- job["updatedAt"] = time.time()
- def _run_generate_job(job_id, manifest, creds):
- _set_job(job_id, status="running", logs=[], game=manifest.get("game"), startedAt=time.time(), updatedAt=time.time())
- try:
- lib, _ = pipeline.run(manifest, OUT_ROOT, creds=creds, log=lambda m: _append_job_log(job_id, m))
- _set_job(job_id, status="done", ok=True, game=lib["game"], updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 生成任务失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _load_library(game):
- library_path = os.path.join(OUT_ROOT, game, "library.json")
- if not os.path.isfile(library_path):
- return {"game": game, "characters": [], "vfx": [], "ui": [], "ui_art": []}
- with open(library_path, encoding="utf-8") as f:
- lib = json.load(f)
- if _repair_library_index(game, lib):
- with open(library_path, "w", encoding="utf-8") as f:
- json.dump(lib, f, ensure_ascii=False, indent=2)
- return lib
- def _save_library(game, lib):
- library_path = os.path.join(OUT_ROOT, game, "library.json")
- os.makedirs(os.path.dirname(library_path), exist_ok=True)
- with open(library_path, "w", encoding="utf-8") as f:
- json.dump(lib, f, ensure_ascii=False, indent=2)
- def _spine_size_from_json(path):
- try:
- data = json.load(open(path, encoding="utf-8"))
- sk = data.get("skeleton", {})
- return int(round(sk.get("width") or 1000)), int(round(sk.get("height") or 1000))
- except Exception:
- return 1000, 1000
- def _image_size(path):
- try:
- from PIL import Image
- with Image.open(path) as img:
- return img.size
- except Exception:
- return 0, 0
- def _repair_library_index(game, lib):
- """Index files that exist on disk but were not written to library.json.
- This lets partial retry jobs become visible immediately even if a later
- required task failed before the old library index was updated.
- """
- changed = False
- base = os.path.join(OUT_ROOT, game)
- manifest = _manifest_from_library(lib)
- by_section = {
- "characters": {x.get("id"): x for x in lib.get("characters", [])},
- "ui_art": {x.get("id"): x for x in lib.get("ui_art", [])},
- "vfx": {x.get("id"): x for x in lib.get("vfx", [])},
- "ui": {x.get("id"): x for x in lib.get("ui", [])},
- }
- for c in manifest.get("characters", []):
- cid = c.get("id")
- if not cid:
- continue
- paths = [os.path.join(base, "characters", f"{cid}.{ext}") for ext in ("json", "atlas", "png")]
- existing_item = by_section["characters"].get(cid)
- if existing_item:
- preview_path = os.path.join(base, "characters", f"{cid}_preview.png")
- if c.get("type") == "spine_parts" and not os.path.isfile(preview_path):
- spine_builder.write_parts_preview_from_files(cid, os.path.join(base, "characters"))
- if c.get("type") == "spine_parts":
- parts = spine_builder.write_part_pngs_from_files(cid, os.path.join(base, "characters"))
- if parts and existing_item.get("parts") != parts:
- existing_item["parts"] = parts
- files = existing_item.setdefault("files", [])
- for part in parts:
- if part["file"] not in files:
- files.append(part["file"])
- changed = True
- if os.path.isfile(preview_path) and not existing_item.get("preview"):
- existing_item["preview"] = f"characters/{cid}_preview.png"
- if existing_item["preview"] not in existing_item.setdefault("files", []):
- existing_item["files"].append(existing_item["preview"])
- changed = True
- continue
- if all(os.path.isfile(p) for p in paths):
- preview_path = os.path.join(base, "characters", f"{cid}_preview.png")
- if c.get("type") == "spine_parts" and not os.path.isfile(preview_path):
- spine_builder.write_parts_preview_from_files(cid, os.path.join(base, "characters"))
- w, h = _spine_size_from_json(paths[0])
- item = {
- "id": cid,
- "png": f"characters/{cid}.png",
- "w": w, "h": h,
- "type": c.get("type", "spine"),
- "role": c.get("role", ""),
- "preview": f"characters/{cid}_preview.png" if os.path.isfile(preview_path) else "",
- "animations": spine_builder.anim_data(c.get("animations", ["idle"])),
- "files": [f"characters/{cid}.json", f"characters/{cid}.atlas", f"characters/{cid}.png"],
- }
- if c.get("type") == "spine_parts":
- item["parts"] = spine_builder.write_part_pngs_from_files(cid, os.path.join(base, "characters"))
- item["files"].extend([p["file"] for p in item["parts"]])
- if item["preview"]:
- item["files"].append(item["preview"])
- lib.setdefault("characters", []).append(item)
- by_section["characters"][cid] = item
- changed = True
- for a in manifest.get("ui_art", []):
- aid = a.get("id")
- if not aid or aid in by_section["ui_art"]:
- continue
- path = os.path.join(base, "ui_art", f"{aid}.png")
- if os.path.isfile(path):
- w, h = _image_size(path)
- item = {"id": aid, "file": f"ui_art/{aid}.png", "w": w, "h": h,
- "transparent": a.get("transparent", True)}
- lib.setdefault("ui_art", []).append(item)
- by_section["ui_art"][aid] = item
- changed = True
- for v in manifest.get("vfx", []):
- vid = v.get("id")
- if not vid or vid in by_section["vfx"]:
- continue
- path = os.path.join(base, "vfx", f"{vid}.particle.json")
- if os.path.isfile(path):
- cfg = json.load(open(path, encoding="utf-8"))
- item = {"id": vid, "template": v.get("template"),
- "file": f"vfx/{vid}.particle.json", "config": cfg}
- lib.setdefault("vfx", []).append(item)
- by_section["vfx"][vid] = item
- changed = True
- ui_path = os.path.join(base, "ui", "TweenPresets.ts")
- if os.path.isfile(ui_path):
- for u in manifest.get("ui", []):
- uid = u.get("id")
- if uid and uid not in by_section["ui"]:
- item = {"id": uid, "preset": u.get("preset"), "params": u.get("params", {})}
- lib.setdefault("ui", []).append(item)
- by_section["ui"][uid] = item
- changed = True
- return changed
- def _manifest_from_library(lib):
- if lib.get("slot_config"):
- return slot_workflow.complete_manifest({"slot_config": lib["slot_config"]})
- return {
- "game": lib.get("game") or "game",
- "style": "",
- "characters": [],
- "ui_art": [],
- "vfx": [],
- "ui": [],
- }
- TASK_KIND_MAP = {
- "characters": "characters",
- "character": "characters",
- "ui_art": "ui_art",
- "art": "ui_art",
- "vfx": "vfx",
- "ui": "ui",
- }
- def _zh_name(kind, item, slot_config=None):
- item_id = item.get("id", "")
- role = item.get("role", "")
- known = {
- "wild": "百搭符号",
- "scatter": "免费旋转触发符号",
- "coin_cash": "现金金币符号",
- "collect": "收集符号",
- "bg_main": "主背景",
- "cover": "封面图",
- "logo": "游戏 Logo",
- "reel_frame": "卷轴框",
- "btn_spin": "旋转按钮",
- "btn_round": "通用圆按钮",
- "hud_pill": "HUD 信息条",
- "boss_explosion": "大魔王裂开爆炸特效",
- }
- if role == "boss" or item_id == (slot_config or {}).get("boss", {}).get("id"):
- return (slot_config or {}).get("boss", {}).get("title") or "大魔王关主"
- if item_id in known:
- return known[item_id]
- if item_id.startswith("jelly_"):
- return "果冻角色 " + item_id.replace("jelly_", "")
- if kind == "vfx":
- return "粒子特效 " + item_id
- if kind == "ui":
- return "界面动效 " + item_id
- return "素材 " + item_id
- def _task_use(kind, item):
- item_id = item.get("id", "")
- role = item.get("role", "")
- if role == "boss":
- return "关主角色。待机时 watch/charge,玩家赢时撒币并裂开爆炸,玩家输时举剑踩踏耀武扬威。"
- if kind == "characters":
- return "卷轴符号或可动画角色,用于 slot 盘面和中奖反馈。"
- if kind == "ui_art":
- if item_id == "bg_main":
- return "游戏主场景背景。"
- if item_id == "cover":
- return "封面、分享图或入口展示。"
- return "界面美术元素,用于 Cocos 原型的背景、按钮、框体或 HUD。"
- if kind == "vfx":
- return "粒子特效配置,用于中奖、金币雨、爆炸或强调反馈。"
- return "UI tween 动效预设,用于按钮、弹窗、数字滚动等界面反馈。"
- def _task_prompt(kind, item):
- if kind == "characters" and item.get("type") == "spine_parts":
- parts = item.get("parts") or []
- part_text = "\n".join([f"- {p.get('id')}: {p.get('prompt', '')}" for p in parts])
- sheet = item.get("spriteSheet") or {}
- mode = ""
- if item.get("partGeneration") == "sprite_sheet" or sheet.get("enabled"):
- mode = f"\n生成方式:先生成 {sheet.get('cols', 4)}×{sheet.get('rows', 4)} 透明拆件表,再按格子自动切图。"
- return (item.get("prompt", "") + mode + ("\n拆件:\n" + part_text if part_text else "")).strip()
- return item.get("prompt") or item.get("template") or item.get("preset") or ""
- def _asset_quality_for_task(game, kind, item, asset):
- if not asset:
- return {"ok": False, "errors": [], "warnings": []}
- if kind != "characters" or asset.get("type") != "spine_parts":
- return {"ok": True, "errors": [], "warnings": []}
- base = os.path.join(OUT_ROOT, game)
- errors = []
- preview = asset.get("preview")
- if preview:
- ppath = os.path.join(base, preview)
- if os.path.isfile(ppath):
- ok, reason, _ = asset_quality.boss_preview_quality(ppath)
- if not ok:
- errors.append(f"完整预览不可用:{reason}")
- else:
- errors.append("完整预览文件缺失")
- else:
- errors.append("完整预览缺失")
- preview_version = asset.get("previewVersion", "")
- part_versions = asset.get("partVersions") or {}
- if preview_version:
- stale = [
- part.get("id", "")
- for part in asset.get("parts") or []
- if part_versions.get(part.get("id", "")) != preview_version
- ]
- if stale:
- errors.append(f"拆件与当前主图版本不一致,请按主图重生全部拆件:{', '.join(stale[:6])}")
- for part in asset.get("parts") or []:
- pfile = part.get("file", "")
- ppath = os.path.join(base, pfile)
- if not pfile or not os.path.isfile(ppath):
- errors.append(f"拆件缺失:{part.get('id', '')}")
- continue
- ok, reason, _ = asset_quality.boss_part_quality(part.get("id", ""), ppath)
- if not ok:
- errors.append(f"{part.get('id', '')}:{reason}")
- return {"ok": not errors, "errors": errors, "warnings": []}
- def _build_tasks(lib):
- manifest = _manifest_from_library(lib)
- slot_config = manifest.get("slot_config", {})
- existing = {
- "characters": {x.get("id"): x for x in lib.get("characters", [])},
- "ui_art": {x.get("id"): x for x in lib.get("ui_art", [])},
- "vfx": {x.get("id"): x for x in lib.get("vfx", [])},
- "ui": {x.get("id"): x for x in lib.get("ui", [])},
- }
- tasks = {k: [] for k in ("characters", "ui_art", "vfx", "ui")}
- for kind in tasks:
- for item in manifest.get(kind, []):
- item_id = item.get("id")
- asset = existing[kind].get(item_id)
- quality = _asset_quality_for_task(lib.get("game", ""), kind, item, asset)
- status = "done" if asset else "missing"
- if asset and not quality.get("ok", True):
- status = "invalid"
- tasks[kind].append({
- "kind": kind,
- "id": item_id,
- "englishName": item_id,
- "chineseName": _zh_name(kind, item, slot_config),
- "use": _task_use(kind, item),
- "prompt": _task_prompt(kind, item),
- "status": status,
- "asset": asset,
- "quality": quality,
- "assetType": item.get("type") or kind,
- "animations": item.get("animations", []),
- "transparent": item.get("transparent"),
- "size": item.get("size"),
- })
- return tasks
- def _filter_manifest_tasks(manifest, kind, task_ids):
- wanted = set(task_ids)
- filtered = json.loads(json.dumps(manifest, ensure_ascii=False))
- for key in ("characters", "ui_art", "vfx", "ui"):
- filtered[key] = [x for x in manifest.get(key, []) if key == kind and x.get("id") in wanted]
- return filtered
- def _filter_manifest_groups(manifest, groups):
- filtered = json.loads(json.dumps(manifest, ensure_ascii=False))
- for key in ("characters", "ui_art", "vfx", "ui"):
- wanted = set(groups.get(key, []))
- filtered[key] = [x for x in manifest.get(key, []) if x.get("id") in wanted]
- return filtered
- def _run_retry_job(job_id, game, kind, task_ids, creds):
- _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time())
- try:
- lib = _load_library(game)
- manifest = _manifest_from_library(lib)
- partial = _filter_manifest_tasks(manifest, kind, task_ids)
- labels = ", ".join(task_ids)
- _append_job_log(job_id, f"补生成任务:{kind} / {labels}")
- new_lib, _ = pipeline.run(partial, OUT_ROOT, creds=creds,
- log=lambda m: _append_job_log(job_id, m),
- merge_existing=True)
- _set_job(job_id, status="done", ok=True, game=new_lib["game"], updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 补生成失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _run_retry_missing_job(job_id, game, groups, creds):
- _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time())
- try:
- lib = _load_library(game)
- manifest = _manifest_from_library(lib)
- partial = _filter_manifest_groups(manifest, groups)
- labels = []
- for kind, ids in groups.items():
- if ids:
- labels.append(f"{kind}: {', '.join(ids)}")
- _append_job_log(job_id, "批量补生成缺失项:" + ";".join(labels))
- new_lib, _ = pipeline.run(partial, OUT_ROOT, creds=creds,
- log=lambda m: _append_job_log(job_id, m),
- merge_existing=True)
- _set_job(job_id, status="done", ok=True, game=new_lib["game"], updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 批量补生成失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _transparent_prompt(extra):
- return ", ".join([
- extra,
- "生成纯透明背景 PNG,真实 Alpha 通道,不要棋盘格,不要白底,不要阴影。",
- ])
- def _has_alpha(img):
- alpha = img.convert("RGBA").getchannel("A")
- return alpha.getextrema()[0] == 0
- def _generate_alpha_image(creds, prompt, size, label, log):
- img = providers.generate(creds["provider"], prompt, creds["api_key"],
- creds.get("base_url", "https://api.openai.com/v1"),
- creds.get("model", "gpt-image-2"), size,
- timeout=int(creds.get("timeout") or DEFAULT_IMAGE_TIMEOUT))
- if _has_alpha(img):
- return img
- log(f"🧠 [{label}] 模型没有真实 Alpha,改用百度智能抠图兜底…")
- fixed = baidu_segment.remove_background(img, label=label, log=log)
- if _has_alpha(fixed):
- return fixed
- raise RuntimeError("图片没有真实 Alpha,抠图后仍不合格")
- def _creds_from_request(data):
- return {
- "provider": data.get("provider", "OpenAI 兼容接口"),
- "api_key": (data.get("api_key") or DEFAULT_API_KEY).strip(),
- "base_url": (data.get("base_url") or DEFAULT_BASE_URL).strip(),
- "model": (data.get("model") or DEFAULT_IMAGE_MODEL).strip(),
- "size": data.get("size", "1024x1024"),
- "timeout": int(data.get("timeout") or DEFAULT_IMAGE_TIMEOUT),
- }
- def _new_asset_version():
- return f"v{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}"
- def _image_data_url(path):
- with open(path, "rb") as f:
- raw = f.read()
- return "data:image/png;base64," + base64.b64encode(raw).decode("ascii")
- def _boss_preview_analysis(preview_path, creds, log):
- try:
- notes = providers.analyze_reference_images(
- image_data_urls=[_image_data_url(preview_path)],
- api_key=creds.get("api_key", ""),
- base_url=creds.get("base_url", DEFAULT_BASE_URL),
- model=DEFAULT_TEXT_MODEL,
- )
- text = json.dumps(notes, ensure_ascii=False)
- log(f"🔎 主图风格解析完成,用于约束拆件一致性")
- return text[:1800]
- except Exception as e:
- log(f"⚠️ 主图风格解析失败,改用文字 prompt 约束一致性:{e}")
- return ""
- def _part_consistency_prompt(row, preview_analysis):
- version = row.get("previewVersion", "")
- return (
- f"Match the current boss preview exactly. previewVersion={version}. "
- "Keep the same character identity, jelly material, color palette, crown/armor shapes, proportions, lighting, and candy-land art style. "
- "This must be one isolated rigging part cropped from that same character design, not a redesigned character. "
- + (f"Reference preview analysis: {preview_analysis}" if preview_analysis else "")
- )
- def _run_retry_boss_part_job(job_id, game, boss_id, part_id, creds):
- _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time())
- try:
- lib = _load_library(game)
- manifest = _manifest_from_library(lib)
- base = os.path.join(OUT_ROOT, game)
- chars_out = os.path.join(base, "characters")
- boss = next((c for c in manifest.get("characters", [])
- if c.get("id") == boss_id or c.get("role") == "boss"), None)
- if not boss or boss.get("type") != "spine_parts":
- raise RuntimeError(f"找不到可拆件关主:{boss_id}")
- parts = boss.get("parts") or []
- target = next((p for p in parts if p.get("id") == part_id), None)
- if not target:
- raise RuntimeError(f"找不到关主拆件:{part_id}")
- row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None) or {}
- preview_version = row.get("previewVersion", "")
- consistency = _part_consistency_prompt(row, row.get("previewAnalysis", ""))
- _append_job_log(job_id, f"重生关主拆件:{boss_id}/{part_id}")
- part_images = {}
- for part in parts:
- pid = part["id"]
- ppath = os.path.join(chars_out, f"{boss_id}_parts", f"{pid}.png")
- if pid != part_id:
- if not os.path.isfile(ppath):
- raise RuntimeError(f"缺少现有拆件,无法局部重建:{pid}")
- from PIL import Image
- part_images[pid] = Image.open(ppath).convert("RGBA")
- style = manifest.get("style", "")
- base_prompt = ", ".join(x for x in [
- target.get("prompt", ""),
- style,
- consistency,
- _transparent_prompt(
- f"only the {part_id} rigging part from the boss character, isolated single part, "
- "not a full character, no complete body, no other body parts, no sprite sheet, "
- "fill most of the image with this one clean part, centered"
- ),
- ] if x)
- correction = ""
- last_reason = ""
- for attempt in range(1, 4):
- if attempt > 1:
- _append_job_log(job_id, f"🔁 [{part_id}] 拆件不合格,重新生成(第 {attempt}/3 次)…")
- prompt = base_prompt if not correction else ", ".join([base_prompt, correction])
- try:
- img = _generate_alpha_image(creds, prompt, target.get("size", boss.get("size", creds.get("size", "1024x1024"))),
- f"{boss_id}/{part_id}", lambda m: _append_job_log(job_id, m))
- except Exception as e:
- last_reason = str(e)
- _append_job_log(job_id, f"⚠️ [{part_id}] 图像接口失败/超时:{last_reason}")
- correction = "接口刚才失败或超时。请继续只输出这一件干净拆件,不要完整角色,不要其他部件。"
- continue
- ok, reason, detail = asset_quality.boss_part_quality(part_id, img)
- if ok:
- _append_job_log(job_id, f"✅ [{part_id}] 拆件质量通过:最大主体 {detail.get('largestShare', 0):.0%}")
- part_images[part_id] = img
- break
- last_reason = reason
- _append_job_log(job_id, f"⚠️ [{part_id}] 拆件质量失败:{reason}")
- correction = (
- "这不是干净的单个拆件。请只生成这一件,不要包含任何其他身体部位、碎片、武器边缘或相邻格内容。"
- )
- if part_id not in part_images:
- raise RuntimeError(f"拆件重生失败:{last_reason}")
- spine_builder.build_parts_character(boss_id, part_images, chars_out, boss.get("animations", ["idle"]), parts,
- write_preview=False)
- part_files = spine_builder.write_part_pngs_from_files(boss_id, chars_out)
- if row:
- row["parts"] = part_files
- row["preview"] = f"characters/{boss_id}_preview.png"
- versions = row.setdefault("partVersions", {})
- if preview_version:
- versions[part_id] = preview_version
- files = [f"characters/{boss_id}.json", f"characters/{boss_id}.atlas", f"characters/{boss_id}.png",
- f"characters/{boss_id}_preview.png"]
- files.extend([p["file"] for p in part_files])
- row["files"] = files
- _save_library(game, lib)
- _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 拆件重生失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _run_retry_boss_preview_job(job_id, game, boss_id, creds):
- _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time())
- try:
- lib = _load_library(game)
- manifest = _manifest_from_library(lib)
- base = os.path.join(OUT_ROOT, game)
- chars_out = os.path.join(base, "characters")
- boss = next((c for c in manifest.get("characters", [])
- if c.get("id") == boss_id or c.get("role") == "boss"), None)
- if not boss:
- raise RuntimeError(f"找不到关主:{boss_id}")
- style = manifest.get("style", "")
- base_prompt = ", ".join(x for x in [
- boss.get("prompt", ""),
- style,
- _transparent_prompt(
- "single complete assembled boss character preview, full body, centered, readable silhouette, "
- "one complete character only, no separated parts, no sprite sheet, no atlas, no cropped body"
- ),
- ] if x)
- correction = ""
- last_reason = ""
- _append_job_log(job_id, f"重生关主完整预览:{boss_id}")
- for attempt in range(1, 4):
- if attempt > 1:
- _append_job_log(job_id, f"🔁 [{boss_id}/preview] 主图不合格,重新生成(第 {attempt}/3 次)…")
- prompt = base_prompt if not correction else ", ".join([base_prompt, correction])
- img = _generate_alpha_image(creds, prompt, boss.get("size", creds.get("size", "1024x1024")),
- f"{boss_id}/preview", lambda m: _append_job_log(job_id, m))
- ok, reason, detail = asset_quality.boss_preview_quality(img)
- if ok:
- os.makedirs(chars_out, exist_ok=True)
- spine_builder.trim_to_content(img, pad=16).save(os.path.join(chars_out, f"{boss_id}_preview.png"))
- _append_job_log(job_id, f"✅ [{boss_id}/preview] 主图质量通过:最大主体 {detail.get('largestShare', 0):.0%}")
- break
- last_reason = reason
- _append_job_log(job_id, f"⚠️ [{boss_id}/preview] 主图质量失败:{reason}")
- correction = (
- "这不是完整主图。请只生成一个完整、站立、全身、主体连贯的关主角色;"
- "不要拆件、不要把部件分开、不要 atlas、不要只画头或半身。"
- )
- else:
- raise RuntimeError(f"主图重生失败:{last_reason}")
- row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None)
- if row:
- preview_version = _new_asset_version()
- preview_path = os.path.join(chars_out, f"{boss_id}_preview.png")
- row["preview"] = f"characters/{boss_id}_preview.png"
- row["previewVersion"] = preview_version
- row["partsSourcePreviewVersion"] = row.get("partsSourcePreviewVersion", "")
- row["partsStale"] = True
- row["previewAnalysis"] = _boss_preview_analysis(preview_path, creds, lambda m: _append_job_log(job_id, m))
- files = row.setdefault("files", [])
- if row["preview"] not in files:
- files.append(row["preview"])
- _save_library(game, lib)
- _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 主图重生失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _run_retry_boss_parts_from_preview_job(job_id, game, boss_id, creds):
- _set_job(job_id, status="running", logs=[], game=game, startedAt=time.time(), updatedAt=time.time())
- try:
- lib = _load_library(game)
- manifest = _manifest_from_library(lib)
- base = os.path.join(OUT_ROOT, game)
- chars_out = os.path.join(base, "characters")
- row = next((c for c in lib.get("characters", []) if c.get("id") == boss_id), None)
- boss = next((c for c in manifest.get("characters", [])
- if c.get("id") == boss_id or c.get("role") == "boss"), None)
- if not row or not boss or boss.get("type") != "spine_parts":
- raise RuntimeError(f"找不到可按主图重生的关主:{boss_id}")
- preview_path = os.path.join(base, row.get("preview") or f"characters/{boss_id}_preview.png")
- if not os.path.isfile(preview_path):
- raise RuntimeError("请先重生主图,再按主图重生拆件")
- preview_version = row.get("previewVersion") or _new_asset_version()
- row["previewVersion"] = preview_version
- if not row.get("previewAnalysis"):
- row["previewAnalysis"] = _boss_preview_analysis(preview_path, creds, lambda m: _append_job_log(job_id, m))
- parts = boss.get("parts") or []
- style = manifest.get("style", "")
- consistency = _part_consistency_prompt(row, row.get("previewAnalysis", ""))
- part_images = {}
- _append_job_log(job_id, f"按当前主图重生全部拆件:{boss_id},共 {len(parts)} 个")
- for idx, part in enumerate(parts, start=1):
- pid = part["id"]
- base_prompt = ", ".join(x for x in [
- part.get("prompt", ""),
- style,
- consistency,
- _transparent_prompt(
- f"only the {pid} rigging part from the current boss preview, isolated single part, "
- "not a full character, no complete body, no other body parts, no sprite sheet, "
- "fill most of the image with this one clean part, centered"
- ),
- ] if x)
- correction = ""
- last_reason = ""
- for attempt in range(1, 4):
- if attempt > 1:
- _append_job_log(job_id, f"🔁 [{pid}] 拆件不合格,重新生成(第 {attempt}/3 次)…")
- prompt = base_prompt if not correction else ", ".join([base_prompt, correction])
- try:
- img = _generate_alpha_image(creds, prompt, part.get("size", boss.get("size", creds.get("size", "1024x1024"))),
- f"{boss_id}/{pid}", lambda m: _append_job_log(job_id, m))
- except Exception as e:
- last_reason = str(e)
- _append_job_log(job_id, f"⚠️ [{pid}] 图像接口失败/超时:{last_reason}")
- correction = "接口刚才失败或超时。请继续只输出这一件干净拆件,不要完整角色,不要其他部件。"
- continue
- ok, reason, detail = asset_quality.boss_part_quality(pid, img)
- if ok:
- _append_job_log(job_id, f"✅ [{idx}/{len(parts)}] {pid} 通过:最大主体 {detail.get('largestShare', 0):.0%}")
- part_images[pid] = img
- break
- last_reason = reason
- _append_job_log(job_id, f"⚠️ [{pid}] 拆件质量失败:{reason}")
- correction = "请严格参考主图,只输出这一件拆件,不要完整角色,不要其他部件,不要相邻碎片。"
- if pid not in part_images:
- raise RuntimeError(f"{pid} 重生失败:{last_reason}")
- spine_builder.build_parts_character(boss_id, part_images, chars_out, boss.get("animations", ["idle"]), parts,
- write_preview=False)
- part_files = spine_builder.write_part_pngs_from_files(boss_id, chars_out)
- row["parts"] = part_files
- row["partVersions"] = {p["id"]: preview_version for p in part_files}
- row["partsSourcePreviewVersion"] = preview_version
- row["partsStale"] = False
- files = [f"characters/{boss_id}.json", f"characters/{boss_id}.atlas", f"characters/{boss_id}.png",
- f"characters/{boss_id}_preview.png"]
- files.extend([p["file"] for p in part_files])
- row["files"] = files
- _save_library(game, lib)
- _set_job(job_id, status="done", ok=True, game=game, updatedAt=time.time())
- except Exception as e:
- traceback.print_exc()
- _append_job_log(job_id, f"❌ 按主图重生拆件失败: {e}")
- _set_job(job_id, status="error", ok=False, error=str(e), updatedAt=time.time())
- def _split_lines(value):
- if isinstance(value, list):
- return [str(x).strip() for x in value if str(x).strip()]
- return [x.strip() for x in str(value or "").splitlines() if x.strip()]
- def _creative_fallback(data):
- brief = (data.get("brief") or "").lower()
- text = " ".join([brief, data.get("styleNotes", "").lower(), data.get("avoidNotes", "").lower()])
- allowed_themes = {"jelly", "fruit", "egypt", "pirate", "pirate_jelly", "cyber"}
- requested_theme = data.get("theme") if data.get("theme") in allowed_themes else ""
- theme = requested_theme or "jelly"
- pirate_hint = any(k in text for k in ("pirate", "海盗", "treasure", "宝藏", "金币", "船长"))
- jelly_hint = any(k in text for k in ("jelly", "果冻", "candy", "糖果", "gummy", "软糖"))
- if not requested_theme or requested_theme == "jelly":
- if pirate_hint and jelly_hint:
- theme = "pirate_jelly"
- elif any(k in text for k in ("egypt", "埃及", "金字塔", "法老")):
- theme = "egypt"
- elif pirate_hint:
- theme = "pirate"
- elif any(k in text for k in ("cyber", "赛博", "neon", "霓虹")):
- theme = "cyber"
- elif any(k in text for k in ("fruit", "水果", "cherry", "樱桃")):
- theme = "fruit"
- reel_mode = data.get("reelMode") or ("cluster" if any(k in text for k in ("消除", "cluster", "连通", "match")) else "ways")
- volatility = data.get("volatility") or ("high" if any(k in text for k in ("刺激", "大奖", "高波动", "big win")) else "medium")
- features = list(data.get("features") or ["cascades", "free_spins", "wilds"])
- if any(k in text for k in ("金币", "jackpot", "hold", "respin", "大奖池")) and "hold_win" not in features:
- features.append("hold_win")
- if any(k in text for k in ("倍率", "multiplier", "连锁")) and "multipliers" not in features:
- features.append("multipliers")
- if theme == "pirate_jelly":
- for feature in ("hold_win", "multipliers"):
- if feature not in features:
- features.append(feature)
- title = data.get("title") or "AI Custom Slot"
- style = data.get("styleNotes") or data.get("brief") or ""
- return {
- "gameId": data.get("gameId") or title,
- "title": title,
- "theme": theme,
- "style": style,
- "reelMode": reel_mode,
- "volatility": volatility,
- "targetRtp": data.get("targetRtp", 96),
- "characterCount": data.get("characterCount", 10),
- "uiCompleteness": data.get("uiCompleteness", "full"),
- "feedbackIntensity": data.get("feedbackIntensity", "standard"),
- "enableBoss": bool(data.get("enableBoss", True)),
- "bossPresence": data.get("bossPresence", "full"),
- "enableMathModel": bool(data.get("enableMathModel", True)),
- "features": features,
- }
- def build_game_plan(slot_request, creative_payload, source):
- features = slot_request.get("features") or []
- hook_parts = []
- if "hold_win" in features:
- hook_parts.append("金币锁格 respin 大奖目标")
- if "cascades" in features:
- hook_parts.append("连锁下落爽感")
- if "free_spins" in features:
- hook_parts.append("Scatter 免费旋转期待")
- if "multipliers" in features:
- hook_parts.append("递增倍率爆点")
- core_hook = " + ".join(hook_parts[:2]) or "轻量现代老虎机循环"
- vision = creative_payload.get("visionStyleAnalysis") or {}
- art_direction = {
- "theme": slot_request.get("theme"),
- "style": slot_request.get("style", ""),
- "reference_style_analysis": vision,
- "avoid": creative_payload.get("avoidNotes", ""),
- }
- return {
- "source": source,
- "creative": {
- "brief": creative_payload.get("brief", ""),
- "references": creative_payload.get("references", []),
- "uploadedReferenceImages": creative_payload.get("uploadedReferenceImages", 0),
- "visionStyleAnalysis": vision,
- "visionStyleAnalysisError": creative_payload.get("visionStyleAnalysisError", ""),
- },
- "gameDesign": {
- "title": slot_request.get("title"),
- "coreHook": core_hook,
- "artDirection": art_direction,
- "reelExperience": slot_request.get("reelMode"),
- "volatility": slot_request.get("volatility"),
- "differentiators": hook_parts,
- "bossDesign": {
- "enabled": bool(slot_request.get("enableBoss", True)),
- "presence": slot_request.get("bossPresence", "full"),
- "idle": "静静等待时会呼吸、环顾、蓄力",
- "playerWin": "玩家赢钱时撒币,然后受击裂开爆炸",
- "playerLose": "玩家输钱时举剑嘲讽、踩踏并攻击",
- },
- "assetStrategy": {
- "symbolCount": slot_request.get("characterCount"),
- "uiCompleteness": slot_request.get("uiCompleteness"),
- "feedbackIntensity": slot_request.get("feedbackIntensity"),
- },
- },
- }
- def creative_to_slot_request(data):
- api_key = (data.get("api_key") or DEFAULT_API_KEY).strip()
- base_url = (data.get("base_url") or DEFAULT_BASE_URL).strip()
- text_model = (data.get("text_model") or DEFAULT_TEXT_MODEL).strip()
- references = _split_lines(data.get("references"))
- reference_images = data.get("reference_images") or []
- if not isinstance(reference_images, list):
- reference_images = []
- payload = {
- "title": data.get("title") or "AI Custom Slot",
- "gameId": data.get("gameId") or "",
- "brief": data.get("brief") or "",
- "references": references,
- "uploadedReferenceImages": len(reference_images),
- "styleNotes": data.get("styleNotes") or "",
- "avoidNotes": data.get("avoidNotes") or "",
- "theme": data.get("theme") or "",
- "reelMode": data.get("reelMode") or "",
- "volatility": data.get("volatility") or "",
- "targetRtp": data.get("targetRtp", 96),
- "characterCount": data.get("characterCount", 10),
- "uiCompleteness": data.get("uiCompleteness", "full"),
- "feedbackIntensity": data.get("feedbackIntensity", "standard"),
- "enableBoss": bool(data.get("enableBoss", True)),
- "bossPresence": data.get("bossPresence", "full"),
- "features": data.get("features") or [],
- "enableMathModel": bool(data.get("enableMathModel", True)),
- }
- if not api_key:
- fallback = _creative_fallback(payload)
- return fallback, "fallback_no_api_key", build_game_plan(fallback, payload, "fallback_no_api_key")
- vision_notes = {}
- direct_image_urls = [
- x for x in references
- if x.lower().startswith(("http://", "https://")) and x.lower().split("?")[0].endswith((".png", ".jpg", ".jpeg", ".webp"))
- ]
- if direct_image_urls or reference_images:
- try:
- vision_notes = providers.analyze_reference_images(
- reference_urls=direct_image_urls,
- image_data_urls=reference_images,
- api_key=api_key,
- base_url=base_url,
- model=text_model,
- )
- payload["visionStyleAnalysis"] = vision_notes
- if vision_notes.get("image_prompt_style"):
- payload["styleNotes"] = ";".join([
- str(payload.get("styleNotes") or ""),
- "视觉参考分析:" + str(vision_notes.get("image_prompt_style")),
- ]).strip(";")
- elif vision_notes:
- payload["styleNotes"] = ";".join([
- str(payload.get("styleNotes") or ""),
- "视觉参考分析:" + json.dumps(vision_notes, ensure_ascii=False),
- ]).strip(";")
- except Exception as e:
- payload["visionStyleAnalysisError"] = str(e)[:500]
- messages = [
- {
- "role": "system",
- "content": (
- "你是资深移动老虎机游戏策划和美术总监。根据用户的基础描述、参考图或网址、风格要求,"
- "生成一份可执行的 slot 工作流输入 JSON。必须原创,不能复刻参考图里的具体 IP、logo、角色。"
- "只输出 JSON,不要解释。"
- ),
- },
- {
- "role": "user",
- "content": json.dumps({
- "task": "把创意简报转换为 slot_workflow.build_workflow 可接受的输入",
- "allowedFields": {
- "gameId": "string slug or title",
- "title": "string",
- "theme": "jelly|fruit|egypt|pirate|pirate_jelly|cyber",
- "style": "English image-generation style prompt, include reference-derived art direction",
- "reelMode": "ways|paylines|megaways|cluster",
- "volatility": "low|medium|high",
- "targetRtp": "number percent, e.g. 96",
- "characterCount": "6|8|10|12",
- "uiCompleteness": "basic|full",
- "feedbackIntensity": "quiet|standard|loud",
- "enableBoss": "boolean",
- "bossPresence": "light|standard|full",
- "enableMathModel": "boolean",
- "features": "array of cascades, free_spins, wilds, hold_win, multipliers"
- },
- "creativeBrief": payload,
- "outputRules": [
- "Return JSON object with exactly these workflow fields.",
- "Pick one clear core hook, not every feature.",
- "Use references as style inspiration only; do not copy protected characters, logos, or exact layout.",
- "Style must be specific enough for image generation."
- ],
- }, ensure_ascii=False),
- },
- ]
- try:
- obj = providers.chat_json_openai(messages, api_key=api_key, base_url=base_url, model=text_model)
- except Exception:
- fallback = _creative_fallback(payload)
- return fallback, "fallback_text_model_failed", build_game_plan(fallback, payload, "fallback_text_model_failed")
- fallback = _creative_fallback(payload)
- out = dict(fallback)
- for key in ("gameId", "title", "theme", "style", "reelMode", "volatility", "targetRtp",
- "characterCount", "uiCompleteness", "feedbackIntensity", "enableBoss",
- "bossPresence", "enableMathModel"):
- if key in obj and obj[key] not in (None, ""):
- out[key] = obj[key]
- if isinstance(obj.get("features"), list) and obj["features"]:
- allowed = {"cascades", "free_spins", "wilds", "hold_win", "multipliers"}
- out["features"] = [x for x in obj["features"] if x in allowed]
- source = "vision_text_model" if vision_notes else "text_model"
- return out, source, build_game_plan(out, payload, source)
- class Handler(BaseHTTPRequestHandler):
- def _send(self, code, body, ctype="application/json; charset=utf-8"):
- if isinstance(body, (dict, list)):
- body = json.dumps(body, ensure_ascii=False).encode("utf-8")
- elif isinstance(body, str):
- body = body.encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", ctype)
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- def _file(self, path):
- if not os.path.isfile(path):
- return self._send(404, {"error": "not found"})
- ctype = mimetypes.guess_type(path)[0] or "application/octet-stream"
- with open(path, "rb") as f:
- data = f.read()
- self.send_response(200)
- self.send_header("Content-Type", ctype)
- self.send_header("Content-Length", str(len(data)))
- self.end_headers()
- self.wfile.write(data)
- def log_message(self, *a):
- pass # 静音
- def do_GET(self):
- u = urlparse(self.path)
- path, qs = u.path, parse_qs(u.query)
- if path == "/" or path == "/index.html":
- return self._file(os.path.join(WEB_DIR, "index.html"))
- if path == "/api/manifest":
- with open(DEFAULT_MANIFEST, encoding="utf-8") as f:
- return self._send(200, f.read())
- if path == "/api/games":
- return self._send(200, {"games": list_games()})
- if path == "/api/job":
- job_id = qs.get("id", [""])[0]
- job = _job_snapshot(job_id)
- if not job:
- return self._send(404, {"ok": False, "error": "job not found"})
- job["id"] = job_id
- return self._send(200, job)
- if path == "/api/library":
- games = list_games()
- game = (qs.get("game", [None])[0]) or (games[-1] if games else None)
- if not game:
- return self._send(200, {"game": None, "games": games,
- "characters": [], "vfx": [], "ui": [], "tasks": {}})
- library_path = os.path.join(OUT_ROOT, game, "library.json")
- if not os.path.isfile(library_path):
- return self._send(200, {"game": game, "games": games, "assetBase": f"/assets/{game}/",
- "characters": [], "vfx": [], "ui": [], "tasks": {}})
- with open(library_path, encoding="utf-8") as f:
- lib = json.load(f)
- lib["games"] = games
- lib["assetBase"] = f"/assets/{game}/"
- lib["tasks"] = _build_tasks(lib)
- all_tasks = [t for rows in lib["tasks"].values() for t in rows]
- lib["taskSummary"] = {
- "total": len(all_tasks),
- "done": sum(1 for t in all_tasks if t.get("status") == "done"),
- "missing": sum(1 for t in all_tasks if t.get("status") != "done"),
- }
- return self._send(200, lib)
- if path.startswith("/assets/"):
- rel = unquote(path[len("/assets/"):])
- try:
- return self._file(safe_join(OUT_ROOT, rel))
- except ValueError:
- return self._send(400, {"error": "bad path"})
- return self._send(404, {"error": "not found"})
- def do_POST(self):
- try:
- return self._do_POST()
- except Exception as e:
- traceback.print_exc()
- return self._send(500, {"ok": False, "error": str(e)})
- def _read_json_body(self):
- length = int(self.headers.get("Content-Length", 0))
- return json.loads(self.rfile.read(length) or b"{}")
- def _do_POST(self):
- route = urlparse(self.path).path
- if route == "/api/export":
- return self._post_export()
- if route == "/api/open-folder":
- return self._post_open_folder()
- if route == "/api/retry-task":
- return self._post_retry_task()
- if route == "/api/retry-boss-part":
- return self._post_retry_boss_part()
- if route == "/api/retry-boss-preview":
- return self._post_retry_boss_preview()
- if route == "/api/retry-boss-parts-from-preview":
- return self._post_retry_boss_parts_from_preview()
- if route == "/api/retry-missing":
- return self._post_retry_missing()
- if route == "/api/delete":
- return self._post_delete()
- if route == "/api/slot-workflow":
- return self._post_slot_workflow()
- if route == "/api/creative-manifest":
- return self._post_creative_manifest()
- if route != "/api/generate":
- return self._send(404, {"error": "not found"})
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- try:
- manifest = json.loads(data["manifest"]) if isinstance(data.get("manifest"), str) \
- else data.get("manifest")
- manifest = slot_workflow.complete_manifest(manifest)
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"manifest 非法 JSON: {e}"})
- creds = _creds_from_request(data)
- logs = []
- if data.get("async", True):
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["任务已创建,等待开始…"],
- "game": manifest.get("game"), "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_generate_job, args=(job_id, manifest, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": manifest.get("game")})
- try:
- lib, _ = pipeline.run(manifest, OUT_ROOT, creds=creds, log=lambda m: logs.append(m))
- except Exception as e:
- return self._send(500, {"ok": False, "error": str(e), "logs": logs})
- return self._send(200, {"ok": True, "logs": logs, "game": lib["game"]})
- def _post_slot_workflow(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- try:
- result = slot_workflow.build_workflow(data)
- except Exception as e:
- traceback.print_exc()
- return self._send(500, {"ok": False, "error": str(e)})
- return self._send(200, {"ok": True, **result})
- def _post_creative_manifest(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- try:
- normalized, source, game_plan = creative_to_slot_request(data)
- normalized["creative"] = game_plan.get("creative", {})
- normalized["gameDesign"] = game_plan.get("gameDesign", {})
- result = slot_workflow.build_workflow(normalized)
- return self._send(200, {"ok": True, "source": source, "game_plan": game_plan,
- "creative_request": normalized, **result})
- except Exception as e:
- traceback.print_exc()
- return self._send(500, {"ok": False, "error": str(e)})
- def _post_export(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- logs = []
- try:
- pack = exporter.export(game, OUT_ROOT, log=lambda m: logs.append(m))
- except Exception as e:
- traceback.print_exc()
- return self._send(500, {"ok": False, "error": str(e), "logs": logs})
- return self._send(200, {"ok": True, "logs": logs, "game": game,
- "pack": os.path.abspath(pack)})
- def _post_open_folder(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- target = os.path.abspath(os.path.join(OUT_ROOT, game))
- if not target.startswith(os.path.abspath(OUT_ROOT) + os.sep) or not os.path.isdir(target):
- return self._send(400, {"ok": False, "error": "素材目录不存在或路径非法"})
- try:
- if sys.platform == "darwin":
- subprocess.Popen(["open", target])
- elif os.name == "nt":
- os.startfile(target) # type: ignore[attr-defined]
- else:
- subprocess.Popen(["xdg-open", target])
- except Exception as e:
- return self._send(500, {"ok": False, "error": f"打开素材目录失败: {e}"})
- return self._send(200, {"ok": True, "game": game, "path": target})
- def _post_retry_task(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- kind = TASK_KIND_MAP.get((data.get("kind") or "").strip())
- if not kind:
- return self._send(400, {"ok": False, "error": "无效的任务类型"})
- task_ids = data.get("ids") or data.get("taskIds") or [data.get("id")]
- if isinstance(task_ids, str):
- task_ids = [task_ids]
- task_ids = [str(x).strip() for x in task_ids if str(x or "").strip()]
- if not task_ids:
- return self._send(400, {"ok": False, "error": "没有选择要重试的任务"})
- lib = _load_library(game)
- tasks = _build_tasks(lib).get(kind, [])
- valid_ids = {t["id"] for t in tasks}
- bad = [x for x in task_ids if x not in valid_ids]
- if bad:
- return self._send(400, {"ok": False, "error": f"任务不存在: {', '.join(bad)}"})
- creds = _creds_from_request(data)
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["补生成任务已创建,等待开始…"],
- "game": game, "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_retry_job, args=(job_id, game, kind, task_ids, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": game})
- def _post_retry_boss_part(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- boss_id = (data.get("bossId") or data.get("id") or "").strip()
- part_id = (data.get("partId") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- if not boss_id or not part_id:
- return self._send(400, {"ok": False, "error": "缺少 bossId 或 partId"})
- creds = _creds_from_request(data)
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["拆件重生任务已创建,等待开始…"],
- "game": game, "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_retry_boss_part_job,
- args=(job_id, game, boss_id, part_id, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": game})
- def _post_retry_boss_preview(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- boss_id = (data.get("bossId") or data.get("id") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- if not boss_id:
- return self._send(400, {"ok": False, "error": "缺少 bossId"})
- creds = _creds_from_request(data)
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["主图重生任务已创建,等待开始…"],
- "game": game, "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_retry_boss_preview_job,
- args=(job_id, game, boss_id, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": game})
- def _post_retry_boss_parts_from_preview(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- boss_id = (data.get("bossId") or data.get("id") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- if not boss_id:
- return self._send(400, {"ok": False, "error": "缺少 bossId"})
- creds = _creds_from_request(data)
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["按主图重生拆件任务已创建,等待开始…"],
- "game": game, "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_retry_boss_parts_from_preview_job,
- args=(job_id, game, boss_id, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": game})
- def _post_retry_missing(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- lib = _load_library(game)
- tasks = _build_tasks(lib)
- groups = {k: [t["id"] for t in rows if t.get("status") != "done"]
- for k, rows in tasks.items()}
- groups = {k: v for k, v in groups.items() if v}
- if not groups:
- return self._send(400, {"ok": False, "error": "当前资源库没有缺失任务"})
- creds = _creds_from_request(data)
- job_id = uuid.uuid4().hex
- with JOBS_LOCK:
- JOBS[job_id] = {"status": "queued", "ok": None, "logs": ["批量补生成任务已创建,等待开始…"],
- "game": game, "createdAt": time.time(), "updatedAt": time.time()}
- threading.Thread(target=_run_retry_missing_job, args=(job_id, game, groups, creds), daemon=True).start()
- return self._send(200, {"ok": True, "jobId": job_id, "game": game, "groups": groups})
- def _post_delete(self):
- try:
- data = self._read_json_body()
- except Exception as e:
- return self._send(400, {"ok": False, "error": f"请求体不是合法 JSON: {e}"})
- game = (data.get("game") or "").strip()
- if not game or game not in list_games():
- return self._send(400, {"ok": False, "error": f"无效的 game: {game!r}"})
- target = os.path.join(OUT_ROOT, game)
- # 安全校验:必须落在 OUT_ROOT 内
- if not os.path.abspath(target).startswith(os.path.abspath(OUT_ROOT) + os.sep):
- return self._send(400, {"ok": False, "error": "非法路径"})
- shutil.rmtree(target)
- return self._send(200, {"ok": True, "deleted": game, "games": list_games()})
- if __name__ == "__main__":
- os.makedirs(OUT_ROOT, exist_ok=True)
- print(f"Anim Studio 网站: http://127.0.0.1:{PORT}")
- ThreadingHTTPServer(("127.0.0.1", PORT), Handler).serve_forever()
|