# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 1.利用getPage创建socket # 2.将socket添加到事件循环 # 3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 1.利用getPage创建socket def response(content): print(content) # 2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode(‘utf-8‘)) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d # 执行task函数 task() # 3.开始事件循环 reactor.run()
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 1.1. 利用getPage创建socket def response(content): print(content) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode(‘utf-8‘)) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d def done(*args,**kwargs): # 终止事件循环 reactor.stop() # 执行task函数 d = task() # 监听d是否完成,需要用列表[d,]加入 dd = defer.DeferredList([d,]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
版本3.0
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 版本3.0 # 3.1.1.解决并发,异步IO的问题--利用多个socket # 1.1. 利用getPage创建socket def response(content): print(content) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" # 根据域名找到你的ip,并创建socket,返回值d和defer.Deferred类似,不会自动终止的特殊socket对象 d = getPage(url.encode(‘utf-8‘)) # 利用socket发请求,请求完成拿到值,执行response函数 d.addCallback(response) yield d def done(*args, **kwargs): # 终止事件循环 reactor.stop() # 执行task函数 # d = task() # 3.1.1.同时监听多个d,利用多个socket,解决并发的问题,异步IO的问题,全部发出去了,等请求回来 li = [] for i in range(10): d = task() li.append(d) dd = defer.DeferredList(li) # 监听d是否完成,需要用列表[d,]加入 # dd = defer.DeferredList([d,]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer # 版本1.0 # 1.1.利用getPage创建socket # 1.2.将socket添加到事件循环 # 1.3.开始事件循环(内部发送请求,并接受响应,当所有socket请求完成,终止事件循环) # 版本2.0 # 2.1.解决不能自动终止的问题 # 版本3.1 # 3.1.解决并发,异步IO的问题--利用task中加入特殊socket对象 # 3.2 加入多个爬虫同时运行的功能--类似scrapy crawl all _close = None count = 0 # 1.1. 利用getPage创建socket def response(content): print(content) global count count += 1 if count == 3: # 使特殊socket对象终止 _close.callback(None) # 1.2.将socket添加到事件循环 # 这个装饰器和yield d表示将socket已经添加到事件循环 @defer.inlineCallbacks def task(): # 3.1:创建多个socket,因为defer.Deferred()特殊对象,不会自动停止 # 设定_close 全局变量,以便能请求全部返还能够手动终止 # 利用全局变量count的计数,去控制特殊对象的终止,只有全部终止才会结束 global _close # 这个相当于scrapy中的start_url url = "http://www.baidu.com" d1 = getPage(url.encode(‘utf-8‘)) d1.addCallback(response) url = "http://www.cnblogs.com" d2 = getPage(url.encode(‘utf-8‘)) d2.addCallback(response) url = "http://www.bing.com" d3 = getPage(url.encode(‘utf-8‘)) d3.addCallback(response) _close = defer.Deferred() yield _close def done(*args, **kwargs): # 终止事件循环 reactor.stop() # 3.2:同时创建多个task即可实现,scrapy爬虫同时执行,2个爬虫有各自的start_url是并发的 # 执行task函数 spider1 = task() spider2 = task() # 监听d是否完成,需要用列表[d,]加入 dd = defer.DeferredList([spider1,spider2]) # 监听d是否完成,如果完成就会调用addBoth的回调函数 # 2.1:利用回调函数done终止事件循环 dd.addBoth(done) # 1.3.开始事件循环 reactor.run()
# 事件循环,监听socket变化(终止条件,所有socket都已经移除) from twisted.internet import reactor # 创建socket对象,如果下载完成,自动从事件循环中移除 from twisted.web.client import getPage # defer.Deferred 特殊的socket的对象,不给任何地址发请求,不会自动移除,需要手动移除 from twisted.internet import defer import queue Q = queue.Queue() class Request(object): # 这里的callback = parse def __init__(self, url, callback): self.url = url self.callback = callback class HttpResponse(object): def __init__(self, content, request): self.content = content self.request = request self.url = request.url self.text = str(content, encoding=‘utf-8‘) class ChoutiSpider(object): name = ‘chouti‘ def start_requests(self): start_url = [‘http://www.baidu.com‘, ‘http://www.bing.com‘, ] for url in start_url: # 执行Request函数 yield Request(url, self.parse) def parse(self, response): # 1.crawling移除 # 2.获取parse yield返回值 # 3.再次去队列中获取 print(response.text) # 执行HttpResponse()中的方法 yield Request(‘http://www.cnblogs.com‘, callback=self.parse) class Engine(object): def __init__(self): self._close = None self.spider = None self.max = 5 # 最大并发数 self.crawling = [] # 表示正在爬取的爬虫 def get_response_callback(self, content, request): # getPage的返回值response,传入的req ---url 和 callback self.crawling.remove(request) # 删除已经下载完成的url callback rep = HttpResponse(content, request) # 将返回值传递过去 # 生成器或空 result = request.callback(rep) # 调用spider中的parse方法 = parse(rep) import types # 判断返回值是不是生成器 if isinstance(result, types.GeneratorType): for req in result: Q.put(req) # 将新请求加入队列 def _next_request(self): # 判断终止条件 if Q.qsize() == 0 and len(self.crawling) == 0: self._close.callback(None) # 手动停止 return # 发送过程中,会有最大并发数的限制,循环取url并下载 if len(self.crawling) >= self.max: # 超过最大并发数,直接返回 return while len(self.crawling) < self.max: # 低于最大并发数 try: req = Q.get(block=False) # 取数据,如果为空会报错,加入block不会等队列中的数据 self.crawling.append(req) # 将取到的url加入记录正在爬取数量的列表crawling d = getPage(req.url.encode(‘utf-8‘)) # getPage创建socket对象,发送请求进行下载 # 5.等页面下载完成执行用户自己定义的回调函数,处理response d.addCallback(self.get_response_callback, req) # d为请求的结果 # 未达到最大并发数,可以再去调度器中获取Request # d.addCallback(self._next_request) # 上一个方法执行玩,进行递归调用,继续取url d.addCallback(lambda _: reactor.callLater(0, self._next_request)) # 多久后调用 except Exception as e: # 如果队列为空,直接返回,不再循环取 return # 这个装饰器和yield self._close表示将socket已经添加到事件循环 @defer.inlineCallbacks def crawl(self, spider): # 3.将初始Request对象添加到调度器---将初始urL加入队列 start_requests = iter(spider.start_requests()) # 迭代器---执行spider中start_request函数 while True: try: request = next(start_requests) # 取迭代器中的下一个值 url和callback Q.put(request) # 将取到的值放入队列 except StopIteration as e: # 如果队列取完,就跳出循环 break # 4.反复去调度器中取request并发送请求进行下载,下载完成后执行回调函数 # self._next_request() reactor.callLater(0, self._next_request) # scrapy内部的写法 self._close = defer.Deferred() # 特殊socket不会自动结束,只能手动结束 yield self._close # 爬虫对象 spider = ChoutiSpider() _active = set() # 1.创建引擎 engine = Engine() # 2.将爬虫放入引擎进行处理,执行引擎中crawl函数 d = engine.crawl(spider) _active.add(d) # 监听爬虫d是否完成,如果完成执行addBoth终止socket dd = defer.DeferredList(_active) # 终止socket dd.addBoth(lambda _: reactor.stop()) reactor.run()
原文:https://www.cnblogs.com/shuimohei/p/13363462.html