当前位置: 代码迷 >> 综合 >> RxJava 和 RxAndroid 五(线程调度)
  详细解决方案

RxJava 和 RxAndroid 五(线程调度)

热度:18   发布时间:2023-12-14 01:50:25.0

转自:http://www.cnblogs.com/zhaoyanjun/p/5624395.html

对rxJava不了解的同学可以先看

RxJava 和 RxAndroid 一 (基础)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和内存优化)

RxJava 和 RxAndroid 四(RxBinding的使用)

 

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

Observable

               .create(new Observable.OnSubscribe<String>() {

                   @Override

                   public void call(Subscriber<? super String> subscriber) {

                       Logger.v( "rx_call" , Thread.currentThread().getName()  );

 

                       subscriber.onNext( "dd");

                       subscriber.onCompleted();

                   }

               })

               .map(new Func1<String, String >() {

                   @Override

                   public String call(String s) {

                       Logger.v( "rx_map" , Thread.currentThread().getName()  );

                       return s + "88";

                   }

               })

               .subscribe(new Action1<String>() {

                   @Override

                   public void call(String s) {

                       Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );

                   }

               }) ;

  结果

/rx_call: main           -- 主线程
/rx_map: main        --  主线程
/rx_subscribe: main   -- 主线程

例2

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

  new Thread(new Runnable() {

           @Override

           public void run() {

               Logger.v( "rx_newThread" , Thread.currentThread().getName()  );

               rx();

           }

       }).start();

 

void rx(){

       Observable

               .create(new Observable.OnSubscribe<String>() {

                   @Override

                   public void call(Subscriber<? super String> subscriber) {

                       Logger.v( "rx_call" , Thread.currentThread().getName()  );

 

                       subscriber.onNext( "dd");

                       subscriber.onCompleted();

                   }

               })

               .map(new Func1<String, String >() {

                   @Override

                   public String call(String s) {

                       Logger.v( "rx_map" , Thread.currentThread().getName()  );

                       return s + "88";

                   }

               })

               .subscribe(new Action1<String>() {

                   @Override

                   public void call(String s) {

                       Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );

                   }

               }) ;

 

   }

 

      结果

/rx_newThread: Thread-564   -- 子线程
/rx_call: Thread-564              -- 子线程
/rx_map: Thread-564            -- 子线程 
/rx_subscribe: Thread-564    -- 子线程

 

  • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程(上述说法不完全准确,补充研究:https://blog.csdn.net/rnZuoZuo/article/details/83830391

 

例3

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

Observable

               .create(new Observable.OnSubscribe<String>() {

                   @Override

                   public void call(Subscriber<? super String> subscriber) {

                       Logger.v( "rx_call" , Thread.currentThread().getName()  );

 

                       subscriber.onNext( "dd");

                       subscriber.onCompleted();

                   }

               })

 

               .subscribeOn(Schedulers.io())

               .observeOn(AndroidSchedulers.mainThread())

 

               .map(new Func1<String, String >() {

                   @Override

                   public String call(String s) {

                       Logger.v( "rx_map" , Thread.currentThread().getName()  );

                       return s + "88";

                   }

               })

               .subscribe(new Action1<String>() {

                   @Override

                   public void call(String s) {

                       Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );

                   }

               }) ;

  结果

/rx_call: RxCachedThreadScheduler-1    --io线程
/rx_map: main                                     --主线程
/rx_subscribe: main                              --主线程

 

例4

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

Observable

               .create(new Observable.OnSubscribe<String>() {

                   @Override

                   public void call(Subscriber<? super String> subscriber) {

                       Logger.v( "rx_call" , Thread.currentThread().getName()  );

 

                       subscriber.onNext( "dd");

                       subscriber.onCompleted();

                   }

               })

               .map(new Func1<String, String >() {

                   @Override

                   public String call(String s) {

                       Logger.v( "rx_map" , Thread.currentThread().getName()  );

                       return s + "88";

                   }

               })

 

               .subscribeOn(Schedulers.io())

               .observeOn(AndroidSchedulers.mainThread())

 

               .subscribe(new Action1<String>() {

                   @Override

                   public void call(String s) {

                       Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );

                   }

               }) ; 

      结果

/rx_call: RxCachedThreadScheduler-1     --io线程
/rx_map: RxCachedThreadScheduler-1   --io线程
/rx_subscribe: main                              --主线程

   

  • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
  • 对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

       事件消费:默认运行在当前线程,可以有observeOn() 自定义

 

例5  多次切换线程

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

