本文介绍一个简单的多线程并发爬虫,这里说的简单是指爬取的数据规模不大,单机运行,并且不使用数据库,但保证多线程下的数据的一致性,并且能让爬得正起劲的爬虫停下来,而且能保存爬取状态以备下次继续。
爬虫实现的步骤基本如下:
这次是为了给博客园的用户进行一次pagerank排名,爬取了博客园各个用户的粉丝与关注者。博客园的用户用17万多个。爬取的页面是http://home.cnblogs.com/u/+userId,每个用户的url只需要用用户的id表示就可以了,用户id按平均10B来计算,保存所有用户也只需1.7Mb内存。因此我把两个Buffer都放在内存中。
这个项目的就四个java文件,结构如下:
下面对整个爬虫的实现过程进行详细的介绍。
一、登录
要获取用户的粉丝与关注,必须先登录,博客园的模拟登陆算是比较简单,找到登录时要上传的参数,然后Pos发送即登录成功,可以使用Chrome的工具,打开登录页面,调好账号和密码后,按F12弹出工具,按登录就能看到要传的参数了,再POST一个这些参数就好了。
我之前是使用这样的方式,后来用使用Jsoup解析的参数,代码实现如下:
1 /** 2 * 使用Joup解析登录参数,然后POST发送参数实现登录 3 * 4 * @throws UnsupportedEncodingException 5 * @throws IOException 6 */ 7 private static void login() throws UnsupportedEncodingException, 8 IOException { 9 CookieHandler.setDefault(new CookieManager()); 10 // 获取登录页面 11 String page = getPage(LOGIN_URL); 12 // 从登录去取出参数,并填充账号和密码 13 Document doc = Jsoup.parse(page); 14 // 取登录表格 15 Element loginform = doc.getElementById("frmLogin"); 16 Elements inputElements = loginform.getElementsByTag("input"); 17 List<String> paramList = new ArrayList<String>(); 18 for (Element inputElement : inputElements) { 19 String key = inputElement.attr("name"); 20 String value = inputElement.attr("value"); 21 if (key.equals("tbUserName")) 22 value = Test.Name; 23 else if (key.equals("tbPassword")) 24 value = Test.passwd; 25 paramList.add(key + "=" + URLEncoder.encode(value, "UTF-8")); 26 } 27 // 封装请求参数 28 StringBuilder para = new StringBuilder(); 29 for (String param : paramList) { 30 if (para.length() == 0) { 31 para.append(param); 32 } else { 33 para.append("&" + param); 34 } 35 } 36 // POST发送登录 37 String result = sendPost(LOGIN_URL, para.toString()); 38 if (!result.contains("followees")) { 39 cookies = null; 40 System.out.println("登录失败"); 41 } else 42 System.out.println("登录成功"); 43 }
二、获取粉丝与关注
登录成功就可以爬取粉丝和关注了,关注在http://home.cnblogs.com/u/userid/followees/链接中,而粉丝在http://home.cnblogs.com/u/userid/followers/,两个网页结构基本相同,只需要把选择一下followees(被关注者)和(followers)关注者,用Jsoup解析avatar_list中avatar_name就好了,代码如下:
1 /** 2 * 获取一页中的关注or粉丝 3 * 4 * @param pageHtml 5 * @return 6 */ 7 8 private List<String> getOnePageFriends(Document doc) { 9 List<String> firends = new ArrayList<String>(); 10 Elements inputElements = doc.getElementsByClass("avatar_name"); 11 for (Element inputElement : inputElements) { 12 Elements links = inputElement.getElementsByTag("a"); 13 for (Element link : links) { 14 //从href中解析出用户id 15 String href = link.attr("href"); 16 firends.add(href.substring(3, href.length() - 1)); 17 } 18 } 19 return firends; 20 }
每一页显示50个粉丝or关注者,需要分页爬取,获取下一页跟获取用户粉丝差不多,找到元素就好。
三、爬取单个用户
爬取的一个用户的过程就是先分页爬取粉丝,再爬取关注者,然后把爬过的用户放入访问Buffer中,再把爬到的用户放到未访问队列中。
1 @Override 2 public void run() { 3 while (stop.get() == false) { 4 // 取出一个待访问 5 String userId = mUserBuffer.pickOne(); 6 try { 7 // 爬取粉丝 8 List<String> fans = crawUser(userId, "/followers"); 9 // 爬取关注者 10 List<String> heros = crawUser(userId, "/followees"); 11 // 只需要保持粉丝关系即可 12 StringBuilder sb = new StringBuilder(userId).append("\t"); 13 for (String friend : fans) { 14 sb.append(friend).append("\t"); 15 } 16 sb.deleteCharAt(sb.length() - 1).append("\n"); 17 saver.save(sb.toString()); 18 // 被关注者应该放进队列里面,以供下次爬取他的粉丝 19 fans.addAll(heros); 20 mUserBuffer.addUnCrawedUsers(fans); 21 } catch (Exception e) { 22 saver.log(e.getMessage()); 23 // 访问错误时,放入访问出错的队列中,以备以后重新访问。 24 mUserBuffer.addErrorUser(userId); 25 } 26 }
一页一页爬取单个用户如下:
1 /** 2 * 爬取用户,根据tag来决定是爬该用户关注的人,还是该用户的粉丝 3 * 4 * @param userId 5 * @return 6 * @throws IOException 7 */ 8 private List<String> crawUser(String userId, String tag) throws IOException { 9 //构造URL 10 StringBuilder urlBuilder = new StringBuilder(USER_HOME); 11 urlBuilder.append("/u/").append(userId).append(tag); 12 //请求页面 13 String page = getPage(urlBuilder.toString()); 14 Document doc = Jsoup.parse(page); 15 List<String> friends = new ArrayList<String>(); 16 //爬取第一页 17 friends.addAll(getOnePageFriends(doc)); 18 String nextUrl = null; 19 //不断地爬取下一页 20 while ((nextUrl = getNextUrl(doc)) != null) { 21 page = getPage(nextUrl); 22 doc = Jsoup.parse(page); 23 friends.addAll(getOnePageFriends(doc)); 24 } 25 return friends; 26 }
整个爬虫结构就是这样了:
1 public class UserCrawler implements Runnable { 2 // 停止任务标志 3 private static AtomicBoolean stop; 4 // 当前爬虫的id 5 private int id; 6 // 用户缓存 7 private UserBuffer mUserBuffer; 8 // 日志与粉丝保存工具 9 private Saver saver; 10 11 static { 12 stop = new AtomicBoolean(false); 13 try { 14 // 登录一次即可 15 login(); 16 // 保存数据线程先启动 17 Saver.getInstance().start(); 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 // new Thread(new CommandListener()).start(); 22 } 23 24 public UserCrawler(UserBuffer userBuffer) { 25 mUserBuffer = userBuffer; 26 mUserBuffer.crawlerCountIncrease(); 27 id = c++; 28 saver = Saver.getInstance(); 29 } 30 31 @Override 32 public void run() { 33 if (id > 0) { 34 // 等第一个线程启动一段时候再开始新的线程 35 try { 36 TimeUnit.SECONDS.sleep(20 + id); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 } 41 System.out.println("UserCrawler " + id + " start"); 42 int retry = 3;// 重置尝试次数 43 while (stop.get() == false) { 44 // 取出一个待访问 45 String userId = mUserBuffer.pickOne(); 46 if (userId == null) {// 队列元素已经为空 47 retry--;// 重试3次 48 if (retry <= 0) 49 break; 50 continue; 51 } 52 ...//爬取用户 53 } 54 System.out.println("UserCrawler " + id + " stop"); 55 // 当前线程停止了 56 mUserBuffer.crawlerCountDecrease(); 57 } 58 59 private List<String> crawUser(String userId, String tag) throws IOException { 60 61 } 62 63 /** 64 * 获取一页中的关注or粉丝 65 * 66 * @param pageHtml 67 * @return 68 */ 69 70 private List<String> getOnePageFriends(Document doc) { 71 ... 72 } 73 74 /** 75 * 获取下一页的地址 76 * 77 * @param doc 78 * @return 79 */ 80 private String getNextUrl(Document doc) { 81 82 } 83 84 private static String getPage(String pageUrl) throws IOException { 85 86 } 87 88 /*** 89 * 终止所有爬虫任务 90 */ 91 public static void stop() { 92 System.out.println("正在终止..."); 93 stop.compareAndSet(false, true); 94 UserBuffer.getInstance().prepareForStop(); 95 } 96 97 private static void login() throws UnsupportedEncodingException, 98 IOException { 99 ... 100 } 101 102 103 private static String sendPost(String url, String postParams) 104 throws IOException { 105 ... 106 107 }
四、Buffer并发控制
在用户Buffer设置一个已经访问的用户集合、一个访问出错的用户集合和一个待访问的队列。
1 private UserBuffer() { 2 crawedUsers = new HashSet<String>();// 已经访问的用户,包括访问成功和访问出错的用户 3 errorUsers = new HashSet<String>();// 访问出错的用户 4 unCrawedUsers = new LinkedList<String>();// 未访问的用户 5 }
UserBuffer全局唯一,因此采用单例模式,使用的集合和队列都不是线程安全的数据结构,我认为没有必要使用线程安全的ConcurrentSkipListSet与ConcurrentLinkedQueue,因为将一个用户插入unCrawedUsers队列时需要先判断是否已经存在于crawedUsers用户集合中了,这需要用锁来同步访问,如果不使用锁来控制,单个线程安全的set和queque不能保证多个变量之间的test-try有效性。而使用了锁后,再使用线程安全的数据结构只会增加加锁和解锁的次数,反而降低了性能。
1 /*** 2 * 添加未访问的用户 3 * 4 * @param users 5 * @return 6 */ 7 public synchronized void addUnCrawedUsers(List<String> users) { 8 // 添加未访问的用户 9 for (String user : users) { 10 if (!crawedUsers.contains(user)) 11 unCrawedUsers.add(user); 12 } 13 } 14 15 /** 16 * 从队列中取一个元素,并把这个元素添加到已经访问的集合中,以免重复访问。 17 * 18 * 19 * @return 20 */ 21 public synchronized String pickOne() { 22 String newId = unCrawedUsers.poll(); 23 // 队列中可能包含重复的id,因为插入队列时只检查是否在访问集合里, 24 // 没有检查是否已经出现在队列里 25 while (crawedUsers.contains(newId)) { 26 newId = unCrawedUsers.poll(); 27 } 28 //访问前先把添加到已经访问的集合中 29 crawedUsers.add(newId); 30 return newId; 31 } 32 33 /** 34 * 添加访问出错的用户 35 * 36 * @param userId 37 */ 38 public synchronized void addErrorUser(String userId) { 39 errorUsers.add(userId); 40 }
五、安全地终止爬虫
大丈夫能伸能屈,能走能停,爬虫也当如此。爬博客园的用户时,我真是提心吊胆,担心管理员封我的账号,我尽量挑凌晨的时间爬数据,另外就是爬了一会后,我就停下来,过一段时间再爬。要让一个多线程的程序平稳的停下来还真不简单,最担心的就是死锁,发送停止命令后,线程不动却也没有终止,爬了好久的Buffer空间没有保存,那个恨啊。
我的思路:
1、爬虫启动时,向Buffer注册一下,buffer记录启动的爬虫数量;
2、对爬虫设置一个全局的标志,爬虫在每次爬取一个用户前检查终止标志是否被设置;
3、当发生停止命令时,爬虫检查到停止标志被设置,于是通知Buffer自己将要停止,通知完后就结束运行;
4、Buffer收到爬虫停止通知后,将爬虫计数器减1,当计数器为0时,保存工作空间,同时通知关闭日志;
5、为了避免有些爬虫在运行异常时推出而没有通知Buffer,在发出停止命令时,同时通知Buffer准备停止,Buffer设置一个计时器,2分钟后,强制保存爬虫状态和日志。
完整的代码还是放在Github上,有兴趣的同学可以看看。
感谢阅读,转载请注明出处:http://www.cnblogs.com/fengfenggirl/
http://www.cnblogs.com/fengfenggirl/p/cnblogs-crawler.html
大数据之网络爬虫-一个简单的多线程爬虫,布布扣,bubuko.com
原文:http://www.cnblogs.com/pengkunfan/p/3746629.html