问题描述
我正在将AsyncTaskLoader
迁移到RxJava的过程中,试图了解有关RxJava并发方法的所有详细信息。
简单的事情运行正常,但是我在下面的代码中苦苦挣扎:
这是执行的顶级方法:
mCompositeDisposable.add(mDataRepository
.getStuff()
.subscribeOn(mSchedulerProvider.io())
.subscribeWith(...)
mDataRepository.getStuff()看起来像这样:
public Observable<StuffResult> getStuff() {
return mDataManager
.listStuff()
.flatMap(stuff -> Observable.just(new StuffResult(stuff)))
.onErrorReturn(throwable -> new StuffResult(null));
最后一层:
public Observable<Stuff> listStuff() {
Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName());
String sql = <...>;
return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql).mapToList(mStuffMapper);
}
因此,使用上面的代码, log
将打印出.listStuff() - main
,这与我要查找的不完全相同。
我不太确定为什么。
我的印象是,通过设置subscribeOn
,将从链中拉出的每个事件都将在subscribeOn方法中指定的线程上进行处理。
我认为正在发生的事情是,到达mBriteDatabase
之前的源mBriteDatabase
最终层代码不是来自RxJava世界,因此在调用createQuery
之前不是事件。
所以我可能需要某种包装纸?
我尝试应用.fromCallable
,但是这是非Rx代码的包装,我的数据库层返回了一个可观察的...
1楼
您的Log.d
通话发生
-
立即调用
listStuff
- 在getStuff被调用之后
- 这是您向我们展示的顶级代码片段中发生的第一件事。
如果需要在订阅发生时执行此操作,则需要明确:
public Observable<Stuff> listStuff() {
String sql = <...>;
return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql)
.mapToList(mStuffMapper)
.doOnsubscribe(() -> Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName()));
}