在爬虫学习的过程中,当遇到爬取量较大的情况下,爬虫消耗的时间会比较多。除开使用 Python 爬虫框架之外,合理使用多进程与多线程来爬取数据是非常有效的。
在前两天的实例操作过程中,由于爬取内容较多,导致时间过长,因此我深入研究学习了多线程以及多进程的相关知识,将这两种方法与实例相结合,可以非常有效的缩短爬取时间。废话不多说,我们进入主题。未成年人请酌情阅读
本次实例是下载图片集,基本上可以分解为三个步骤:
- 首先从页面上获取所有的图片列表,提取每个图片集的详细页面地址。
- 接着爬取每个详细页面的图片信息,获取我们想要的图片地址。
- 最后根据获得到的图片地址,将图片下载到本地。
前两个步骤具体实现用的是多线程爬虫,最后一步,根据当时自己实际处理的情况,先是使用多线程爬虫下载图片;后又学习使用多进程多线程爬虫来下载图片。
图片地址的爬取
在我前面的多线程学习文章中,我比较喜欢使用 Queue 模块来实现线程同步,因此代码中也都是用的这种方式。若习惯用 Lock 方式来实现线程同步的朋友们,可以看下我上篇文章,修改下代码同样可以运行。
本文是以爱MM图为例,爬取相关图片。
首先是获取页面上所有的图片详情页面地址,然后存取队列中。
page_urlqueue = queue.Queue()
class Thread_Crawl(threading.Thread):
def __init__(self, que):
threading.Thread.__init__(self)
self.que = que
def run(self):
while True:
url = self.que.get()
self.que.task_done()
if url == None:
break
self.page_spider(url)
def page_spider(self, url):
html = get_text(url)
doc = pq(html)
items = doc('.excerpt-c5').items()
for item in items:
title = item.find('h2').text()#标题
photo_page_url = item.find('h2 > a').attr('href')#图片集详情地址
detail_urlqueue.put(photo_page_url)
接着再次使用多线程爬取队列中的每个地址,将所有图片地址存入到 mongodb 中。
mongo = MongoDB()
class Thread_Parser(threading.Thread):
def __init__(self, que):
threading.Thread.__init__(self)
self.que = que
def run(self):
while True:
url = self.que.get(False)
self.que.task_done()
if not url:
break
self.parse_data(url)
def parse_data(self, url):
global total
html = get_text(url)
doc = pq(html)
items = doc('.article-content .gallery-item').items()
title = doc('.article-title').text()
for item in items:
# 由于图片尺寸有限制,经测试,可以修改获取大图
# https://pic.bsbxjn.com/wp-content/uploads/2019/05/7ee81646f32f97d-240x320.jpg
# 改为:https://pic.bsbxjn.com/wp-content/uploads/2019/05/7ee81646f32f97d.jpg
photo_url = item.find('.gallery-icon > a > img').attr('src')
purl = str(photo_url)[:-12] + str(photo_url)[-4:]
photo = {
'title': title,
'_id': purl,
'status' : 1
}
mongo.insert(photo)
total += 1
图片下载
1.多线程图片下载
class PhotoThread(threading.Thread):
def __init__(self, que):
threading.Thread.__init__(self)
self.que = que
def run(self):
while True:
photo = self.que.get()
self.que.task_done()
if photo == None:
break
self.download_photo(photo)
def download_photo(self,photo):
title = photo['title']
url = photo['photo_url']
fpath = 'MM_photo/' + title
if not os.path.exists(fpath):
os.mkdir(fpath)
response = request.get(url, 3)
if response != None:
file_path = '{0}/{1}.{2}'.format(fpath, md5(response.content).hexdigest(), 'jpg')
if not os.path.exists(file_path):
with open(file_path, 'wb') as fw:
fw.write(response.content)
else:
print('Already Download', file_path)
else:
print('Failed to Save Image')
按照图片集标题将每一类的图片存入各自的文件目录下。
2.多进程多线程图片下载
在实现多线程爬虫爬取图片后,我发现依然会耗费不少的时间,因此查询资料发现可以将多进程与多线程配合着使用。
import time
import threading
from photos_download.download_text import request
import multiprocessing
from photos_download.mongodb_queue import MogoQueue
import os
from hashlib import md5
def mzitu_crawler(max_threads=10):
crawl_queue = MogoQueue()
def pageurl_crawler():
while True:
try:
photo = crawl_queue.pop()#取出的同时,设置url为正在下载状态
url = photo['_id']
print(url)
except KeyError:
print('队列没有数据')
break
else:
download_photo(photo)
crawl_queue.complete(url) ##设置为完成状态
def download_photo(photo):
title = photo['title']
url = photo['_id']
fpath = 'MM_photo/' + title
if not os.path.exists(fpath):
os.mkdir(fpath)
response = request.get(url, 3)
if response != None:
file_path = '{0}/{1}.{2}'.format(fpath, md5(response.content).hexdigest(), 'jpg')
if not os.path.exists(file_path):
with open(file_path, 'wb') as fw:
fw.write(response.content)
else:
print('Already Download', file_path)
else:
print('Failed to Save Image')
threads = []
while crawl_queue:
# print('Process----------{0}-------len(threads)---{1}'.format(multiprocessing.current_process().name,len(threads)))
"""
这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据
crawl_queue为真都代表我们还没下载完成,程序就会继续执行
"""
for thread in threads:
# print('Thread-------{0}-------{1}'.format(thread.name,thread.is_alive()))
if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉
threads.remove(thread)
while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时
# thread = PhotoThread(photos_queue) ##创建线程
thread = threading.Thread(target=pageurl_crawler) ##创建线程
thread.setDaemon(True) ##设置守护线程
thread.start() ##启动线程
threads.append(thread) ##添加进线程队列
time.sleep(1)
def process_crawler():
process = []
num_cpus = multiprocessing.cpu_count()
print('将会启动进程数为:', num_cpus)
for i in range(num_cpus):
p = multiprocessing.Process(target=mzitu_crawler) ##创建进程
p.start() ##启动进程
process.append(p) ##添加进进程队列
for p in process:
p.join() ##等待进程队列里面的进程结束
if __name__ == '__main__':
start_time = time.time()
process_crawler()
end_time = time.time()
print('耗时{}s'.format((end_time - start_time)))
开启多进程,然后进程中构建线程池,利用多线程来下载图片。
每个进程需要知道那些 URL 爬取过了、哪些 URL 需要爬取!我们来给每个 URL 设置三种状态:
- OUTSTANDING = 1 ##初始状态
- PROCESSING = 2 ##正在下载状态
- COMPLETE = 3 ##下载完成状态
这里涉及到一个重要的类 MogoQueue,待会我们根据这个类分析一下爬取思路。
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
from photos_download.setting import *
class MogoQueue():
OUTSTANDING = 1 ##初始状态
PROCESSING = 2 ##正在下载状态
COMPLETE = 3 ##下载完成状态
def __init__(self, timeout=10):##初始mongodb连接
self.client = MongoClient(host=MONGO_HOST, port=MONGO_PORT)
self.db = self.client[MONGO_DB]
self.collection = self.db[MONGO_COL]
self.timeout = timeout
def __bool__(self):
"""
ne的意思是不匹配,如果所有图片都是下载完成状态,则返回False
"""
record = self.collection.find_one(
{'status': {'ne': self.COMPLETE}}
)
return True if record else False
def push(self, url, title): ##这个函数用来添加新的URL进队列
try:
self.collection.insert({'_id': url, 'status': self.OUTSTANDING, 'title': title})
print(url, '插入队列成功')
except errors.DuplicateKeyError as e: ##报错则代表已经存在于队列之中了
print(url, '已经存在于队列中了')
pass
def pop(self):
"""
这个函数会查询队列中的所有状态为OUTSTANDING的值,
更改状态,(query后面是查询)(update后面是更新)
并返回_id(就是我们的URL),MongDB好使吧,^_^
如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
set是设置的意思,和MySQL的set语法一个意思
"""
record = self.collection.find_one_and_update(
{'status': self.OUTSTANDING},
{'set': {'status': self.PROCESSING,'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}
)
if record:
return record
else:
self.repair()
raise KeyError
def peek(self):
"""这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
record = self.collection.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
"""这个函数是更新已完成的URL完成"""
self.collection.update({'_id': url}, {'set': {'status': self.COMPLETE}})
def repair(self):
"""这个函数是重置状态lt是小于,该方法主要是用于判断2状态的可能下载失败"""
dl = datetime.now() - timedelta(seconds=self.timeout)
record = self.collection.update_many(
{
'timestamp': {'lt': dl.strftime("%Y-%m-%d %H:%M:%S")},
'status': self.PROCESSING
},
{'set': {'status': self.OUTSTANDING}}
)
def clear(self):
"""这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
self.collection.drop()
进程刚开始时,所有图片的 URL 状态都为 OUTSTANDING 状态,然后线程获取一个状态为 OUTSTANDING 状态的 URL 对象,在取到这个对象时,同时设置该对象为 PROCESSING 状态,通知别的线程不需要再处理该对象;然后根据图片 url 进行图片下载,下载完成后,设置该对象为 COMPLETE 状态。当然,在图片下载的过程中,可能存在网站未反应情况,这时我们便不能将 URL 对象设置为 COMPLETE 状态。当所有初始状态的图片都爬取结束(图片状态不一定都成功置为 COMPLETE 状态,也可能是 PROCESSING 状态),此时进程因为发现仍然有图片状态不是下载完成状态,所以不会结束进程。在进程继续的过程中,MogoQueue 队列已经获取不到 OUTSTANDING 状态的对象,我们设置一个计时参数,将距离上次进行图片下载时超过 10s 的 URL 对象从 PROCESSING 状态置为 OUTSTANDING 状态,这样保证了所有图片都能够下载。最后,当所有图片状态为 COMPLETE 状态时,进程结束,程序结束。
结果如下:
对于图片下载的实现方式,多进程多线程爬虫在爬取 454 张图片时比多线程爬虫节省了 100s。
详情代码前往:https://github.com/Acorn2/SpiderCrack/tree/master/photos_download
本文作者为hresh,转载请注明。