首页 > 其他 > 详细

Zookeeper3.4.6 源码解读

时间:2016-07-09 02:08:20      阅读:283      评论:0      收藏:0      [点我收藏+]

一、入口类是Main方法,读取zoo.cfg文件

org.apache.zookeeper.server.quorum.QuorumPeerMain

?

public static void main(String[] args) {

? ? ? ? QuorumPeerMain main = new QuorumPeerMain();

? ? ? ? main.initializeAndRun(args);?

}

?

?

二、读取zoo.cfg配置文件,初始化参数

protected void initializeAndRun(String[] args){

? ? ? ? QuorumPeerConfig config = new QuorumPeerConfig();

? ? ? ? if (args.length == 1) {

? ? ? ? ? ? //读取zoo.cfg文件,默认通过args传递进来的

? ? ? ? ? ? config.parse(args[0]);

? ? ? ? }

? ? ? ??

? ? ? ? // 启动定时任务,清理ZK的数据目录,防止文件过大

? ? ? ? // Start and schedule the the purge task

? ? ? ? DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config

? ? ? ? ? ? ? ? .getDataDir(), config.getDataLogDir(), config

? ? ? ? ? ? ? ? .getSnapRetainCount(), config.getPurgeInterval());

? ? ? ? purgeMgr.start();

?

? ? ? ? if (args.length == 1 && config.servers.size() > 0) {

? ? ? ? ? ? runFromConfig(config);

? ? ? ? } else {

? ? ? ? ? ? LOG.warn("Either no config or no quorum defined in config, running "

? ? ? ? ? ? ? ? ? ? + " in standalone mode");

? ? ? ? ? ? // there is only server in the quorum -- run as standalone

? ? ? ? ? ? ZooKeeperServerMain.main(args);

? ? ? ? }

? }

?

?

三、调用Properties?方法load配置文件

?/**

? ? ?* Parse a ZooKeeper configuration file

? ? ?* @param path the patch of the configuration file

? ? ?* @throws ConfigException error processing configuration

? ? ?*/

? ? public void parse(String path) throws ConfigException {

? ? ? ? File configFile = new File(path);

?

? ? ? ? LOG.info("Reading configuration from: " + configFile);

?

? ? ? ? try {

? ? ? ? ? ? if (!configFile.exists()) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException(configFile.toString()

? ? ? ? ? ? ? ? ? ? ? ? + " file is missing");

? ? ? ? ? ? }

?

? ? ? ? ? ? Properties cfg = new Properties();

? ? ? ? ? ? FileInputStream in = new FileInputStream(configFile);

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? cfg.load(in);

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? in.close();

? ? ? ? ? ? }

?

? ? ? ? ? ? parseProperties(cfg);

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? throw new ConfigException("Error processing " + path, e);

? ? ? ? } catch (IllegalArgumentException e) {

? ? ? ? ? ? throw new ConfigException("Error processing " + path, e);

? ? ? ? }

? ? }

?

四、遍历Properties 获取属性值

?/**

? ? ?* Parse config from a Properties.

? ? ?* @param zkProp Properties to parse from.

? ? ?* @throws IOException

? ? ?* @throws ConfigException

? ? ?*/

? ? public void parseProperties(Properties zkProp)

