当前位置: 代码迷 >> 综合 >> RxJava 和 RxAndroid 五(线程调度)补充研究
  详细解决方案

RxJava 和 RxAndroid 五(线程调度)补充研究

热度:70   发布时间:2023-12-14 01:39:13.0

线程调度文章:https://blog.csdn.net/rnZuoZuo/article/details/83830391

研究结论:默认情况下,不指定生产者线程和消费者线程,运行在当前线程,

但是如果emitter在发射的时候放在一个新的线程里,那么加工线程和消费线程保持和这个新的线程一致

测试代码:

public class MainDemo {public static void main(String[] args){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {emitter.onNext(1);emitter.onComplete();}}).start();System.out.println( "rx_call_1:" + Thread.currentThread().getName()  );}}). map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer cmsResponse) throws Exception {System.out.println( "rx_call_map:" + Thread.currentThread().getName()  );return 2;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {System.out.println( "rx_call_2:" + Thread.currentThread().getName()  );}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}
}

 

下面虽然指定里生产者线程 subscribeOn(Schedulers.io()),但是加工线程和消费线程还是跟随emiter.onnext执行的线程

 

public class MainDemo {public static void main(String[] args){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {emitter.onNext(1);emitter.onComplete();}}).start();System.out.println( "rx_call_1:" + Thread.currentThread().getName()  );}}). map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer cmsResponse) throws Exception {System.out.println( "rx_call_map:" + Thread.currentThread().getName()  );return 2;}}) .subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {System.out.println( "rx_call_2:" + Thread.currentThread().getName()  );}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}
}