本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略。本文以及后续的文章将重点介绍Replication策略。Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步。本文先介绍在SolrCloud的leader到replica的数据同步,下一篇文章将介绍通过配置SolrConfig.xml来实现多个Solr节点间的主从同步。
Replication的作用在前文已经介绍过了,当需要同步的数据较多时候,Solr会放弃按document为单位的同步模式(PeerSync)而采用以文件为最小单位的同步模式。在Replication的过程中,重点使用了SnapPuller类,它封装了对leader数据快照以及通过快照来实现同步的代码。Replication流程原理如下图所示。接下来根据源码来介绍每一步骤。
1 private void commitOnLeader(String leaderUrl) throws SolrServerException, 2 IOException { 3 HttpSolrServer server = new HttpSolrServer(leaderUrl); 4 try { 5 server.setConnectionTimeout(30000); 6 UpdateRequest ureq = new UpdateRequest(); 7 ureq.setParams(new ModifiableSolrParams()); 8 ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true); 9 ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false); 10 ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process( 11 server); 12 } finally { 13 server.shutdown(); 14 } 15 }
1 //get the current ‘replicateable‘ index version in the master 2 NamedList response = null; 3 try { 4 response = getLatestVersion(); 5 } catch (Exception e) { 6 LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage()); 7 return false; 8 } 9 long latestVersion = (Long) response.get(CMD_INDEX_VERSION); 10 long latestGeneration = (Long) response.get(GENERATION);
1 // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes) 2 IndexCommit commit = core.getDeletionPolicy().getLatestCommit(); 3 if (commit == null) { 4 // Presumably the IndexWriter hasn‘t been opened yet, and hence the deletion policy hasn‘t been updated with commit points 5 RefCounted<SolrIndexSearcher> searcherRefCounted = null; 6 try { 7 searcherRefCounted = core.getNewestSearcher(false); 8 if (searcherRefCounted == null) { 9 LOG.warn("No open searcher found - fetch aborted"); 10 return false; 11 } 12 commit = searcherRefCounted.get().getIndexReader().getIndexCommit(); 13 } finally { 14 if (searcherRefCounted != null) 15 searcherRefCounted.decref(); 16 } 17 }
1 if (latestVersion == 0L) { 2 if (forceReplication && commit.getGeneration() != 0) { 3 // since we won‘t get the files for an empty index, 4 // we just clear ours and commit 5 RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core); 6 try { 7 iw.get().deleteAll(); 8 } finally { 9 iw.decref(); 10 } 11 SolrQueryRequest req = new LocalSolrQueryRequest(core, 12 new ModifiableSolrParams()); 13 core.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); 14 } 15 16 //there is nothing to be replicated 17 successfulInstall = true; 18 return true; 19 }
1 if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) { 2 //master and slave are already in sync just return 3 LOG.info("Slave in sync with master."); 4 successfulInstall = true; 5 return true; 6 }
1 // get the list of files first 2 fetchFileList(latestGeneration); 3 // this can happen if the commit point is deleted before we fetch the file list. 4 if(filesToDownload.isEmpty()) return false; 5 6 private void fetchFileList(long gen) throws IOException { 7 ModifiableSolrParams params = new ModifiableSolrParams(); 8 params.set(COMMAND, CMD_GET_FILE_LIST); 9 params.set(GENERATION, String.valueOf(gen)); 10 params.set(CommonParams.WT, "javabin"); 11 params.set(CommonParams.QT, "/replication"); 12 QueryRequest req = new QueryRequest(params); 13 HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler 14 try { 15 server.setSoTimeout(60000); 16 server.setConnectionTimeout(15000); 17 NamedList response = server.request(req); 18 19 List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST); 20 if (files != null) 21 filesToDownload = Collections.synchronizedList(files); 22 else { 23 filesToDownload = Collections.emptyList(); 24 LOG.error("No files to download for index generation: "+ gen); 25 } 26 27 files = (List<Map<String,Object>>) response.get(CONF_FILES); 28 if (files != null) 29 confFilesToDownload = Collections.synchronizedList(files); 30 31 } catch (SolrServerException e) { 32 throw new IOException(e); 33 } finally { 34 server.shutdown(); 35 } 36 }
1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date()); 2 tmpIndex = createTempindexDir(core, tmpIdxDirName); 3 4 tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); 5 6 // cindex dir... 7 indexDirPath = core.getIndexDir(); 8 indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied 2 // then a new index directory to be created and all the files need to be copied 3 boolean isFullCopyNeeded = IndexDeletionPolicyWrapper 4 .getCommitTimestamp(commit) >= latestVersion 5 || commit.getGeneration() >= latestGeneration || forceReplication; 6 7 if (isIndexStale(indexDir)) { 8 isFullCopyNeeded = true; 9 } 10 11 if (!isFullCopyNeeded) { 12 // rollback - and do it before we download any files 13 // so we don‘t remove files we thought we didn‘t need 14 // to download later 15 solrCore.getUpdateHandler().getSolrCoreState() 16 .closeIndexWriter(core, true); 17 }
1 private void downloadIndexFiles(boolean downloadCompleteIndex, 2 Directory indexDir, Directory tmpIndexDir, long latestGeneration) 3 throws Exception { 4 if (LOG.isDebugEnabled()) { 5 LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll())); 6 } 7 for (Map<String,Object> file : filesToDownload) { 8 if (!slowFileExists(indexDir, (String) file.get(NAME)) 9 || downloadCompleteIndex) { 10 dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file, 11 (String) file.get(NAME), false, latestGeneration); 12 currentFile = file; 13 dirFileFetcher.fetchFile(); 14 filesDownloaded.add(new HashMap<>(file)); 15 } else { 16 LOG.info("Skipping download for " + file.get(NAME) 17 + " because it already exists"); 18 } 19 } 20 } 21 22 /** 23 * The main method which downloads file 24 */ 25 void fetchFile() throws Exception { 26 try { 27 while (true) { 28 final FastInputStream is = getStream(); 29 int result; 30 try { 31 //fetch packets one by one in a single request 32 result = fetchPackets(is); 33 if (result == 0 || result == NO_CONTENT) { 34 35 return; 36 } 37 //if there is an error continue. But continue from the point where it got broken 38 } finally { 39 IOUtils.closeQuietly(is); 40 } 41 } 42 } finally { 43 cleanup(); 44 //if cleanup suceeds . The file is downloaded fully. do an fsync 45 fsyncService.submit(new Runnable(){ 46 @Override 47 public void run() { 48 try { 49 copy2Dir.sync(Collections.singleton(saveAs)); 50 } catch (IOException e) { 51 fsyncException = e; 52 } 53 } 54 }); 55 } 56 }
1 //get the details of the local conf files with the same alias/name 2 List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache); 3 //compare their size/checksum to see if 4 for (Map<String, Object> fileInfo : localFilesInfo) { 5 String name = (String) fileInfo.get(NAME); 6 Map<String, Object> m = nameVsFile.get(name); 7 if (m == null) continue; // the file is not even present locally (so must be downloaded) 8 if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) { 9 nameVsFile.remove(name); //checksums are same so the file need not be downloaded 10 } 11 }
1 private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception { 2 LOG.info("Starting download of configuration files from master: " + confFilesToDownload); 3 confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); 4 File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); 5 try { 6 boolean status = tmpconfDir.mkdirs(); 7 if (!status) { 8 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 9 "Failed to create temporary config folder: " + tmpconfDir.getName()); 10 } 11 for (Map<String, Object> file : confFilesToDownload) { 12 String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS)); 13 localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration); 14 currentFile = file; 15 localFileFetcher.fetchFile(); 16 confFilesDownloaded.add(new HashMap<>(file)); 17 } 18 // this is called before copying the files to the original conf dir 19 // so that if there is an exception avoid corrupting the original files. 20 terminateAndWaitFsyncService(); 21 copyTmpConfFiles2Conf(tmpconfDir); 22 } finally { 23 delTree(tmpconfDir); 24 } 25 }
1 if (isFullCopyNeeded) { 2 successfulInstall = modifyIndexProps(tmpIdxDirName); 3 deleteTmpIdxDir = false; 4 } else { 5 successfulInstall = moveIndexFiles(tmpIndexDir, indexDir); 6 }
接下来要重点介绍下modifyIndexProps和moveIndexFiles的实现。前文讲到,同步索引文件时候,下载下来的数据会存放在data目录下,以index. 加上同步开始时间的时间戳结构的目录下。当下载数据完成后,Replication会在同级目录下新建index.property文件。该文件内只会放入一句内容,index= index.2014XXXXXXXXXX,这样做的目的就是将索引目录index重定向到index.2014XXXXXXXXXX上,这个时候相当于index.2014XXXXXXXXXX成为了index目录。然后就可以删除原来的index目录了。
而moveIndexFiles则比较简单,即将临时文件下的索引文件都拷贝到正在用的index目录上,其中segment_N文件最后复制。
Replication的一次同步过程就这么结束了,但是有个问题需要搞清楚,那就是在进行Replication的时候即shard的状态recoverying时候,分片是可以建索引的但是不能进行查询。在同步的时候,新进来的数据会进入到ulog中,但是这些数据是否会进入索引文件中?源码上我还没有发现可以证明新进来的数据的只会进入ulog中,但是不会进入索引文件。
目前我认为,当shard变为recoverying时候,新进来的请求只会进入ulog中,而不会进入索引文件中,原因有3:
1. 因为一旦有新数据写入旧索引文件中,索引文件发送变化了,那么下载好后的数据(索引文件)就不好替换旧的索引文件。
2. 在同步过程中,如果isFullCopyNeeded是false,那么就会close indexwriter,既然关闭了indexwriter就无法写入新的数据。而如果isFullCopyNeeded是true的话,因为整个index目录替换,所以是否关闭indexeriter也没啥意义。
3. 在recovery过程中,当同步replication结束后,会进行replay过程,该过程就是将ulog中的请求重新进行一遍。
以上是我目前的猜测,待我搞明白了再来修改这部分内容,或者是否有网友能指导下。
在整个recovery过程中,当replication结束后,会调用replay的来将ulog的请求重新刷入索引文件中。replay过程的本质是调用ulog的LogReplayer线程。
1 for(;;) { 2 TransactionLog translog = translogs.pollFirst(); 3 if (translog == null) break; 4 doReplay(translog); 5 }
1 tlogReader = translog.getReader(recoveryInfo.positionOfStart); 2 3 // NOTE: we don‘t currently handle a core reload during recovery. This would cause the core 4 // to change underneath us. 5 6 UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); 7 UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { isLeader = false; // we actually might be the leader, but we don‘t want leader-logic for these types of updates anyway. forwardToLeader = false; return nodes; }
本文主要介绍了SolrCloud中Replication的原理以及过程,同时简要讲述LogReplayer的doReplay线程对ulog的日志进行recovery。下文将要重点介绍主从模式下的Replication的配置以及使用。
Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)
原文:http://www.cnblogs.com/rcfeng/p/4148733.html