×

沃尔玛商品数据 API JSON 返回异常处理:Python 完整方案

知名用户18007905473 知名用户18007905473 发表于2025-12-03 16:18:56 浏览35 评论0

抢沙发发表评论

沃尔玛商品数据 API JSON 返回异常处理:Python 完整方案

在沃尔玛商品 API 系列采集(详情、批量、库存等)中,JSON 数据返回的异常主要集中在 API 响应异常、JSON 格式异常、字段缺失 / 类型不匹配 三类场景。以下是针对性的异常处理方案,覆盖全链路容错,确保采集流程稳定不中断,同时兼顾合规性与可维护性。

一、核心异常类型与影响

异常场景表现形式(JSON / 响应特征)影响范围
API 业务异常响应 JSON 含code≠200(如 401、429、404),message提示错误整次请求失败(无有效数据)
JSON 格式非法响应非标准 JSON(如 HTML 错误页、网络中断导致 JSON 不完整)无法解析,数据完全不可用
核心字段缺失JSON 结构完整,但data.itemIdvariants等关键字段为null部分数据无效(如无规格、无库存)
字段类型不匹配预期数字字段(如currentPrice)返回字符串(如 "缺货")解析报错,中断循环
数组字段异常comments/variant字段应为数组,实际为字典 /null批量解析时中断后续数据提取

二、异常处理核心原则

  1. 分层捕获:API 请求层、JSON 解析层、字段提取层分开捕获异常,不混用;

  2. 优雅降级:整体请求失败返回空数据,单条商品 / 规格解析失败跳过,不中断批量采集;

  3. 日志留痕:记录异常详情(商品 ID、错误栈、原始响应片段),便于排查;

  4. 合规兜底:异常时不泄露敏感信息(如Client Secret、完整 JSON 响应),不强制使用无效数据。

三、完整实现方案(基于 API 系列框架)

以下在之前 API 系列代码基础上,强化异常处理逻辑,新增通用 JSON 解析工具和异常捕获机制:

1. 基础依赖与配置(新增异常处理相关工具)

python
运行
import requestsimport timeimport jsonfrom typing import List, Dict, Optional, Anyimport logging# -------------------------- 日志配置(记录异常详情)--------------------------logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler("walmart_api_exception.log"), logging.StreamHandler()])logger = logging.getLogger(__name__)# -------------------------- 基础配置(同之前,替换为你的合法信息)--------------------------CLIENT_ID = "你的 Client ID"CLIENT_SECRET = "你的 Client Secret"TOKEN_URL = "https://marketplace.walmartapis.com/v3/token"BASE_URL = "https://marketplace.walmartapis.com"TIMEOUT = 15RATE_LIMIT_DELAY = 0.2RETRY_TIMES = 3  # 异常重试次数

2. 通用异常处理工具函数(核心新增)

python
运行
def safe_json_parse(response_text: str) -> Optional[Dict]:
    """
    安全解析JSON字符串(处理JSON格式非法异常)
    :param response_text: API响应原始文本
    :return: 解析后的字典,失败返回None
    """
    try:
        return json.loads(response_text)
    except json.JSONDecodeError as e:
        # 记录异常响应片段(前200字符,避免日志过长)
        error_response = response_text[:200] + "..." if len(response_text) > 200 else response_text
        logger.error(f"JSON格式解析失败,原始响应片段:{error_response},错误:{str(e)}")
        return Nonedef get_json_field(json_data: Dict, field_path: str, default: Any = None, expected_type: type = None) -> Any:
    """
    安全提取JSON字段(处理字段缺失、类型不匹配)
    :param json_data: 解析后的JSON字典
    :param field_path: 字段路径(如 "data.variants.variant")
    :param default: 字段缺失时的默认值
    :param expected_type: 预期字段类型(如 int、list,不指定则不校验)
    :return: 提取后的字段值
    """
    keys = field_path.split(".")
    value = json_data    for key in keys:
        # 逐层提取,中间层级为None则返回默认值
        if isinstance(value, dict):
            value = value.get(key)
        elif isinstance(value, list) and value:
            # 数组字段默认取第一个元素(可根据需求修改)
            value = value[0].get(key) if isinstance(value[0], dict) else None
        else:
            value = None
            break

    # 字段缺失,返回默认值
    if value is None:
        logger.debug(f"字段 {field_path} 缺失,返回默认值:{default}")
        return default    # 类型校验(不匹配则返回默认值)
    if expected_type and not isinstance(value, expected_type):
        logger.warning(f"字段 {field_path} 类型不匹配(预期:{expected_type.__name__},实际:{type(value).__name__}),返回默认值:{default}")
        return default    return value

