沃尔玛商品数据 API JSON 返回异常处理:Python 完整方案
一、核心异常类型与影响
| 异常场景 | 表现形式(JSON / 响应特征) | 影响范围 |
|---|---|---|
| API 业务异常 | 响应 JSON 含code≠200(如 401、429、404),message提示错误 | 整次请求失败(无有效数据) |
| JSON 格式非法 | 响应非标准 JSON(如 HTML 错误页、网络中断导致 JSON 不完整) | 无法解析,数据完全不可用 |
| 核心字段缺失 | JSON 结构完整,但data.itemId、variants等关键字段为null | 部分数据无效(如无规格、无库存) |
| 字段类型不匹配 | 预期数字字段(如currentPrice)返回字符串(如 "缺货") | 解析报错,中断循环 |
| 数组字段异常 | comments/variant字段应为数组,实际为字典 /null | 批量解析时中断后续数据提取 |
二、异常处理核心原则
分层捕获:API 请求层、JSON 解析层、字段提取层分开捕获异常,不混用;
优雅降级:整体请求失败返回空数据,单条商品 / 规格解析失败跳过,不中断批量采集;
日志留痕:记录异常详情(商品 ID、错误栈、原始响应片段),便于排查;
合规兜底:异常时不泄露敏感信息(如
Client Secret、完整 JSON 响应),不强制使用无效数据。
三、完整实现方案(基于 API 系列框架)
1. 基础依赖与配置(新增异常处理相关工具)
import requestsimport timeimport jsonfrom typing import List, Dict, Optional, Anyimport logging# -------------------------- 日志配置(记录异常详情)--------------------------logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("walmart_api_exception.log"), logging.StreamHandler()])logger = logging.getLogger(__name__)# -------------------------- 基础配置(同之前,替换为你的合法信息)--------------------------CLIENT_ID = "你的 Client ID"CLIENT_SECRET = "你的 Client Secret"TOKEN_URL = "https://marketplace.walmartapis.com/v3/token"BASE_URL = "https://marketplace.walmartapis.com"TIMEOUT = 15RATE_LIMIT_DELAY = 0.2RETRY_TIMES = 3 # 异常重试次数2. 通用异常处理工具函数(核心新增)
def safe_json_parse(response_text: str) -> Optional[Dict]:
"""
安全解析JSON字符串(处理JSON格式非法异常)
:param response_text: API响应原始文本
:return: 解析后的字典,失败返回None
"""
try:
return json.loads(response_text)
except json.JSONDecodeError as e:
# 记录异常响应片段(前200字符,避免日志过长)
error_response = response_text[:200] + "..." if len(response_text) > 200 else response_text
logger.error(f"JSON格式解析失败,原始响应片段:{error_response},错误:{str(e)}")
return Nonedef get_json_field(json_data: Dict, field_path: str, default: Any = None, expected_type: type = None) -> Any:
"""
安全提取JSON字段(处理字段缺失、类型不匹配)
:param json_data: 解析后的JSON字典
:param field_path: 字段路径(如 "data.variants.variant")
:param default: 字段缺失时的默认值
:param expected_type: 预期字段类型(如 int、list,不指定则不校验)
:return: 提取后的字段值
"""
keys = field_path.split(".")
value = json_data for key in keys:
# 逐层提取,中间层级为None则返回默认值
if isinstance(value, dict):
value = value.get(key)
elif isinstance(value, list) and value:
# 数组字段默认取第一个元素(可根据需求修改)
value = value[0].get(key) if isinstance(value[0], dict) else None
else:
value = None
break
# 字段缺失,返回默认值
if value is None:
logger.debug(f"字段 {field_path} 缺失,返回默认值:{default}")
return default # 类型校验(不匹配则返回默认值)
if expected_type and not isinstance(value, expected_type):
logger.warning(f"字段 {field_path} 类型不匹配(预期:{expected_type.__name__},实际:{type(value).__name__}),返回默认值:{default}")
return default return value3. 强化版通用请求函数(处理 API 响应异常)
def get_access_token() -> Optional[str]:
"""获取Token(新增重试+异常详细日志)"""
for retry in range(RETRY_TIMES):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET }
try:
response = requests.post(TOKEN_URL, headers=headers, data=data, timeout=TIMEOUT)
response.raise_for_status()
token_data = safe_json_parse(response.text) # 用安全解析函数
if not token_data:
raise ValueError("Token响应JSON解析失败")
access_token = token_data.get("access_token")
if access_token:
logger.info("Token获取成功,有效期1小时")
return access_token else:
raise ValueError("Token响应中无access_token字段")
except Exception as e:
logger.error(f"Token获取失败(重试{retry+1}/{RETRY_TIMES}):{str(e)}")
if retry < RETRY_TIMES - 1:
time.sleep(2 ** retry) # 指数退避重试(1s、2s、4s)
return Nonedef create_headers(access_token: str) -> Dict[str, str]:
"""创建请求头(不变)"""
return {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}def safe_request(url: str, headers: Dict, method: str = "GET", json_data: Optional[Dict] = None) -> Optional[Dict]:
"""
安全请求函数(强化异常处理:重试、JSON解析容错、状态码细分)
"""
for retry in range(RETRY_TIMES):
time.sleep(RATE_LIMIT_DELAY) # 频率控制
try:
# 发送请求
if method.upper() == "GET":
response = requests.get(url, headers=headers, timeout=TIMEOUT)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, json=json_data, timeout=TIMEOUT)
else:
logger.error(f"不支持的请求方式:{method}")
return None
# 记录响应状态和原始文本片段
logger.debug(f"请求URL:{url},状态码:{response.status_code}")
response_text = response.text[:500] + "..." if len(response.text) > 500 else response.text
logger.debug(f"响应片段:{response_text}")
# 处理状态码
if response.status_code == 200:
# 安全解析JSON
json_result = safe_json_parse(response.text)
if json_result:
# 校验业务状态码(部分API HTTP 200但code≠200)
business_code = get_json_field(json_result, "code", expected_type=int)
if business_code == 200:
return json_result else:
business_msg = get_json_field(json_result, "message", "未知业务错误")
logger.error(f"API业务失败,错误码:{business_code},信息:{business_msg}")
return None
else:
logger.error("JSON解析失败,放弃重试")
return None
elif response.status_code == 401:
logger.error("Token过期或无效,终止请求(需重新获取Token)")
return None # Token失效无需重试
elif response.status_code == 404:
logger.error(f"资源不存在(404):{url}")
return None # 商品不存在无需重试
elif response.status_code == 429:
delay = 10 * (retry + 1)
logger.warning(f"调用频率超限(429),延迟{delay}秒重试({retry+1}/{RETRY_TIMES})")
time.sleep(delay)
continue # 继续重试
elif response.status_code >= 500:
# 服务器错误,重试
logger.error(f"服务器错误({response.status_code}),重试{retry+1}/{RETRY_TIMES}")
if retry < RETRY_TIMES - 1:
time.sleep(2 ** retry)
continue
else:
logger.error(f"请求失败({response.status_code}),响应:{response_text}")
return None
except requests.exceptions.Timeout:
logger.error(f"请求超时({retry+1}/{RETRY_TIMES}):{url}")
if retry < RETRY_TIMES - 1:
time.sleep(2 ** retry)
continue
except requests.exceptions.ConnectionError:
logger.error(f"连接失败({retry+1}/{RETRY_TIMES}):{url}")
if retry < RETRY_TIMES - 1:
time.sleep(2 ** retry)
continue
except Exception as e:
logger.error(f"请求未知异常({retry+1}/{RETRY_TIMES}):{str(e)}")
if retry < RETRY_TIMES - 1:
time.sleep(2 ** retry)
continue
# 重试次数耗尽仍失败
logger.error(f"请求重试{RETRY_TIMES}次后失败:{url}")
return None# -------------------------- 初始化Token和请求头(新增异常终止)--------------------------access_token = get_access_token()if not access_token:
logger.critical("Token获取失败,终止程序")
raise SystemExit(1) # 终止程序headers = create_headers(access_token)4. 各 API 场景异常处理强化(以单商品、批量为例)
场景 1:单商品详情采集(强化字段提取容错)
def fetch_single_item(item_id: str) -> Optional[Dict]:
"""单商品采集(强化JSON字段容错、单规格解析失败跳过)"""
logger.info(f"开始采集商品:{item_id}")
url = f"{BASE_URL}/v3/items/{item_id}"
json_data = safe_request(url, headers)
if not json_data:
logger.error(f"商品 {item_id} 采集失败(无有效JSON响应)")
return None
# 用通用字段提取函数,避免字段缺失/类型错误
data = get_json_field(json_data, "data", {}, expected_type=dict)
parsed_item = {
"商品ID": get_json_field(data, "itemId", f"未知ID_{item_id}", expected_type=str),
"商品标题": get_json_field(data, "title", "无标题", expected_type=str),
"品牌": get_json_field(data, "brand", "未知品牌", expected_type=str),
"类目路径": get_json_field(data, "categoryPath", "未知类目", expected_type=str),
"平均评分": get_json_field(data, "ratings.averageRating", 0.0, expected_type=(int, float)),
"评论总数": get_json_field(data, "ratings.reviewCount", 0, expected_type=int),
"主图URL": "",
"规格列表": []
}
# 提取主图(容错数组异常)
images = get_json_field(data, "images", [], expected_type=list)
for img in images:
if isinstance(img, dict) and img.get("imageType") == "PRIMARY":
parsed_item["主图URL"] = img.get("imageUrl", "")
break
# 提取多规格(单规格解析失败跳过,不影响整体)
variants = get_json_field(data, "variants.variant", [], expected_type=list)
for idx, spec in enumerate(variants, 1):
try:
if not isinstance(spec, dict):
logger.warning(f"商品 {item_id} 第{idx}个规格不是字典类型,跳过")
continue
spec_id = get_json_field(spec, "variantId", f"规格_{idx}", expected_type=str)
attrs = get_json_field(spec, "attributes", {}, expected_type=dict)
price = get_json_field(spec, "price", {}, expected_type=dict)
inventory = get_json_field(spec, "inventory", {}, expected_type=dict)
parsed_spec = {
"规格ID": spec_id,
"颜色": get_json_field(attrs, "color", "未知颜色", expected_type=str),
"配置": get_json_field(attrs, "storage", "默认配置", expected_type=str),
"当前售价": f"{get_json_field(price, 'currentPrice', 0.0, expected_type=(int, float))} {get_json_field(price, 'currency', 'USD')}",
"原价": f"{get_json_field(price, 'originalPrice', get_json_field(price, 'currentPrice', 0.0), expected_type=(int, float))} {get_json_field(price, 'currency', 'USD')}",
"库存状态": get_json_field(inventory, "stockStatus", "UNKNOWN", expected_type=str),
"可用库存": get_json_field(inventory, "availableQuantity", 0, expected_type=int)
}
parsed_item["规格列表"].append(parsed_spec)
except Exception as e:
logger.error(f"商品 {item_id} 第{idx}个规格解析失败:{str(e)}", exc_info=True)
continue # 跳过该规格,继续解析下一个
logger.info(f"商品 {item_id} 采集完成,有效规格数:{len(parsed_item['规格列表'])}")
return parsed_item# 调用示例if __name__ == "__main__":
item_data = fetch_single_item("WM123456789")
if item_data:
print(f"商品ID:{item_data['商品ID']},标题:{item_data['商品标题']}")场景 2:批量商品采集(强化单商品失败跳过)
def fetch_batch_items(item_ids: List[str]) -> List[Dict]:
"""批量商品采集(单商品失败跳过,不中断批量流程)"""
logger.info(f"开始批量采集 {len(item_ids)} 个商品")
if len(item_ids) > 20:
logger.warning("批量采集最多支持20个商品,自动截取前20个")
item_ids = item_ids[:20]
url = f"{BASE_URL}/v3/items/batch"
json_body = {"itemIds": item_ids}
json_data = safe_request(url, headers, method="POST", json_data=json_body)
if not json_data:
logger.error("批量采集请求失败")
return []
items = get_json_field(json_data, "data.items", [], expected_type=list)
parsed_items = []
for item in items:
try:
if not isinstance(item, dict):
logger.warning("批量返回中存在非字典类型商品,跳过")
continue
item_id = get_json_field(item, "itemId", "未知ID", expected_type=str)
parsed_item = {
"商品ID": item_id,
"商品标题": get_json_field(item, "title", "无标题", expected_type=str),
"品牌": get_json_field(item, "brand", "未知品牌", expected_type=str),
"当前售价": get_json_field(item, "price.currentPrice", 0.0, expected_type=(int, float)),
"库存状态": get_json_field(item, "inventory.stockStatus", "UNKNOWN", expected_type=str)
}
parsed_items.append(parsed_item)
except Exception as e:
logger.error(f"批量商品中单个商品解析失败:{str(e)}", exc_info=True)
continue # 跳过该商品,继续解析下一个
logger.info(f"批量采集完成,成功解析 {len(parsed_items)}/{len(item_ids)} 个商品")
return parsed_items# 调用示例if __name__ == "__main__":
batch_item_ids = ["WM123456789", "WM987654321", "WM_INVALID_ID"]
batch_data = fetch_batch_items(batch_item_ids)
for item in batch_data:
print(f"商品ID:{item['商品ID']},售价:{item['当前售价']} USD")5. 异常后的数据兜底处理
def save_to_csv(items: List[Dict], filename: str = "沃尔玛商品数据.csv"):
"""保存CSV(处理空数据、字段缺失)"""
if not items:
logger.warning("无有效数据可保存")
return
# 动态获取表头(避免部分商品字段缺失导致CSV列不完整)
headers = set()
for item in items:
headers.update(item.keys())
headers = list(headers)
try:
with open(filename, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, restval="") # restval设置缺失字段默认值
writer.writeheader()
writer.writerows(items)
logger.info(f"数据已保存到:{filename}")
except Exception as e:
logger.error(f"保存CSV失败:{str(e)}", exc_info=True)# 调用示例if __name__ == "__main__":
batch_data = fetch_batch_items(["WM123456789", "WM987654321"])
save_to_csv(batch_data)四、关键异常处理细节拆解
1. API 响应异常(401、429、500 等)
401 Token 失效:直接终止当前请求,提示重新获取 Token(无需重试,重试也无效);
429 频率超限:指数退避重试(10s、20s、30s),避免持续超限;
500 服务器错误:最多重试 3 次,重试失败则放弃,记录异常便于后续排查;
404 商品不存在:直接返回空数据,记录商品 ID 为无效 ID,避免重复请求。
2. JSON 格式异常(非标准 JSON)
用
safe_json_parse函数捕获JSONDecodeError,记录响应片段(不记录完整响应,避免日志过大);解析失败直接返回 None,触发上层函数的重试或降级处理。
3. 字段缺失 / 类型不匹配
用
get_json_field函数逐层提取字段,中间层级缺失返回默认值;支持类型校验(如
currentPrice必须是数字),类型不匹配则返回默认值,记录警告日志;单条规格 / 商品解析失败时,用
try-except捕获异常,跳过该条,不中断整体流程。
4. 数组字段异常(如variants.variant不是数组)
提取数组字段时,默认值设为空列表
[],避免循环时报错;遍历数组时,先判断元素类型(如是否为字典),不符合则跳过并记录日志。
五、进阶优化建议
Token 自动刷新:在
safe_request中捕获 401 错误后,自动调用get_access_token重新获取 Token,无需手动重启程序;无效商品 ID 缓存:将 404 的商品 ID 存入缓存(如本地文件、Redis),后续批量采集时跳过,减少无效请求;
动态字段适配:通过日志监控新增字段(如沃尔玛 API 更新后新增的
discount字段),定期更新解析逻辑;告警机制:关键异常(如 Token 失效、批量采集失败率超过 50%)可添加邮件 / 短信告警,及时通知开发者处理。
总结
底层:请求层处理网络、状态码异常,确保请求不中断;
中层:JSON 解析层处理格式异常,确保解析不崩溃;
上层:字段提取层处理缺失 / 类型异常,确保单条失败不影响整体;
最终:数据存储层处理空数据,确保存储不报错。