淘宝商品评论接口技术实现:从评论获取到情感分析全流程方案

电子说

1.4w人已加入

描述

 商品评论接口是电商数据分析的重要入口,通过评论数据可以挖掘用户需求、分析产品优缺点、监控舆情走向。本文将详细讲解淘宝商品评论接口的技术实现,重点解决评论分页机制、反爬策略应对、数据解析与情感分析等核心问题,提供一套合规、高效的技术方案,同时严格遵守平台规则与数据采集规范。


一、评论接口基础原理与合规要点

淘宝商品评论数据存储在商品详情页的评论模块,通过动态加载方式呈现。实现评论接口需理解其基本原理并遵守以下合规要点:

   数据范围:仅采集商品公开评论(不包含追评、问答等非评论内容)
   请求频率:单商品评论请求间隔不低于 10 秒,单 IP 日请求不超过 1000 次
   使用规范:数据仅用于个人学习、市场调研,不得用于商业竞争或恶意分析
   反爬尊重:不使用破解、绕过等方式获取数据,模拟正常用户浏览行为

评论接口的核心技术流程如下:

plaintext

商品ID解析 → 评论参数构造 → 分页请求发送 → 评论数据提取 → 数据清洗与分析

接口接口

点击获取key和secret


二、核心技术实现:从评论获取到数据解析

1. 商品 ID 解析工具

获取商品评论的前提是解析出正确的商品 ID(item_id),可从商品详情页 URL 或页面内容中提取:

python

运行

   import re
   import requests
   from lxml import etree
    
   class ProductIdParser:
       """商品ID解析器,从URL或页面中提取item_id"""
       
       def __init__(self):
           self.headers = {
               "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36",
               "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
               "Referer": "https://www.taobao.com/"
           }
       
       def parse_from_url(self, product_url):
           """从商品URL中提取item_id"""
           # 匹配常见的商品URL格式
           patterns = [
               r"item.taobao.com/item.htm?.*id=(d+)",
               r"detail.tmall.com/item.htm?.*id=(d+)",
               r"id=(d+)"  # 通用匹配
           ]
           
           for pattern in patterns:
               match = re.search(pattern, product_url)
               if match:
                   return match.group(1)
           return None
       
       def parse_from_page(self, product_url):
           """从商品页面内容中提取item_id(URL解析失败时使用)"""
           try:
               response = requests.get(
                   product_url,
                   headers=self.headers,
                   timeout=10,
                   allow_redirects=True
               )
               response.encoding = "utf-8"
               
               # 从页面HTML中提取item_id
               tree = etree.HTML(response.text)
               # 尝试从meta标签提取
               meta_content = tree.xpath('//meta[@name="mobile-agent"]/@content')
               if meta_content:
                   match = re.search(r"item_id=(d+)", meta_content[0])
                   if match:
                       return match.group(1)
               
               # 尝试从脚本标签提取
               script_content = tree.xpath('//script[contains(text(), "itemId")]/text()')
               for script in script_content:
                   match = re.search(r"itemIds*=s*'?(d+)'?", script)
                   if match:
                       return match.group(1)
               
               return None
           except Exception as e:
               print(f"从页面提取item_id失败: {str(e)}")
               return None
       
       def get_product_id(self, product_url):
           """获取商品ID,先尝试从URL提取,失败则从页面提取"""
           product_id = self.parse_from_url(product_url)
           if product_id:
               return product_id
           return self.parse_from_page(product_url)

2. 评论参数构造器

淘宝评论请求需要特定的参数组合,包括商品 ID、页码、排序方式等,其中部分参数需要动态生成:

python