3. 强化版通用请求函数(处理 API 响应异常)

python
运行
def get_access_token() -> Optional[str]:
    """获取Token(新增重试+异常详细日志)"""
    for retry in range(RETRY_TIMES):
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET        }
        try:
            response = requests.post(TOKEN_URL, headers=headers, data=data, timeout=TIMEOUT)
            response.raise_for_status()
            token_data = safe_json_parse(response.text)  # 用安全解析函数
            if not token_data:
                raise ValueError("Token响应JSON解析失败")
            access_token = token_data.get("access_token")
            if access_token:
                logger.info("Token获取成功,有效期1小时")
                return access_token            else:
                raise ValueError("Token响应中无access_token字段")
        except Exception as e:
            logger.error(f"Token获取失败(重试{retry+1}/{RETRY_TIMES}):{str(e)}")
            if retry < RETRY_TIMES - 1:
                time.sleep(2 ** retry)  # 指数退避重试(1s、2s、4s)
    return Nonedef create_headers(access_token: str) -> Dict[str, str]:
    """创建请求头(不变)"""
    return {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }def safe_request(url: str, headers: Dict, method: str = "GET", json_data: Optional[Dict] = None) -> Optional[Dict]:
    """
    安全请求函数(强化异常处理:重试、JSON解析容错、状态码细分)
    """
    for retry in range(RETRY_TIMES):
        time.sleep(RATE_LIMIT_DELAY)  # 频率控制
        try:
            # 发送请求
            if method.upper() == "GET":
                response = requests.get(url, headers=headers, timeout=TIMEOUT)
            elif method.upper() == "POST":
                response = requests.post(url, headers=headers, json=json_data, timeout=TIMEOUT)
            else:
                logger.error(f"不支持的请求方式:{method}")
                return None

            # 记录响应状态和原始文本片段
            logger.debug(f"请求URL:{url},状态码:{response.status_code}")
            response_text = response.text[:500] + "..." if len(response.text) > 500 else response.text
            logger.debug(f"响应片段:{response_text}")

            # 处理状态码
            if response.status_code == 200:
                # 安全解析JSON
                json_result = safe_json_parse(response.text)
                if json_result:
                    # 校验业务状态码(部分API HTTP 200但code≠200)
                    business_code = get_json_field(json_result, "code", expected_type=int)
                    if business_code == 200:
                        return json_result                    else:
                        business_msg = get_json_field(json_result, "message", "未知业务错误")
                        logger.error(f"API业务失败,错误码:{business_code},信息:{business_msg}")
                        return None
                else:
                    logger.error("JSON解析失败,放弃重试")
                    return None

            elif response.status_code == 401:
                logger.error("Token过期或无效,终止请求(需重新获取Token)")
                return None  # Token失效无需重试

            elif response.status_code == 404:
                logger.error(f"资源不存在(404):{url}")
                return None  # 商品不存在无需重试

            elif response.status_code == 429:
                delay = 10 * (retry + 1)
                logger.warning(f"调用频率超限(429),延迟{delay}秒重试({retry+1}/{RETRY_TIMES})")
                time.sleep(delay)
                continue  # 继续重试

            elif response.status_code >= 500:
                # 服务器错误,重试
                logger.error(f"服务器错误({response.status_code}),重试{retry+1}/{RETRY_TIMES}")
                if retry < RETRY_TIMES - 1:
                    time.sleep(2 ** retry)
                    continue

            else:
                logger.error(f"请求失败({response.status_code}),响应:{response_text}")
                return None

        except requests.exceptions.Timeout:
            logger.error(f"请求超时({retry+1}/{RETRY_TIMES}):{url}")
            if retry < RETRY_TIMES - 1:
                time.sleep(2 ** retry)
                continue

        except requests.exceptions.ConnectionError:
            logger.error(f"连接失败({retry+1}/{RETRY_TIMES}):{url}")
            if retry < RETRY_TIMES - 1:
                time.sleep(2 ** retry)
                continue

        except Exception as e:
            logger.error(f"请求未知异常({retry+1}/{RETRY_TIMES}):{str(e)}")
            if retry < RETRY_TIMES - 1:
                time.sleep(2 ** retry)
                continue

    # 重试次数耗尽仍失败
    logger.error(f"请求重试{RETRY_TIMES}次后失败:{url}")
    return None# -------------------------- 初始化Token和请求头(新增异常终止)--------------------------access_token = get_access_token()if not access_token:
    logger.critical("Token获取失败,终止程序")
    raise SystemExit(1)  # 终止程序headers = create_headers(access_token)