? ? throws IOException, ConfigException {

? ? ? ? int clientPort = 0;

? ? ? ? String clientPortAddress = null;

? ? ? ? for (Entry<Object, Object> entry : zkProp.entrySet()) {

? ? ? ? ? ? String key = entry.getKey().toString().trim();

? ? ? ? ? ? String value = entry.getValue().toString().trim();

? ? ? ? ? ? if (key.equals("dataDir")) {

? ? ? ? ? ? ? ? dataDir = value;

? ? ? ? ? ? } else if (key.equals("dataLogDir")) {

? ? ? ? ? ? ? ? dataLogDir = value;

? ? ? ? ? ? } else if (key.equals("clientPort")) {

? ? ? ? ? ? ? ? clientPort = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("clientPortAddress")) {

? ? ? ? ? ? ? ? clientPortAddress = value.trim();

? ? ? ? ? ? } else if (key.equals("tickTime")) {

? ? ? ? ? ? ? ? tickTime = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("maxClientCnxns")) {

? ? ? ? ? ? ? ? maxClientCnxns = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("minSessionTimeout")) {

? ? ? ? ? ? ? ? minSessionTimeout = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("maxSessionTimeout")) {

? ? ? ? ? ? ? ? maxSessionTimeout = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("initLimit")) {

? ? ? ? ? ? ? ? initLimit = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("syncLimit")) {

? ? ? ? ? ? ? ? syncLimit = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("electionAlg")) {

? ? ? ? ? ? ? ? electionAlg = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("quorumListenOnAllIPs")) {

? ? ? ? ? ? ? ? quorumListenOnAllIPs = Boolean.parseBoolean(value);

? ? ? ? ? ? } else if (key.equals("peerType")) {

? ? ? ? ? ? ? ? if (value.toLowerCase().equals("observer")) {

? ? ? ? ? ? ? ? ? ? peerType = LearnerType.OBSERVER;

? ? ? ? ? ? ? ? } else if (value.toLowerCase().equals("participant")) {

? ? ? ? ? ? ? ? ? ? peerType = LearnerType.PARTICIPANT;

? ? ? ? ? ? ? ? } else

? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? throw new ConfigException("Unrecognised peertype: " + value);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else if (key.equals( "syncEnabled" )) {

? ? ? ? ? ? ? ? syncEnabled = Boolean.parseBoolean(value);

? ? ? ? ? ? } else if (key.equals("autopurge.snapRetainCount")) {

? ? ? ? ? ? ? ? snapRetainCount = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.equals("autopurge.purgeInterval")) {

? ? ? ? ? ? ? ? purgeInterval = Integer.parseInt(value);

? ? ? ? ? ? } else if (key.startsWith("server.")) {

? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);

? ? ? ? ? ? ? ? long sid = Long.parseLong(key.substring(dot + 1));

? ? ? ? ? ? ? ? String parts[] = value.split(":");

? ? ? ? ? ? ? ? if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {

? ? ? ? ? ? ? ? ? ? ? //Zookeeper 配置集群的三种方法

? ? ? ? ? ? ? ? ? ? LOG.error(value

? ? ? ? ? ? ? ? ? ? ? ?+ " does not have the form host:port or host:port:port " +

? ? ? ? ? ? ? ? ? ? ? ?" or host:port:port:type");

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? InetSocketAddress addr = new InetSocketAddress(parts[0],

? ? ? ? ? ? ? ? ? ? ? ? Integer.parseInt(parts[1]));

? ? ? ? ? ? ? ? if (parts.length == 2) {

? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));

? ? ? ? ? ? ? ? } else if (parts.length == 3) {

? ? ? ? ? ? ? ? ? ? InetSocketAddress electionAddr = new InetSocketAddress(

? ? ? ? ? ? ? ? ? ? ? ? ? ? parts[0], Integer.parseInt(parts[2]));

? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,

? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr));

? ? ? ? ? ? ? ? } else if (parts.length == 4) {

? ? ? ? ? ? ? ? ? ? InetSocketAddress electionAddr = new InetSocketAddress(

? ? ? ? ? ? ? ? ? ? ? ? ? ? parts[0], Integer.parseInt(parts[2]));

? ? ? ? ? ? ? ? ? ? LearnerType type = LearnerType.PARTICIPANT;

? ? ? ? ? ? ? ? ? ? if (parts[3].toLowerCase().equals("observer")) {

? ? ? ? ? ? ? ? ? ? ? ? type = LearnerType.OBSERVER;

? ? ? ? ? ? ? ? ? ? ? ? observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr,type));

