首页 > 其他 > 详细

<scrapy>scrapy源码剖析

时间:2020-07-22 23:35:47      阅读:63      评论:0      收藏:0      [点我收藏+]
  • 前戏Twisted使用
    • 1.简单实现版本1.0
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        
        
        # 1.利用getPage创建socket
        # 2.将socket添加到事件循环
        # 3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
        
        # 1.利用getPage创建socket
        def response(content):
        	print(content)
        
        # 2.将socket添加到事件循环
        # 这个装饰器和yield d表示将socket已经添加到事件循环
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
        	d = getPage(url.encode(‘utf-8‘))
        	# 利用socket发请求,请求完成拿到值,执行response函数
        	d.addCallback(response)
        
        	yield d
        
        # 执行task函数
        task()
        # 3.开始事件循环
        reactor.run()
    • 版本2.0
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        
        # 版本1.0
        # 1.1.利用getPage创建socket
        # 1.2.将socket添加到事件循环
        # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
        
        # 版本2.0
        # 2.1.解决不能自动终止的问题
        
        # 1.1. 利用getPage创建socket
        def response(content):
        	print(content)
        
        # 1.2.将socket添加到事件循环
        # 这个装饰器和yield d表示将socket已经添加到事件循环
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
        	d = getPage(url.encode(‘utf-8‘))
        	# 利用socket发请求,请求完成拿到值,执行response函数
        	d.addCallback(response)
        
        	yield d
        
        def done(*args,**kwargs):
        	# 终止事件循环
        	reactor.stop()
        
        # 执行task函数
        d = task()
        # 监听d是否完成,需要用列表[d,]加入
        dd = defer.DeferredList([d,])
        # 监听d是否完成,如果完成就会调用addBoth的回调函数
        # 2.1:利用回调函数done终止事件循环
        dd.addBoth(done)
        
        
        # 1.3.开始事件循环
        reactor.run()
    •  

      版本3.0
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          
          # 版本1.0
          # 1.1.利用getPage创建socket
          # 1.2.将socket添加到事件循环
          # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 版本2.0
          # 2.1.解决不能自动终止的问题
          
          # 版本3.0
          # 3.1.1.解决并发,异步IO的问题--利用多个socket
          
          # 1.1. 利用getPage创建socket
          def response(content):
          	print(content)
          
          
          # 1.2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	url = "http://www.baidu.com"
          	# 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象
          	d = getPage(url.encode(‘utf-8‘))
          	# 利用socket发请求,请求完成拿到值,执行response函数
          	d.addCallback(response)
          
          	yield d
          
          
          def done(*args, **kwargs):
          	# 终止事件循环
          	reactor.stop()
          
          
          # 执行task函数
          # d = task()
          
          # 3.1.1.同时监听多个d,利用多个socket,解决并发的问题,异步IO的问题,全部发出去了,等请求回来
          li = []
          for i in range(10):
          	d = task()
          	li.append(d)
          
          dd = defer.DeferredList(li)
          
          # 监听d是否完成,需要用列表[d,]加入
          # dd = defer.DeferredList([d,])
          
          # 监听d是否完成,如果完成就会调用addBoth的回调函数
          # 2.1:利用回调函数done终止事件循环
          dd.addBoth(done)
          
          # 1.3.开始事件循环
          reactor.run()

       

        版本3.1-另一种方法解决并发问题-以及多爬虫同时爬取的并发问题
        • 有bug是因为_close只有一个,后面会进行封装,不用多关注
        • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
          from twisted.internet import reactor
          # 创建socket对象,如果下载完成,自动从事件循环中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
          from twisted.internet import defer
          
          # 版本1.0
          # 1.1.利用getPage创建socket
          # 1.2.将socket添加到事件循环
          # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环)
          
          # 版本2.0
          # 2.1.解决不能自动终止的问题
          
          # 版本3.1
          # 3.1.解决并发,异步IO的问题--利用task中加入特殊socket对象
          # 3.2 加入多个爬虫同时运行的功能--类似scrapy crawl all
          
          
          _close = None
          count = 0
          
          
          # 1.1. 利用getPage创建socket
          def response(content):
          	print(content)
          	global count
          	count += 1
          	if count == 3:
          		# 使特殊socket对象终止
          		_close.callback(None)
          
          
          # 1.2.将socket添加到事件循环
          # 这个装饰器和yield d表示将socket已经添加到事件循环
          @defer.inlineCallbacks
          def task():
          	# 3.1:创建多个socket,因为defer.Deferred()特殊对象,不会自动停止
          	# 设定_close 全局变量,以便能请求全部返还能够手动终止
          	# 利用全局变量count的计数,去控制特殊对象的终止,只有全部终止才会结束
          	global _close
          
          	# 这个相当于scrapy中的start_url
          	url = "http://www.baidu.com"
          	d1 = getPage(url.encode(‘utf-8‘))
          	d1.addCallback(response)
          
          	url = "http://www.cnblogs.com"
          	d2 = getPage(url.encode(‘utf-8‘))
          	d2.addCallback(response)
          
          	url = "http://www.bing.com"
          	d3 = getPage(url.encode(‘utf-8‘))
          	d3.addCallback(response)
          
          	_close = defer.Deferred()
          
          	yield _close
          
          
          def done(*args, **kwargs):
          	# 终止事件循环
          	reactor.stop()
          
          
          # 3.2:同时创建多个task即可实现,scrapy爬虫同时执行,2个爬虫有各自的start_url是并发的
          # 执行task函数
          spider1 = task()
          spider2 = task()
          
          # 监听d是否完成,需要用列表[d,]加入
          dd = defer.DeferredList([spider1,spider2])
          
          # 监听d是否完成,如果完成就会调用addBoth的回调函数
          # 2.1:利用回调函数done终止事件循环
          dd.addBoth(done)
          
          # 1.3.开始事件循环
          reactor.run()
                        
  • scrapy经验 + Twisted功能
    • Low
      • # 事件循环,监听socket变化(终止条件,所有socket都已经移除)
        from twisted.internet import reactor
        # 创建socket对象,如果下载完成,自动从事件循环中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除
        from twisted.internet import defer
        import queue
        
        Q = queue.Queue()
        
        
        class Request(object):
        	# 这里的callback = parse
        	def __init__(self, url, callback):
        		self.url = url
        		self.callback = callback
        
        
        class HttpResponse(object):
        
        	def __init__(self, content, request):
        		self.content = content
        		self.request = request
        		self.url = request.url
        		self.text = str(content, encoding=‘utf-8‘)
        
        
        class ChoutiSpider(object):
        	name = ‘chouti‘
        
        	def start_requests(self):
        		start_url = [‘http://www.baidu.com‘, ‘http://www.bing.com‘, ]
        		for url in start_url:
        			# 执行Request函数
        			yield Request(url, self.parse)
        
        	def parse(self, response):
        		# 1.crawling移除
        		# 2.获取parse yield返回值
        		# 3.再次去队列中获取
        		print(response.text)  # 执行HttpResponse()中的方法
        		yield Request(‘http://www.cnblogs.com‘, callback=self.parse)
        
        
        class Engine(object):
        	def __init__(self):
        		self._close = None
        		self.spider = None
        		self.max = 5  # 最大并发数
        		self.crawling = []  # 表示正在爬取的爬虫
        
        	def get_response_callback(self, content, request):
        		# getPage的返回值response,传入的req ---url 和 callback
        		self.crawling.remove(request)  # 删除已经下载完成的url callback
        		rep = HttpResponse(content, request)  # 将返回值传递过去
        		# 生成器或空
        		result = request.callback(rep)  # 调用spider中的parse方法 = parse(rep)
        		import types
        		# 判断返回值是不是生成器
        		if isinstance(result, types.GeneratorType):
        			for req in result:
        				Q.put(req)  # 将新请求加入队列
        
        	def _next_request(self):
        		# 判断终止条件
        		if Q.qsize() == 0 and len(self.crawling) == 0:
        			self._close.callback(None)  # 手动停止
        			return
        
        		# 发送过程中,会有最大并发数的限制,循环取url并下载
        		if len(self.crawling) >= self.max:  # 超过最大并发数,直接返回
        			return
        		while len(self.crawling) < self.max:  # 低于最大并发数
        			try:
        				req = Q.get(block=False)  # 取数据,如果为空会报错,加入block不会等队列中的数据
        				self.crawling.append(req)  # 将取到的url加入记录正在爬取数量的列表crawling
        				d = getPage(req.url.encode(‘utf-8‘))  # getPage创建socket对象,发送请求进行下载
        
        				# 5.等页面下载完成执行用户自己定义的回调函数,处理response
        				d.addCallback(self.get_response_callback, req)  # d为请求的结果
        				# 未达到最大并发数,可以再去调度器中获取Request
        				# d.addCallback(self._next_request)  # 上一个方法执行玩,进行递归调用,继续取url
        				d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 多久后调用
        			except Exception as e:  # 如果队列为空,直接返回,不再循环取
        				return
        
        	# 这个装饰器和yield self._close表示将socket已经添加到事件循环
        	@defer.inlineCallbacks
        	def crawl(self, spider):
        		# 3.将初始Request对象添加到调度器---将初始urL加入队列
        		start_requests = iter(spider.start_requests())  # 迭代器---执行spider中start_request函数
        		while True:
        			try:
        				request = next(start_requests)  # 取迭代器中的下一个值 url和callback
        				Q.put(request)  # 将取到的值放入队列
        			except StopIteration as e:  # 如果队列取完,就跳出循环
        				break
        
        		# 4.反复去调度器中取request并发送请求进行下载,下载完成后执行回调函数
        		# self._next_request()
        		reactor.callLater(0, self._next_request)  # scrapy内部的写法
        
        		self._close = defer.Deferred()  # 特殊socket不会自动结束,只能手动结束
        		yield self._close
        
        
        # 爬虫对象
        spider = ChoutiSpider()
        
        _active = set()
        # 1.创建引擎
        engine = Engine()
        # 2.将爬虫放入引擎进行处理,执行引擎中crawl函数
        d = engine.crawl(spider)
        
        _active.add(d)
        # 监听爬虫d是否完成,如果完成执行addBoth终止socket
        dd = defer.DeferredList(_active)
        # 终止socket
        dd.addBoth(lambda _: reactor.stop())
        
        reactor.run()
          
    • High
  • Scrapy源码剖析            

<scrapy>scrapy源码剖析

原文:https://www.cnblogs.com/shuimohei/p/13363462.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!