4. 各 API 场景异常处理强化(以单商品、批量为例)

场景 1:单商品详情采集(强化字段提取容错)

python
运行
def fetch_single_item(item_id: str) -> Optional[Dict]:
    """单商品采集(强化JSON字段容错、单规格解析失败跳过)"""
    logger.info(f"开始采集商品:{item_id}")
    url = f"{BASE_URL}/v3/items/{item_id}"
    json_data = safe_request(url, headers)
    if not json_data:
        logger.error(f"商品 {item_id} 采集失败(无有效JSON响应)")
        return None

    # 用通用字段提取函数,避免字段缺失/类型错误
    data = get_json_field(json_data, "data", {}, expected_type=dict)
    parsed_item = {
        "商品ID": get_json_field(data, "itemId", f"未知ID_{item_id}", expected_type=str),
        "商品标题": get_json_field(data, "title", "无标题", expected_type=str),
        "品牌": get_json_field(data, "brand", "未知品牌", expected_type=str),
        "类目路径": get_json_field(data, "categoryPath", "未知类目", expected_type=str),
        "平均评分": get_json_field(data, "ratings.averageRating", 0.0, expected_type=(int, float)),
        "评论总数": get_json_field(data, "ratings.reviewCount", 0, expected_type=int),
        "主图URL": "",
        "规格列表": []
    }

    # 提取主图(容错数组异常)
    images = get_json_field(data, "images", [], expected_type=list)
    for img in images:
        if isinstance(img, dict) and img.get("imageType") == "PRIMARY":
            parsed_item["主图URL"] = img.get("imageUrl", "")
            break

    # 提取多规格(单规格解析失败跳过,不影响整体)
    variants = get_json_field(data, "variants.variant", [], expected_type=list)
    for idx, spec in enumerate(variants, 1):
        try:
            if not isinstance(spec, dict):
                logger.warning(f"商品 {item_id} 第{idx}个规格不是字典类型,跳过")
                continue

            spec_id = get_json_field(spec, "variantId", f"规格_{idx}", expected_type=str)
            attrs = get_json_field(spec, "attributes", {}, expected_type=dict)
            price = get_json_field(spec, "price", {}, expected_type=dict)
            inventory = get_json_field(spec, "inventory", {}, expected_type=dict)

            parsed_spec = {
                "规格ID": spec_id,
                "颜色": get_json_field(attrs, "color", "未知颜色", expected_type=str),
                "配置": get_json_field(attrs, "storage", "默认配置", expected_type=str),
                "当前售价": f"{get_json_field(price, 'currentPrice', 0.0, expected_type=(int, float))} {get_json_field(price, 'currency', 'USD')}",
                "原价": f"{get_json_field(price, 'originalPrice', get_json_field(price, 'currentPrice', 0.0), expected_type=(int, float))} {get_json_field(price, 'currency', 'USD')}",
                "库存状态": get_json_field(inventory, "stockStatus", "UNKNOWN", expected_type=str),
                "可用库存": get_json_field(inventory, "availableQuantity", 0, expected_type=int)
            }
            parsed_item["规格列表"].append(parsed_spec)
        except Exception as e:
            logger.error(f"商品 {item_id} 第{idx}个规格解析失败:{str(e)}", exc_info=True)
            continue  # 跳过该规格,继续解析下一个

    logger.info(f"商品 {item_id} 采集完成,有效规格数:{len(parsed_item['规格列表'])}")
    return parsed_item# 调用示例if __name__ == "__main__":
    item_data = fetch_single_item("WM123456789")
    if item_data:
        print(f"商品ID:{item_data['商品ID']},标题:{item_data['商品标题']}")

场景 2:批量商品采集(强化单商品失败跳过)

