"""Aliyun background-removal integration. Aliyun Marketplace product cmapi00069648 exposes: POST https://bgrem.market.alicloudapi.com/api/v1/bg-remove/submit POST https://bgrem.market.alicloudapi.com/api/v1/bg-remove/query The remote service requires a public image URL. Generated images from the current gpt-image-2 gateway are usually b64-only, so the pipeline uses remote matting only when a public URL is available. There is intentionally no local fallback because local matting was not clean enough for production assets. """ import base64 import http.client import io import json import time import uuid from urllib.parse import urlparse from PIL import Image, ImageFilter import config ALIYUN_APP_CODE = config.get("ALIYUN_BGREM_APP_CODE", "") HOST = "bgrem.market.alicloudapi.com" SUBMIT_PATH = "/api/v1/bg-remove/submit" QUERY_PATH = "/api/v1/bg-remove/query" def _json_post(path, payload, timeout=60): if not ALIYUN_APP_CODE: raise RuntimeError("missing ALIYUN_BGREM_APP_CODE") body = json.dumps(payload, ensure_ascii=False).encode("utf-8") headers = { "Content-Type": "application/json; charset=utf-8", "Accept": "application/json", "Authorization": f"APPCODE {ALIYUN_APP_CODE}", "X-Ca-Nonce": uuid.uuid4().hex, } conn = http.client.HTTPSConnection(HOST, timeout=timeout) try: conn.request("POST", path, body=body, headers=headers) res = conn.getresponse() raw = res.read() res_headers = dict(res.getheaders()) finally: conn.close() if res.status < 200 or res.status >= 300: msg = raw.decode("utf-8", "ignore") or res_headers.get("X-Ca-Error-Message", "") raise RuntimeError(f"HTTP {res.status}: {msg}") if not raw: raise RuntimeError("empty response") return json.loads(raw.decode("utf-8")) def _download(url, timeout=120): parsed = urlparse(url) path = parsed.path or "/" if parsed.query: path += "?" + parsed.query conn_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection conn = conn_cls(parsed.hostname, parsed.port, timeout=timeout) try: conn.request("GET", path) res = conn.getresponse() raw = res.read() finally: conn.close() if res.status < 200 or res.status >= 300: raise RuntimeError(f"download HTTP {res.status}") return raw def _extract_result_key(data): payload = data.get("data", data) if isinstance(payload, str): try: payload = json.loads(payload) except Exception: return payload if isinstance(payload, dict): for key in ("result_key", "resultKey", "task_key", "taskKey"): if payload.get(key): return payload[key] tasks = payload.get("task_list") or payload.get("taskList") if isinstance(tasks, list) and tasks: return _extract_result_key(tasks[0]) return None def _extract_result_url(data): payload = data.get("data", data) if isinstance(payload, str): try: payload = json.loads(payload) except Exception: return payload if payload.startswith(("http://", "https://")) else None if isinstance(payload, dict): results = payload.get("results") if isinstance(results, list) and results: return results[0] for key in ("url", "result", "result_url", "resultUrl", "image_url", "imageUrl", "alpha", "alpha_url"): val = payload.get(key) if isinstance(val, str) and val: return val tasks = payload.get("task_list") or payload.get("taskList") or payload.get("result_list") or payload.get("resultList") if isinstance(tasks, list) and tasks: return _extract_result_url(tasks[0]) return None def _ensure_success(data): if data.get("success") is False: raise RuntimeError(data.get("message") or json.dumps(data, ensure_ascii=False)[:500]) code = data.get("code") if code not in (None, 0, "0", 200, "200"): raise RuntimeError(data.get("message") or json.dumps(data, ensure_ascii=False)[:500]) def _remote_remove_by_url(image_url, model_type="general", log=None, label="image"): submit_body = {"data": {"task_list": [{"model_type": model_type, "image": image_url}]}} if log: log(f"🧼 [{label}] 去背景:提交阿里云任务…") submit = _json_post(SUBMIT_PATH, submit_body, timeout=60) _ensure_success(submit) result_key = _extract_result_key(submit) if not result_key: raise RuntimeError(f"提交接口未返回 result_key: {json.dumps(submit, ensure_ascii=False)[:500]}") if log: log(f"🧼 [{label}] 去背景:任务已提交 result_key={result_key}") query_body = {"data": {"result_key": result_key}} last = None for i in range(40): time.sleep(1.5 if i else 0.3) data = _json_post(QUERY_PATH, query_body, timeout=60) _ensure_success(data) last = data result_url = _extract_result_url(data) if result_url: if log: log(f"🧼 [{label}] 去背景:任务完成,下载透明 PNG…") return Image.open(io.BytesIO(_download(result_url))).convert("RGBA") if log and i in (0, 3, 10, 20): log(f"🧼 [{label}] 去背景:等待任务结果…") raise RuntimeError(f"查询超时,最后返回: {json.dumps(last, ensure_ascii=False)[:500]}") def remove_background(img, log=None, label="image", enabled=True, image_url=None, model_type="general"): if not enabled: return img.convert("RGBA") if image_url and image_url.startswith(("http://", "https://")): try: out = _remote_remove_by_url(image_url, model_type=model_type, log=log, label=label) if log: log(f"✅ [{label}] 远端去背景完成:透明 PNG {out.size[0]}×{out.size[1]}") return out except Exception as e: if log: log(f"⚠️ [{label}] 远端去背景失败,保留原图:{e}") else: if log: log(f"ℹ️ [{label}] 去背景:当前图片没有公网 URL,跳过远端去背景并保留原图") return img.convert("RGBA")