当前位置: 代码迷 >> java >> 观察到多个Flowable在单个Scheduler上,事件未按与发出时相同的顺序使用
  详细解决方案

观察到多个Flowable在单个Scheduler上,事件未按与发出时相同的顺序使用

热度:56   发布时间:2023-07-26 13:57:47.0

我正在使用从不同的外部来源/订阅包装到不同的Flowable使用的事件。 来源无关紧要,因为我可以通过一个简单的循环重现该问题。 我有 :

  • 不同的FlowableEmitter(3个足以重现)
  • 发射器的单线程(下面是主线程)
  • 订户的单线程(newSingleThreadExecutor)

这是一个简单的代码重现

    final ExecutorService executor = Executors.newSingleThreadExecutor();
    final Scheduler scheduler = Schedulers.from(executor);

    List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
    for (int i=0; i<3; ++i) {
        final int finalI = i;
        Flowable.create( new FlowableOnSubscribe<Long>(){
            @Override
            public void subscribe(FlowableEmitter<Long> emitter) {
                emitterList.add(emitter);
            }
        },  BackpressureStrategy.MISSING)
                .observeOn(scheduler)
                .subscribe(
                        val -> System.out.println(
                            "[" +Thread.currentThread().getName()
                            + "] Flow:" + finalI 
                            + " > " + Long.toString(val)));
    }

    long state = 0;
    for (int i=0; i<5; ++i) {
        for (FlowableEmitter<Long> emitter: emitterList){
            emitter.onNext(++state);
        }
    }

    executor.shutdown();

我的问题是事件的消耗顺序与发出的顺序不同。 如果我删除了observeOn(scheduler),它可以正常工作,但是我需要在不同的线程上发射器和订阅器。 我还测试了不同的BackpressureStrategy,但无济于事。
任何提示都可以按顺序(1,2,3,4,5 ... 14,15)订阅/打印所有号码的线索,而不是下面的内容

[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15

如果有的话,我正在使用rx-java 2.2.5和Java 8。

observeOn不公平,如果还有更多工作要做,可能会拥抱发射线程。 通过循环发射,一个信号源可能准备好发射更多信号,因此您无法获得完美的交错。 另请注意,通过使用MISSING策略,您很容易出现反压异常。 从扩展项目中尝试 :

final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);

List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
    final int finalI = i;
    Flowable.create( new FlowableOnSubscribe<Long>(){
        @Override
        public void subscribe(FlowableEmitter<Long> emitter) {
            emitterList.add(emitter);
        }
    },  BackpressureStrategy.BUFFER)
            // ---------------------------------------------------------
            .compose(FlowableTransformers.requestObserveOn(scheduler))
            // ---------------------------------------------------------
            .subscribe(
                    val -> System.out.println(
                        "[" +Thread.currentThread().getName()
                        + "] Flow:" + finalI 
                        + " > " + Long.toString(val)));
}

long state = 0;
for (int i=0; i<5; ++i) {
    for (FlowableEmitter<Long> emitter: emitterList){
        emitter.onNext(++state);
    }
}

executor.shutdown();
  相关解决方案