Observable<Integer> values = Observable.range(0, 3);
values.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("values:Complete!");
}
@Override
public void onError(Throwable e) {
log("values:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("values:"+integer);
}
});
values.count()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("count:Complete!");
}
@Override
public void onError(Throwable e) {
log("count:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("count:"+integer);
}
});
结果:
Values: 0
Values: 1
Values: 2
Values: Complete!
Count: 3
Count: Complete!
如果发射数据的个数超过了 int 最大值,则可以使用 countLong 函数。
Custom aggregators(自定义聚合)
本节前面介绍的几个函数,和之前看到的也没太大区别。下面会介绍两个非常强大的操作函数,可以很方便的来扩展源 Observable。
之前所介绍的所有操作函数都可以通过这两个函数来实现。
你可能从 [MapReduce](http://baike.baidu.com/view/2902.htm) 中了解过 reduce。该思想是使用源 Observable 中的所有数据两两组合来生成一个单一的 数据。在大部分重载函数中都需要一个函数用来定义如何组合两个数据变成一个。
public final Observable<T> reduce(Func2<T,T,T> accumulator)
Observable<Integer> values = Observable.range(0,5);
values.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Sum:Complete!");
}
@Override
public void onError(Throwable e) {
log("Sum:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Sum:"+integer);
}
});
values.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return (integer>integer2) ? integer2 : integer;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Min:Complete!");
}
@Override
public void onError(Throwable e) {
log("Min:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Min:"+integer);
}
});
结果:
Sum: 10
Sum: Complete!
Min: 0
Min: Complete!
public final <R> Observable<R> reduce(R initialValue, Func2<R,? super T,R> accumulator)
Observable<String> values = Observable.just("Rx", "is", "easy");
values.reduce(0, new Func2<Integer, String, Integer>() {
@Override
public Integer call(Integer integer, String s) {
return integer+1;
}
})
//实现Last
// values.reduce("", new Func2<String, String, String>() {
// @Override
// public String call(String s, String s2) {
// return s2;
// }
// })
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Count:Complete!");
}
@Override
public void onError(Throwable e) {
log("Count:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Count:"+integer);
}
});
结果:
Count:3
Count: Complete!
上面示例中的 accumulator 参数为
new Func2<String, String, String>()
,其中call(integer,s)
需要两个参数 integer和s, 当第一个数据从 源 Observable 发射出来的时候,由于 call(integer,s)
还没有调用过,所以使用 初始值 0 来替代 integer,使用第一个字符串“Rx” 来调用 accumulator (也就是new Func2()
),这样 call(integer,s)
返回的值就是 integer+ 1 (而 integer为初始值 0 ,所以返回 1, 可以看到 这个 s参数 为源 Observable 的值在这里是没有用的);这样 源Observable 每次发射一个数据,accumulator 就把上一次的结果加1 返回。和 count 的功能一样。
对于前面只返回一个数据结果的操作函数,大部分都可以通过 reduce 来实现。对于那些 源 Observable 没有完成就返回的操作函数来说,是不能使用 reduce 来实现的。所以 可以用 reduce 来实现 last,但是用 reduce 实现的 all 函数和原来的 all 是不太一样的。
Aggregation to collections(把数据聚合到集合中)
使用 reduce 可以把源Observable 发射的数据放到一个集合中:
Observable<Integer> values = Observable.range(10,5);
values
.reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
integers.add(integer);
return integers;
}
})
.subscribe(new Observer<ArrayList<Integer>>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log( e.getMessage().toString());
}
@Override
public void onNext(ArrayList<Integer> arrayList) {
log(arrayList.toString());
}
});
reduce 的参数初始值为 new ArrayList(), reduce把源Observable 发射的数据添加到这个 List 中。当 源Observable 完成的时候,返回这个 List 对象。
结果:
[10, 11, 12, 13, 14]
Complete!
scan 和 reduce 很像,不一样的地方在于 scan会发射所有中间的结算结果。
public final Observable<T> scan(Func2<T,T,T> accumulator)
通过上图可以看到和 reduce 的区别, reduce 只是最后把计算结果发射出来,而 scan 把每次的计算结果都发射出来。
values.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
})
// .takeLast()//实现reduce
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Sum:Complete!");
}
@Override
public void onError(Throwable e) {
log("Sum:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Sum:" + integer);
}
});
结果:
Sum: 0
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: Complete!
reduce 可以通过 scan 来实现: reduce(acc) = scan(acc).takeLast() 。所以 scan 比 reduce 更加通用。
Subject<Integer, Integer> values = ReplaySubject.create();
values
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Values:Complete!");
}
@Override
public void onError(Throwable e) {
log("Values:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Values:" + integer);
}
});
values
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return (integer<integer2) ? integer : integer2;
}
})
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Min:Complete!");
}
@Override
public void onError(Throwable e) {
log("Min:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Min:" + integer);
}
});
values.onNext(2);
values.onNext(3);
values.onNext(1);
values.onNext(4);
values.onCompleted();
结果:
Values: 2
Min: 2
Values: 3
Values: 1
Min: 1
Values: 4
Values: Completed
Min: Completed
下面是一个符合原则但是效率很低的实现:.
.reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
ArrayList<Integer> newAcc = (ArrayList<Integer>) integers.clone();
newAcc.add(integer);
return integers;
}
})
上面每一个值都创建一个新对象的性能是无法接受的,需要通过文档说明你没有遵守 Rx 的原则使用不可变对象,避免其他人误解。为此,
Observable<Integer> values = Observable.range(10,5);
values
.collect(new Func0<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() {
return new ArrayList<Integer>();
}
}, new Action2<ArrayList<Integer>, Integer>() {
@Override
public void call(ArrayList<Integer> arrayList, Integer integer) {
arrayList.add(integer);
}
})
.subscribe(new Action1<ArrayList<Integer>>() {
@Override
public void call(ArrayList<Integer> integers) {
log(integers.toString());
}
});
结果:
[10, 11, 12, 13, 14]
Observable<Integer> values = Observable.range(10,5);
values
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});.
结果:
[10, 11, 12, 13, 14]
toSortedList 和前面类似,返回一个排序后的 list,下面是该函数的定义:
public final Observable<java.util.List<T>> toSortedList()
public final Observable<java.util.List<T>> toSortedList(
Func2<? super T,? super T,java.lang.Integer> sortFunction)
可以使用默认的比较方式来比较对象,也可以提供一个比较参数。该比较参数和 Comparator 接口语义一致。
下面通过一个自定义的比较参数来返回一个倒序排列的整数集合:
Observable<Integer> values = Observable.range(10,5);
values
.toSortedList(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer2 - integer;
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});
结果:
[14, 13, 12, 11, 10]
toMap 把数据流 T 变为一个 Map<TKey,T>。 该函数有三个重载形式:
public final <K> Observable<java.util.Map<K,T>> toMap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,V>> mapFactory)
来看看一个示例:
有这么一个 Person 对象:
class Person {
public final String name;
public final Integer age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
下面的代码使用 Person 的 name 作为 key, Person 作为 map 的value:
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(new Func1<Person, String>() {
@Override
public String call(Person person) {
return person.name;
}
})
.subscribe(new Action1<Map>() {
@Override
public void call(Map mMap) {
log(mMap.toString());
}
});.
结果:
Saul=Person@5280919c, Will=$Person@5280914c, Nick=$Person@52809174}
还可以用 Person 的 age 作为map 的value:
values
.toMap(new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
})
结果:
{Saul=35, Will=25, Nick=40}
还可以自定义如何生成这个 map 对象:
values
.toMap(new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func0<Map<Object, Object>>() {
@Override
public Map<Object, Object> call() {
return new HashMap();
}
})
最后一个参数为工厂函数,每次一个新的 Subscriber 订阅的时候, 都会返回一个新的 map 对象。
通常情况下多个 value 的 key 可能是一样的。 一个 key 可以映射多个 value 的数据结构为
multimap,multimap 的 value 为一个集合。该过程被称之为 “grouping” (分组)。
public final <K> Observable<java.util.Map<K,java.util.Collection<T>>> toMultimap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory,
Func1<? super K,? extends java.util.Collection<V>> collectionFactory)
下面是通过 age 来分组 Person 的实现:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
})
.subscribe(new Action1<Map>() {
@Override
public void call(Map mMap) {
log(mMap.toString());
}
});
结果:
{35=[Will, Saul], 40=[Nick]}
toMultimap 的参数和 toMap 类似,最后一个 collectionFactory 参数是用来创建 value 的集合对象的,collectionFactory 使用 key 作为参数,这样你可以根据 key 来做不同的处理。下面示例代码中没有使用这个 key 参数:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}, new Func0<Map<Object, Collection<Object>>>() {
@Override
public Map<Object, Collection<Object>> call() {
return new HashMap();
}
}, new Func1<Object, Collection<Object>>() {
@Override
public Collection<Object> call(Object o) {
return new ArrayList();
}
}) // 没有使用这个 key 参数
注意事项
这些操作函数都有非常有限的用法。这些函数只是用来给初学者把数据收集到集合中使用的,并且内部使用传统的方式来处理数据。这些方式不应该在实际项目中实现,因为他们和使用Rx 的理念并不相符。
groupBy 是 toMultimap 函数的 Rx 方式的实现。groupBy 根据每个源Observable 发射的值来计算一个
key, 然后为每个 key 创建一个新的 Observable并把key 一样的值发射到对应的新 Observable 中。
public final <K> Observable<GroupedObservable<K,T>> groupBy(Func1<? super T,? extends K> keySelector)
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.subscribe(new Action1<GroupedObservable<Object, String>>() {
@Override
public void call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
objectStringGroupedObservable.last().subscribe(new Action1<String>() {
@Override
public void call(String s) {
log( objectStringGroupedObservable.getKey() +":" + s);
}
});
}
});
t:third
f:fifth
s:sixth
上面的代码使用了嵌套的 Subscriber,但Rx 功能之一就是为了避免嵌套回调函数,所以下面演示了如何避免嵌套:
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.flatMap(new Func1<GroupedObservable<Object, String>, Observable<?>>() {
@Override
public Observable<?> call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
return objectStringGroupedObservable.last().map(new Func1<String, Object>() {
@Override
public Object call(String s) {
return objectStringGroupedObservable.getKey() +":" + s;
}
});
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
s: sixth
t: third
f: fifth
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 聚合 - 云在千峰
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET
原文:http://blog.csdn.net/qq_20198405/article/details/51260739