当前位置: 代码迷 >> Android >> RxJava在Android中使用札记
  详细解决方案

RxJava在Android中使用札记

热度:107   发布时间:2016-04-24 11:21:54.0
RxJava在Android中使用笔记

使用RxJava可以方便实现观察者模式,数据转换和线程之间通信

https://github.com/ReactiveX/RxJava

在Android中使用RxAndroid增加安卓主线程支持:

https://github.com/ReactiveX/RxAndroid

实际开发中在gradle中注册对应的依赖即可:

compile 'io.reactivex:rxandroid:1.1.0'

?观察者模式一般要有两个对象:

1.被观察者:Obserable

2.观察者:Observer

观察者和被观察者绑定后,被观察者在某种事件发生时会向所有观察它的观察者发送事件,即调用观察者的回调函数。

?

使用java.util里的观察者应该是这样的:

?

import android.util.Log;import java.util.Observable;import java.util.Observer;public class ObserverTest {    void test0() {        //创建被观察者        DemoObserable obserable = new DemoObserable();        //创建一个观察者        Observer observer1 = new Observer() {            //观察者接收到事件时的操作            @Override            public void update(Observable observable, Object data) {                Log.e("TAG", "ToObserver1: " + data);            }        };        //添加绑定        obserable.addObserver(observer1);        //被观察者发送事件        obserable.subscribe("Hello World!!");        //被观察者将观察者移除        obserable.deleteObserver(observer1);    }    //被监听者    public class DemoObserable extends Observable {        //被监听者发送数据给所有观察者        void subscribe(String data) {            setChanged();            notifyObservers(data);        }    }}

?假设观察者有多个,可以依次注册,删除:

?

//创建被观察者        DemoObserable obserable = new DemoObserable();        //创建一个观察者        Observer observer1 = new Observer() {            //观察者接收到事件时的操作            @Override            public void update(Observable observable, Object data) {                Log.e("TAG", "ToObserver1: " + data);            }        };        //创建一个观察者        Observer observer2 = new Observer() {            //观察者接收到事件时的操作            @Override            public void update(Observable observable, Object data) {                Log.e("TAG", "ToObserver2: " + data);            }        };        //添加绑定        obserable.addObserver(observer1);        obserable.addObserver(observer2);
然而,我们接下来使用 rx.Observer,?rx.Observable, 思想也是类似的:
1.观察者与被观察者
void test0() {        Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                subscriber.onNext("hello world!!");                subscriber.onCompleted();            }        };        Observable<String> observable = Observable.create(onSubscribe);        Observer<String> observer = new Observer<String>() {            @Override            public void onNext(String s) {                Log.e("TAG", s);            }            @Override            public void onCompleted() {                Log.e("TAG", "completed");            }            @Override            public void onError(Throwable e) {                Log.e("TAG", "error" + e);            }        };        observable.subscribe(observer);    }
?完成了类似的功能。
Observable构造方法是保护的,只能通过提供的静态方法创建对象,如Observable.create()
RxJava可以使用链式调用简化代码,所以也可以写成:
Observable.create(new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                subscriber.onNext("hello world!!");                subscriber.onCompleted();            }        }).subscribe(new Observer<String>() {            @Override            public void onNext(String s) {                Log.e("TAG", s);            }            @Override            public void onCompleted() {                Log.e("TAG", "completed");            }            @Override            public void onError(Throwable e) {                Log.e("TAG", "error" + e);            }        });
?如果直接知道被监听者发送事件onNext()时的参数,可以使用just()来创建Observable,效果和上例一样
Observable.just("Hello World!")                .subscribe(new Observer<String>() {                    @Override                    public void onCompleted() {                        Log.e("TAG", "complete");                    }                    @Override                    public void onError(Throwable e) {                    }                    @Override                    public void onNext(String s) {                        Log.e("TAG", s);                    }                });
?just函数可以传入多个参数,事件依次调用后最后执行onComplete()
Observable.just("Hello World!", "RxJava Demo")                .subscribe(new Observer<String>() {                    @Override                    public void onCompleted() {                        Log.e("TAG", "complete");                    }                    @Override                    public void onError(Throwable e) {                    }                    @Override                    public void onNext(String s) {                        Log.e("TAG", s);                    }                });
?如果观察者只需要重写onNext()方法,可以直接用ActionX来节省掉onError和onComplete的代码:
Observable.just("Hello World!", "RxJava Demo")                .subscribe(new Action1<String>() {                    @Override                    public void call(String s) {                        Log.e("TAG", s);                    }                });
?现在观察者也变的好简单,使用多个参数just()来创建也可以用from(T[])来创建,与上例等价:
String[] arr = {"Hello World!", "RxJava Demo"};        Observable.from(arr)                .subscribe(new Action1<String>() {                    @Override                    public void call(String s) {                        Log.e("TAG", s);                    }                });
?如果不想用Observer,又想处理onError和onComplete,就需要在subscribe()函数传多个ActionX:
String[] arr = {"Hello World!", "RxJava Demo"};        Action1 nextAction, errorAction;        Action0 completeAction;        nextAction = new Action1<String>() {            @Override            public void call(String s) {                Log.e("TAG", s);            }        };        errorAction = new Action1<Exception>() {            @Override            public void call(Exception e) {                e.printStackTrace();            }        };        completeAction = new Action0() {            @Override            public void call() {                Log.e("TAG", "complete");            }        };        Observable.from(arr)                .subscribe(nextAction, errorAction, completeAction);
?2.观察者与被观察者在不同线程
加上线程控制语句subscribeOn,observeOn,就可以将被观察者和观察者指定在不同线程中:
Observable.create(new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                Log.e("TAG", "I am in computation");                subscriber.onNext("Hello World!!");            }        })                .subscribeOn(Schedulers.computation())                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Action1<String>() {                    @Override                    public void call(String s) {                        Log.e("TAG", s);                    }                });
?两个操作发生在不同线程中,通过log可以看出来:
03-29 21:00:39.333 13153-13226/com.hzy.rxjavademo E/TAG: I am in computation03-29 21:00:39.334 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!03-29 21:00:39.914 13153-13227/com.hzy.rxjavademo E/TAG: I am in computation03-29 21:00:39.915 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!03-29 21:00:40.607 13153-13228/com.hzy.rxjavademo E/TAG: I am in computation03-29 21:00:40.608 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!
?通过日志pid可以看到,Observable执行会开启新的线程,而Observer观察者收到消息后执行的操作在UI线程执行,所以可以方便的替代之前Android的new Thread加Handler发送消息的机制。
如下代码就可以完成一个完整的异步网络请求,并把结果和出错信息通知到UI线程的观察者:
Observable.create(new Observable.OnSubscribe<String>() {            @Override            public void call(Subscriber<? super String> subscriber) {                try {                    URL url = new URL("https://m.baidu.com/");                    InputStream is = url.openStream();                    InputStreamReader isr = new InputStreamReader(is);                    BufferedReader br = new BufferedReader(isr);                    String line;                    StringBuilder builder = new StringBuilder();                    while ((line = br.readLine()) != null) {                        builder.append(line);                    }                    subscriber.onNext(builder.toString());                } catch (Exception e) {                    subscriber.onError(e);                }            }        })                .subscribeOn(Schedulers.io())                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Action1<String>() {                    @Override                    public void call(String s) {                        Log.e("TAG", s);                    }                }, new Action1<Throwable>() {                    @Override                    public void call(Throwable throwable) {                        throwable.printStackTrace();                    }                });
?异步操作变得好简单,而且很容易理解
3.数据的过滤
数据过滤使用filter方法:
String[] arr = {"Hello World!", "RxJava Demo", "RxAndroid"};        Observable.from(arr)                .filter(new Func1<String, Boolean>() {                    @Override                    public Boolean call(String s) {                        return s.startsWith("Rx");                    }                })                .subscribe(new Action1<String>() {                    @Override                    public void call(String s) {                        Log.e("TAG", s);                    }                });
?这时,如例子中所有返回以Rx开头字符串的事件,才会去通知观察者。
4.数据的转换
消息发送者返回的数据用map()进行转化后再发送给观察者
Observable.just(123)                .map(new Func1<Integer, String>() {                    @Override                    public String call(Integer integer) {                        String ret = integer + ":" + integer * 2;                        return ret;                    }                }).subscribe(new Action1<String>() {            @Override            public void call(String s) {                Log.e("TAG", s);            }        });
?把消息发送者发来的 Integer类型转成String类型再传给观察者去处理。
使用lift()把一个观察者转成另一个:
Observable.just(123)                .lift(new Observable.Operator<String, Integer>() {                    @Override                    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {                        return new Subscriber<Integer>() {                            @Override                            public void onCompleted() {                                subscriber.onCompleted();                            }                            @Override                            public void onError(Throwable e) {                                subscriber.onError(e);                            }                            @Override                            public void onNext(Integer integer) {                                subscriber.onNext("hello" + integer);                            }                        };                    }                }).subscribe(new Action1<String>() {            @Override            public void call(String s) {                Log.e("TAG", s);            }        });
?把一个Integer类型的Subscriber转成String类型,Subscriber实现Observer接口,所以Subscriber也是观察者。

