一、入口类是Main方法,读取zoo.cfg文件
org.apache.zookeeper.server.quorum.QuorumPeerMain
?
public static void main(String[] args) {
? ? ? ? QuorumPeerMain main = new QuorumPeerMain();
? ? ? ? main.initializeAndRun(args);?
}
?
?
二、读取zoo.cfg配置文件,初始化参数
protected void initializeAndRun(String[] args){
? ? ? ? QuorumPeerConfig config = new QuorumPeerConfig();
? ? ? ? if (args.length == 1) {
? ? ? ? ? ? //读取zoo.cfg文件,默认通过args传递进来的
? ? ? ? ? ? config.parse(args[0]);
? ? ? ? }
? ? ? ??
? ? ? ? // 启动定时任务,清理ZK的数据目录,防止文件过大
? ? ? ? // Start and schedule the the purge task
? ? ? ? DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
? ? ? ? ? ? ? ? .getDataDir(), config.getDataLogDir(), config
? ? ? ? ? ? ? ? .getSnapRetainCount(), config.getPurgeInterval());
? ? ? ? purgeMgr.start();
?
? ? ? ? if (args.length == 1 && config.servers.size() > 0) {
? ? ? ? ? ? runFromConfig(config);
? ? ? ? } else {
? ? ? ? ? ? LOG.warn("Either no config or no quorum defined in config, running "
? ? ? ? ? ? ? ? ? ? + " in standalone mode");
? ? ? ? ? ? // there is only server in the quorum -- run as standalone
? ? ? ? ? ? ZooKeeperServerMain.main(args);
? ? ? ? }
? }
?
?
三、调用Properties?方法load配置文件
?/**
? ? ?* Parse a ZooKeeper configuration file
? ? ?* @param path the patch of the configuration file
? ? ?* @throws ConfigException error processing configuration
? ? ?*/
? ? public void parse(String path) throws ConfigException {
? ? ? ? File configFile = new File(path);
?
? ? ? ? LOG.info("Reading configuration from: " + configFile);
?
? ? ? ? try {
? ? ? ? ? ? if (!configFile.exists()) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException(configFile.toString()
? ? ? ? ? ? ? ? ? ? ? ? + " file is missing");
? ? ? ? ? ? }
?
? ? ? ? ? ? Properties cfg = new Properties();
? ? ? ? ? ? FileInputStream in = new FileInputStream(configFile);
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? cfg.load(in);
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? in.close();
? ? ? ? ? ? }
?
? ? ? ? ? ? parseProperties(cfg);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? throw new ConfigException("Error processing " + path, e);
? ? ? ? } catch (IllegalArgumentException e) {
? ? ? ? ? ? throw new ConfigException("Error processing " + path, e);
? ? ? ? }
? ? }
?
四、遍历Properties 获取属性值
?/**
? ? ?* Parse config from a Properties.
? ? ?* @param zkProp Properties to parse from.
? ? ?* @throws IOException
? ? ?* @throws ConfigException
? ? ?*/
? ? public void parseProperties(Properties zkProp)
? ? throws IOException, ConfigException {
? ? ? ? int clientPort = 0;
? ? ? ? String clientPortAddress = null;
? ? ? ? for (Entry<Object, Object> entry : zkProp.entrySet()) {
? ? ? ? ? ? String key = entry.getKey().toString().trim();
? ? ? ? ? ? String value = entry.getValue().toString().trim();
? ? ? ? ? ? if (key.equals("dataDir")) {
? ? ? ? ? ? ? ? dataDir = value;
? ? ? ? ? ? } else if (key.equals("dataLogDir")) {
? ? ? ? ? ? ? ? dataLogDir = value;
? ? ? ? ? ? } else if (key.equals("clientPort")) {
? ? ? ? ? ? ? ? clientPort = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("clientPortAddress")) {
? ? ? ? ? ? ? ? clientPortAddress = value.trim();
? ? ? ? ? ? } else if (key.equals("tickTime")) {
? ? ? ? ? ? ? ? tickTime = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("maxClientCnxns")) {
? ? ? ? ? ? ? ? maxClientCnxns = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("minSessionTimeout")) {
? ? ? ? ? ? ? ? minSessionTimeout = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("maxSessionTimeout")) {
? ? ? ? ? ? ? ? maxSessionTimeout = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("initLimit")) {
? ? ? ? ? ? ? ? initLimit = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("syncLimit")) {
? ? ? ? ? ? ? ? syncLimit = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("electionAlg")) {
? ? ? ? ? ? ? ? electionAlg = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("quorumListenOnAllIPs")) {
? ? ? ? ? ? ? ? quorumListenOnAllIPs = Boolean.parseBoolean(value);
? ? ? ? ? ? } else if (key.equals("peerType")) {
? ? ? ? ? ? ? ? if (value.toLowerCase().equals("observer")) {
? ? ? ? ? ? ? ? ? ? peerType = LearnerType.OBSERVER;
? ? ? ? ? ? ? ? } else if (value.toLowerCase().equals("participant")) {
? ? ? ? ? ? ? ? ? ? peerType = LearnerType.PARTICIPANT;
? ? ? ? ? ? ? ? } else
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? throw new ConfigException("Unrecognised peertype: " + value);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else if (key.equals( "syncEnabled" )) {
? ? ? ? ? ? ? ? syncEnabled = Boolean.parseBoolean(value);
? ? ? ? ? ? } else if (key.equals("autopurge.snapRetainCount")) {
? ? ? ? ? ? ? ? snapRetainCount = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.equals("autopurge.purgeInterval")) {
? ? ? ? ? ? ? ? purgeInterval = Integer.parseInt(value);
? ? ? ? ? ? } else if (key.startsWith("server.")) {
? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);
? ? ? ? ? ? ? ? long sid = Long.parseLong(key.substring(dot + 1));
? ? ? ? ? ? ? ? String parts[] = value.split(":");
? ? ? ? ? ? ? ? if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
? ? ? ? ? ? ? ? ? ? ? //Zookeeper 配置集群的三种方法
? ? ? ? ? ? ? ? ? ? LOG.error(value
? ? ? ? ? ? ? ? ? ? ? ?+ " does not have the form host:port or host:port:port " +
? ? ? ? ? ? ? ? ? ? ? ?" or host:port:port:type");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? InetSocketAddress addr = new InetSocketAddress(parts[0],
? ? ? ? ? ? ? ? ? ? ? ? Integer.parseInt(parts[1]));
? ? ? ? ? ? ? ? if (parts.length == 2) {
? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
? ? ? ? ? ? ? ? } else if (parts.length == 3) {
? ? ? ? ? ? ? ? ? ? InetSocketAddress electionAddr = new InetSocketAddress(
? ? ? ? ? ? ? ? ? ? ? ? ? ? parts[0], Integer.parseInt(parts[2]));
? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr));
? ? ? ? ? ? ? ? } else if (parts.length == 4) {
? ? ? ? ? ? ? ? ? ? InetSocketAddress electionAddr = new InetSocketAddress(
? ? ? ? ? ? ? ? ? ? ? ? ? ? parts[0], Integer.parseInt(parts[2]));
? ? ? ? ? ? ? ? ? ? LearnerType type = LearnerType.PARTICIPANT;
? ? ? ? ? ? ? ? ? ? if (parts[3].toLowerCase().equals("observer")) {
? ? ? ? ? ? ? ? ? ? ? ? type = LearnerType.OBSERVER;
? ? ? ? ? ? ? ? ? ? ? ? observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr,type));
? ? ? ? ? ? ? ? ? ? } else if (parts[3].toLowerCase().equals("participant")) {
? ? ? ? ? ? ? ? ? ? ? ? type = LearnerType.PARTICIPANT;
? ? ? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr,type));
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? throw new ConfigException("Unrecognised peertype: " + value);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else if (key.startsWith("group")) {
? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);
? ? ? ? ? ? ? ? long gid = Long.parseLong(key.substring(dot + 1));
?
? ? ? ? ? ? ? ? numGroups++;
?
? ? ? ? ? ? ? ? String parts[] = value.split(":");
? ? ? ? ? ? ? ? for(String s : parts){
? ? ? ? ? ? ? ? ? ? long sid = Long.parseLong(s);
? ? ? ? ? ? ? ? ? ? if(serverGroup.containsKey(sid))
? ? ? ? ? ? ? ? ? ? ? ? throw new ConfigException("Server " + sid + "is in multiple groups");
? ? ? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? ? ? ? ? serverGroup.put(sid, gid);
? ? ? ? ? ? ? ? }
?
? ? ? ? ? ? } else if(key.startsWith("weight")) {
? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);
? ? ? ? ? ? ? ? long sid = Long.parseLong(key.substring(dot + 1));
? ? ? ? ? ? ? ? serverWeight.put(sid, Long.parseLong(value));
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? System.setProperty("zookeeper." + key, value);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ??
? ? ? ? // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
? ? ? ? // PurgeTxnLog.purge(File, File, int) will not allow to purge less
? ? ? ? // than 3.
? ? ? ? if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {
? ? ? ? ? ? LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount
? ? ? ? ? ? ? ? ? ? + ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);
? ? ? ? ? ? snapRetainCount = MIN_SNAP_RETAIN_COUNT;
? ? ? ? }
?
? ? ? ? if (dataDir == null) {
? ? ? ? ? ? throw new IllegalArgumentException("dataDir is not set");
? ? ? ? }
? ? ? ? if (dataLogDir == null) {
? ? ? ? ? ? dataLogDir = dataDir;
? ? ? ? } else {
? ? ? ? ? ? if (!new File(dataLogDir).isDirectory()) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("dataLogDir " + dataLogDir
? ? ? ? ? ? ? ? ? ? ? ? + " is missing.");
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? if (clientPort == 0) {
? ? ? ? ? ? throw new IllegalArgumentException("clientPort is not set");
? ? ? ? }
? ? ? ? if (clientPortAddress != null) {
? ? ? ? ? ? this.clientPortAddress = new InetSocketAddress(
? ? ? ? ? ? ? ? ? ? InetAddress.getByName(clientPortAddress), clientPort);
? ? ? ? } else {
? ? ? ? ? ? this.clientPortAddress = new InetSocketAddress(clientPort);
? ? ? ? }
?
? ? ? ? if (tickTime == 0) {
? ? ? ? ? ? throw new IllegalArgumentException("tickTime is not set");
? ? ? ? }
? ? ? ? if (minSessionTimeout > maxSessionTimeout) {
? ? ? ? ? ? throw new IllegalArgumentException(
? ? ? ? ? ? ? ? ? ? "minSessionTimeout must not be larger than maxSessionTimeout");
? ? ? ? }
? ? ? ? if (servers.size() == 0) {
? ? ? ? ? ? if (observers.size() > 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
? ? ? ? ? ? }
? ? ? ? ? ? // Not a quorum configuration so return immediately - not an error
? ? ? ? ? ? // case (for b/w compatibility), server will default to standalone
? ? ? ? ? ? // mode.
? ? ? ? ? ? return;
? ? ? ? } else if (servers.size() == 1) {
? ? ? ? ? ? if (observers.size() > 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
? ? ? ? ? ? }
?
? ? ? ? ? ? // HBase currently adds a single server line to the config, for
? ? ? ? ? ? // b/w compatibility reasons we need to keep this here.
? ? ? ? ? ? LOG.error("Invalid configuration, only one server specified (ignoring)");
? ? ? ? ? ? servers.clear();
? ? ? ? } else if (servers.size() > 1) {
? ? ? ? ? ? if (servers.size() == 2) {
? ? ? ? ? ? ? ? LOG.warn("No server failure will be tolerated. " +
? ? ? ? ? ? ? ? ? ? "You need at least 3 servers.");
? ? ? ? ? ? } else if (servers.size() % 2 == 0) {
? ? ? ? ? ? ? ? LOG.warn("Non-optimial configuration, consider an odd number of servers.");
? ? ? ? ? ? }
? ? ? ? ? ? if (initLimit == 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("initLimit is not set");
? ? ? ? ? ? }
? ? ? ? ? ? if (syncLimit == 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("syncLimit is not set");
? ? ? ? ? ? }
? ? ? ? ? ? /*
? ? ? ? ? ? ?* If using FLE, then every server requires a separate election
? ? ? ? ? ? ?* port.
? ? ? ? ? ? ?*/
? ? ? ? ? ? if (electionAlg != 0) {
? ? ? ? ? ? ? ? for (QuorumServer s : servers.values()) {
? ? ? ? ? ? ? ? ? ? if (s.electionAddr == null)
? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalArgumentException(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "Missing election port for server: " + s.id);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
?
? ? ? ? ? ? /*
? ? ? ? ? ? ?* Default of quorum config is majority
? ? ? ? ? ? ?*/
? ? ? ? ? ? if(serverGroup.size() > 0){
? ? ? ? ? ? ? ? if(servers.size() != serverGroup.size())
? ? ? ? ? ? ? ? ? ? throw new ConfigException("Every server must be in exactly one group");
? ? ? ? ? ? ? ? /*
? ? ? ? ? ? ? ? ?* The deafult weight of a server is 1
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? for(QuorumServer s : servers.values()){
? ? ? ? ? ? ? ? ? ? if(!serverWeight.containsKey(s.id))
? ? ? ? ? ? ? ? ? ? ? ? serverWeight.put(s.id, (long) 1);
? ? ? ? ? ? ? ? }
?
? ? ? ? ? ? ? ? /*
? ? ? ? ? ? ? ? ?* Set the quorumVerifier to be QuorumHierarchical
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? quorumVerifier = new QuorumHierarchical(numGroups,
? ? ? ? ? ? ? ? ? ? ? ? serverWeight, serverGroup);
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? /*
? ? ? ? ? ? ? ? ?* The default QuorumVerifier is QuorumMaj
? ? ? ? ? ? ? ? ?*/
?
? ? ? ? ? ? ? ? LOG.info("Defaulting to majority quorums");
? ? ? ? ? ? ? ? quorumVerifier = new QuorumMaj(servers.size());
? ? ? ? ? ? }
?
? ? ? ? ? ? // Now add observers to servers, once the quorums have been
? ? ? ? ? ? // figured out
? ? ? ? ? ? servers.putAll(observers);
? ??
? ? ? ? ? ? File myIdFile = new File(dataDir, "myid");
? ? ? ? ? ? if (!myIdFile.exists()) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException(myIdFile.toString()
? ? ? ? ? ? ? ? ? ? ? ? + " file is missing");
? ? ? ? ? ? }
? ? ? ? ? ? BufferedReader br = new BufferedReader(new FileReader(myIdFile));
? ? ? ? ? ? String myIdString;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? myIdString = br.readLine();
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? br.close();
? ? ? ? ? ? }
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? serverId = Long.parseLong(myIdString);
? ? ? ? ? ? ? ? MDC.put("myid", myIdString);
? ? ? ? ? ? } catch (NumberFormatException e) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("serverid " + myIdString
? ? ? ? ? ? ? ? ? ? ? ? + " is not a number");
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? ? ? // Warn about inconsistent peer type
? ? ? ? ? ? LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER
? ? ? ? ? ? ? ? ? ? : LearnerType.PARTICIPANT;
? ? ? ? ? ? if (roleByServersList != peerType) {
? ? ? ? ? ? ? ? LOG.warn("Peer type from servers list (" + roleByServersList
? ? ? ? ? ? ? ? ? ? ? ? + ") doesn‘t match peerType (" + peerType
? ? ? ? ? ? ? ? ? ? ? ? + "). Defaulting to servers list.");
? ??
? ? ? ? ? ? ? ? peerType = roleByServersList;
? ? ? ? ? ? }
? ? ? ? }
? ? }
?
?
?
?
五、启动清理数据文件的进程
?
public void start() {
? ? ? ? if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
? ? ? ? ? ? LOG.warn("Purge task is already running.");
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? // Don‘t schedule the purge task with zero or negative purge interval.
? ? ? ? if (purgeInterval <= 0) {
? ? ? ? ? ? LOG.info("Purge task is not scheduled.");
? ? ? ? ? ? return;
? ? ? ? }
?
? ? ? ? timer = new Timer("PurgeTask", true);
? ? ? ? TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
? ? ? ? timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
?
? ? ? ? purgeTaskStatus = PurgeTaskStatus.STARTED;
? ? }
?
?
?
六、启动线程进行清理
?static class PurgeTask extends TimerTask {
? ? ? ? private String logsDir;
? ? ? ? private String snapsDir;
? ? ? ? private int snapRetainCount;
?
? ? ? ? public PurgeTask(String dataDir, String snapDir, int count) {
? ? ? ? ? ? logsDir = dataDir;
? ? ? ? ? ? snapsDir = snapDir;
? ? ? ? ? ? snapRetainCount = count;
? ? ? ? }
?
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? LOG.info("Purge task started.");
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? LOG.error("Error occured while purging.", e);
? ? ? ? ? ? }
? ? ? ? ? ? LOG.info("Purge task completed.");
? ? ? ? }
? ? }
?
?
?
七、真正的过滤删除文件
/**
? ? ?* purges the snapshot and logs keeping the last num snapshots?
? ? ?* and the corresponding logs.
? ? ?* @param dataDir the dir that has the logs
? ? ?* @param snapDir the dir that has the snapshots
? ? ?* @param num the number of snapshots to keep
? ? ?* @throws IOException
? ? ?*/
? ? public static void purge(File dataDir, File snapDir, int num) throws IOException {
? ? ? ? if (num < 3) {
? ? ? ? ? ? throw new IllegalArgumentException("count should be greater than 3");
? ? ? ? }
?
? ? ? ? FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
? ? ? ??
? ? ? ? // found any valid recent snapshots?
? ? ? ??
? ? ? ? // files to exclude from deletion
? ? ? ? Set<File> exc=new HashSet<File>();
? ? ? ? List<File> snaps = txnLog.findNRecentSnapshots(num);
? ? ? ? if (snaps.size() == 0)?
? ? ? ? ? ? return;
? ? ? ? File snapShot = snaps.get(snaps.size() -1);
? ? ? ? for (File f: snaps) {
? ? ? ? ? ? exc.add(f);
? ? ? ? }
? ? ? ? long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");
? ? ? ? exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));
?
? ? ? ? final Set<File> exclude=exc;
? ? ? ? class MyFileFilter implements FileFilter{
? ? ? ? ? ? private final String prefix;
? ? ? ? ? ? MyFileFilter(String prefix){
? ? ? ? ? ? ? ? this.prefix=prefix;
? ? ? ? ? ? }
? ? ? ? ? ? public boolean accept(File f){
? ? ? ? ? ? ? ? if(!f.getName().startsWith(prefix) || exclude.contains(f))
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // add all non-excluded log files
? ? ? ? List<File> files=new ArrayList<File>(
? ? ? ? ? ? ? ? Arrays.asList(txnLog.getDataDir().listFiles(new MyFileFilter("log."))));
? ? ? ? // add all non-excluded snapshot files to the deletion list
? ? ? ? files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(new MyFileFilter("snapshot."))));
? ? ? ? // remove the old files
? ? ? ? for(File f: files)
? ? ? ? {
? ? ? ? ? ? System.out.println("Removing file: "+
? ? ? ? ? ? ? ? DateFormat.getDateTimeInstance().format(f.lastModified())+
? ? ? ? ? ? ? ? "\t"+f.getPath());
? ? ? ? ? ? if(!f.delete()){
? ? ? ? ? ? ? ? System.err.println("Failed to remove "+f.getPath());
? ? ? ? ? ? }
? ? ? ? }
?
? ? }
?
?
?八、这个函数功能自己研究
?public void runFromConfig(QuorumPeerConfig config) throws IOException {
? ? ? try {
? ? ? ? ? ManagedUtil.registerLog4jMBeans();
? ? ? } catch (JMException e) {
? ? ? ? ? LOG.warn("Unable to register log4j JMX control", e);
? ? ? }
??
? ? ? LOG.info("Starting quorum peer");
? ? ? try {
? ? ? ? ? ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
? ? ? ? ? cnxnFactory.configure(config.getClientPortAddress(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? config.getMaxClientCnxns());
??
? ? ? ? ? quorumPeer = new QuorumPeer();
? ? ? ? ? quorumPeer.setClientPortAddress(config.getClientPortAddress());
? ? ? ? ? quorumPeer.setTxnFactory(new FileTxnSnapLog(
? ? ? ? ? ? ? ? ? ? ? new File(config.getDataLogDir()),
? ? ? ? ? ? ? ? ? ? ? new File(config.getDataDir())));
? ? ? ? ? quorumPeer.setQuorumPeers(config.getServers());
? ? ? ? ? quorumPeer.setElectionType(config.getElectionAlg());
? ? ? ? ? quorumPeer.setMyid(config.getServerId());
? ? ? ? ? quorumPeer.setTickTime(config.getTickTime());
? ? ? ? ? quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
? ? ? ? ? quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
? ? ? ? ? quorumPeer.setInitLimit(config.getInitLimit());
? ? ? ? ? quorumPeer.setSyncLimit(config.getSyncLimit());
? ? ? ? ? quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
? ? ? ? ? quorumPeer.setCnxnFactory(cnxnFactory);
? ? ? ? ? quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
? ? ? ? ? quorumPeer.setLearnerType(config.getPeerType());
? ? ? ? ? quorumPeer.setSyncEnabled(config.getSyncEnabled());
? ? ? ? ? quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
??
? ? ? ? ? quorumPeer.start();
? ? ? ? ? quorumPeer.join();
? ? ? } catch (InterruptedException e) {
? ? ? ? ? // warn, but generally this is ok
? ? ? ? ? LOG.warn("Quorum Peer interrupted", e);
? ? ? }
? ? }
?
原文:http://gaojingsong.iteye.com/blog/2309923