运行

   import time
   import hashlib
   import random
    
   class CommentParamsGenerator:
       """评论请求参数生成器"""
       
       def __init__(self):
           self.app_key = "12574478"  # 模拟应用标识
           self.sort_types = {
               "default": 0,    # 默认排序
               "latest": 1,     # 最新评论
               "good": 2,       # 好评
               "poor": 3        # 差评
           }
       
       def generate_params(self, item_id, page=1, sort="default", page_size=20):
           """
           生成评论请求参数
           
           :param item_id: 商品ID
           :param page: 页码
           :param sort: 排序方式
           :param page_size: 每页评论数
           :return: 评论请求参数字典
           """
           # 基础参数
           params = {
               "itemId": item_id,
               "pageNum": page,
               "pageSize": page_size,
               "sortType": self.sort_types.get(sort, 0),
               "auctionNumId": item_id,
               "userType": 0,
               "platform": "h5",
               "needFold": 0,
               "callback": f"jsonp_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
           }
           
           # 生成时间戳和签名
           t = str(int(time.time() * 1000))
           params["t"] = t
           params["sign"] = self._generate_sign(params)
           
           return params
       
       def _generate_sign(self, params):
           """生成签名,模拟平台参数验证机制"""
           # 按参数名排序并拼接
           sorted_params = sorted(params.items(), key=lambda x: x[0])
           sign_str = "&".join([f"{k}={v}" for k, v in sorted_params if k != "sign"])
           # 加入固定密钥(仅作示例)
           sign_str += "&secret=taobao_comment_demo_key"
           # 计算MD5签名
           return hashlib.md5(sign_str.encode()).hexdigest().upper()

3. 评论请求发送器

处理评论分页请求,包含反爬机制应对策略:

python

运行

   import time
   import random
   import requests
   from fake_useragent import UserAgent
    
   class CommentRequester:
       """评论请求发送器,负责发送请求并处理反爬"""
       
       def __init__(self, proxy_pool=None):
           self.comment_api = "https://h5api.m.taobao.com/h5/mtop.taobao.review.list.get/1.0/"
           self.proxy_pool = proxy_pool or []
           self.ua = UserAgent()
           self.session = requests.Session()
           self.last_request_time = 0
           self.min_interval = 10  # 评论请求最小间隔(秒)
           
       def _get_headers(self):
           """生成随机请求头"""
           return {
               "User-Agent": self.ua.random,
               "Accept": "*/*",
               "Accept-Language": "zh-CN,zh;q=0.9",
               "Referer": "https://detail.tmall.com/",
               "Origin": "https://detail.tmall.com",
               "X-Requested-With": "XMLHttpRequest",
               "Connection": "keep-alive"
           }
       
       def _get_proxy(self):
           """从代理池获取随机代理"""
           if not self.proxy_pool:
               return None
           return random.choice(self.proxy_pool)
       
       def _check_interval(self):
           """确保请求间隔,避免触发反爬"""
           current_time = time.time()
           elapsed = current_time - self.last_request_time
           if elapsed < self.min_interval:
               sleep_time = self.min_interval - elapsed + random.uniform(0, 2)
               print(f"请求间隔不足,休眠 {sleep_time:.1f} 秒")
               time.sleep(sleep_time)
           self.last_request_time = time.time()
       
       def fetch_comments(self, params):
           """
           发送评论请求
           
           :param params: 评论请求参数
           :return: 响应内容或None
           """
           self._check_interval()
           
           headers = self._get_headers()
           proxy = self._get_proxy()
           proxies = {"http": proxy, "https": proxy} if proxy else None
           
           try:
               response = self.session.get(
                   self.comment_api,
                   params=params,
                   headers=headers,
                   proxies=proxies,
                   timeout=15
               )
               
               # 检查响应状态
               if response.status_code != 200:
                   print(f"评论请求失败,状态码: {response.status_code}")
                   return None
               
               # 检查是否被反爬拦截
               if self._is_blocked(response.text):
                   print("评论请求被拦截,可能需要验证")
                   # 移除可能失效的代理
                   if proxy and proxy in self.proxy_pool:
                       self.proxy_pool.remove(proxy)
                   return None
                   
               return response.text
               
           except Exception as e:
               print(f"评论请求异常: {str(e)}")
               return None
       
       def _is_blocked(self, response_text):
           """判断是否被反爬机制拦截"""
           block_keywords = [
               "请输入验证码",
               "访问过于频繁",
               "系统繁忙",
               "验证"
           ]
           for keyword in block_keywords:
               if keyword in response_text:
                   return True
           return False

4. 评论数据解析器

解析评论响应内容,提取结构化的评论数据:

python