使用compose直接把被监听者转换掉,Transformer规定了转换的规则,Obserable怎么转换呢,还是用个map来转吧,Transformer在此显得多此一举了:

Observable.just(123)                .compose(new Observable.Transformer<Integer, String>() {                    @Override                    public Observable<String> call(Observable<Integer> integerObservable) {                        return integerObservable.map(new Func1<Integer, String>() {                            @Override                            public String call(Integer integer) {                                return integer + "hello";                            }                        });                    }                }).subscribe(new Action1<String>() {            @Override            public void call(String s) {                Log.e("TAG", s);            }        });

?把Integer类型转换成String类型

?

5.条件判断

条件判断的Observable范型都会被转换成Boolean类型

判断是否存在某种类型用exists()函数

Observable.just(123, 20, 34, 4, 0, 2)                .exists(new Func1<Integer, Boolean>() {                    @Override                    public Boolean call(Integer integer) {                        return integer == 0;                    }                })                .subscribe(new Action1<Boolean>() {                    @Override                    public void call(Boolean aBoolean) {                        Log.e("TAG", "" + aBoolean);                    }                });

判断那些值中是否存在为0的Integer,显然存在,返回true

用all()判断是否全都是某种情况:

Observable.just(123, 20, 34, 4, 0, 2)                .all(new Func1<Integer, Boolean>() {                    @Override                    public Boolean call(Integer integer) {                        return integer > 0;                    }                })                .subscribe(new Action1<Boolean>() {                    @Override                    public void call(Boolean aBoolean) {                        Log.e("TAG", "" + aBoolean);                    }                });

