| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- """Aliyun background-removal integration.
- Aliyun Marketplace product cmapi00069648 exposes:
- POST https://bgrem.market.alicloudapi.com/api/v1/bg-remove/submit
- POST https://bgrem.market.alicloudapi.com/api/v1/bg-remove/query
- The remote service requires a public image URL. Generated images from the
- current gpt-image-2 gateway are usually b64-only, so the pipeline uses remote
- matting only when a public URL is available. There is intentionally no local
- fallback because local matting was not clean enough for production assets.
- """
- import base64
- import http.client
- import io
- import json
- import time
- import uuid
- from urllib.parse import urlparse
- from PIL import Image, ImageFilter
- import config
- ALIYUN_APP_CODE = config.get("ALIYUN_BGREM_APP_CODE", "")
- HOST = "bgrem.market.alicloudapi.com"
- SUBMIT_PATH = "/api/v1/bg-remove/submit"
- QUERY_PATH = "/api/v1/bg-remove/query"
- def _json_post(path, payload, timeout=60):
- if not ALIYUN_APP_CODE:
- raise RuntimeError("missing ALIYUN_BGREM_APP_CODE")
- body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- headers = {
- "Content-Type": "application/json; charset=utf-8",
- "Accept": "application/json",
- "Authorization": f"APPCODE {ALIYUN_APP_CODE}",
- "X-Ca-Nonce": uuid.uuid4().hex,
- }
- conn = http.client.HTTPSConnection(HOST, timeout=timeout)
- try:
- conn.request("POST", path, body=body, headers=headers)
- res = conn.getresponse()
- raw = res.read()
- res_headers = dict(res.getheaders())
- finally:
- conn.close()
- if res.status < 200 or res.status >= 300:
- msg = raw.decode("utf-8", "ignore") or res_headers.get("X-Ca-Error-Message", "")
- raise RuntimeError(f"HTTP {res.status}: {msg}")
- if not raw:
- raise RuntimeError("empty response")
- return json.loads(raw.decode("utf-8"))
- def _download(url, timeout=120):
- parsed = urlparse(url)
- path = parsed.path or "/"
- if parsed.query:
- path += "?" + parsed.query
- conn_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
- conn = conn_cls(parsed.hostname, parsed.port, timeout=timeout)
- try:
- conn.request("GET", path)
- res = conn.getresponse()
- raw = res.read()
- finally:
- conn.close()
- if res.status < 200 or res.status >= 300:
- raise RuntimeError(f"download HTTP {res.status}")
- return raw
- def _extract_result_key(data):
- payload = data.get("data", data)
- if isinstance(payload, str):
- try:
- payload = json.loads(payload)
- except Exception:
- return payload
- if isinstance(payload, dict):
- for key in ("result_key", "resultKey", "task_key", "taskKey"):
- if payload.get(key):
- return payload[key]
- tasks = payload.get("task_list") or payload.get("taskList")
- if isinstance(tasks, list) and tasks:
- return _extract_result_key(tasks[0])
- return None
- def _extract_result_url(data):
- payload = data.get("data", data)
- if isinstance(payload, str):
- try:
- payload = json.loads(payload)
- except Exception:
- return payload if payload.startswith(("http://", "https://")) else None
- if isinstance(payload, dict):
- results = payload.get("results")
- if isinstance(results, list) and results:
- return results[0]
- for key in ("url", "result", "result_url", "resultUrl", "image_url", "imageUrl", "alpha", "alpha_url"):
- val = payload.get(key)
- if isinstance(val, str) and val:
- return val
- tasks = payload.get("task_list") or payload.get("taskList") or payload.get("result_list") or payload.get("resultList")
- if isinstance(tasks, list) and tasks:
- return _extract_result_url(tasks[0])
- return None
- def _ensure_success(data):
- if data.get("success") is False:
- raise RuntimeError(data.get("message") or json.dumps(data, ensure_ascii=False)[:500])
- code = data.get("code")
- if code not in (None, 0, "0", 200, "200"):
- raise RuntimeError(data.get("message") or json.dumps(data, ensure_ascii=False)[:500])
- def _remote_remove_by_url(image_url, model_type="general", log=None, label="image"):
- submit_body = {"data": {"task_list": [{"model_type": model_type, "image": image_url}]}}
- if log:
- log(f"🧼 [{label}] 去背景:提交阿里云任务…")
- submit = _json_post(SUBMIT_PATH, submit_body, timeout=60)
- _ensure_success(submit)
- result_key = _extract_result_key(submit)
- if not result_key:
- raise RuntimeError(f"提交接口未返回 result_key: {json.dumps(submit, ensure_ascii=False)[:500]}")
- if log:
- log(f"🧼 [{label}] 去背景:任务已提交 result_key={result_key}")
- query_body = {"data": {"result_key": result_key}}
- last = None
- for i in range(40):
- time.sleep(1.5 if i else 0.3)
- data = _json_post(QUERY_PATH, query_body, timeout=60)
- _ensure_success(data)
- last = data
- result_url = _extract_result_url(data)
- if result_url:
- if log:
- log(f"🧼 [{label}] 去背景:任务完成,下载透明 PNG…")
- return Image.open(io.BytesIO(_download(result_url))).convert("RGBA")
- if log and i in (0, 3, 10, 20):
- log(f"🧼 [{label}] 去背景:等待任务结果…")
- raise RuntimeError(f"查询超时,最后返回: {json.dumps(last, ensure_ascii=False)[:500]}")
- def remove_background(img, log=None, label="image", enabled=True, image_url=None, model_type="general"):
- if not enabled:
- return img.convert("RGBA")
- if image_url and image_url.startswith(("http://", "https://")):
- try:
- out = _remote_remove_by_url(image_url, model_type=model_type, log=log, label=label)
- if log:
- log(f"✅ [{label}] 远端去背景完成:透明 PNG {out.size[0]}×{out.size[1]}")
- return out
- except Exception as e:
- if log:
- log(f"⚠️ [{label}] 远端去背景失败,保留原图:{e}")
- else:
- if log:
- log(f"ℹ️ [{label}] 去背景:当前图片没有公网 URL,跳过远端去背景并保留原图")
- return img.convert("RGBA")
|