以下代码均为cursor生成,如果是自己写的话可能得大半天,cursor几分钟就把脚本都生成了
wallhaven地址:https://wallhaven.cc/search?categories=010&purity=100&sorting=relevance&order=desc
1. 爬取图片
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Wallhaven 图片爬虫
爬取指定收藏夹或搜索页面的所有图片
"""
import os
import re
import time
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse, urlencode, parse_qs
from pathlib import Path
class WallhavenScraper:
def __init__(self, base_url, download_dir="wallhaven_images"):
"""
初始化爬虫
Args:
base_url: 要爬取的页面URL
download_dir: 图片保存目录
"""
self.base_url = base_url
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
# 设置请求头,模拟浏览器访问
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
def get_page(self, url):
"""获取页面内容"""
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"获取页面失败 {url}: {e}")
return None
def extract_detail_urls(self, html):
"""从HTML中提取所有详情页链接"""
soup = BeautifulSoup(html, 'html.parser')
detail_urls = []
# 查找所有带有 class="preview" 的 a 标签
preview_links = soup.find_all('a', class_='preview')
for link in preview_links:
href = link.get('href')
if href:
# 确保是完整的URL
if href.startswith('/'):
href = urljoin('https://wallhaven.cc', href)
detail_urls.append(href)
return detail_urls
def get_image_url(self, detail_url):
"""从详情页获取原图URL"""
html = self.get_page(detail_url)
if not html:
return None
soup = BeautifulSoup(html, 'html.parser')
# 方法1: 查找 id="wallpaper" 的 img 标签
img = soup.find('img', id='wallpaper')
if img:
img_url = img.get('src')
if img_url:
return img_url
# 方法2: 查找 class="scrollbox" 中的 img 标签
scrollbox = soup.find('div', class_='scrollbox')
if scrollbox:
img = scrollbox.find('img')
if img:
img_url = img.get('src')
if img_url:
return img_url
# 方法3: 查找所有可能的图片链接
# wallhaven 的原图通常在类似 https://w.wallhaven.cc/full/xx/wallhaven-xxxxxx.jpg 的URL中
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href')
if href and 'wallhaven.cc/full/' in href:
return href
print(f"无法找到图片URL: {detail_url}")
return None
def download_image(self, img_url, filename):
"""下载图片"""
try:
response = self.session.get(img_url, timeout=30, stream=True)
response.raise_for_status()
filepath = self.download_dir / filename
# 如果文件已存在,跳过
if filepath.exists():
print(f"文件已存在,跳过: {filename}")
return True
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"下载成功: {filename}")
return True
except requests.RequestException as e:
print(f"下载失败 {img_url}: {e}")
return False
def get_filename_from_url(self, img_url):
"""从URL中提取文件名"""
parsed = urlparse(img_url)
filename = os.path.basename(parsed.path)
# 如果没有扩展名,尝试从URL推断
if not filename or '.' not in filename:
# 尝试从详情页URL获取ID
match = re.search(r'/w/([a-z0-9]+)', img_url)
if match:
filename = f"wallhaven-{match.group(1)}.jpg"
else:
filename = f"image_{int(time.time())}.jpg"
return filename
def build_page_url(self, page_num):
"""构建指定页码的URL"""
parsed = urlparse(self.base_url)
query_params = parse_qs(parsed.query)
# 更新或添加page参数
query_params['page'] = [str(page_num)]
# 重新构建URL
new_query = urlencode(query_params, doseq=True)
page_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{new_query}"
return page_url
def get_total_pages(self, html):
"""从HTML中获取总页数"""
soup = BeautifulSoup(html, 'html.parser')
# 方法1: 查找 "Page X / Y" 格式的文本
page_text = soup.find(string=re.compile(r'Page\s+\d+\s*/\s*\d+'))
if page_text:
match = re.search(r'Page\s+\d+\s*/\s*(\d+)', page_text)
if match:
total = int(match.group(1))
print(f"从页面文本中检测到总页数: {total}")
return total
# 方法2: 查找分页导航中的最大页码
pagination = soup.find('nav', class_='pagination')
if pagination:
# 查找所有页码链接
page_links = pagination.find_all('a', href=True)
max_page = 1
for link in page_links:
text = link.get_text().strip()
try:
page_num = int(text)
max_page = max(max_page, page_num)
except ValueError:
pass
# 如果找到的页码很大,可能是总页数
if max_page > 1:
print(f"从分页导航中检测到最大页码: {max_page}")
return max_page
# 查找"最后一页"链接
last_page_link = pagination.find('a', string=re.compile(r'^\d+$'))
if last_page_link:
try:
last_page = int(last_page_link.get_text().strip())
print(f"从最后一页链接中检测到总页数: {last_page}")
return last_page
except ValueError:
pass
# 方法3: 查找所有包含页码的文本
all_text = soup.get_text()
page_matches = re.findall(r'Page\s+\d+\s*/\s*(\d+)', all_text)
if page_matches:
try:
total = max(int(p) for p in page_matches)
print(f"从页面文本中检测到总页数: {total}")
return total
except (ValueError, TypeError):
pass
print("警告: 无法自动检测总页数,返回1")
return 1
def scrape(self, delay=1, max_pages=None, start_page=1, total_pages=None):
"""
开始爬取
Args:
delay: 每次请求之间的延迟(秒),避免被封
max_pages: 最大爬取页数,None表示爬取所有页面
start_page: 起始页码(从1开始)
total_pages: 手动指定总页数,如果为None则自动检测
"""
print(f"开始爬取: {self.base_url}")
print(f"图片将保存到: {self.download_dir.absolute()}")
# 获取第一页以确定总页数
first_page_url = self.build_page_url(start_page)
html = self.get_page(first_page_url)
if not html:
print("无法获取页面内容")
return
# 获取总页数
if total_pages is None:
detected_pages = self.get_total_pages(html)
total_pages = detected_pages
else:
print(f"使用手动指定的总页数: {total_pages}")
if max_pages:
total_pages = min(total_pages, max_pages)
print(f"总共 {total_pages} 页,从第 {start_page} 页开始爬取")
# 统计信息
total_success = 0
total_fail = 0
all_detail_urls = set() # 用于去重
# 逐页爬取并下载(边爬边下载,避免内存占用过大)
print(f"\n开始循环爬取,将从第 {start_page} 页到第 {total_pages} 页")
print(f"循环范围: range({start_page}, {total_pages + 1})")
print(f"预计将处理 {total_pages - start_page + 1} 页\n")
for page in range(start_page, total_pages + 1):
try:
print(f"\n{'='*60}")
print(f"正在处理第 {page}/{total_pages} 页")
print(f"{'='*60}")
# 构建当前页URL
page_url = self.build_page_url(page)
print(f"当前页URL: {page_url}")
html = self.get_page(page_url)
if not html:
print(f"第 {page} 页获取失败,跳过")
time.sleep(delay)
continue
# 提取当前页的详情页链接
detail_urls = self.extract_detail_urls(html)
print(f"第 {page} 页找到 {len(detail_urls)} 个图片链接")
if not detail_urls:
print(f"第 {page} 页没有找到图片链接,继续下一页")
time.sleep(delay)
continue
# 下载当前页的图片
page_success = 0
page_fail = 0
for i, detail_url in enumerate(detail_urls, 1):
try:
# 去重检查
if detail_url in all_detail_urls:
print(f" [{i}/{len(detail_urls)}] 跳过重复链接: {detail_url}")
continue
all_detail_urls.add(detail_url)
print(f" [{i}/{len(detail_urls)}] 处理: {detail_url}")
# 获取原图URL
img_url = self.get_image_url(detail_url)
if not img_url:
page_fail += 1
total_fail += 1
time.sleep(delay)
continue
# 生成文件名
filename = self.get_filename_from_url(img_url)
# 如果文件名不包含详情页ID,尝试从详情页URL提取
if 'wallhaven-' not in filename:
match = re.search(r'/w/([a-z0-9]+)', detail_url)
if match:
filename = f"wallhaven-{match.group(1)}.jpg"
# 下载图片
if self.download_image(img_url, filename):
page_success += 1
total_success += 1
else:
page_fail += 1
total_fail += 1
# 延迟,避免请求过快
time.sleep(delay)
except Exception as e:
print(f" [{i}/{len(detail_urls)}] 处理图片时出错: {e}")
page_fail += 1
total_fail += 1
time.sleep(delay)
continue
print(f"第 {page} 页完成: 成功 {page_success} 张, 失败 {page_fail} 张")
print(f"累计进度: 成功 {total_success} 张, 失败 {total_fail} 张")
# 页面之间的延迟
if page < total_pages:
time.sleep(delay)
except Exception as e:
print(f"处理第 {page} 页时发生错误: {e}")
print(f"错误类型: {type(e).__name__}")
import traceback
traceback.print_exc()
print(f"继续处理下一页...")
time.sleep(delay)
continue
print(f"\n{'='*60}")
print(f"爬取完成!")
print(f"总共处理: {len(all_detail_urls)} 个图片链接")
print(f"成功: {total_success} 张")
print(f"失败: {total_fail} 张")
print(f"图片保存在: {self.download_dir.absolute()}")
print(f"{'='*60}")
def main():
"""主函数"""
# 目标URL - 搜索页面(注意:URL中的page参数会被忽略,会从start_page开始)
url = "https://wallhaven.cc/search?categories=010&purity=100&sorting=relevance&order=desc"
# 创建爬虫实例,图片保存到 D:\tmp\pics
scraper = WallhavenScraper(url, download_dir=r"D:\tmp\pics")
# 开始爬取所有6589页(延迟1秒,避免被封)
# 如果只想测试,可以设置 max_pages=10 来限制页数
# 如果自动检测失败,可以手动指定 total_pages=6589
scraper.scrape(delay=1, max_pages=None, start_page=1, total_pages=6589)
if __name__ == "__main__":
main()
2.挑选出自己喜欢的图片后对图片进行分类
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图片分类脚本
根据图片的尺寸(高度和宽度)将图片分为三类:
- 高度大于宽度(竖图/Portrait)
- 高度等于宽度(正方形/Square)
- 高度小于宽度(横图/Landscape)
"""
import os
import shutil
from pathlib import Path
from PIL import Image
class ImageClassifier:
def __init__(self, source_dir, output_base_dir="classified_images"):
"""
初始化图片分类器
Args:
source_dir: 源图片文件夹路径
output_base_dir: 输出文件夹的基础目录
"""
self.source_dir = Path(source_dir)
self.output_base_dir = Path(output_base_dir)
# 创建三个分类文件夹
self.portrait_dir = self.output_base_dir / "portrait" # 竖图(高度>宽度)
self.square_dir = self.output_base_dir / "square" # 正方形(高度=宽度)
self.landscape_dir = self.output_base_dir / "landscape" # 横图(高度<宽度)
# 创建输出目录
self.output_base_dir.mkdir(parents=True, exist_ok=True)
self.portrait_dir.mkdir(parents=True, exist_ok=True)
self.square_dir.mkdir(parents=True, exist_ok=True)
self.landscape_dir.mkdir(parents=True, exist_ok=True)
# 支持的图片格式
self.supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tiff', '.tif'}
def get_image_size(self, image_path):
"""
获取图片的尺寸
Args:
image_path: 图片文件路径
Returns:
(width, height) 元组,如果读取失败返回None
"""
try:
with Image.open(image_path) as img:
return img.size # 返回 (width, height)
except Exception as e:
print(f"无法读取图片 {image_path}: {e}")
return None
def classify_image(self, image_path):
"""
根据图片尺寸分类图片
Args:
image_path: 图片文件路径
Returns:
目标文件夹路径,如果分类失败返回None
"""
size = self.get_image_size(image_path)
if size is None:
return None
width, height = size
# 根据高度和宽度的关系分类
if height > width:
return self.portrait_dir
elif height == width:
return self.square_dir
else: # height < width
return self.landscape_dir
def move_image(self, source_path, target_dir):
"""
移动图片到目标文件夹
Args:
source_path: 源文件路径
target_dir: 目标文件夹路径
"""
target_path = target_dir / source_path.name
# 如果目标文件已存在,添加序号
counter = 1
original_name = source_path.stem
extension = source_path.suffix
while target_path.exists():
new_name = f"{original_name}_{counter}{extension}"
target_path = target_dir / new_name
counter += 1
try:
shutil.move(str(source_path), str(target_path))
return target_path
except Exception as e:
print(f"移动文件失败 {source_path} -> {target_path}: {e}")
return None
def copy_image(self, source_path, target_dir):
"""
复制图片到目标文件夹(保留原文件)
Args:
source_path: 源文件路径
target_dir: 目标文件夹路径
"""
target_path = target_dir / source_path.name
# 如果目标文件已存在,添加序号
counter = 1
original_name = source_path.stem
extension = source_path.suffix
while target_path.exists():
new_name = f"{original_name}_{counter}{extension}"
target_path = target_dir / new_name
counter += 1
try:
shutil.copy2(str(source_path), str(target_path))
return target_path
except Exception as e:
print(f"复制文件失败 {source_path} -> {target_path}: {e}")
return None
def classify_all_images(self, move_files=True):
"""
分类文件夹下的所有图片
Args:
move_files: True表示移动文件,False表示复制文件(保留原文件)
"""
if not self.source_dir.exists():
print(f"错误: 源文件夹不存在: {self.source_dir}")
return
if not self.source_dir.is_dir():
print(f"错误: 指定的路径不是文件夹: {self.source_dir}")
return
# 统计信息
stats = {
'total': 0,
'portrait': 0,
'square': 0,
'landscape': 0,
'failed': 0
}
print(f"开始分类图片...")
print(f"源文件夹: {self.source_dir.absolute()}")
print(f"输出文件夹: {self.output_base_dir.absolute()}")
print(f"模式: {'移动' if move_files else '复制'}")
print(f"{'='*60}\n")
# 遍历源文件夹中的所有文件
image_files = [f for f in self.source_dir.iterdir()
if f.is_file() and f.suffix.lower() in self.supported_formats]
if not image_files:
print(f"在 {self.source_dir} 中没有找到支持的图片文件")
print(f"支持的格式: {', '.join(self.supported_formats)}")
return
print(f"找到 {len(image_files)} 个图片文件\n")
# 处理每个图片文件
for i, image_path in enumerate(image_files, 1):
print(f"[{i}/{len(image_files)}] 处理: {image_path.name}")
target_dir = self.classify_image(image_path)
if target_dir is None:
print(f" ❌ 分类失败")
stats['failed'] += 1
continue
# 移动或复制文件
if move_files:
result = self.move_image(image_path, target_dir)
else:
result = self.copy_image(image_path, target_dir)
if result:
# 更新统计信息
stats['total'] += 1
if target_dir == self.portrait_dir:
stats['portrait'] += 1
print(f" ✅ 竖图 -> {target_dir.name}/")
elif target_dir == self.square_dir:
stats['square'] += 1
print(f" ✅ 正方形 -> {target_dir.name}/")
else:
stats['landscape'] += 1
print(f" ✅ 横图 -> {target_dir.name}/")
else:
stats['failed'] += 1
print(f" ❌ 操作失败")
# 打印统计信息
print(f"\n{'='*60}")
print(f"分类完成!")
print(f"总共处理: {stats['total']} 张图片")
print(f" - 竖图 (高度>宽度): {stats['portrait']} 张 -> {self.portrait_dir}")
print(f" - 正方形 (高度=宽度): {stats['square']} 张 -> {self.square_dir}")
print(f" - 横图 (高度<宽度): {stats['landscape']} 张 -> {self.landscape_dir}")
print(f" - 失败: {stats['failed']} 张")
print(f"{'='*60}")
def main():
"""主函数"""
# ========== 配置区域 ==========
# 源图片文件夹路径(需要分类的图片所在文件夹)
source_directory = r"D:\temp\tt" # 请修改为你的图片文件夹路径
# 输出文件夹路径(分类后的图片将存放在这里)
output_directory = r"D:\tmp\classified_images" # 可以自定义输出文件夹路径
# 是否移动文件(True=移动,False=复制,保留原文件)
move_files = True # 设置为False可以保留原文件
# ========== 执行分类 ==========
classifier = ImageClassifier(source_directory, output_directory)
classifier.classify_all_images(move_files=move_files)
if __name__ == "__main__":
main()
3.对图片进行压缩,将png和jpg压缩成webp格式
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
将指定目录下所有非 WebP 图片转换为 WebP 格式。
默认遍历 `ANIME_DIR` 中的所有文件:
- 过滤掉已经是 .webp 的文件
- 使用 Pillow 重新编码为 WebP,默认质量 80
- 输出文件名与原文件相同但扩展名为 .webp
"""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Iterable, List
from PIL import Image
# 默认目录:用户附件中的 anime_picture 目录
ANIME_DIR = Path(r"D:\study\code\test\20251116213718-backup-8ehv92nj\workdir\attachments\upload\anime_picture")
DEFAULT_QUALITY = 80
def find_source_images(folder: Path) -> Iterable[Path]:
"""获取目录下所有非 webp 文件。"""
for file in folder.iterdir():
if file.is_file() and file.suffix.lower() != ".webp":
yield file
def convert_image(source: Path, quality: int) -> Path:
"""将单个文件转换为 webp,返回生成文件路径。"""
target = source.with_suffix(".webp")
with Image.open(source) as img:
if img.mode in {"RGBA", "LA", "P"}:
img = img.convert("RGBA")
else:
img = img.convert("RGB")
img.save(
target,
format="WEBP",
quality=quality,
method=6, # 使用较高压缩效率
)
return target
def convert_folder(folder: Path, quality: int, sources: List[Path]) -> List[Path]:
"""转换目录中的所有图片,返回生成的文件列表。"""
created: List[Path] = []
total = len(sources)
for idx, image in enumerate(sources, 1):
convert_image(image, quality)
created.append(image.with_suffix(".webp"))
print(f"[{idx}/{total}] {image.name} -> {image.with_suffix('.webp').name}")
return created
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="批量将图片转换为 WebP")
parser.add_argument(
"-d",
"--dir",
type=Path,
default=ANIME_DIR,
help="需要转换的目录(默认为附件 anime_picture 路径)",
)
parser.add_argument(
"-q",
"--quality",
type=int,
default=DEFAULT_QUALITY,
help=f"WebP 输出质量 (1-100),默认 {DEFAULT_QUALITY}",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
folder: Path = args.dir
quality: int = args.quality
if not folder.exists() or not folder.is_dir():
raise SystemExit(f"目录不存在或不可用: {folder}")
sources = list(find_source_images(folder))
if not sources:
print("没有找到需要转换的文件。")
return
total = len(sources)
print(f"开始转换,共 {total} 个文件,目标目录: {folder}")
results = convert_folder(folder, quality, sources)
print(f"转换完成,生成 {len(results)} 个 WebP 文件。")
if __name__ == "__main__":
main()