? ? ? ? ? ? ? ? ? ? } else if (parts[3].toLowerCase().equals("participant")) {

? ? ? ? ? ? ? ? ? ? ? ? type = LearnerType.PARTICIPANT;

? ? ? ? ? ? ? ? ? ? ? ? servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? electionAddr,type));

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? throw new ConfigException("Unrecognised peertype: " + value);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else if (key.startsWith("group")) {

? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);

? ? ? ? ? ? ? ? long gid = Long.parseLong(key.substring(dot + 1));

?

? ? ? ? ? ? ? ? numGroups++;

?

? ? ? ? ? ? ? ? String parts[] = value.split(":");

? ? ? ? ? ? ? ? for(String s : parts){

? ? ? ? ? ? ? ? ? ? long sid = Long.parseLong(s);

? ? ? ? ? ? ? ? ? ? if(serverGroup.containsKey(sid))

? ? ? ? ? ? ? ? ? ? ? ? throw new ConfigException("Server " + sid + "is in multiple groups");

? ? ? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? ? ? serverGroup.put(sid, gid);

? ? ? ? ? ? ? ? }

?

? ? ? ? ? ? } else if(key.startsWith("weight")) {

? ? ? ? ? ? ? ? int dot = key.indexOf(‘.‘);

? ? ? ? ? ? ? ? long sid = Long.parseLong(key.substring(dot + 1));

? ? ? ? ? ? ? ? serverWeight.put(sid, Long.parseLong(value));

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? System.setProperty("zookeeper." + key, value);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ??

? ? ? ? // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)

? ? ? ? // PurgeTxnLog.purge(File, File, int) will not allow to purge less

? ? ? ? // than 3.

? ? ? ? if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {

? ? ? ? ? ? LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount

? ? ? ? ? ? ? ? ? ? + ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);

? ? ? ? ? ? snapRetainCount = MIN_SNAP_RETAIN_COUNT;

? ? ? ? }

?

? ? ? ? if (dataDir == null) {

? ? ? ? ? ? throw new IllegalArgumentException("dataDir is not set");

? ? ? ? }

? ? ? ? if (dataLogDir == null) {

? ? ? ? ? ? dataLogDir = dataDir;

? ? ? ? } else {

? ? ? ? ? ? if (!new File(dataLogDir).isDirectory()) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("dataLogDir " + dataLogDir

? ? ? ? ? ? ? ? ? ? ? ? + " is missing.");

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? if (clientPort == 0) {

? ? ? ? ? ? throw new IllegalArgumentException("clientPort is not set");

? ? ? ? }

? ? ? ? if (clientPortAddress != null) {

? ? ? ? ? ? this.clientPortAddress = new InetSocketAddress(

? ? ? ? ? ? ? ? ? ? InetAddress.getByName(clientPortAddress), clientPort);

? ? ? ? } else {

? ? ? ? ? ? this.clientPortAddress = new InetSocketAddress(clientPort);

? ? ? ? }

?

? ? ? ? if (tickTime == 0) {

? ? ? ? ? ? throw new IllegalArgumentException("tickTime is not set");

? ? ? ? }

? ? ? ? if (minSessionTimeout > maxSessionTimeout) {

? ? ? ? ? ? throw new IllegalArgumentException(

? ? ? ? ? ? ? ? ? ? "minSessionTimeout must not be larger than maxSessionTimeout");

? ? ? ? }

? ? ? ? if (servers.size() == 0) {

? ? ? ? ? ? if (observers.size() > 0) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");

? ? ? ? ? ? }

? ? ? ? ? ? // Not a quorum configuration so return immediately - not an error

? ? ? ? ? ? // case (for b/w compatibility), server will default to standalone

? ? ? ? ? ? // mode.

? ? ? ? ? ? return;

? ? ? ? } else if (servers.size() == 1) {

? ? ? ? ? ? if (observers.size() > 0) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");

? ? ? ? ? ? }

