首页 > 编程语言 > 详细

RxJava----操作符:聚合Observable

时间:2016-04-29 16:28:24      阅读:395      评论:0      收藏:0      [点我收藏+]

count

  • count 函数和 Java 集合中的 size 或者 length 一样。用来统计源 Observable 完成的时候一共发射了多少个数据。
    技术分享
        Observable<Integer> values = Observable.range(0, 3);
        values.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                log("values:Complete!");
            }
            @Override
            public void onError(Throwable e) {
                log("values:"+e.getMessage().toString());
            }
            @Override
            public void onNext(Integer integer) {
                log("values:"+integer);
            }
        });
        values.count()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("count:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("count:"+e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("count:"+integer);
                    }
                });

结果:

Values: 0
Values: 1
Values: 2
Values: Complete!
Count: 3
Count: Complete!
如果发射数据的个数超过了 int 最大值,则可以使用 countLong 函数。

Custom aggregators(自定义聚合)

本节前面介绍的几个函数,和之前看到的也没太大区别。下面会介绍两个非常强大的操作函数,可以很方便的来扩展源 Observable。
之前所介绍的所有操作函数都可以通过这两个函数来实现。

reduce

你可能从 [MapReduce](http://baike.baidu.com/view/2902.htm) 中了解过 reduce。该思想是使用源 Observable 中的所有数据两两组合来生成一个单一的 数据。在大部分重载函数中都需要一个函数用来定义如何组合两个数据变成一个。
public final Observable<T> reduce(Func2<T,T,T> accumulator)

技术分享

        Observable<Integer> values = Observable.range(0,5);
        values.reduce(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer+integer2;
            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Sum:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Sum:"+e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Sum:"+integer);
                    }
                });

        values.reduce(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return (integer>integer2) ? integer2 : integer;
            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Min:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Min:"+e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Min:"+integer);
                    }
                });

结果:

Sum: 10
Sum: Complete!
Min: 0
Min: Complete!
  • Rx 中的 reduce 和并行系统中的 reduce 不一样。
  • 在并行系统中的 reduce 是指,计算的取值是不相关的,这样多个机器可以独立并行工作。
  • 在 Rx 中是使用从数据流中第一个数据到最后一个数据(从左往右)中的数据来调用 参数 accumulator ,accumulator 用前一次返回的结果和下一个数据来再次调用 accumulator (累加器)。
  • 下面这个重载函数更加暴露了这个设计意图。
public final <R> Observable<R> reduce(R initialValue, Func2<R,? super T,R> accumulator)
        Observable<String> values = Observable.just("Rx", "is", "easy");
        values.reduce(0, new Func2<Integer, String, Integer>() {
            @Override
            public Integer call(Integer integer, String s) {
                return integer+1;
            }
        })
           //实现Last
//        values.reduce("", new Func2<String, String, String>() {
//            @Override
//            public String call(String s, String s2) {
//                return s2;
//            }
//        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Count:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Count:"+e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Count:"+integer);
                    }
                });

结果:

Count:3
Count: Complete!

上面示例中的 accumulator 参数为
new Func2<String, String, String>(),其中call(integer,s)需要两个参数 integer和s, 当第一个数据从 源 Observable 发射出来的时候,由于 call(integer,s)还没有调用过,所以使用 初始值 0 来替代 integer,使用第一个字符串“Rx” 来调用 accumulator (也就是new Func2()),这样 call(integer,s)返回的值就是 integer+ 1 (而 integer为初始值 0 ,所以返回 1, 可以看到 这个 s参数 为源 Observable 的值在这里是没有用的);这样 源Observable 每次发射一个数据,accumulator 就把上一次的结果加1 返回。和 count 的功能一样。

对于前面只返回一个数据结果的操作函数,大部分都可以通过 reduce 来实现。对于那些 源 Observable 没有完成就返回的操作函数来说,是不能使用 reduce 来实现的。所以 可以用 reduce 来实现 last,但是用 reduce 实现的 all 函数和原来的 all 是不太一样的。

