当前位置: 代码迷 >> 综合 >> vert.x中future的简单使用
  详细解决方案

vert.x中future的简单使用

热度:26   发布时间:2023-12-17 00:09:53.0

文章目录

    • 需要知道vert.x的基础知识,和vertx单侧方式
    • 初始化:
    • future完成(成功或者失败(异常))
    • 设置回调
    • 举例
    • tryComplete方法
    • map的使用
    • compose方法,最后讲
    • 源码

需要知道vert.x的基础知识,和vertx单侧方式

初始化:

package io.vertx.core;
直接看源码:
在这里插入图片描述
这几个方法是静态的方法,一个是传handler,一个成功返回,失败返回,用的比较多的就是直接初始化 Future.future();这时候返回的future是没有完成的,也没有处理器,比较灵活.
在这里插入图片描述

future完成(成功或者失败(异常))

在这里插入图片描述
就是对初始化的future进行complete或者fail操作,这时候future就是完成的.完成之后就要对返回的数据进行处理了,这时候就要设置回调方法了

设置回调

  @FluentFuture<T> setHandler(Handler<AsyncResult<T>> handler);

泛型是封装了返回的泛型的AsyncResult是异步结果,因为可能成功,可能失败,所以回调要进行判断一下,然后通过succeeded方法来判断成功怎么办,失败怎么办;
在这里插入图片描述

举例

写一个简单的例子:

@Testpublic void testsetHandler(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);}else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:"+cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");}private void asynchronousMethod(Future<String> f) {
    CompletableFuture.runAsync(() -> {
    System.out.println("asynchronousMethod...");try {
    TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {
    e.printStackTrace();}f.complete("haha");});}

一般是先执行其他操作,然后在设置Handler,都是一样的…
回调的AsyncResult有成功还是失败,如果失败则应该fail(exception),这样在返回的时候我们也可以获取的到:
在这里插入图片描述
执行的结果:

在这里插入图片描述

tryComplete方法

tryComplete方法顾名思义,就是尝试成功,如果回调还没有执行完,然后被tryComplete执行了,那就返回true,如果,回调已经执行完了,那就返回false.这个比较简单,感觉使用的也少;

  @Testpublic void testTryComplete(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);} else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:" + cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");//不进行sleep,返回的就是ccc
// try {
    
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
    
// e.printStackTrace();
// }boolean b = future.tryComplete("ccc");System.out.println(b);}

执行结果:
在这里插入图片描述

map的使用

map方法比较简单,就是返回新的数据类型, 举个例子,好比一开始返回的是haha,然后要返回haha + “aaa”;你懂我的意思,就是要对原先返回的进行处理一下在返回:可以是其他数据类型,为了测试简单,我们就使用String,返回也是:

 @Testpublic void testMap(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);} else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:" + cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");try {
    TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {
    e.printStackTrace();}Future<String> map = future.map(x -> {
    System.out.println(x);return "fuction...return" + x;});map.setHandler(x -> {
    System.out.println("map result :" + x.result());async.complete();});}

执行的结果是:
在这里插入图片描述
这时候看到第一个handler是没有执行的,如果,我们使用map,尽量就不要设置handler了,因为我实在找不出理由,你想对返回结果进行转换还要进行处理,这样就会出现问题,如果,我们在执行map之前时间很长,那么两个handler都会执行了:
在这里插入图片描述
所以,使用map,感觉就可以直接在map里面进行统一处理即可;
map还要其他两个方法,也比较简单,就是直接返回一个新的数据类型,对原来的数据还没有进行处理;
在这里插入图片描述
mapEmpty返回的是null,mapV是返回的v,
源码:就是新建一个Future然后进行处理一下返回,比较简单;
在这里插入图片描述
在这里插入图片描述

compose方法,最后讲

compose方法应该相当于链式调用,我想在操作1执行完成之后执行操作2
第一种方式:
compose源码:

default <U> Future<U> compose(Handler<T> handler, Future<U> next) {
    setHandler(ar -> {
    if (ar.succeeded()) {
    try {
    handler.handle(ar.result());} catch (Throwable err) {
    if (next.isComplete()) {
    throw err;}next.fail(err);}} else {
    next.fail(ar.cause());}});return next;}

传了两个参数,第一个是执行compose的handler,第二个参数是下一个future,然后将下一个future返回回去,写个demo:

 @Testpublic void testCompose(TestContext context) {
    Future<String> f1 = Future.future();Future<Integer> f2 = Future.future();f1.complete("f1's result");// f1.setHandler(x -> {
    
// System.out.println("f1 handler:" + x);
// f2.complete(123);
// });
// f2.setHandler(x -> {
    
// System.out.println("f2 handler:" + x);
// });f1.compose(r -> {
    System.out.println("f1 handler:" + r);f2.complete(123);}, f2).setHandler(r -> {
    System.out.println("f2 handler:" + r.result());});}

结果:
在这里插入图片描述
f1执行完,在f1的handler里面需要将f2给完成掉.
第二种componse源码:

default <U> Future<U> compose(Function<T, Future<U>> mapper) {
    if (mapper == null) {
    throw new NullPointerException();}Future<U> ret = Future.future();setHandler(ar -> {
    if (ar.succeeded()) {
    Future<U> apply;try {
    apply = mapper.apply(ar.result());} catch (Throwable e) {
    ret.fail(e);return;}apply.setHandler(ret);} else {
    ret.fail(ar.cause());}});return ret;}

写一个跟上面一样的测试:

@Testpublic void testCompose2(TestContext context) {
    Future<String> f1 = Future.future();f1.complete("f1's result");f1.compose(r -> {
    System.out.println("f1 hander :"+r);Future<String> f2 = Future.future();f2.complete("f2's result");//返回的f2,下一个componse的执行者return f2;}).compose(r -> {
    System.out.println("f2 hander :"+r);Future<String> f3 = Future.future();f3.complete("f3's result");//返回的f3,setHandlerreturn f3;}).setHandler(r -> {
    System.out.println("f3 hander :"+r);});}

执行结果:
在这里插入图片描述
compose实现方法太绕了,没怎么看得懂…个人理解,就是在compose方法里面相当于是执行回调方法,然后执行完成返回下一个要执行的future…可以看一下别哥们写的:
https://zhuanlan.zhihu.com/p/35980063

源码

package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.file.FileSystem;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.test.core.TestVerticle;import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;/*** @author* @Version 2020-04-19 14:09* @Version 1.0* @Description VertxTest*/
@RunWith(VertxUnitRunner.class)
public class VertxTest {
    private Vertx vertx;private FileSystem fs;@Beforepublic void setUp(TestContext context) {
    vertx = Vertx.vertx();fs = vertx.fileSystem();vertx.deployVerticle(new TestVerticle());}@Afterpublic void after(TestContext context) {
    System.out.println("after ...");vertx.close(context.asyncAssertSuccess());}private void synchronousMethod(Future<String> f) {
    CompletableFuture.runAsync(() -> {
    System.out.println("asynchronousMethod...");try {
    TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {
    e.printStackTrace();}f.complete("haha");});}private void asynchronousMethod(Future<String> f) {
    CompletableFuture.runAsync(() -> {
    System.out.println("asynchronousMethod...");try {
    TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {
    e.printStackTrace();}f.complete("haha");
// f.fail(new NullPointerException("aa"));});}@Testpublic void testsetHandler(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);} else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:" + cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");}@Testpublic void testTryComplete(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);} else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:" + cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");//不进行sleep,返回的就是ccc
// try {
    
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
    
// e.printStackTrace();
// }boolean b = future.tryComplete("ccc");System.out.println(b);}@Testpublic void testMap(TestContext context) {
    Async async = context.async();Future<String> future = Future.future();future.setHandler(r -> {
    if (r.succeeded()) {
    //成功执行String result = r.result();System.out.println("result is:" + result);} else {
    //失败执行Throwable cause = r.cause();System.out.println("异常:" + cause);}async.complete();});//异步执行其他操作,操作完成执行future.complete方法.asynchronousMethod(future);System.out.println("handler设置完成...");try {
    TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {
    e.printStackTrace();}Future<Object> objectFuture = future.mapEmpty();System.out.println(objectFuture);Future<String> map = future.map(x -> {
    System.out.println(x);return "fuction...return" + x;});map.setHandler(x -> {
    System.out.println("map result :" + x.result());async.complete();});}// public static void handleAddCollection(RoutingContext routingContext){
    
// Future<Void> futn=Future.future();
// futn.setHandler(a->{
    
// System.out.println("4最后一步:");
// sendData(routingContext,"ok");
// return;
// });
// Future<UpdateResult> fut1 = Future.future();
// JsonArray params=new JsonArray().add("123").add("测试标题").add("http://baidu.com").add("");
// String sql="insert into mytable (userid,title,url,pic) values (?,?,?,?)";
// myDBConnecton.updateWithParams(sql,params,fut1.completer());
// fut1.compose(v-> {
    
// if(v.getUpdated()>0)System.out.println("2 插入成功了");
// else System.out.println("2 插入失败了");
// Future<ResultSet> fut2 = Future.future();
// myDBConnecton.query("select * from mytable", fut2.completer());
// return fut2;
// }).compose(v -> {
    
// List<JsonObject> data=v.getRows();
// System.out.println("3查询结果为:"+data.toString());
// Future<Void> fut3 = Future.future();
// myDBConnecton.updateWithParams(sql,params,fut1.completer());
// futn.complete();
// },futn);
// }@Testpublic void testCompose(TestContext context) {
    Future<String> f1 = Future.future();Future<Integer> f2 = Future.future();f1.complete("f1's result");// f1.setHandler(x -> {
    
// System.out.println("f1 handler:" + x);
// f2.complete(123);
// });
// f2.setHandler(x -> {
    
// System.out.println("f2 handler:" + x);
// });f1.compose(r -> {
    System.out.println("f1 handler:" + r);f2.complete(123);}, f2).setHandler(r -> {
    System.out.println("f2 handler:" + r.result());});}@Testpublic void testCompose2(TestContext context) {
    Future<String> f1 = Future.future();f1.complete("f1's result");f1.compose(r -> {
    System.out.println("f1 hander :"+r);Future<String> f2 = Future.future();f2.complete("f2's result");//返回的f2,下一个componse的执行者return f2;}).compose(r -> {
    System.out.println("f2 hander :"+r);Future<String> f3 = Future.future();f3.complete("f3's result");//返回的f3,setHandlerreturn f3;}).setHandler(r -> {
    System.out.println("f3 hander :"+r);});}@Testpublic void test2(TestContext context) {
    Async async = context.async();vertx.executeBlocking(future -> {
    // 调用一些需要耗费显著执行时间返回结果的阻塞式APItry {
    TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {
    e.printStackTrace();}future.complete("complete...over...");}, res -> {
    System.out.println("The result is: " + res.result());async.complete();});}}
  相关解决方案