python
运行
def fetch_batch_items(item_ids: List[str]) -> List[Dict]:
    """批量商品采集(单商品失败跳过,不中断批量流程)"""
    logger.info(f"开始批量采集 {len(item_ids)} 个商品")
    if len(item_ids) > 20:
        logger.warning("批量采集最多支持20个商品,自动截取前20个")
        item_ids = item_ids[:20]

    url = f"{BASE_URL}/v3/items/batch"
    json_body = {"itemIds": item_ids}
    json_data = safe_request(url, headers, method="POST", json_data=json_body)
    if not json_data:
        logger.error("批量采集请求失败")
        return []

    items = get_json_field(json_data, "data.items", [], expected_type=list)
    parsed_items = []

    for item in items:
        try:
            if not isinstance(item, dict):
                logger.warning("批量返回中存在非字典类型商品,跳过")
                continue

            item_id = get_json_field(item, "itemId", "未知ID", expected_type=str)
            parsed_item = {
                "商品ID": item_id,
                "商品标题": get_json_field(item, "title", "无标题", expected_type=str),
                "品牌": get_json_field(item, "brand", "未知品牌", expected_type=str),
                "当前售价": get_json_field(item, "price.currentPrice", 0.0, expected_type=(int, float)),
                "库存状态": get_json_field(item, "inventory.stockStatus", "UNKNOWN", expected_type=str)
            }
            parsed_items.append(parsed_item)
        except Exception as e:
            logger.error(f"批量商品中单个商品解析失败:{str(e)}", exc_info=True)
            continue  # 跳过该商品,继续解析下一个

    logger.info(f"批量采集完成,成功解析 {len(parsed_items)}/{len(item_ids)} 个商品")
    return parsed_items# 调用示例if __name__ == "__main__":
    batch_item_ids = ["WM123456789", "WM987654321", "WM_INVALID_ID"]
    batch_data = fetch_batch_items(batch_item_ids)
    for item in batch_data:
        print(f"商品ID:{item['商品ID']},售价:{item['当前售价']} USD")

5. 异常后的数据兜底处理

python
运行
def save_to_csv(items: List[Dict], filename: str = "沃尔玛商品数据.csv"):
    """保存CSV(处理空数据、字段缺失)"""
    if not items:
        logger.warning("无有效数据可保存")
        return

    # 动态获取表头(避免部分商品字段缺失导致CSV列不完整)
    headers = set()
    for item in items:
        headers.update(item.keys())
    headers = list(headers)

    try:
        with open(filename, "w", encoding="utf-8-sig", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=headers, restval="")  # restval设置缺失字段默认值
            writer.writeheader()
            writer.writerows(items)
        logger.info(f"数据已保存到:{filename}")
    except Exception as e:
        logger.error(f"保存CSV失败:{str(e)}", exc_info=True)# 调用示例if __name__ == "__main__":
    batch_data = fetch_batch_items(["WM123456789", "WM987654321"])
    save_to_csv(batch_data)

四、关键异常处理细节拆解

1. API 响应异常(401、429、500 等)

  • 401 Token 失效:直接终止当前请求,提示重新获取 Token(无需重试,重试也无效);

  • 429 频率超限:指数退避重试(10s、20s、30s),避免持续超限;

  • 500 服务器错误:最多重试 3 次,重试失败则放弃,记录异常便于后续排查;

  • 404 商品不存在:直接返回空数据,记录商品 ID 为无效 ID,避免重复请求。

2. JSON 格式异常(非标准 JSON)

  • 用 safe_json_parse 函数捕获 JSONDecodeError,记录响应片段(不记录完整响应,避免日志过大);

  • 解析失败直接返回 None,触发上层函数的重试或降级处理。

3. 字段缺失 / 类型不匹配

  • 用 get_json_field 函数逐层提取字段,中间层级缺失返回默认值;

  • 支持类型校验(如 currentPrice 必须是数字),类型不匹配则返回默认值,记录警告日志;

  • 单条规格 / 商品解析失败时,用 try-except 捕获异常,跳过该条,不中断整体流程。

4. 数组字段异常(如variants.variant不是数组)

  • 提取数组字段时,默认值设为空列表 [],避免循环时报错;

  • 遍历数组时,先判断元素类型(如是否为字典),不符合则跳过并记录日志。

五、进阶优化建议

  1. Token 自动刷新:在 safe_request 中捕获 401 错误后,自动调用 get_access_token 重新获取 Token,无需手动重启程序;

  2. 无效商品 ID 缓存:将 404 的商品 ID 存入缓存(如本地文件、Redis),后续批量采集时跳过,减少无效请求;

  3. 动态字段适配:通过日志监控新增字段(如沃尔玛 API 更新后新增的discount字段),定期更新解析逻辑;

  4. 告警机制:关键异常(如 Token 失效、批量采集失败率超过 50%)可添加邮件 / 短信告警,及时通知开发者处理。

总结

沃尔玛商品 API JSON 返回的异常处理,核心是「分层捕获、优雅降级、日志留痕、重试有度」:
  • 底层:请求层处理网络、状态码异常,确保请求不中断;

  • 中层:JSON 解析层处理格式异常,确保解析不崩溃;

  • 上层:字段提取层处理缺失 / 类型异常,确保单条失败不影响整体;

  • 最终:数据存储层处理空数据,确保存储不报错。

该方案可覆盖 99% 以上的实际异常场景,确保采集流程稳定可靠,同时通过日志便于问题追溯,适合生产环境长期使用。


群贤毕至

访客