运行

   import re
   import json
   from datetime import datetime
    
   class CommentParser:
       """评论数据解析器,提取结构化评论信息"""
       
       def __init__(self):
           # 处理JSONP格式的正则
           self.jsonp_pattern = re.compile(r'jsonp_d+_d+((.*?))')
           # 表情符号清理正则
           self.emoji_pattern = re.compile(r'[U00010000-U0010ffff]', flags=re.UNICODE)
       
       def parse_jsonp(self, jsonp_text):
           """解析JSONP格式响应为JSON"""
           match = self.jsonp_pattern.search(jsonp_text)
           if not match:
               return None
           try:
               return json.loads(match.group(1))
           except json.JSONDecodeError:
               print("JSON解析失败")
               return None
       
       def clean_comment_text(self, text):
           """清理评论文本,去除多余符号和表情"""
           if not text:
               return ""
           # 去除HTML标签
           text = re.sub(r'<[^>]+>', '', text)
           # 去除表情符号
           text = self.emoji_pattern.sub('', text)
           # 去除多余空格和换行
           text = re.sub(r's+', ' ', text).strip()
           return text
       
       def parse_comment_item(self, comment_item):
           """解析单个评论项"""
           try:
               # 解析评论时间
               comment_time = comment_item.get("commentTime", "")
               if comment_time:
                   try:
                       comment_time = datetime.strptime(comment_time, "%Y-%m-%d %H:%M:%S")
                   except ValueError:
                       comment_time = None
               
               # 提取商品属性(如颜色、尺寸等)
               auction_params = comment_item.get("auctionParam", "")
               product_attrs = {}
               if auction_params:
                   for param in auction_params.split(";"):
                       if ":" in param:
                           key, value = param.split(":", 1)
                           product_attrs[key.strip()] = value.strip()
               
               return {
                   "comment_id": comment_item.get("id", ""),
                   "user_nick": comment_item.get("userNick", ""),
                   "user_level": comment_item.get("userVipLevel", 0),
                   "comment_text": self.clean_comment_text(comment_item.get("content", "")),
                   "comment_time": comment_time,
                   "star_rating": comment_item.get("star", 0),  # 星级评分
                   "product_attrs": product_attrs,  # 购买的商品属性
                   "praise_count": comment_item.get("useful", 0),  # 有用数
                   "reply_count": comment_item.get("replyCount", 0),  # 回复数
                   "has_image": len(comment_item.get("images", [])) > 0  # 是否有图
               }
           except Exception as e:
               print(f"解析单个评论失败: {str(e)}")
               return None
       
       def parse_comments(self, jsonp_text):
           """
           解析评论列表
           
           :param jsonp_text: JSONP格式的评论响应
           :return: 包含评论列表和分页信息的字典
           """
           json_data = self.parse_jsonp(jsonp_text)
           if not json_data or json_data.get("ret", [""])[0] != "SUCCESS::调用成功":
               return None
           
           result = {
               "total_comments": 0,
               "total_pages": 0,
               "current_page": 0,
               "comments": []
           }
           
           data = json_data.get("data", {})
           comments = data.get("comments", [])
           
           # 提取分页信息
           result["total_comments"] = data.get("total", 0)
           result["current_page"] = data.get("pageNum", 1)
           page_size = data.get("pageSize", 20)
           result["total_pages"] = (result["total_comments"] + page_size - 1) // page_size
           
           # 解析评论列表
           for item in comments:
               comment = self.parse_comment_item(item)
               if comment:
                   result["comments"].append(comment)
           
           return result

5. 评论情感分析工具

对评论内容进行情感倾向分析,判断好评、中评、差评:

python

