问题描述
在我的Android项目中,我严重依赖 , (带有RxJavaInterop)和 。
我得到了一个应该无限期运行的rx流(直到我的服务停止),在它上面我有一个.flatMap Function<String, ObservableSource<Action>> 。
意思是,这个flatMap包含一个Subject<Action> ,将接收String actionId ,对那些actionId进行一些处理(与问题无关),并根据条件向数据库查询Action对象并将其分派给subject
我的第一种方法是直接执行查询:
Cursor c = db.query(...);
if(c.moveFirst()) {
Action a = Action.SELECT_ALL_MAPPER.map(c);
subject.onNext(selectAll);
}
但这阻塞了正在运行的线程,我宁愿在自己的流上触发它,该流应执行以下操作:
- 查询(应返回0或1个项目)
-
如果有值:映射到
Action对象,然后将值推送到subject - 如果没有价值:终止/处置。
-
subject无法接收终止或错误。 它必须为将来的事件而活着。
我当前的方法是以下代码:
RxJavaInterop.toV2Observable(db.createQuery(
Action.TABLE_NAME,
Action.FACTORY.Select_by_id(actionId).statement)
.mapToOne(new Func1<Cursor, Action>() {
@Override public Action call(Cursor cursor) {
return Action.SELECT_ALL_MAPPER.map(cursor);
}
}))
.take(1)
.subscribe(new Consumer<Action>() {
@Override public void accept(Action action) throws Exception {
subject.onNext(action);
}
});
尽管在第一印象中这似乎可以解决问题,但我看到了一些错误:
-
我不能处理
即使获得对Disposable对象的引用,也无法从
Consumer<Action>内部调用它,因为它“可能尚未初始化”(据我所知,可以)。 - 如果没有使用给定ID的操作,则可观察对象将永远挂在那里,直到VM被杀死为止。
所以问题是:
我怎样才能做到这一点?
1楼
我宁愿自己触发
看看 。 可能看起来像:
yourRxStream
.flatMap(*db request here*)
.subscribeOn(Schedulers.io())
.subcribe(subject);
subject无法接收终止或错误。 它必须为将来的事件而活着。
用切换主题:
主题对于弥合非Rx API之间的差距很有用。 但是,它们以破坏性的方式处于有状态:当它们接收到
onComplete或onError它们将不再可用于移动数据。 这是可观察的契约,有时是期望的行为。 大多数时候不是。中继只是没有上述属性的主题。 它们使您可以轻松地将非Rx API桥接到Rx中,而不必担心意外触发终端状态。
最后,对于可能输出0或1项的请求,请使用 。