Aggregation to collections(把数据聚合到集合中)

使用 reduce 可以把源Observable 发射的数据放到一个集合中:
        Observable<Integer> values = Observable.range(10,5);
        values
                .reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
                        integers.add(integer);
                        return integers;
                    }
                })
                .subscribe(new Observer<ArrayList<Integer>>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log( e.getMessage().toString());
                    }
                    @Override
                    public void onNext(ArrayList<Integer> arrayList) {
                        log(arrayList.toString());
                    }
                });

reduce 的参数初始值为 new ArrayList(), reduce把源Observable 发射的数据添加到这个 List 中。当 源Observable 完成的时候,返回这个 List 对象。
结果:

[10, 11, 12, 13, 14]
Complete!

scan

scan 和 reduce 很像,不一样的地方在于 scan会发射所有中间的结算结果。

public final Observable<T> scan(Func2<T,T,T> accumulator)

技术分享

通过上图可以看到和 reduce 的区别, reduce 只是最后把计算结果发射出来,而 scan 把每次的计算结果都发射出来。

        values.scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        })
//              .takeLast()//实现reduce
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Sum:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Sum:" + e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Sum:" + integer);
                    }
                });

结果:

Sum: 0
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: Complete!
reduce 可以通过 scan 来实现: reduce(acc) = scan(acc).takeLast() 。所以 scan 比 reduce 更加通用。
  • 源 Observable 发射数据,经过 scan 处理后 scan 也发射一个处理后的数据,
  • 所以 scan 并不要求源 Observable 完成发射。
  • 下面示例实现了 查找已经发射数据中的最小值的功能:
        Subject<Integer, Integer> values = ReplaySubject.create();
        values
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Values:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Values:" + e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Values:" + integer);
                    }
                });
        values
                .scan(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return (integer<integer2) ? integer : integer2;
                    }
                })
                .distinctUntilChanged()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Min:Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log("Min:" + e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log("Min:" + integer);
                    }
                });
        values.onNext(2);
        values.onNext(3);
        values.onNext(1);
        values.onNext(4);
        values.onCompleted();

结果:


Values: 2
Min: 2
Values: 3
Values: 1
Min: 1
Values: 4
Values: Completed
Min: Completed

collect

  • 前面提到的,使用 reduce 可以把源Observable 发射的数据放到一个集合中,其实并不太符合 Rx 操作符的原则。
  • 操作符有个原则是不能修改其他对象的状态。
  • 所以符合原则的实现应该是在每次转换中都创建一个新的 ArrayList 对象。

下面是一个符合原则但是效率很低的实现:.

    .reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
                        ArrayList<Integer> newAcc = (ArrayList<Integer>) integers.clone();
                        newAcc.add(integer);
                        return integers;
                    }
                })

上面每一个值都创建一个新对象的性能是无法接受的,需要通过文档说明你没有遵守 Rx 的原则使用不可变对象,避免其他人误解。为此,

  • Rx 提供了一个 collect 函数来实现该功能,该函数使用了一个可变的 accumulator 。
        Observable<Integer> values = Observable.range(10,5);
        values
                .collect(new Func0<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() {
                        return new ArrayList<Integer>();
                    }
                }, new Action2<ArrayList<Integer>, Integer>() {
                    @Override
                    public void call(ArrayList<Integer> arrayList, Integer integer) {
                        arrayList.add(integer);

                    }
                })
                .subscribe(new Action1<ArrayList<Integer>>() {
                    @Override
                    public void call(ArrayList<Integer> integers) {
                        log(integers.toString());
                    }
                });

结果:

[10, 11, 12, 13, 14]

toList

        Observable<Integer> values = Observable.range(10,5);
        values
                .toList()
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        log(integers.toString());
                    }
                });.

结果:

[10, 11, 12, 13, 14]

toSortedList

toSortedList 和前面类似,返回一个排序后的 list,下面是该函数的定义:

