? ? ? ?Observer观察者设计模式是行为模式的一种,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态
? ? ? ?Observer模式提供给关联对象一种同步通信的手段,使某个对象与依赖它的其他对象之间保持状态同步。如下用代码的形式来展现被观察者(新闻出版社)和观察者(与之关联的订户观察者对象)是如何保持信息同步的。
/**
?* NewsPublisher: 新闻出版社
?*/
class NewsPublisher extends Observable {
??? public void publishNews(String newsTitle, String newsBody) {
??????? News news = new News(newsTitle, newsBody);
??????? setChanged();?? //通过setChanged()方法标明对象的状态已发生变化
??????? this.notifyObservers(news);?? //通知各Observer,并发送一个名为news对象的消息
?
??????? // ... ...
??? }
}
/**
?* 订户观察者。
?* Created by myuser on 2014/11/29.
?*/
public class SubscriberObserver implements Observer {
??? // 新闻出版社调用notifyObservers(news)方法,自动调用如下方法以保持信息同步。
??? public void update(Observable observee, Object param) {
??????? if (param instanceof News) {
??????????? mail2Subscriber((News)param);
??????? }
??? }
?
??? private void mail2Subscriber(News news) {
??????? System.out.println("Mail to subscriber. A news published with title:" + news.getTitle());
??? }
}
??? 被观察者调用notifyObservers()方法后,为什么观察者就能接收到呢?那是因为有构造者这个角色,它将观察者添加到被观察者的依赖对象里面,代码如下:
public class Client {
??? /**
???? * Test Observer Pattern
???? */
??? public static void main(String[] args) {
??????? NewsPublisher publisher = new NewsPublisher();
??????? // 添加订户观察者依赖对象
??????? publisher.addObserver(new SubscriberObserver());
??????? //发布新闻,触发通知事件
??????? publisher.publishNews("Hello news", "news body");
??? }
}
上面是一个简单的Observer观察者设计模式的实例。接下来看看大数据框架hadoop是如何应用该设计模式的。
应用场景如下:JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。
??? JobTracker进行作业添加(执行addJob()方法)时,会同步该消息到对应的观察者(JobInProgressListener )那里,代码如下:
publicclass JobTracker {
? privatefinal List<JobInProgressListener> jobInProgressListeners =
??? new CopyOnWriteArrayList<JobInProgressListener>();
? private synchronized JobStatus addJob(JobID jobId, JobInProgress
job) {
??? ??... ...
????? for (JobInProgressListener listener : jobInProgressListeners) {
??????? listener.jobAdded(job);? //通知各Observer,并发送job消息
????? }
????? ... ...
? }
}
class JobQueueJobInProgressListener extends JobInProgressListener {
? private Map<JobSchedulingInfo, JobInProgress> jobQueue;
? @Override
? publicvoid jobAdded(JobInProgress job) {
??? // 将作业添加到作业队列里。
??? jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
? }
}
??? 接下来看一下JobTracker和JobQueueJobInProgressListener的依赖关系是如何建立起来的。
??? 先看一下JobTracker类的一些信息,代码如下所示:
publicclass JobTracker implements MRConstants, InterTrackerProtocol,
??? JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
??? RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
JobTrackerMXBean {
? // 该类实例化的时候,会从配置文件的属性mapred.jobtracker.taskScheduler获取调度器的类名,然后实例化一个调度器作为JobTracker的一个属性。代码如下所示:
? JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) {
... ...
??? Class<? extends TaskScheduler> schedulerClass
????? = conf.getClass("mapred.jobtracker.taskScheduler",
????????? JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
... ...
? }
? // JobTracker类被运行的时候会去调用startTracker()静态方法和offerService(),代码如下所示:
? publicstaticvoid main(String argv[]
????????????????????????? ) throws IOException, InterruptedException {
... ...
??? JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
... ...
? }
// startTracker()静态方法首先实例一个JobTracker类,然后将当前实例赋给调度器的taskTrackerManager属性。
publicstatic JobTracker startTracker(JobConf conf, String identifier)
? throws IOException, InterruptedException {
... ...
??? result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
... ...
returnresult;
? }
? // offerService()方法调用调度器的start()方法。
? publicvoid offerService() throws InterruptedException, IOException {
? ??... ...
taskScheduler.start();
... ...
? }
}
// 如下是调度器的start()方法,该方法将JobQueueJobInProgressListener类添加到JobTracker的依赖属性里,也即构造了JobTracker和JobQueueJobInProgressListener的被观察者与观察者关系。
class JobQueueTaskScheduler extends TaskScheduler {
? public JobQueueTaskScheduler() {
??? this.jobQueueJobInProgressListener=new JobQueueJobInProgressListener();
? }
? publicsynchronizedvoid start() throws IOException {
??? ... ...
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
... ...
? }
?
}
原文:http://seandeng888.iteye.com/blog/2161824