?

? ? ? ? ? ? // HBase currently adds a single server line to the config, for

? ? ? ? ? ? // b/w compatibility reasons we need to keep this here.

? ? ? ? ? ? LOG.error("Invalid configuration, only one server specified (ignoring)");

? ? ? ? ? ? servers.clear();

? ? ? ? } else if (servers.size() > 1) {

? ? ? ? ? ? if (servers.size() == 2) {

? ? ? ? ? ? ? ? LOG.warn("No server failure will be tolerated. " +

? ? ? ? ? ? ? ? ? ? "You need at least 3 servers.");

? ? ? ? ? ? } else if (servers.size() % 2 == 0) {

? ? ? ? ? ? ? ? LOG.warn("Non-optimial configuration, consider an odd number of servers.");

? ? ? ? ? ? }

? ? ? ? ? ? if (initLimit == 0) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("initLimit is not set");

? ? ? ? ? ? }

? ? ? ? ? ? if (syncLimit == 0) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("syncLimit is not set");

? ? ? ? ? ? }

? ? ? ? ? ? /*

? ? ? ? ? ? ?* If using FLE, then every server requires a separate election

? ? ? ? ? ? ?* port.

? ? ? ? ? ? ?*/

? ? ? ? ? ? if (electionAlg != 0) {

? ? ? ? ? ? ? ? for (QuorumServer s : servers.values()) {

? ? ? ? ? ? ? ? ? ? if (s.electionAddr == null)

? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalArgumentException(

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "Missing election port for server: " + s.id);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

?

? ? ? ? ? ? /*

? ? ? ? ? ? ?* Default of quorum config is majority

? ? ? ? ? ? ?*/

? ? ? ? ? ? if(serverGroup.size() > 0){

? ? ? ? ? ? ? ? if(servers.size() != serverGroup.size())

? ? ? ? ? ? ? ? ? ? throw new ConfigException("Every server must be in exactly one group");

? ? ? ? ? ? ? ? /*

? ? ? ? ? ? ? ? ?* The deafult weight of a server is 1

? ? ? ? ? ? ? ? ?*/

? ? ? ? ? ? ? ? for(QuorumServer s : servers.values()){

? ? ? ? ? ? ? ? ? ? if(!serverWeight.containsKey(s.id))

? ? ? ? ? ? ? ? ? ? ? ? serverWeight.put(s.id, (long) 1);

? ? ? ? ? ? ? ? }

?

? ? ? ? ? ? ? ? /*

? ? ? ? ? ? ? ? ?* Set the quorumVerifier to be QuorumHierarchical

? ? ? ? ? ? ? ? ?*/

? ? ? ? ? ? ? ? quorumVerifier = new QuorumHierarchical(numGroups,

? ? ? ? ? ? ? ? ? ? ? ? serverWeight, serverGroup);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? /*

? ? ? ? ? ? ? ? ?* The default QuorumVerifier is QuorumMaj

? ? ? ? ? ? ? ? ?*/

?

? ? ? ? ? ? ? ? LOG.info("Defaulting to majority quorums");

? ? ? ? ? ? ? ? quorumVerifier = new QuorumMaj(servers.size());

? ? ? ? ? ? }

?

? ? ? ? ? ? // Now add observers to servers, once the quorums have been

? ? ? ? ? ? // figured out

? ? ? ? ? ? servers.putAll(observers);

? ??

? ? ? ? ? ? File myIdFile = new File(dataDir, "myid");

? ? ? ? ? ? if (!myIdFile.exists()) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException(myIdFile.toString()

? ? ? ? ? ? ? ? ? ? ? ? + " file is missing");

? ? ? ? ? ? }

? ? ? ? ? ? BufferedReader br = new BufferedReader(new FileReader(myIdFile));

? ? ? ? ? ? String myIdString;

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? myIdString = br.readLine();

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? br.close();

? ? ? ? ? ? }

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? serverId = Long.parseLong(myIdString);