Observable

                .create(new Observable.OnSubscribe<String>() {

                    @Override

                    public void call(Subscriber<? super String> subscriber) {

                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

 

                        subscriber.onNext( "dd");

                        subscriber.onCompleted();

                    }

                })

 

                .observeOn( Schedulers.newThread() )    //新线程

 

                .map(new Func1<String, String >() {

                    @Override

                    public String call(String s) {

                        Logger.v( "rx_map" , Thread.currentThread().getName()  );

                        return s + "88";

                    }

                })

 

                .observeOn( Schedulers.io() )      //io线程

 

                .filter(new Func1<String, Boolean>() {

                    @Override

                    public Boolean call(String s) {

                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );

                        return s != null ;

                    }

                })

 

                .subscribeOn(Schedulers.io())     //定义事件产生线程:io线程

                .observeOn(AndroidSchedulers.mainThread())     //事件消费线程:主线程

 

                .subscribe(new Action1<String>() {

                    @Override

                    public void call(String s) {

                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );

                    }

                }) ;

  结果

/rx_call: RxCachedThreadScheduler-1           -- io 线程
/rx_map: RxNewThreadScheduler-1             -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2        -- io线程
/rx_subscribe: main                                   -- 主线程

 

例6:只规定了事件产生的线程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

Observable

         .create(new Observable.OnSubscribe<String>() {

             @Override

             public void call(Subscriber<? super String> subscriber) {

                 Log.v( "rx--create " , Thread.currentThread().getName() ) ;

                 subscriber.onNext( "dd" ) ;

             }

         })

         .subscribeOn(Schedulers.io())

         .subscribe(new Action1<String>() {

             @Override

             public void call(String s) {

                 Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;

             }

         }) ;

  结果

/rx--create: RxCachedThreadScheduler-4                      // io 线程
/rx--subscribe: RxCachedThreadScheduler-4                 // io 线程

     

例:7:只规定事件消费线程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

Observable

               .create(new Observable.OnSubscribe<String>() {

                   @Override

                   public void call(Subscriber<? super String> subscriber) {

                       Log.v( "rx--create " , Thread.currentThread().getName() ) ;

                       subscriber.onNext( "dd" ) ;

                   }

               })

               .observeOn( Schedulers.newThread() )

               .subscribe(new Action1<String>() {

                   @Override

                   public void call(String s) {

                       Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;

                   }

               }) ;

  结果

/rx--create: main                                           -- 主线程
/rx--subscribe: RxNewThreadScheduler-1        --  new 出来的子线程 

      

    从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。

    从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。

 

例8:线程调度封装

 在Android 常常有这样的场景,后台处理处理数据,前台展示数据。

一般的用法:

1

2

3

4

5

6

7

8

9

Observable

             .just( "123" )

             .subscribeOn( Schedulers.io())

             .observeOn( AndroidSchedulers.mainThread() )

             .subscribe(new Action1() {

                 @Override

                 public void call(Object o) {

                 }

             }) ;

  但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。

简单的封装

1

2

3

4

public Observable apply( Observable observable ){

   return observable.subscribeOn( Schedulers.io() )

            .observeOn( AndroidSchedulers.mainThread() ) ;

}

使用

1

2

3

4

5

6

7

apply( Observable.just( "123" ) )

              .subscribe(new Action1() {

                  @Override

                  public void call(Object o) {

 

                  }

              }) ;

弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。

改进:Transformers 的使用(就是转化器的意思,把一种类型的Observable转换成另一种类型的Observable )

改进后的封装

1

2

3

4

5

6

Observable.Transformer schedulersTransformer = new  Observable.Transformer() {

    @Override public Object call(Object observable) {

        return ((Observable)  observable).subscribeOn(Schedulers.newThread())

                .observeOn(AndroidSchedulers.mainThread());

    }

};

  使用

1

2

3

4

5

6

7

8

Observable

          .just( "123" )

          .compose( schedulersTransformer )

          .subscribe(new Action1() {

              @Override

              public void call(Object o) {

              }

          }) ;

  弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulersTransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。

改进后的封装

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

package lib.app.com.myapplication;

 

import rx.Observable;

import rx.android.schedulers.AndroidSchedulers;

import rx.schedulers.Schedulers;

 

/**

 * Created by ${zyj} on 2016/7/1.

 */

public class RxUtil {

 

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {

        @Override public Object call(Object observable) {

            return ((Observable)  observable).subscribeOn(Schedulers.newThread())

                    .observeOn(AndroidSchedulers.mainThread());

        }

    };

 

   public static  <T> Observable.Transformer<T, T> applySchedulers() {

        return (Observable.Transformer<T, T>) schedulersTransformer;

    }

 

}

  使用

1

2

3

4

5

6

7

8

Observable

            .just( "123" )

            .compose( RxUtil.<String>applySchedulers() )

            .subscribe(new Action1() {

                @Override

                public void call(Object o) {

                }

            }) ;