public final Observable<java.util.List<T>> toSortedList()
public final Observable<java.util.List<T>> toSortedList(
    Func2<? super T,? super T,java.lang.Integer> sortFunction)

可以使用默认的比较方式来比较对象,也可以提供一个比较参数。该比较参数和 Comparator 接口语义一致。
下面通过一个自定义的比较参数来返回一个倒序排列的整数集合:

        Observable<Integer> values = Observable.range(10,5);
        values
                .toSortedList(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer2 - integer;
                    }
                })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        log(integers.toString());
                    }
                });

结果:

[14, 13, 12, 11, 10]

toMap

toMap 把数据流 T 变为一个 Map<TKey,T>。 该函数有三个重载形式:
public final <K> Observable<java.util.Map<K,T>> toMap(
    Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
    Func1<? super T,? extends K> keySelector,
    Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
    Func1<? super T,? extends K> keySelector,
    Func1<? super T,? extends V> valueSelector,
    Func0<? extends java.util.Map<K,V>> mapFactory)
  • keySelector 功能是从一个值 T 中获取他对应的 key。
  • valueSelector 功能是从一个值 T 中获取需要保存 map 中的值。
  • mapFactory 功能是创建该 map 对象。

来看看一个示例:
有这么一个 Person 对象:

class Person {
    public final String name;
    public final Integer age;
    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

下面的代码使用 Person 的 name 作为 key, Person 作为 map 的value:

        Observable<Person> values = Observable.just(
                new Person("Will", 25),
                new Person("Nick", 40),
                new Person("Saul", 35)
        );
        values
                .toMap(new Func1<Person, String>() {
                    @Override
                    public String call(Person person) {
                        return person.name;
                    }
                })
                .subscribe(new Action1<Map>() {
                    @Override
                    public void call(Map mMap) {
                        log(mMap.toString());
                    }
                });.

结果:

Saul=Person@5280919c, Will=$Person@5280914c, Nick=$Person@52809174}

还可以用 Person 的 age 作为map 的value:

        values
            .toMap(new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.name;
                    }
                }, new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.age;
                    }
                })

结果:

{Saul=35, Will=25, Nick=40}

还可以自定义如何生成这个 map 对象:

        values
            .toMap(new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.name;
                    }
                }, new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.age;
                    }
                }, new Func0<Map<Object, Object>>() {
                    @Override
                    public Map<Object, Object> call() {
                        return new HashMap();
                    }
                })

最后一个参数为工厂函数,每次一个新的 Subscriber 订阅的时候, 都会返回一个新的 map 对象。

toMultimap

通常情况下多个 value 的 key 可能是一样的。 一个 key 可以映射多个 value 的数据结构为
multimap,multimap 的 value 为一个集合。该过程被称之为 “grouping” (分组)。

public final <K> Observable<java.util.Map<K,java.util.Collection<T>>> toMultimap(
    Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
    Func1<? super T,? extends K> keySelector,
    Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
    Func1<? super T,? extends K> keySelector,
    Func1<? super T,? extends V> valueSelector,
    Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
    Func1<? super T,? extends K> keySelector,
    Func1<? super T,? extends V> valueSelector,
    Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory,
    Func1<? super K,? extends java.util.Collection<V>> collectionFactory)

下面是通过 age 来分组 Person 的实现:

        Observable<Person> values = Observable.just(
                new Person("Will", 35),
                new Person("Nick", 40),
                new Person("Saul", 35)
        );
        values
                .toMultimap(
                        new Func1<Person, Object>() {
                            @Override
                            public Object call(Person person) {
                                return person.age;
                            }
                        }, new Func1<Person, Object>() {
                            @Override
                            public Object call(Person person) {
                                return person.name;
                            }
                        })
                .subscribe(new Action1<Map>() {
                    @Override
                    public void call(Map mMap) {
                        log(mMap.toString());
                    }
                });

结果:

