首页 > 其他 > 详细

ObservableTest

时间:2018-12-13 14:05:29      阅读:220      评论:0      收藏:0      [点我收藏+]
package com.test.rxjava;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang3.time.DurationFormatUtils;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

public class ObservableTest {
  
  public static void main(String[] args) {
    ObservableTest test = new ObservableTest();
    CountDownLatch latch = new CountDownLatch(1);
    test.run();
    try {
      latch.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  private void run() {
    LinkedList<Integer> aList = new LinkedList<Integer>();
    for (int i = 0; i < 10000000; i++) {
      aList.add(i);
    }
    Instant start = Instant.now();
    Observable
    //Flowable
    //.fromIterable(aList)
    .create((ObservableOnSubscribe<Integer>) observableEmitter -> {
      //observableEmitter: 发射器
      Integer i = 0;
      while ( true){
          i++;
          System.out.println(i);
          observableEmitter.onNext(i);
      }
  })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.newThread())
    //.filter(i -> i%2==0)
    .subscribe(this::next, throwable -> throwable.printStackTrace(),()-> System.out.println(DurationFormatUtils.formatDurationWords(Duration.between(start, Instant.now()).toMillis(), true, true)));
  }
  private void next(Integer i) {
    try {
      Thread.sleep(100000);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+":"+i);
  }
}

 

ObservableTest

原文:https://www.cnblogs.com/tonggc1668/p/10113641.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!