上面那些值全都大于0么,显然不是,返回false。

?

6.观察者嵌套

有一种场景,观察者收到消息后,不能完全处理掉消息,自身又作为消息发送者给其他观察者发消息。

Observable.just(123)                .subscribe(new Action1<Integer>() {                    @Override                    public void call(Integer integer) {                        Observable.just(integer)                                .subscribe(new Action1<Integer>() {                                    @Override                                    public void call(Integer integer) {                                        Log.e("TAG", "" + integer);                                    }                                });                    }                });

?两层嵌套之后缩进已经很长了,为解决这个问题,可以用flatMap()来减少缩进:

Observable.just(123)                .flatMap(new Func1<Integer, Observable<Integer>>() {                    @Override                    public Observable<Integer> call(Integer integer) {                        return Observable.just(integer);                    }                }).subscribe(new Action1<Integer>() {            @Override            public void call(Integer integer) {                Log.e("TAG", "" + integer);            }        });

?这样,缩进好多了。

最后是一个综合实例:
//使用IO线程读取SD卡上所有png文件并在主线程上打印    private void testy() {        Observable.create(new Observable.OnSubscribe<File>() {            @Override            public void call(Subscriber<? super File> subscriber) {                //获取外部存储上的所有文件                File exDir = Environment.getExternalStorageDirectory();                traceFiles(exDir, subscriber);            }        }).subscribeOn(Schedulers.io())                //过滤出png文件                .filter(new Func1<File, Boolean>() {                    @Override                    public Boolean call(File file) {                        return file.isFile() && file.getName().endsWith(".png");                    }                    //使用buffer防止消息发送过快                }).onBackpressureBuffer()                .observeOn(AndroidSchedulers.mainThread())                .subscribe(new Action1<File>() {                    @Override                    public void call(File file) {                        Log.e("TAG", file.getAbsolutePath());                    }                }, new Action1<Throwable>() {                    @Override                    public void call(Throwable throwable) {                        Log.e("TAG", "Error:" + throwable);                        throwable.printStackTrace();                    }                });    }    private void traceFiles(File file, Subscriber<? super File> subscriber) {        subscriber.onNext(file);        if (file.isDirectory()) {            for (File f : file.listFiles()) {                traceFiles(f, subscriber);            }        }    }
??
结束
  相关解决方案