你可以想象一个机器,不停的发射弹子出来,发射出来的弹子可以被其他模块再次加工(比如 上色、把不合格的弹子给回收了),加工完成后再次发射出来 … 弹子图就是对这个机器的抽象描述。在 Rx 中流行使用这种方式来描述操作符,毕竟图片看起来直观多了。 Marble diagrams(弹子图)基本元素如下:
时间从左往右流动,每个图形代表一个数据,竖线代表发射完成了,而 X 代表出现错误了。 操作函数把上面的 Observable 转换下面的新的 Observable , 里面的每个数据都被操作函数给处理了并返回一个新的数据。
filter 函数使用一个 predicate 函数接口来判断每个发射的值是否能通过这个判断。如果返回 true,则该数据继续往下一个(过滤后的) Observable 发射。
比如下面示例创建了一个发射 0 到 9 十个数字的 源Observable。在该 Observable 使用一个 filter 操作来过滤掉奇数,最后只保留偶数。
Observable<Integer> values = Observable.range(0,10);
Subscription oddNumbers = values
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2==0;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
0
2
4
6
8
Complete!
ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤,其流程图如下:
Observable.just(0, "1", 2, "3").ofType(String.class)
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
log(object.toString() + ":" + object.getClass());
}
});
结果:
1:class java.lang.String
3:class java.lang.String
Observable<Integer> values = Observable.range(0, 10);
Subscription subscription = values
.elementAt(3)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
3
Complete!
Observable<Integer> values = Observable.range(0, 10);
Subscription subscription = values
.elementAtOrDefault(11, 5)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
5
Complete!
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(10) // 获取前 10 个数据 的 Observable
.single(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong == 5L;
}
}) // 有且仅有一个 数据为 5L
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Single1:Complete!");
}
@Override
public void onError(Throwable e) {
log("Single1:" + e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log("Single1:" + aLong);
}
});
values
.single(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong == 5L;
}
}) // 由于源 Observable 为无限的,所以这个不会打印任何东西
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Single2:Complete!");
}
@Override
public void onError(Throwable e) {
log("Single2:" + e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log("Single2:" + aLong);
}
});
结果:
Single1: 5
Single1: Complete!
和前面的类似,使用 singleOrDefault 可以返回一个默认值。
Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);
Subscription subscription = values
.timeout(300,TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
0
1
2
...
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//产生结果的间隔时间分别为100、200、300...900毫秒
for (int i = 1; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.debounce(500, TimeUnit.MILLISECONDS) //超时时间为400毫秒
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("Next:" + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log("Error:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
log("Complete!");
}
});
结果:
Next:5
Next:6
Next:7
Next:8
Next:9
completed!
简单来说,就是过滤掉发送间隔小于debounce(long timeout, TimeUnit unit)
中timeout的数据。
ignoreElements 会忽略所有发射的数据,只让 onCompleted 和 onError 可以通过。
Observable<Integer> values = Observable.range(0, 10);
Subscription subscription = values
.ignoreElements()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
Complete!
ignoreElements() 和使用 filter(v -> false) 是一样的效果。
当我们不需要整个序列时,而是只想取开头或结尾的几个元素,我们可以用take()或takeLast()。
Observable<T> take(int num)
Observable<Integer> values = Observable.range(0, 5);
Subscription first2 = values
.take(2)
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer+"");
}
}
);
结果:
0
1
只要第 N 个数据可用, take 操作就结束了,立即执行onCompleted()。 如果在 N 个数据发射之前发生了 error, error 信息会继续传递到下一个 Observable。 如果 第 N 个数据发射后, take 就不再关心源 Observable 的状态了。
---------------这里的额状态-------------
Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onError(new Exception("Oops"));
subscriber.onNext(2);
}
});
Subscription subscription = values
.take(1)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
1
Complete!
如果将take(1)改成take(2):
结果:
1
Oops
skip 返回 take 操作忽略的另外一部分数据。也就是跳过前面 N 个数据。
Observable<T> skip(int num)
Observable<Integer> values = Observable.range(0, 5);
Subscription subscription = values
.skip(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
2
3
4
Complete!
除了根据发射数据的索引来过滤数据以外,还可以使用数据流发射的时间来过滤。比如过滤掉前五秒发射的数据。
Observable<T> take(long time, java.util.concurrent.TimeUnit unit)
Observable<T> skip(long time, java.util.concurrent.TimeUnit unit)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.take(250, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
0
1
Complete!
上面示例中只获取前 250 毫秒发射的数据。 第 300 毫秒才开始发射数据 2, 所以这里只获取 0 和1 两个数据。
skip 和 take 是从头开始索引数据,而 skipLast 和 takeLast 和他们相反,是从末尾开始索引数据。
Observable<Integer> values = Observable.range(0, 5);
Subscription subscription = values
.skipLast(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log(integer+"");
}
});
结果:
0
1
2
Complete!
同样这两个函数也有依时间为条件的重载函数。
这两个函数是使用一个 predicate 参数来当做判断条件。 如果判断条件返回为 ture, 则 takeWhile 保留该数据。
Observable<T> takeWhile(Func1<? super T,java.lang.Boolean> predicate)
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeWhile(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong<2;
}
})
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
0
1
Complete!
不出意料, skipWhile 跳过过滤条件为 true 的数据。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipWhile(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong<2;
}
})
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
2
3
4
...
// 获取源Observable的数据直到 other Observable 发射第一个数据时停止
public final <E> Observable<T> takeUntil(Observable<? extends E> other)
Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeUntil(cutoff)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
0
1
Complete!
Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipUntil(cutoff)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
2
3
4
...
first()方法和last()方法很容易弄明白。它们从Observable中只发射第一个元素或者最后一个元素。这两个都可以传Func1作为参数,:一个可以确定我们感兴趣的第一个或者最后一个的方法:
与first()和last()相似的变量有:firstOrDefault()和lastOrDefault().这两个函数当可观测序列完成时不再发射任何值时用得上。在这种场景下,如果Observable不再发射任何值时我们可以指定发射一个默认的值
distinct 函数用来过滤掉已经出现过的数据了。
public final Observable<T> distinct()
public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)
distinct 还有个变体是 distinctUntilChanged。区别是 distinctUntilChanged 只过滤相邻的 key 一样的数据。
public final Observable<T> distinctUntilChanged()
public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)
Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Subscription subscription = values
.distinct()
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer + "");
}
}
);
结果:
1
2
3
distinct 还有一个重载函数,该函数有个生成 key 的参数。每个发射的数据都使用该参数生成一个 key,然后使用该key 来判断数据是否一样。
Observable<String> values = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("First");
subscriber.onNext("Second");
subscriber.onNext("Third");
subscriber.onNext("Fourth");
subscriber.onNext("Fifth");
subscriber.onCompleted();
}
});
Subscription subscription = values
.distinct(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
log(s);
}
}
);
结果:
First
Second
Third
“Fourth” 和 “Fifth” 字符串被过滤掉了,应为他们的 key (首字母)和 First 一样。已经发射过的数据将被过滤掉。
有经验的码农知道,该函数在内部维护一个 key 集合来保存所有已经发射数据的 key,当有新的数据发射的时候,在集合中查找该 数据的key 是否存在。 在使用 Rx 操作函数的时把内部细节给封装起来了,但是我们应该注意该问题来避免性能问题。(如果有大量的数据,维护一个内部的集合来保存 key 可能会占用很多内存。)
Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Subscription subscription = values
.distinctUntilChanged()
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer + "");
}
}
);
结果:
1
2
3
2
Observable<String> values = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("First");
subscriber.onNext("Second");
subscriber.onNext("Third");
subscriber.onNext("Fourth");
subscriber.onNext("Fifth");
subscriber.onCompleted();
}
});
Subscription subscription = values
.distinctUntilChanged(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
log(s);
}
}
);
结果:
First
Second
Third
Fourth
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 过滤数据 - 云在千峰
过滤Observables | RxJava Essentials CN
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET
原文:http://blog.csdn.net/qq_20198405/article/details/51248721