运行

   import jieba
   import jieba.analyse
   from snownlp import SnowNLP
    
   class CommentSentimentAnalyzer:
       """评论情感分析工具"""
       
       def __init__(self):
           # 加载情感分析所需的词典
           jieba.initialize()
       
       def get_sentiment_score(self, comment_text):
           """获取评论情感得分(0-1,越高越正面)"""
           if not comment_text:
               return 0.5  # 无内容默认中性
           try:
               return SnowNLP(comment_text).sentiments
           except:
               return 0.5
       
       def analyze_sentiment(self, comment_text):
           """分析评论情感倾向"""
           score = self.get_sentiment_score(comment_text)
           if score >= 0.7:
               return "positive", score
           elif score <= 0.3:
               return "negative", score
           else:
               return "neutral", score
       
       def extract_keywords(self, comment_text, top_k=5):
           """提取评论关键词"""
           if not comment_text:
               return []
           try:
               return jieba.analyse.extract_tags(comment_text, topK=top_k)
           except:
               return []
       
       def process_comments(self, comments):
           """批量处理评论,添加情感分析结果"""
           processed = []
           for comment in comments:
               sentiment, score = self.analyze_sentiment(comment["comment_text"])
               keywords = self.extract_keywords(comment["comment_text"])
               
               processed_comment = comment.copy()
               processed_comment["sentiment"] = sentiment
               processed_comment["sentiment_score"] = round(score, 4)
               processed_comment["keywords"] = keywords
               
               processed.append(processed_comment)
           return processed

三、完整评论采集服务封装

将上述组件整合为完整的评论采集服务:

python

运行

   class TaobaoCommentService:
       """淘宝商品评论采集服务"""
       
       def __init__(self, proxy_pool=None):
           self.product_id_parser = ProductIdParser()
           self.params_generator = CommentParamsGenerator()
           self.requester = CommentRequester(proxy_pool=proxy_pool)
           self.parser = CommentParser()
           self.sentiment_analyzer = CommentSentimentAnalyzer()
       
       def get_comments(self, product_url, max_pages=5, sort="default", analyze_sentiment=True):
           """
           采集商品评论
           
           :param product_url: 商品详情页URL
           :param max_pages: 最大采集页数
           :param sort: 评论排序方式
           :param analyze_sentiment: 是否进行情感分析
           :return: 包含评论数据的字典
           """
           # 1. 获取商品ID
           print("正在解析商品ID...")
           item_id = self.product_id_parser.get_product_id(product_url)
           if not item_id:
               print("无法获取商品ID,采集失败")
               return None
           print(f"获取商品ID成功: {item_id}")
           
           all_comments = []
           current_page = 1
           total_pages = 1
           
           # 2. 分页采集评论
           while current_page <= max_pages and current_page <= total_pages:
               print(f"正在采集第 {current_page}/{max_pages} 页评论...")
               
               # 生成请求参数
               params = self.params_generator.generate_params(
                   item_id=item_id,
                   page=current_page,
                   sort=sort
               )
               
               # 发送请求
               response_text = self.requester.fetch_comments(params)
               if not response_text:
                   print(f"第 {current_page} 页评论获取失败,跳过该页")
                   current_page += 1
                   continue
               
               # 解析评论
               result = self.parser.parse_comments(response_text)
               if not result:
                   print(f"第 {current_page} 页评论解析失败,跳过该页")
                   current_page += 1
                   continue
               
               # 更新总页数
               total_pages = result["total_pages"]
               # 添加到结果列表
               all_comments.extend(result["comments"])
               
               print(f"第 {current_page} 页解析完成,获取 {len(result['comments'])} 条评论")
               
               # 检查是否已采集所有评论
               if len(all_comments) >= result["total_comments"]:
                   print("已获取全部评论,停止采集")
                   break
               
               current_page += 1
           
           # 3. 情感分析
           if analyze_sentiment and all_comments:
               print("正在进行评论情感分析...")
               all_comments = self.sentiment_analyzer.process_comments(all_comments)
           
           # 4. 返回结果
           return {
               "item_id": item_id,
               "product_url": product_url,
               "total_collected": len(all_comments),
               "total_available": result["total_comments"] if result else 0,
               "pages_collected": current_page - 1,
               "comments": all_comments
           }

四、使用示例与数据存储
1. 基本使用示例

python

