Asynchronous Call

In certain situations, we hope Dubbo interfaces can be called asynchronously to avoid unnecessary waiting.

Dubbo asynchronous calls can be divided into Provider-side asynchronous calls and Consumer-side asynchronous calls.

  • Consumer-side asynchronous means that after initiating an RPC call, it returns immediately, allowing the calling thread to continue processing other business logic. When the response returns, a callback function is used to notify the consumer of the result.
  • Provider-side asynchronous execution moves blocking business from Dubbo’s internal thread pool to a user-defined thread, avoiding excessive occupation of the Dubbo thread pool, which helps prevent interference between different services.

Below is a working example diagram of consumer asynchronous calls:

/user-guide/images/future.jpg

Provider-side asynchronous execution and Consumer-side asynchronous calls are mutually independent, and you can combine the configurations of both sides in any orthogonal manner.

  • Consumer synchronous - Provider synchronous
  • Consumer asynchronous - Provider synchronous
  • Consumer synchronous - Provider asynchronous
  • Consumer asynchronous - Provider asynchronous

Please refer to the complete example source code demonstrated in this document:

Provider Asynchronous

1 Using CompletableFuture

Interface Definition:

public interface AsyncService {  
    /**  
     * Synchronous call method  
     */  
    String invoke(String param);  
    /**  
     * Asynchronous call method  
     */  
    CompletableFuture<String> asyncInvoke(String param);  
}  

Service Implementation:

@DubboService  
public class AsyncServiceImpl implements AsyncService {  

    @Override  
    public String invoke(String param) {  
        try {  
            long time = ThreadLocalRandom.current().nextLong(1000);  
            Thread.sleep(time);  
            StringBuilder s = new StringBuilder();  
            s.append("AsyncService invoke param:").append(param).append(",sleep:").append(time);  
            return s.toString();  
        }  
        catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
        }  
        return null;  
    }  

    @Override  
    public CompletableFuture<String> asyncInvoke(String param) {  
        // It is recommended to provide a custom thread pool for supplyAsync  
        return CompletableFuture.supplyAsync(() -> {  
            try {  
                // Do something  
                long time = ThreadLocalRandom.current().nextLong(1000);  
                Thread.sleep(time);  
                StringBuilder s = new StringBuilder();  
                s.append("AsyncService asyncInvoke param:").append(param).append(",sleep:").append(time);  
                return s.toString();  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
            return null;  
        });  
    }  
}  

By returning CompletableFuture.supplyAsync(), the business execution has switched from the Dubbo thread to the business thread, avoiding blocking the Dubbo thread pool.

2 Using AsyncContext

Dubbo provides an asynchronous interface AsyncContext similar to Servlet 3.0, which can also achieve Provider-side asynchronous execution without a CompletableFuture signature interface.

Interface Definition:

public interface AsyncService {  
    String sayHello(String name);  
}  

Service Implementation:

public class AsyncServiceImpl implements AsyncService {  
    public String sayHello(String name) {  
        final AsyncContext asyncContext = RpcContext.startAsync();  
        new Thread(() -> {  
            // If you want to use the context, it must be executed on the first line  
            asyncContext.signalContextSwitch();  
            try {  
                Thread.sleep(500);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            // Write back the response  
            asyncContext.write("Hello " + name + ", response from provider.");  
        }).start();  
        return null;  
    }  
}  

Consumer Asynchronous

1 Using CompletableFuture

@DubboReference  
private AsyncService asyncService;  

@Override  
public void run(String... args) throws Exception {  
    // Call asynchronous interface  
    CompletableFuture<String> future1 = asyncService.asyncInvoke("async call request1");  
    future1.whenComplete((v, t) -> {  
        if (t != null) {  
            t.printStackTrace();  
        } else {  
            System.out.println("AsyncTask Response-1: " + v);  
        }  
    });  
    // Two calls do not return in order  
    CompletableFuture<String> future2 = asyncService.asyncInvoke("async call request2");  
    future2.whenComplete((v, t) -> {  
        if (t != null) {  
            t.printStackTrace();  
        } else {  
            System.out.println("AsyncTask Response-2: " + v);  
        }  
    });  
    // Consumer asynchronous call  
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {  
        return asyncService.invoke("invoke call request3");  
    });  
    future3.whenComplete((v, t) -> {  
        if (t != null) {  
            t.printStackTrace();  
        } else {  
            System.out.println("AsyncTask Response-3: " + v);  
        }  
    });  

    System.out.println("AsyncTask Executed before response return.");  
}  

2 Using RpcContext

Configure in the annotation:

@DubboReference(async="true")  
private AsyncService asyncService;  

You can also specify method-level asynchronous configurations:

@DubboReference(methods = {@Method(name = "sayHello", timeout = 5000)})  
private AsyncService asyncService;  

The subsequent calls will be asynchronous:

// This call will immediately return null  
asyncService.sayHello("world");  
// Get the Future reference of the call, which will be notified and set to this Future when the result returns  
CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();  
// Add a callback to the Future  
helloFuture.whenComplete((retValue, exception) -> {  
    if (exception == null) {  
        System.out.println(retValue);  
    } else {  
        exception.printStackTrace();  
    }  
});  

Or, you can also do asynchronous calls this way

CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(  
    () -> {  
        asyncService.sayHello("oneway call request1");  
    }  
);  

future.get();  

Asynchronous calls never wait for a return, and you can also set whether to wait for the message to be sent

  • sent="true" waits for the message to be sent; an exception will be thrown if the message sending fails.
  • sent="false" does not wait for the message to be sent, puts the message into the IO queue, and returns immediately.
@DubboReference(methods = {@Method(name = "sayHello", timeout = 5000, sent = true)})  
private AsyncService asyncService;  

If you just want to be asynchronous and completely ignore the return value, you can configure return="false" to reduce the creation and management cost of Future objects

@DubboReference(methods = {@Method(name = "sayHello", timeout = 5000, return = false)})  
private AsyncService asyncService;  
Last modified September 30, 2024: Translate (22d3d83a3b)