? ? ? ? ? ? ? ? MDC.put("myid", myIdString);

? ? ? ? ? ? } catch (NumberFormatException e) {

? ? ? ? ? ? ? ? throw new IllegalArgumentException("serverid " + myIdString

? ? ? ? ? ? ? ? ? ? ? ? + " is not a number");

? ? ? ? ? ? }

? ? ? ? ? ??

? ? ? ? ? ? // Warn about inconsistent peer type

? ? ? ? ? ? LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER

? ? ? ? ? ? ? ? ? ? : LearnerType.PARTICIPANT;

? ? ? ? ? ? if (roleByServersList != peerType) {

? ? ? ? ? ? ? ? LOG.warn("Peer type from servers list (" + roleByServersList

? ? ? ? ? ? ? ? ? ? ? ? + ") doesn‘t match peerType (" + peerType

? ? ? ? ? ? ? ? ? ? ? ? + "). Defaulting to servers list.");

? ??

? ? ? ? ? ? ? ? peerType = roleByServersList;

? ? ? ? ? ? }

? ? ? ? }

? ? }

?

?

?

?

五、启动清理数据文件的进程

?

public void start() {

? ? ? ? if (PurgeTaskStatus.STARTED == purgeTaskStatus) {

? ? ? ? ? ? LOG.warn("Purge task is already running.");

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? // Don‘t schedule the purge task with zero or negative purge interval.

? ? ? ? if (purgeInterval <= 0) {

? ? ? ? ? ? LOG.info("Purge task is not scheduled.");

? ? ? ? ? ? return;

? ? ? ? }

?

? ? ? ? timer = new Timer("PurgeTask", true);

? ? ? ? TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);

? ? ? ? timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

?

? ? ? ? purgeTaskStatus = PurgeTaskStatus.STARTED;

? ? }

?

?

?

六、启动线程进行清理

?static class PurgeTask extends TimerTask {

? ? ? ? private String logsDir;

? ? ? ? private String snapsDir;

? ? ? ? private int snapRetainCount;

?

? ? ? ? public PurgeTask(String dataDir, String snapDir, int count) {

? ? ? ? ? ? logsDir = dataDir;

? ? ? ? ? ? snapsDir = snapDir;

? ? ? ? ? ? snapRetainCount = count;

? ? ? ? }

?

? ? ? ? @Override

? ? ? ? public void run() {

? ? ? ? ? ? LOG.info("Purge task started.");

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? LOG.error("Error occured while purging.", e);

? ? ? ? ? ? }

? ? ? ? ? ? LOG.info("Purge task completed.");

? ? ? ? }

? ? }

?

?

?

七、真正的过滤删除文件

/**

? ? ?* purges the snapshot and logs keeping the last num snapshots?

? ? ?* and the corresponding logs.

? ? ?* @param dataDir the dir that has the logs

? ? ?* @param snapDir the dir that has the snapshots

? ? ?* @param num the number of snapshots to keep

? ? ?* @throws IOException

? ? ?*/

? ? public static void purge(File dataDir, File snapDir, int num) throws IOException {

? ? ? ? if (num < 3) {

? ? ? ? ? ? throw new IllegalArgumentException("count should be greater than 3");

? ? ? ? }

?

? ? ? ? FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

? ? ? ??

? ? ? ? // found any valid recent snapshots?

? ? ? ??

? ? ? ? // files to exclude from deletion

? ? ? ? Set<File> exc=new HashSet<File>();

? ? ? ? List<File> snaps = txnLog.findNRecentSnapshots(num);

? ? ? ? if (snaps.size() == 0)?

? ? ? ? ? ? return;

? ? ? ? File snapShot = snaps.get(snaps.size() -1);

? ? ? ? for (File f: snaps) {

? ? ? ? ? ? exc.add(f);

? ? ? ? }

? ? ? ? long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");

? ? ? ? exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));