运行

   def main():
       # 代理池(实际使用中替换为有效代理)
       proxy_pool = [
           # "http://123.123.123.123:8080",
           # "http://111.111.111.111:8888"
       ]
       
       # 初始化评论采集服务
       comment_service = TaobaoCommentService(proxy_pool=proxy_pool)
       
       # 商品详情页URL
       product_url = "https://item.taobao.com/item.htm?id=1234567890"  # 替换为实际商品URL
       
       # 采集评论(最多3页,按最新排序,进行情感分析)
       result = comment_service.get_comments(
           product_url=product_url,
           max_pages=3,
           sort="latest",
           analyze_sentiment=True
       )
       
       # 处理采集结果
       if result:
           print(f"n采集完成!共获取 {result['total_collected']} 条评论")
           
           # 打印部分结果
           if result["comments"]:
               print("n前3条评论摘要:")
               for i, comment in enumerate(result["comments"][:3], 1):
                   print(f"{i}. {comment['comment_text'][:50]}...")
                   print(f"   情感倾向:{comment['sentiment']}(得分:{comment['sentiment_score']})")
                   print(f"   关键词:{','.join(comment['keywords'])}")
                   print(f"   发布时间:{comment['comment_time']}n")
       else:
           print("评论采集失败")
    
   if __name__ == "__main__":
       main()

2. 评论数据存储工具

将采集的评论数据存储为 JSON 和 CSV 格式:

python

运行

   import json
   import csv
   from pathlib import Path
   from datetime import datetime
    
   class CommentStorage:
       """评论数据存储工具"""
       
       def __init__(self, storage_dir="./taobao_comments"):
           self.storage_dir = Path(storage_dir)
           self.storage_dir.mkdir(exist_ok=True, parents=True)
       
       def save_to_json(self, comment_data):
           """保存评论数据到JSON文件"""
           item_id = comment_data["item_id"]
           timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
           filename = f"comments_{item_id}_{timestamp}.json"
           file_path = self.storage_dir / filename
           
           with open(file_path, "w", encoding="utf-8") as f:
               json.dump(comment_data, f, ensure_ascii=False, indent=2, default=str)
           
           print(f"评论数据已保存至JSON文件:{file_path}")
           return file_path
       
       def save_to_csv(self, comment_data):
           """保存评论数据到CSV文件"""
           if not comment_data["comments"]:
               print("没有评论数据可保存到CSV")
               return None
               
           item_id = comment_data["item_id"]
           timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
           filename = f"comments_{item_id}_{timestamp}.csv"
           file_path = self.storage_dir / filename
           
           # 评论字段
           fields = [
               "comment_id", "user_nick", "user_level", "comment_text",
               "comment_time", "star_rating", "praise_count", "reply_count",
               "has_image", "sentiment", "sentiment_score", "keywords"
           ]
           
           with open(file_path, "w", encoding="utf-8-sig", newline="") as f:
               writer = csv.DictWriter(f, fieldnames=fields)
               writer.writeheader()
               
               for comment in comment_data["comments"]:
                   # 处理嵌套字段
                   row = {k: comment.get(k, "") for k in fields}
                   # 关键词列表转为字符串
                   if "keywords" in row and isinstance(row["keywords"], list):
                       row["keywords"] = ",".join(row["keywords"])
                   # 时间转为字符串
                   if isinstance(row["comment_time"], datetime):
                       row["comment_time"] = row["comment_time"].strftime("%Y-%m-%d %H:%M:%S")
                   
                   writer.writerow(row)
           
           print(f"评论数据已保存至CSV文件:{file_path}")
           return file_path

五、进阶优化与合规提示

1. 系统优化策略

   评论缓存机制:对已采集的商品评论进行缓存,避免重复请求

   python

运行

   def get_cached_comments(self, item_id, max_age=3600):
       """从缓存获取评论(实际实现需结合Redis或文件缓存)"""
       # 缓存逻辑实现...
       return None

   分布式采集:大规模采集时采用分布式架构,分散请求压力

   动态调整策略:根据反爬强度动态调整请求间隔和代理使用频率

2. 合规与风险提示

   商业应用前必须获得平台授权,遵守《电子商务法》相关规定
   评论数据不得用于生成与原平台竞争的产品或服务
   避免采集包含用户隐私的评论内容(如手机号、地址等)
   当平台明确限制评论采集时,应立即停止相关操作
   采集行为不得对平台正常运营造成影响,尊重 robots 协议限制

通过本文提供的技术方案,可构建一套功能完善的淘宝商品评论接口系统。该方案注重合规性和可扩展性,能够有效应对电商平台的反爬机制,为商品分析、用户反馈挖掘等场景提供数据支持。在实际应用中,需根据平台规则动态调整策略,确保系统的稳定性和合法性。

​审核编辑 黄宇

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分