批量爬取系统原理与流程说明
📖 相关文档: 关于单个文件下载的详细实现原理,请参考 JS逆向与haowallpaper网站爬虫攻略
📋 目录
🎯 系统概述
批量爬取系统用于从 haowallpaper.com 网站批量下载壁纸图片和视频。系统采用 Selenium 进行网页自动化操作,结合 requests 进行文件下载,实现了从页面浏览到文件下载的完整自动化流程。
主要功能
✅ 自动分页爬取多个页面
✅ 智能提取页面中的图片/视频链接
✅ 自动触发悬浮事件获取隐藏链接
✅ 绕过网站的反爬虫验证机制
✅ 自动下载并保存文件
✅ 支持断点续传和错误重试
🔧 核心原理
1. 网页自动化(Selenium)
使用 Selenium WebDriver 模拟真实浏览器行为:
为什么需要 Selenium?
目标网站使用Nuxt的服务端渲染技术动态加载内容
图片链接隐藏在悬浮事件触发的元素中
需要执行 JavaScript 和 DOM 操作
工作方式:
启动真实的 Chrome 浏览器实例
执行页面加载、滚动、悬浮等操作
等待动态内容加载完成
提取页面中的链接信息
2. 反爬虫绕过机制
网站使用了 ALTCHA 验证系统,需要以下步骤:
获取挑战数据:从服务器获取加密挑战
生成 Payload:通过暴力破解生成验证 payload
验证 Payload:提交 payload 获取验证令牌
获取文件 URL:使用令牌获取实际文件下载链接
💡 详细说明: 关于 ALTCHA 验证系统的详细原理和实现,请参考 JS逆向与haowallpaper网站爬虫攻略
3. 延迟控制机制
为避免触发服务器限流,实现了多层延迟:
页面级延迟:每页之间等待 3 秒
下载级延迟:每次下载之间等待 2 秒
重试延迟:失败重试时递增延迟(2 秒 → 4 秒 → 6 秒)
📊 详细流程
整体流程图
graph TD
A[开始] --> B[初始化 Selenium WebDriver]
B --> C{是否指定页码范围?}
C -->|是| D[循环处理每页]
C -->|否| E[处理单页]
D --> F[访问页面 URL]
E --> F
F --> G[等待页面加载]
G --> H[提取所有 Card 元素]
H --> I[遍历每个 Card]
I --> J[滚动到元素可见]
J --> K[触发悬浮事件]
K --> L[提取链接 href]
L --> M[提取文件 ID]
M --> N[调用下载函数]
N --> O[获取挑战数据]
O --> P[生成 Payload]
P --> Q[验证 Payload]
Q --> R[获取完整文件 URL]
R --> S[下载文件]
S --> T{还有 Card?}
T -->|是| I
T -->|否| U{还有页面?}
U -->|是| D
U -->|否| V[汇总结果]
V --> W[结束]
style A fill:#90EE90
style W fill:#FFB6C1
style N fill:#87CEEB
style S fill:#FFD700
单页处理流程图
sequenceDiagram
participant Main as 主程序
participant Driver as Selenium WebDriver
participant Page as 目标网页
Main->>Driver: 初始化 Chrome WebDriver
Main->>Driver: 访问页面 URL
Driver->>Page: GET /homeView?page=N
Page-->>Driver: 返回 HTML 内容
Main->>Driver: 等待 .homeContainer .card 元素
Page-->>Driver: 元素加载完成
Main->>Driver: 查找所有 Card 元素
Driver-->>Main: 返回 Card 列表
loop 遍历每个 Card
Main->>Driver: 滚动到 Card 可见
Main->>Driver: 触发悬浮事件
Driver->>Page: 触发 hover 事件
Page-->>Driver: 显示隐藏链接
Main->>Driver: 提取 .card-content.hert-col a
Driver-->>Main: 返回 href 链接
Main->>Main: 从 URL 提取文件 ID
Main->>Main: 执行下载流程
Main->>Main: 1. 获取挑战数据
Main->>Main: 2. 生成 Payload
Main->>Main: 3. 验证 Payload
Main->>Main: 4. 获取文件 URL
Main->>Main: 5. 下载文件
Main->>Main: 等待 2 秒(延迟控制)
end
Main->>Driver: 关闭浏览器
文件下载流程图
flowchart TD
A[开始下载] --> B[获取挑战数据]
B --> C{挑战数据有效?}
C -->|否| D[重试 最多3次]
D --> B
C -->|是| E[生成 Payload]
E --> F[Base64 编码 Payload]
F --> G[验证 Payload]
G --> H{验证成功?}
H -->|否| I[抛出异常]
H -->|是| J[获取文件完整 URL]
J --> K{URL 有效?}
K -->|否| I
K -->|是| L[下载文件]
L --> M{下载成功?}
M -->|否| I
M -->|是| N[保存到本地]
N --> O[返回保存路径]
style A fill:#90EE90
style O fill:#FFB6C1
style I fill:#FF6B6B
style D fill:#FFD700
🏗️ 技术架构
模块结构
batch_crawl.py (主模块)
├── extract_file_id_from_url() # URL 解析
├── get_page_content() # 单页爬取
└── crawl_pages() # 分页爬取
系统通过调用下载接口完成文件下载,下载流程包括:
获取挑战数据(带重试机制)
生成验证 Payload
验证 Payload 获取令牌
使用令牌获取文件完整 URL
下载文件并保存到本地
数据流
graph LR
A[页面 URL] --> B[Selenium 获取 HTML]
B --> C[提取 Card 元素]
C --> D[悬浮获取链接]
D --> E[提取文件 ID]
E --> F[挑战验证流程]
F --> G[获取文件 URL]
G --> H[下载文件]
H --> I[保存到本地]
style A fill:#E3F2FD
style I fill:#C8E6C9
⚠️ 注意事项
1. 请求频率控制
问题: 请求过快会导致服务器返回错误或限流
解决方案:
✅ 每次下载间隔 2 秒(
download_delay)✅ 每页之间间隔 3 秒(
page_delay)✅ 失败重试时递增延迟(2 秒 → 4 秒 → 6 秒)
建议:
大批量爬取时适当增加延迟时间
监控失败率,如果失败率高则增加延迟
2. 页面加载等待
问题: 动态内容加载需要时间,过早提取会失败
解决方案:
✅ 使用
WebDriverWait等待关键元素出现✅ 等待
.homeContainer .card元素加载✅ 额外等待 2 秒确保内容完全加载
建议:
网络较慢时可增加
wait_time参数使用
--no-headless模式观察页面加载情况
3. 元素定位
问题: 页面结构可能变化,导致元素定位失败
关键选择器:
.homeContainer- 主容器.card- 卡片元素.card-content.hert-col a- 隐藏的链接元素
建议:
定期检查页面结构是否变化
如果定位失败,检查选择器是否需要更新
4. 浏览器资源管理
问题: WebDriver 未正确关闭会导致资源泄漏
解决方案:
✅ 使用
try-finally确保浏览器关闭✅ 异常时也会执行清理操作
5. 文件去重
问题: 可能下载重复文件
当前实现:
使用 MD5 哈希值进行去重
文件名包含文件 ID,避免重复下载相同文件
建议:
可以添加数据库记录已下载的文件 ID
下载前检查文件是否已存在
6. 错误恢复
问题: 某个文件下载失败不应影响其他文件
解决方案:
✅ 每个文件下载使用独立的 try-except
✅ 失败后继续处理下一个文件
✅ 记录失败信息便于后续重试
🔄 错误处理
常见错误及处理
重试机制
graph TD
A[请求挑战接口] --> B{响应有效?}
B -->|是| C[返回挑战数据]
B -->|否| D{重试次数 < 3?}
D -->|是| E[等待递增延迟]
E --> A
D -->|否| F[抛出异常]
style C fill:#C8E6C9
style F fill:#FF6B6B
style E fill:#FFD700
📈 性能优化建议
1. 并发控制
当前实现: 串行下载(一个接一个)
优化方向:
可以考虑使用线程池并发下载
但需要注意服务器限流,控制并发数
2. 断点续传
当前实现: 每次从头开始
优化方向:
记录已下载的文件 ID
跳过已下载的文件
支持从指定页码继续
3. 资源复用
当前实现: 每页都重新初始化浏览器
优化方向:
复用同一个浏览器实例
减少初始化开销
🎓 使用示例
基本用法
from src.utils.haowallpaper_image_crawl.batch_crawl import crawl_pages
# 爬取第 1 页到第 10 页
result = crawl_pages(
max_page=10,
start_page=1,
headless=True,
download_delay=2.0,
page_delay=3.0,
)
print(f"成功下载: {result['total_download_count']} 个文件")
高级配置
# 自定义延迟和等待时间
result = crawl_pages(
max_page=100,
start_page=1,
headless=False, # 显示浏览器窗口(调试用)
wait_time=15, # 增加页面等待时间
download_delay=3.0, # 增加下载延迟
page_delay=5.0, # 增加页面延迟
driver_path="C:/path/to/chromedriver.exe" # 自定义驱动路径
)
📝 总结
批量爬取系统通过以下关键技术实现了自动化下载:
Selenium 自动化:模拟浏览器操作,处理动态内容
智能等待:确保页面和元素完全加载
延迟控制:避免触发服务器限流
错误重试:提高下载成功率
异常处理:确保单个失败不影响整体流程
通过合理的参数配置和错误处理,可以实现稳定的大批量爬取任务。
源码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量爬取 haowallpaper.com 网页内容
使用 Selenium 抓取指定页面的完整 HTML 内容。
Args:
page: 页码参数,默认为 1
headless: 是否使用无头模式,默认为 True
wait_time: 页面加载等待时间(秒),默认为 10
Returns:
html_content: 页面的 HTML 内容字符串
"""
import sys
import time
from pathlib import Path
from typing import Any, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root))
from src.utils.haowallpaper_image_crawl.single_image_crawler import crawl_single_file
BASE_URL = "https://haowallpaper.com/homeView"
# 默认 ChromeDriver 路径
DEFAULT_CHROMEDRIVER_DIR = Path(__file__).parent / "chromedriver-win64"
DEFAULT_CHROMEDRIVER_PATH = DEFAULT_CHROMEDRIVER_DIR / "chromedriver.exe"
def extract_file_id_from_url(url: str) -> Optional[str]:
"""
从 URL 中提取文件 ID(取最后一个 / 后面的字符串)
Args:
url: 如 "https://haowallpaper.com/homeViewLook/18040194890714496"
Returns:
文件 ID,如 "18040194890714496",如果提取失败返回 None
"""
if not url:
return None
# 取最后一个 / 后面的字符串
file_id = url.rstrip('/').split('/')[-1]
return file_id if file_id else None
def get_page_content(
page: int = 1,
headless: bool = True,
wait_time: int = 10,
driver_path: Optional[str] = None,
download_delay: float = 2.0,
) -> dict[str, Any]:
"""
使用 Selenium 抓取指定页面的完整 HTML 内容。
Args:
page: 页码参数
headless: 是否使用无头模式(不显示浏览器窗口)
wait_time: 页面加载等待时间(秒)
driver_path: ChromeDriver 路径(可选,默认使用项目目录下的 chromedriver.exe)
download_delay: 每次下载之间的延迟(秒),默认 2.0 秒
Returns:
html_content: 页面的完整 HTML 内容
Raises:
WebDriverException: 当浏览器驱动初始化失败时
TimeoutException: 当页面加载超时时
"""
# 如果没有指定 driver_path,使用默认路径
if driver_path is None:
if DEFAULT_CHROMEDRIVER_PATH.exists():
driver_path = str(DEFAULT_CHROMEDRIVER_PATH)
print(f"使用默认 ChromeDriver: {driver_path}")
else:
print(f"⚠️ 警告: 默认 ChromeDriver 不存在 ({DEFAULT_CHROMEDRIVER_PATH})")
print("将尝试使用系统 PATH 中的 ChromeDriver")
# 构建完整 URL
url = f"{BASE_URL}?page={page}"
# 配置 Chrome 选项
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# 初始化 WebDriver
print("正在初始化 Chrome WebDriver...")
try:
if driver_path:
driver_path_obj = Path(driver_path)
if not driver_path_obj.exists():
raise FileNotFoundError(f"ChromeDriver 文件不存在: {driver_path}")
print(f"使用指定的 ChromeDriver 路径: {driver_path}")
service = Service(str(driver_path_obj))
driver = webdriver.Chrome(service=service, options=chrome_options)
else:
print("使用系统 PATH 中的 ChromeDriver...")
driver = webdriver.Chrome(options=chrome_options)
print("✅ WebDriver 初始化成功")
except Exception as e:
print(f"❌ WebDriver 初始化失败: {e}")
print("提示: 请确保已安装 Chrome 浏览器和 ChromeDriver")
print("ChromeDriver 下载地址: https://chromedriver.chromium.org/")
raise WebDriverException(f"浏览器驱动初始化失败: {e}") from e
try:
# 访问页面
print(f"正在访问: {url}")
driver.get(url)
print("✅ 页面访问成功")
# 等待页面加载完成(等待 homeContainer 和 card 元素加载)
print(f"等待页面元素加载(最多 {wait_time} 秒)...")
try:
WebDriverWait(driver, wait_time).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".homeContainer .card"))
)
print("✅ 找到 .homeContainer .card 元素")
# 额外等待一下,确保所有内容加载完成
print("等待内容加载完成...")
time.sleep(2)
except TimeoutException:
print(f"⚠️ 警告: 在 {wait_time} 秒内未找到 .homeContainer .card 元素,继续处理")
# 尝试查找其他元素确认页面已加载
try:
body = driver.find_element(By.TAG_NAME, "body")
print(f"✅ 页面 body 已加载,继续处理")
except Exception:
print("⚠️ 警告: 无法找到页面 body 元素")
# 提取 homeContainer 中的所有 card 元素
print("正在提取 card 元素...")
home_container = driver.find_element(By.CSS_SELECTOR, ".homeContainer")
cards = home_container.find_elements(By.CSS_SELECTOR, ".card")
print(f"✅ 找到 {len(cards)} 个 card 元素")
# 存储提取的链接和文件 ID
card_links = []
file_ids = []
downloaded_files = []
# 依次触发每个 card 的悬浮事件并获取链接
print("开始处理每个 card 元素...")
for index, card in enumerate(cards, 1):
try:
print(f"处理第 {index}/{len(cards)} 个 card...")
# 滚动到元素可见区域
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", card)
time.sleep(0.5) # 等待滚动完成
# 触发悬浮事件
actions = ActionChains(driver)
actions.move_to_element(card).perform()
time.sleep(0.5) # 等待悬浮效果显示
# 查找 card-content hert-col 下的 a 标签
try:
card_content = card.find_element(By.CSS_SELECTOR, ".card-content.hert-col")
link_element = card_content.find_element(By.TAG_NAME, "a")
href = link_element.get_attribute("href")
if href:
card_links.append(href)
# 从 URL 中提取文件 ID
file_id = extract_file_id_from_url(href)
if file_id:
file_ids.append(file_id)
print(f" ✅ 获取到链接: {href}")
print(f" ✅ 提取到文件 ID: {file_id}")
# 调用下载方法
try:
print(f" 📥 开始下载文件 ID: {file_id}...")
# 添加延迟,避免请求频率过高
if index > 1:
print(f" ⏳ 等待 {download_delay} 秒以避免请求过快...")
time.sleep(download_delay)
saved_path = crawl_single_file(file_id, silent=True)
downloaded_files.append({
"file_id": file_id,
"url": href,
"saved_path": saved_path
})
print(f" ✅ 下载完成: {saved_path}")
except Exception as download_error:
error_msg = str(download_error)
print(f" ❌ 下载失败: {error_msg}")
downloaded_files.append({
"file_id": file_id,
"url": href,
"saved_path": None,
"error": error_msg
})
# 下载失败后也等待一下,避免连续失败
time.sleep(1.0)
else:
print(f" ⚠️ 无法从链接中提取文件 ID: {href}")
else:
print(f" ⚠️ 第 {index} 个 card 的链接为空")
except NoSuchElementException:
print(f" ⚠️ 第 {index} 个 card 未找到 .card-content.hert-col a 元素")
except Exception as e:
print(f" ❌ 第 {index} 个 card 处理出错: {e}")
except Exception as e:
print(f" ❌ 第 {index} 个 card 处理失败: {e}")
continue
print(f"\n✅ 成功提取 {len(card_links)} 个链接")
print(f"✅ 成功提取 {len(file_ids)} 个文件 ID")
print(f"✅ 成功下载 {len([f for f in downloaded_files if f.get('saved_path')])} 个文件")
# 返回结果
result = {
"html_content": driver.page_source,
"card_links": card_links,
"file_ids": file_ids,
"downloaded_files": downloaded_files,
"card_count": len(cards),
"link_count": len(card_links),
"download_count": len([f for f in downloaded_files if f.get('saved_path')]),
"failed_count": len([f for f in downloaded_files if not f.get('saved_path')])
}
return result
except WebDriverException as e:
print(f"❌ 浏览器操作错误: {e}")
raise WebDriverException(f"浏览器驱动错误: {e}") from e
except Exception as e:
print(f"❌ 未知错误: {e}")
raise
finally:
print("正在关闭浏览器...")
driver.quit()
print("✅ 浏览器已关闭")
def crawl_pages(
max_page: int = 1,
start_page: int = 1,
headless: bool = True,
wait_time: int = 10,
driver_path: Optional[str] = None,
download_delay: float = 2.0,
page_delay: float = 3.0,
) -> dict[str, Any]:
"""
分页爬取多个页面的内容并下载图片
Args:
max_page: 最大页码(包含)
start_page: 起始页码,默认从第 1 页开始
headless: 是否使用无头模式(不显示浏览器窗口)
wait_time: 页面加载等待时间(秒)
driver_path: ChromeDriver 路径(可选,默认使用项目目录下的 chromedriver.exe)
download_delay: 每次下载之间的延迟(秒)
page_delay: 每页之间的延迟(秒)
Returns:
包含所有页面结果的字典
"""
all_results = {
"pages": [],
"total_card_count": 0,
"total_link_count": 0,
"total_file_ids": 0,
"total_download_count": 0,
"total_failed_count": 0,
"all_downloaded_files": []
}
print(f"开始爬取第 {start_page} 页到第 {max_page} 页...")
for page in range(start_page, max_page + 1):
print(f"\n{'='*60}")
print(f"正在处理第 {page}/{max_page} 页")
print(f"{'='*60}")
try:
# 如果不是第一页,添加页面之间的延迟
if page > start_page:
print(f"⏳ 等待 {page_delay} 秒后处理下一页...")
time.sleep(page_delay)
# 爬取当前页
page_result = get_page_content(
page=page,
headless=headless,
wait_time=wait_time,
driver_path=driver_path,
download_delay=download_delay,
)
# 合并结果
all_results["pages"].append({
"page": page,
"result": page_result
})
all_results["total_card_count"] += page_result["card_count"]
all_results["total_link_count"] += page_result["link_count"]
all_results["total_file_ids"] += len(page_result["file_ids"])
all_results["total_download_count"] += page_result["download_count"]
all_results["total_failed_count"] += page_result["failed_count"]
all_results["all_downloaded_files"].extend(page_result["downloaded_files"])
print(f"\n✅ 第 {page} 页完成:")
print(f" - Card 数量: {page_result['card_count']}")
print(f" - 成功下载: {page_result['download_count']} 个文件")
print(f" - 下载失败: {page_result['failed_count']} 个文件")
except KeyboardInterrupt:
print(f"\n⚠️ 用户中断操作(在第 {page} 页)")
break
except Exception as e:
print(f"\n❌ 第 {page} 页处理失败: {e}")
import traceback
traceback.print_exc()
# 继续处理下一页
continue
# 打印汇总信息
print(f"\n{'='*60}")
print(f"✅ 所有页面爬取完成!")
print(f"{'='*60}")
print(f"总统计:")
print(f" - 处理页面数: {len(all_results['pages'])}")
print(f" - 总 Card 数量: {all_results['total_card_count']}")
print(f" - 总链接数量: {all_results['total_link_count']}")
print(f" - 总文件 ID 数量: {all_results['total_file_ids']}")
print(f" - 总成功下载: {all_results['total_download_count']} 个文件")
print(f" - 总下载失败: {all_results['total_failed_count']} 个文件")
return all_results
if __name__ == "__main__":
# 示例用法:爬取第 1 页到第 3 页
try:
max_page = 1000 # 最大页码 2720
start_page = 1 # 起始页码
result = crawl_pages(
max_page=max_page,
start_page=start_page,
headless=True, # 设置为 False 可以看到浏览器窗口
wait_time=10,
download_delay=2.0, # 每次下载间隔 2 秒
page_delay=3.0, # 每页之间间隔 3 秒
)
print(f"\n✅ 批量爬取完成!")
except KeyboardInterrupt:
print("\n⚠️ 用户中断操作")
sys.exit(1)
except Exception as e:
print(f"\n❌ 错误: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)