?

? ? ? ? final Set<File> exclude=exc;

? ? ? ? class MyFileFilter implements FileFilter{

? ? ? ? ? ? private final String prefix;

? ? ? ? ? ? MyFileFilter(String prefix){

? ? ? ? ? ? ? ? this.prefix=prefix;

? ? ? ? ? ? }

? ? ? ? ? ? public boolean accept(File f){

? ? ? ? ? ? ? ? if(!f.getName().startsWith(prefix) || exclude.contains(f))

? ? ? ? ? ? ? ? ? ? return false;

? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // add all non-excluded log files

? ? ? ? List<File> files=new ArrayList<File>(

? ? ? ? ? ? ? ? Arrays.asList(txnLog.getDataDir().listFiles(new MyFileFilter("log."))));

? ? ? ? // add all non-excluded snapshot files to the deletion list

? ? ? ? files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(new MyFileFilter("snapshot."))));

? ? ? ? // remove the old files

? ? ? ? for(File f: files)

? ? ? ? {

? ? ? ? ? ? System.out.println("Removing file: "+

? ? ? ? ? ? ? ? DateFormat.getDateTimeInstance().format(f.lastModified())+

? ? ? ? ? ? ? ? "\t"+f.getPath());

? ? ? ? ? ? if(!f.delete()){

? ? ? ? ? ? ? ? System.err.println("Failed to remove "+f.getPath());

? ? ? ? ? ? }

? ? ? ? }

?

? ? }

?

?

?八、这个函数功能自己研究

?public void runFromConfig(QuorumPeerConfig config) throws IOException {

? ? ? try {

? ? ? ? ? ManagedUtil.registerLog4jMBeans();

? ? ? } catch (JMException e) {

? ? ? ? ? LOG.warn("Unable to register log4j JMX control", e);

? ? ? }

??

? ? ? LOG.info("Starting quorum peer");

? ? ? try {

? ? ? ? ? ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();

? ? ? ? ? cnxnFactory.configure(config.getClientPortAddress(),

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? config.getMaxClientCnxns());

??

? ? ? ? ? quorumPeer = new QuorumPeer();

? ? ? ? ? quorumPeer.setClientPortAddress(config.getClientPortAddress());

? ? ? ? ? quorumPeer.setTxnFactory(new FileTxnSnapLog(

? ? ? ? ? ? ? ? ? ? ? new File(config.getDataLogDir()),

? ? ? ? ? ? ? ? ? ? ? new File(config.getDataDir())));

? ? ? ? ? quorumPeer.setQuorumPeers(config.getServers());

? ? ? ? ? quorumPeer.setElectionType(config.getElectionAlg());

? ? ? ? ? quorumPeer.setMyid(config.getServerId());

? ? ? ? ? quorumPeer.setTickTime(config.getTickTime());

? ? ? ? ? quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());

? ? ? ? ? quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());

? ? ? ? ? quorumPeer.setInitLimit(config.getInitLimit());

? ? ? ? ? quorumPeer.setSyncLimit(config.getSyncLimit());

? ? ? ? ? quorumPeer.setQuorumVerifier(config.getQuorumVerifier());

? ? ? ? ? quorumPeer.setCnxnFactory(cnxnFactory);

? ? ? ? ? quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));

? ? ? ? ? quorumPeer.setLearnerType(config.getPeerType());

? ? ? ? ? quorumPeer.setSyncEnabled(config.getSyncEnabled());

? ? ? ? ? quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

??

? ? ? ? ? quorumPeer.start();

? ? ? ? ? quorumPeer.join();

? ? ? } catch (InterruptedException e) {

? ? ? ? ? // warn, but generally this is ok

? ? ? ? ? LOG.warn("Quorum Peer interrupted", e);

? ? ? }

? ? }

?

Zookeeper3.4.6 源码解读

原文:http://gaojingsong.iteye.com/blog/2309923

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!