当前位置: 代码迷 >> Android >> 将RxJava2 Db查询到另一个主题
  详细解决方案

将RxJava2 Db查询到另一个主题

热度:81   发布时间:2023-08-04 12:08:28.0

在我的Android项目中,我严重依赖 , (带有RxJavaInterop)和 。

我得到了一个应该无限期运行的rx流(直到我的服务停止),在它上面我有一个.flatMap Function<String, ObservableSource<Action>>

意思是,这个flatMap包含一个Subject<Action> ,将接收String actionId ,对那些actionId进行一些处理(与问题无关),并根据条件向数据库查询Action对象并将其分派给subject

我的第一种方法是直接执行查询:

Cursor c = db.query(...);
if(c.moveFirst()) {
    Action a = Action.SELECT_ALL_MAPPER.map(c);
    subject.onNext(selectAll);
}

但这阻塞了正在运行的线程,我宁愿在自己的流上触发它,该流应执行以下操作:

  • 查询(应返回0或1个项目)
  • 如果有值:映射到Action对象,然后将值推送到subject
  • 如果没有价值:终止/处置。
  • subject无法接收终止或错误。 它必须为将来的事件而活着。

我当前的方法是以下代码:

RxJavaInterop.toV2Observable(db.createQuery(
    Action.TABLE_NAME,
    Action.FACTORY.Select_by_id(actionId).statement)
    .mapToOne(new Func1<Cursor, Action>() {
        @Override public Action call(Cursor cursor) {
            return Action.SELECT_ALL_MAPPER.map(cursor);
        }
    }))
    .take(1)
    .subscribe(new Consumer<Action>() {
        @Override public void accept(Action action) throws Exception {
            subject.onNext(action);
        }
    });

尽管在第一印象中这似乎可以解决问题,但我看到了一些错误:

  • 我不能处理 即使获得对Disposable对象的引用,也无法从Consumer<Action>内部调用它,因为它“可能尚未初始化”(据我所知,可以)。
  • 如果没有使用给定ID的操作,则可观察对象将永远挂在那里,直到VM被杀死为止。

所以问题是:

我怎样才能做到这一点?

我宁愿自己触发

看看 。 可能看起来像:

yourRxStream
    .flatMap(*db request here*)
    .subscribeOn(Schedulers.io())
    .subcribe(subject);

subject无法接收终止或错误。 它必须为将来的事件而活着。

用切换主题:

主题对于弥合非Rx API之间的差距很有用。 但是,它们以破坏性的方式处于有状态:当它们接收到onCompleteonError它们将不再可用于移动数据。 这是可观察的契约,有时是期望的行为。 大多数时候不是。

中继只是没有上述属性的主题。 它们使您可以轻松地将非Rx API桥接到Rx中,而不必担心意外触发终端状态。


最后,对于可能输出0或1项的请求,请使用 。