{35=[Will, Saul], 40=[Nick]}

toMultimap 的参数和 toMap 类似,最后一个 collectionFactory 参数是用来创建 value 的集合对象的,collectionFactory 使用 key 作为参数,这样你可以根据 key 来做不同的处理。下面示例代码中没有使用这个 key 参数:

        Observable<Person> values = Observable.just(
                new Person("Will", 35),
                new Person("Nick", 40),
                new Person("Saul", 35)
        );

        values
        .toMultimap(
                new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.age;
                    }
                }, new Func1<Person, Object>() {
                    @Override
                    public Object call(Person person) {
                        return person.name;
                }, new Func0<Map<Object, Collection<Object>>>() {
                    @Override
                    public Map<Object, Collection<Object>> call() {
                        return new HashMap();
                    }
                }, new Func1<Object, Collection<Object>>() {
                    @Override
                    public Collection<Object> call(Object o) {
                        return new ArrayList();
                    }
                }) // 没有使用这个 key 参数

注意事项

这些操作函数都有非常有限的用法。这些函数只是用来给初学者把数据收集到集合中使用的,并且内部使用传统的方式来处理数据。这些方式不应该在实际项目中实现,因为他们和使用Rx 的理念并不相符。

groupBy

groupBy 是 toMultimap 函数的 Rx 方式的实现。groupBy 根据每个源Observable 发射的值来计算一个
key, 然后为每个 key 创建一个新的 Observable并把key 一样的值发射到对应的新 Observable 中。

public final <K> Observable<GroupedObservable<K,T>> groupBy(Func1<? super T,? extends K> keySelector)

技术分享

  • 返回的结果为 GroupedObservable。 每次发现一个新的key,内部就生成一个新的 GroupedObservable并发射出来。和普通的 Observable 相比 多了一个 getKey 函数来获取 分组的 key。来自于源Observable中的值会被发射到对应 key 的 GroupedObservable 中。
  • 嵌套的 Observable 导致方法的定义比较复杂,但是提供了随时发射数据的优势,没必要等源Observable 发射完成了才能返回数据。
  • 下面的示例中使用了一堆单词作为源Observable的数据,然后根据每个单词的首字母作为分组的 key,最后把每个分组的 最后一个单词打印出来:
         Observable<String> values = Observable.just(
                "first",
                "second",
                "third",
                "forth",
                "fifth",
                "sixth"
        );
        values.groupBy(new Func1<String, Object>() {
            @Override
            public Object call(String s) {
                return s.charAt(0);
            }
        })
                .subscribe(new Action1<GroupedObservable<Object, String>>() {
                    @Override
                    public void call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
                        objectStringGroupedObservable.last().subscribe(new Action1<String>() {
                            @Override
                            public void call(String s) {
                                log( objectStringGroupedObservable.getKey() +":" + s);
                            }
                        });
                    }
                });
t:third
f:fifth
s:sixth

上面的代码使用了嵌套的 Subscriber,但Rx 功能之一就是为了避免嵌套回调函数,所以下面演示了如何避免嵌套:

         Observable<String> values = Observable.just(
                "first",
                "second",
                "third",
                "forth",
                "fifth",
                "sixth"
        );
        values.groupBy(new Func1<String, Object>() {
            @Override
            public Object call(String s) {
                return s.charAt(0);
            }
        })
             .flatMap(new Func1<GroupedObservable<Object, String>, Observable<?>>() {
                    @Override
                    public Observable<?> call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
                        return objectStringGroupedObservable.last().map(new Func1<String, Object>() {
                            @Override
                            public Object call(String s) {
                                return objectStringGroupedObservable.getKey() +":" + s;
                            }
                        });
                    }
                })
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        log(o.toString());
                    }
                });

结果:

s: sixth
t: third
f: fifth

nest

项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 聚合 - 云在千峰
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET

RxJava----操作符:聚合Observable

原文:http://blog.csdn.net/qq_20198405/article/details/51260739

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!