在《Quartz与Spring集成—— SchedulerFactoryBean的初始化分析》一文中介绍过Spring集成Quartz时的初始化过程,其中简单的提到了创建调度器的方法createScheduler。本文将着重介绍Quartz初始化时是如何创建调度器的。
这里从createScheduler的实现(见代码清单1)来分析,其处理步骤如下:
代码清单1
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException { // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading. Thread currentThread = Thread.currentThread(); ClassLoader threadContextClassLoader = currentThread.getContextClassLoader(); boolean overrideClassLoader = (this.resourceLoader != null && !this.resourceLoader.getClassLoader().equals(threadContextClassLoader)); if (overrideClassLoader) { currentThread.setContextClassLoader(this.resourceLoader.getClassLoader()); } try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name ‘" + schedulerName + "‘ already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } if (!this.exposeSchedulerInRepository) { // Need to remove it in this case, since Quartz shares the Scheduler instance by default! SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName()); } return newScheduler; } } finally { if (overrideClassLoader) { // Reset original thread context ClassLoader. currentThread.setContextClassLoader(threadContextClassLoader); } } }
代码清单2
public static synchronized SchedulerRepository getInstance() { if (inst == null) { inst = new SchedulerRepository(); } return inst; }
public Scheduler getScheduler() throws SchedulerException { if (cfg == null) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null) { if (sched.isShutdown()) { schedRep.remove(getSchedulerName()); } else { return sched; } } sched = instantiate(); return sched; }
instantiate方法中包含了很多从PropertiesParser(PropertiesParser在《Quartz与Spring集成—— SchedulerFactoryBean的初始化分析》一文中介绍过)中获取各种属性的语句,这里不过多展示。重点来看其更为本质的内容。
如果当前调度器实际是代理远程RMI调度器,那么创建RemoteScheduler,并将当前调取器与RemoteScheduler进行绑定,最后以此RemoteScheduler作为调度器,见代码清单4。
if (rmiProxy) { if (autoId) { schedInstId = DEFAULT_INSTANCE_ID; } String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier( schedName, schedInstId) : rmiBindName; RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort); schedRep.bind(remoteScheduler); return remoteScheduler; }
如果当前调度器实际是代理远程JMX调度器,那么创建RemoteMBeanScheduler,并将当前调度器与RemoteMBeanScheduler进行绑定,最后以此RemoteMBeanScheduler作为调度器,见代码清单5。
代码清单5
if (jmxProxy) { if (autoId) { schedInstId = DEFAULT_INSTANCE_ID; } if (jmxProxyClass == null) { throw new SchedulerConfigException("No JMX Proxy Scheduler class provided"); } RemoteMBeanScheduler jmxScheduler = null; try { jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate RemoteMBeanScheduler class.", e); } if (jmxObjectName == null) { jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId); } jmxScheduler.setSchedulerObjectName(jmxObjectName); tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true); try { setBeanProps(jmxScheduler, tProps); } catch (Exception e) { initException = new SchedulerException("RemoteMBeanScheduler class ‘" + jmxProxyClass + "‘ props could not be configured.", e); throw initException; } jmxScheduler.initialize(); schedRep.bind(jmxScheduler); return jmxScheduler; }
如果指定了jobFactoryClass属性,那么实例化作业工厂实例,见代码清单6。
代码清单6
JobFactory jobFactory = null; if(jobFactoryClass != null) { try { jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate JobFactory class: " + e.getMessage(), e); } tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true); try { setBeanProps(jobFactory, tProps); } catch (Exception e) { initException = new SchedulerException("JobFactory class ‘" + jobFactoryClass + "‘ props could not be configured.", e); throw initException; } }
如果指定了instanceIdGeneratorClass属性,那么实例化实例ID生成器,见代码清单7。
代码清单7
InstanceIdGenerator instanceIdGenerator = null; if(instanceIdGeneratorClass != null) { try { instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate InstanceIdGenerator class: " + e.getMessage(), e); } tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true); try { setBeanProps(instanceIdGenerator, tProps); } catch (Exception e) { initException = new SchedulerException("InstanceIdGenerator class ‘" + instanceIdGeneratorClass + "‘ props could not be configured.", e); throw initException; } }
org.quartz.threadPool.class属性用于指定线程池类,如果没有指定,则默认为org.quartz.simpl.SimpleThreadPool,见代码清单8。
代码清单8
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); if (tpClass == null) { initException = new SchedulerException( "ThreadPool class not specified. "); throw initException; } try { tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("ThreadPool class ‘" + tpClass + "‘ could not be instantiated.", e); throw initException; } tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true); try { setBeanProps(tp, tProps); } catch (Exception e) { initException = new SchedulerException("ThreadPool class ‘" + tpClass + "‘ props could not be configured.", e); throw initException; }
org.quartz.jobStore.class属性用于指定JobStore的具体类型,我显示指定了org.springframework.scheduling.quartz.LocalDataSourceJobStore,如果没有指定,则默认为RAMJobStore,见代码清单9。
代码清单9
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS, RAMJobStore.class.getName()); if (jsClass == null) { initException = new SchedulerException( "JobStore class not specified. "); throw initException; } try { js = (JobStore) loadHelper.loadClass(jsClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("JobStore class ‘" + jsClass + "‘ could not be instantiated.", e); throw initException; } SchedulerDetailsSetter.setDetails(js, schedName, schedInstId); tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX}); try { setBeanProps(js, tProps); } catch (Exception e) { initException = new SchedulerException("JobStore class ‘" + jsClass + "‘ props could not be configured.", e); throw initException; }
这一步骤的执行逻辑比较多,但是仔细整理后发现数据库管理器都一样,无非是数据连接池的提供者不同(见代码清单10),一共分为三种:
方式一:连接池提供者由connectionProvider.class属性指定;
方式二:连接池提供者由jndiURL属性指定;
方式三:连接池提供者为PoolingConnectionProvider,其使用了C3P0连接池;
代码清单10
String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX); for (int i = 0; i < dsNames.length; i++) { PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup( PROP_DATASOURCE_PREFIX + "." + dsNames[i], true)); String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null); // custom connectionProvider... if(cpClass != null) { ConnectionProvider cp = null; try { cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("ConnectionProvider class ‘" + cpClass + "‘ could not be instantiated.", e); throw initException; } try { // remove the class name, so it isn‘t attempted to be set pp.getUnderlyingProperties().remove( PROP_CONNECTION_PROVIDER_CLASS); if (cp instanceof PoolingConnectionProvider) { populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties()); } else { setBeanProps(cp, pp.getUnderlyingProperties()); } cp.initialize(); } catch (Exception e) { initException = new SchedulerException("ConnectionProvider class ‘" + cpClass + "‘ props could not be configured.", e); throw initException; } dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); } else { String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null); if (dsJndi != null) { boolean dsAlwaysLookup = pp.getBooleanProperty( PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP); String dsJndiInitial = pp.getStringProperty( PROP_DATASOURCE_JNDI_INITIAL); String dsJndiProvider = pp.getStringProperty( PROP_DATASOURCE_JNDI_PROVDER); String dsJndiPrincipal = pp.getStringProperty( PROP_DATASOURCE_JNDI_PRINCIPAL); String dsJndiCredentials = pp.getStringProperty( PROP_DATASOURCE_JNDI_CREDENTIALS); Properties props = null; if (null != dsJndiInitial || null != dsJndiProvider || null != dsJndiPrincipal || null != dsJndiCredentials) { props = new Properties(); if (dsJndiInitial != null) { props.put(PROP_DATASOURCE_JNDI_INITIAL, dsJndiInitial); } if (dsJndiProvider != null) { props.put(PROP_DATASOURCE_JNDI_PROVDER, dsJndiProvider); } if (dsJndiPrincipal != null) { props.put(PROP_DATASOURCE_JNDI_PRINCIPAL, dsJndiPrincipal); } if (dsJndiCredentials != null) { props.put(PROP_DATASOURCE_JNDI_CREDENTIALS, dsJndiCredentials); } } JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi, props, dsAlwaysLookup); dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); } else { String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER); String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL); if (dsDriver == null) { initException = new SchedulerException( "Driver not specified for DataSource: " + dsNames[i]); throw initException; } if (dsURL == null) { initException = new SchedulerException( "DB URL not specified for DataSource: " + dsNames[i]); throw initException; } try { PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties()); dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); // Populate the underlying C3P0 data source pool properties populateProviderWithExtraProps(cp, pp.getUnderlyingProperties()); } catch (Exception sqle) { initException = new SchedulerException( "Could not initialize DataSource: " + dsNames[i], sqle); throw initException; } } } }
这一段用于设置各种调度器插件,见代码清单11。
代码清单11
String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX); SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length]; for (int i = 0; i < pluginNames.length; i++) { Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "." + pluginNames[i], true); String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null); if (plugInClass == null) { initException = new SchedulerException( "SchedulerPlugin class not specified for plugin ‘" + pluginNames[i] + "‘"); throw initException; } SchedulerPlugin plugin = null; try { plugin = (SchedulerPlugin) loadHelper.loadClass(plugInClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "SchedulerPlugin class ‘" + plugInClass + "‘ could not be instantiated.", e); throw initException; } try { setBeanProps(plugin, pp); } catch (Exception e) { initException = new SchedulerException( "JobStore SchedulerPlugin ‘" + plugInClass + "‘ props could not be configured.", e); throw initException; } plugins[i] = plugin; }
这一步用于设置作业监听器,我觉得可以用于做一些作业监控相关的扩展,见代明清单12。
代明清单12
Class<?>[] strArg = new Class[] { String.class }; String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX); JobListener[] jobListeners = new JobListener[jobListenerNames.length]; for (int i = 0; i < jobListenerNames.length; i++) { Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "." + jobListenerNames[i], true); String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null); if (listenerClass == null) { initException = new SchedulerException( "JobListener class not specified for listener ‘" + jobListenerNames[i] + "‘"); throw initException; } JobListener listener = null; try { listener = (JobListener) loadHelper.loadClass(listenerClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "JobListener class ‘" + listenerClass + "‘ could not be instantiated.", e); throw initException; } try { Method nameSetter = null; try { nameSetter = listener.getClass().getMethod("setName", strArg); } catch(NoSuchMethodException ignore) { /* do nothing */ } if(nameSetter != null) { nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } ); } setBeanProps(listener, lp); } catch (Exception e) { initException = new SchedulerException( "JobListener ‘" + listenerClass + "‘ props could not be configured.", e); throw initException; } jobListeners[i] = listener; }
这一步设置触发器监听器,见代码清单13。
代码清单13
String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX); TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length]; for (int i = 0; i < triggerListenerNames.length; i++) { Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "." + triggerListenerNames[i], true); String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null); if (listenerClass == null) { initException = new SchedulerException( "TriggerListener class not specified for listener ‘" + triggerListenerNames[i] + "‘"); throw initException; } TriggerListener listener = null; try { listener = (TriggerListener) loadHelper.loadClass(listenerClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "TriggerListener class ‘" + listenerClass + "‘ could not be instantiated.", e); throw initException; } try { Method nameSetter = null; try { nameSetter = listener.getClass().getMethod("setName", strArg); } catch(NoSuchMethodException ignore) { /* do nothing */ } if(nameSetter != null) { nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } ); } setBeanProps(listener, lp); } catch (Exception e) { initException = new SchedulerException( "TriggerListener ‘" + listenerClass + "‘ props could not be configured.", e); throw initException; } triggerListeners[i] = listener; }
可以通过属性org.quartz.threadExecutor.class设置线程执行器,如果没有指定,默认为DefaultThreadExecutor,见代码清单13。
代码清单13
String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS); if (threadExecutorClass != null) { tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true); try { threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance(); log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass); setBeanProps(threadExecutor, tProps); } catch (Exception e) { initException = new SchedulerException( "ThreadExecutor class ‘" + threadExecutorClass + "‘ could not be instantiated.", e); throw initException; } } else { log.info("Using default implementation for ThreadExecutor"); threadExecutor = new DefaultThreadExecutor(); }
如果需要作业运行在事务中(可以通过属性org.quartz.scheduler.wrapJobExecutionInUserTransaction指定),则创建JTAJobRunShellFactory,否则创建JTAAnnotationAwareJobRunShellFactory,见代码清单14。
代码清单14
JobRunShellFactory jrsf = null; // Create correct run-shell factory... if (userTXLocation != null) { UserTransactionHelper.setUserTxLocation(userTXLocation); } if (wrapJobInTx) { jrsf = new JTAJobRunShellFactory(); } else { jrsf = new JTAAnnotationAwareJobRunShellFactory(); }
如果需要自动生成调度实例ID(可以通过属性org.quartz.scheduler.instanceId为AUTO或者SYS_PROP,其中当指定为AUTO时,则instanceIdGeneratorClass由org.quartz.scheduler.instanceIdGenerator.class属性指定,默认为org.quartz.simpl.SimpleInstanceIdGenerator;当指定为SYS_PROP,则instanceIdGeneratorClass等于org.quartz.simpl.SystemPropertyInstanceIdGenerator),那么调度实例ID为NON_CLUSTERED,当JobStore支持集群部署,那么调度实例ID将由调度实例ID生成器instanceIdGenerator产生,见代码清单15。(注:当不需要自动生成调度实例ID时,可以通过属性org.quartz.scheduler.instanceId指定)
代码清单15
if (autoId) { try { schedInstId = DEFAULT_INSTANCE_ID; if (js.isClustered()) { schedInstId = instanceIdGenerator.generateInstanceId(); } } catch (Exception e) { getLog().error("Couldn‘t generate instance Id!", e); throw new IllegalStateException("Cannot run without an instance id."); } }
JobStoreSupport是JobStore的抽象实现类,只有继承自JobStoreSupport的具体实现类(例如org.springframework.scheduling.quartz.LocalDataSourceJobStore)才可以通过调用其setDbRetryInterval方法设置数据库错误重试间隔(dbFailureRetry属性默认为15000,也可以通过设置org.quartz.scheduler.dbFailureRetryInterval属性进行指定),setThreadExecutor方法用于设置JobStoreSupport的线程执行器,见代码清单16。
代码清单16
if (js instanceof JobStoreSupport) { JobStoreSupport jjs = (JobStoreSupport)js; jjs.setDbRetryInterval(dbFailureRetry); if(threadsInheritInitalizersClassLoader) jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader); jjs.setThreadExecutor(threadExecutor); }
在构造QuartzSchedulerResources的过程中(见代码清单17),设置了很多属性,现在列举如下:
属性名称 | 含义 | 备注 |
name | 调度名称 | 可以由org.quartz.scheduler.instanceName属性指定 |
threadName | 调度线程名称 | 可以由org.quartz.scheduler.threadName属性指定,默认等于调度名称加后缀_QuartzSchedulerThread产生 |
instanceId | 调度实例ID | 可以由org.quartz.scheduler.instanceId属性指定,具体生成规则见文中描述 |
jobRunShellFactory | 作业运行脚本工厂 | 可以由org.quartz.scheduler.wrapJobExecutionInUserTransaction属性指定,具体实现有JTAJobRunShellFactory和JTAAnnotationAwareJobRunShellFactory两种 |
makeSchedulerThreadDaemon | 调度线程是否是后台线程 | 可以由org.quartz.scheduler.makeSchedulerThreadDaemon属性指定 |
threadsInheritInitalizersClassLoader | 线程是否继承初始化的类加载器 | 可以由org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer属性指定 |
runUpdateCheck | 运行时是否检查Quartz的可用更新版本 | 可以由org.quartz.scheduler.skipUpdateCheck属性指定,runUpdateCheck与指定值相反 |
batchTimeWindow | 在时间窗口前批量触发 | 可以由org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow属性指定 |
maxBatchSize | 最大批量执行的作业数 | 可以由org.quartz.scheduler.batchTriggerAcquisitionMaxCount属性指定 |
interruptJobsOnShutdown | 当关闭作业时,中断作业线程 | 可以由org.quartz.scheduler.interruptJobsOnShutdown属性指定 |
interruptJobsOnShutdownWithWait | 当关闭作业时,等待中断作业线程 | 可以由org.quartz.scheduler.interruptJobsOnShutdownWithWait属性指定 |
threadExecutor | 线程执行器 | 可以由org.quartz.threadExecutor.class属性指定,默认为DefaultThreadExecutor |
threadPool | 线程池 | 可以由org.quartz.threadPool.class属性指定,默认为SimpleThreadPool |
jobStore | 作业存储 | 可以由org.quartz.jobStore.class属性指定,默认为RAMJobStore |
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources(); rsrcs.setName(schedName); rsrcs.setThreadName(threadName); rsrcs.setInstanceId(schedInstId); rsrcs.setJobRunShellFactory(jrsf); rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon); rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader); rsrcs.setRunUpdateCheck(!skipUpdateCheck); rsrcs.setBatchTimeWindow(batchTimeWindow); rsrcs.setMaxBatchSize(maxBatchSize); rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown); rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait); rsrcs.setJMXExport(jmxExport); rsrcs.setJMXObjectName(jmxObjectName); if (managementRESTServiceEnabled) { ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration(); managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort); managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled); rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration); } if (rmiExport) { rsrcs.setRMIRegistryHost(rmiHost); rsrcs.setRMIRegistryPort(rmiPort); rsrcs.setRMIServerPort(rmiServerPort); rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry); rsrcs.setRMIBindName(rmiBindName); } SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId); rsrcs.setThreadExecutor(threadExecutor); threadExecutor.initialize(); rsrcs.setThreadPool(tp); if(tp instanceof SimpleThreadPool) { if(threadsInheritInitalizersClassLoader) ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader); } tp.initialize(); tpInited = true; rsrcs.setJobStore(js); // add plugins for (int i = 0; i < plugins.length; i++) { rsrcs.addSchedulerPlugin(plugins[i]); }
构造QuartzScheduler的代码如下:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry); qsInited = true;QuartzScheduler的构造器实现见代码清单18,其处理步骤如下:
代码清单18
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); schedThreadExecutor.execute(this.schedThread); if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime); } jobMgr = new ExecutingJobsManager(); addInternalJobListener(jobMgr); errLogger = new ErrorLogger(); addInternalSchedulerListener(errLogger); signaler = new SchedulerSignalerImpl(this, this.schedThread); if(shouldRunUpdateCheck()) updateTimer = scheduleUpdateCheck(); else updateTimer = null; getLog().info("Quartz Scheduler v." + getVersion() + " created."); }
代码清单19
private Timer scheduleUpdateCheck() { Timer rval = new Timer(true); rval.scheduleAtFixedRate(new UpdateChecker(), 1000, 7 * 24 * 60 * 60 * 1000L); return rval; }
这里再详细分析下QuartzSchedulerThread的构造过程,其构造器见代码清单20。
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) { this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY); }QuartzSchedulerThread的构造器又代理了另一个构造器,见代码清单21。
代码清单21
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) { super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName()); this.qs = qs; this.qsRsrcs = qsRsrcs; this.setDaemon(setDaemon); if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) { log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName()); this.setContextClassLoader(Thread.currentThread().getContextClassLoader()); } this.setPriority(threadPrio); // start the underlying thread, but put this object into the ‘paused‘ // state // so processing doesn‘t start yet... paused = true; halted = new AtomicBoolean(false); }代码清单21比较简单,QuartzScheduler的getSchedulerThreadGroup方法用于创建线程组,QuartzSchedulerResources的isThreadsInheritInitializersClassLoadContext方法实际获取QuartzSchedulerResources的属性threadsInheritInitializersClassLoadContext,此属性如果为真,则设置QuartzSchedulerThread的线程上下文类加载器为当前线程的类加载器,设置paused标志为true,以便于QuartzSchedulerThread线程不能开始处理。halted可以解释为叫停当前线程的执行。
由于在构造QuartzScheduler的过程中已经启动了QuartzSchedulerThread,那么势必导致此线程的执行,其run方法的部分代码见代码清单22.
代码清单22
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we‘re supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } }我们并未叫停调度线程的执行,所以halted属性等于false,对于paused标志而言,这里涉及多线程安全问题,所以这里使用了同步块,但是实际上可以通过调整代码将paused用volatile修饰,这样通过内存可见性省去同步,能够提高性能。由于paused标志在线程刚开始执行时为false,那么这里的white循环将不断轮询,每次循环线程wait一秒。既然QuartzSchedulerThread已经开始执行,但是却又无法执行,岂不是自相矛盾?虽然QuartzSchedulerThread线程开始启动,但是QuartzScheduler并未准备好这一切,必须等到QuartzScheduler准备时将paused修改为false。虽说这样实现也是可以的,但是在QuartzScheduler准备好的这段时间内,QuartzSchedulerThread线程频繁的睡眠、被唤醒,线程上下文来回切换,耗费了一些性能。何不等到QuartzScheduler准备好时再启动QuartzSchedulerThread线程呢?
创建调度器的代码如下
// Create Scheduler ref... Scheduler scheduler = instantiate(rsrcs, qs);
剩余的工作包括:设置作业工厂,对插件初始化,给QuartzScheduler的监听器管理器注册作业监听器和触发器监听器,设置调度器上下文属性,触发JobStore,触发脚本运行工厂,将调度器注册到SchedulerRepository等,见代码清单23。
代码清单23
// set job factory if specified if(jobFactory != null) { qs.setJobFactory(jobFactory); } // Initialize plugins now that we have a Scheduler instance. for (int i = 0; i < plugins.length; i++) { plugins[i].initialize(pluginNames[i], scheduler, loadHelper); } // add listeners for (int i = 0; i < jobListeners.length; i++) { qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs()); } for (int i = 0; i < triggerListeners.length; i++) { qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers()); } // set scheduler context data... for(Object key: schedCtxtProps.keySet()) { String val = schedCtxtProps.getProperty((String) key); scheduler.getContext().put((String)key, val); } // fire up job store, and runshell factory js.setInstanceId(schedInstId); js.setInstanceName(schedName); js.setThreadPoolSize(tp.getPoolSize()); js.initialize(loadHelper, qs.getSchedulerSignaler()); jrsf.initialize(scheduler); qs.initialize(); getLog().info( "Quartz scheduler ‘" + scheduler.getSchedulerName() + "‘ initialized from " + propSrc); getLog().info("Quartz scheduler version: " + qs.getVersion()); // prevents the repository from being garbage collected qs.addNoGCObject(schedRep); // prevents the db manager from being garbage collected if (dbMgr != null) { qs.addNoGCObject(dbMgr); } schedRep.bind(scheduler); return scheduler;
可以看到创建调度器的过程,几乎完全是顺序编程,步骤也十分清楚。但是可以看到其中可以优化的地方也比较多,另外代码组织稍微不太合理,例如instantiate方法的长度1355-579=776行。
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
原文:http://blog.csdn.net/beliefer/article/details/52094198