CompletableFuture enables non-blocking service orchestration in Java 8. It is especially useful when an endpoint needs data from multiple downstream services.


Why CompletableFuture Over Sequential Calls

Sequential pattern:

User user = userClient.fetch(userId);
Orders orders = orderClient.fetch(userId);
Recommendations recs = recommendationClient.fetch(userId);

Total latency is roughly the sum of all calls.

With parallel async calls, overall latency is often close to the slowest dependency (plus orchestration overhead).


Core Composition APIs

supplyAsync

CompletableFuture<User> userF = CompletableFuture.supplyAsync(
        () -> userClient.fetch(userId),
        ioExecutor
);

thenApply vs thenCompose

thenApply: synchronous mapping of result.

CompletableFuture<UserDto> dtoF = userF.thenApply(mapper::toDto);

thenCompose: chain async operation returning another future.

CompletableFuture<UserProfile> profileF = userF.thenCompose(
        user -> CompletableFuture.supplyAsync(() -> profileClient.fetch(user.getId()), ioExecutor)
);

thenCombine

CompletableFuture<Response> responseF = userF.thenCombine(
        CompletableFuture.supplyAsync(() -> orderClient.fetch(userId), ioExecutor),
        (user, orders) -> responseMapper.toResponse(user, orders)
);

Real Backend Aggregation Example

public Response aggregate(String userId) {
    CompletableFuture<User> userF = CompletableFuture.supplyAsync(
            () -> userClient.fetch(userId), ioExecutor);

    CompletableFuture<List<Order>> ordersF = CompletableFuture.supplyAsync(
            () -> orderClient.fetchRecentOrders(userId), ioExecutor);

    CompletableFuture<CreditScore> creditF = CompletableFuture.supplyAsync(
            () -> creditClient.fetch(userId), ioExecutor);

    return userF.thenCombine(ordersF, Pair::of)
            .thenCombine(creditF, (pair, credit) -> responseMapper.map(pair.getLeft(), pair.getRight(), credit))
            .join();
}

That shape is useful, but it still feels abstract until you can run it. The two examples below are complete Java 8 programs designed to feel like realistic backend flows.


Runnable Example 1: User Dashboard Aggregation

This example simulates a backend endpoint that needs:

  • user profile
  • recent orders
  • reward points

The important part is that all three remote-style calls start immediately and run in parallel. The endpoint only blocks once, at the very end, when building the final response.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class DashboardAggregationDemo {

    public static void main(String[] args) {
        ExecutorService ioExecutor = Executors.newFixedThreadPool(8);
        DashboardService dashboardService = new DashboardService(ioExecutor);

        long start = System.currentTimeMillis();
        DashboardResponse response = dashboardService.getDashboard("user-42");
        long elapsed = System.currentTimeMillis() - start;

        System.out.println(response);
        System.out.println("Completed in " + elapsed + " ms");

        ioExecutor.shutdown();
    }

    static final class DashboardService {
        private final ExecutorService ioExecutor;
        private final UserClient userClient = new UserClient();
        private final OrderClient orderClient = new OrderClient();
        private final RewardsClient rewardsClient = new RewardsClient();

        DashboardService(ExecutorService ioExecutor) {
            this.ioExecutor = ioExecutor;
        }

        DashboardResponse getDashboard(String userId) {
            CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
                    () -> userClient.fetchUser(userId), ioExecutor);

            CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
                    () -> orderClient.fetchRecentOrders(userId), ioExecutor);

            CompletableFuture<Integer> rewardPointsFuture = CompletableFuture.supplyAsync(
                    () -> rewardsClient.fetchRewardPoints(userId), ioExecutor);

            return userFuture
                    .thenCombine(ordersFuture, UserOrders::new)
                    .thenCombine(rewardPointsFuture,
                            (userOrders, rewardPoints) -> new DashboardResponse(
                                    userOrders.user,
                                    userOrders.orders,
                                    rewardPoints
                            ))
                    .join();
        }
    }

    static final class UserClient {
        User fetchUser(String userId) {
            sleep(700);
            return new User(userId, "Sandeep");
        }
    }

    static final class OrderClient {
        List<Order> fetchRecentOrders(String userId) {
            sleep(900);
            return Arrays.asList(
                    new Order("ORD-101", 1250),
                    new Order("ORD-102", 899)
            );
        }
    }

    static final class RewardsClient {
        Integer fetchRewardPoints(String userId) {
            sleep(600);
            return 480;
        }
    }

    static final class UserOrders {
        final User user;
        final List<Order> orders;

        UserOrders(User user, List<Order> orders) {
            this.user = user;
            this.orders = orders;
        }
    }

    static final class User {
        final String id;
        final String name;

        User(String id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "User{id='" + id + "', name='" + name + "'}";
        }
    }

    static final class Order {
        final String id;
        final int amount;

        Order(String id, int amount) {
            this.id = id;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{id='" + id + "', amount=" + amount + "}";
        }
    }

    static final class DashboardResponse {
        final User user;
        final List<Order> recentOrders;
        final int rewardPoints;

        DashboardResponse(User user, List<Order> recentOrders, int rewardPoints) {
            this.user = user;
            this.recentOrders = recentOrders;
            this.rewardPoints = rewardPoints;
        }

        @Override
        public String toString() {
            return "DashboardResponse{" +
                    "user=" + user +
                    ", recentOrders=" + recentOrders +
                    ", rewardPoints=" + rewardPoints +
                    '}';
        }
    }

    static void sleep(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
}

What this demonstrates:

  • fan-out to multiple downstream dependencies
  • fan-in into a single response
  • much lower total latency than sequential service calls
  • a clear boundary where blocking happens only once with join()

If you run this example, total time should be close to the slowest call, not the sum of 700 + 900 + 600.


Runnable Example 2: Product Enrichment with Timeout and Fallback

This example models a more realistic backend problem:

  • fetch base products
  • enrich each product with inventory and pricing
  • tolerate one slow dependency with timeout and fallback
  • collect all results before returning

This is the kind of flow where CompletableFuture becomes much more valuable than a simple thread-per-step design.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ProductEnrichmentDemo {

    public static void main(String[] args) {
        ExecutorService ioExecutor = Executors.newFixedThreadPool(12);
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        ProductCatalogService service = new ProductCatalogService(ioExecutor, scheduler);

        long start = System.currentTimeMillis();
        List<ProductView> productViews = service.fetchCatalogPage();
        long elapsed = System.currentTimeMillis() - start;

        for (ProductView productView : productViews) {
            System.out.println(productView);
        }
        System.out.println("Completed in " + elapsed + " ms");

        ioExecutor.shutdown();
        scheduler.shutdown();
    }

    static final class ProductCatalogService {
        private final ExecutorService ioExecutor;
        private final ScheduledExecutorService scheduler;
        private final ProductClient productClient = new ProductClient();
        private final InventoryClient inventoryClient = new InventoryClient();
        private final PricingClient pricingClient = new PricingClient();

        ProductCatalogService(ExecutorService ioExecutor, ScheduledExecutorService scheduler) {
            this.ioExecutor = ioExecutor;
            this.scheduler = scheduler;
        }

        List<ProductView> fetchCatalogPage() {
            List<Product> products = productClient.fetchProducts();

            List<CompletableFuture<ProductView>> futures = products.stream()
                    .map(this::enrichProduct)
                    .collect(Collectors.toList());

            CompletableFuture<Void> allDone = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[0])
            );

            return allDone.thenApply(ignored -> futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList()))
                    .join();
        }

        CompletableFuture<ProductView> enrichProduct(Product product) {
            CompletableFuture<Integer> inventoryFuture = CompletableFuture.supplyAsync(
                    () -> inventoryClient.fetchInventory(product.id), ioExecutor);

            CompletableFuture<Price> priceFuture = withTimeout(
                    CompletableFuture.supplyAsync(
                            () -> pricingClient.fetchPrice(product.id), ioExecutor),
                    1000,
                    scheduler
            ).exceptionally(ex -> {
                System.out.println("Pricing fallback for " + product.id + ": " + rootCause(ex).getMessage());
                return new Price("INR", -1);
            });

            return inventoryFuture.thenCombine(priceFuture,
                    (inventory, price) -> new ProductView(product.id, product.name, inventory, price));
        }
    }

    static final class ProductClient {
        List<Product> fetchProducts() {
            sleep(300);
            return Arrays.asList(
                    new Product("P-100", "Mechanical Keyboard"),
                    new Product("P-200", "USB-C Dock"),
                    new Product("P-300", "27-inch Monitor")
            );
        }
    }

    static final class InventoryClient {
        Integer fetchInventory(String productId) {
            sleep(500);
            return 12;
        }
    }

    static final class PricingClient {
        Price fetchPrice(String productId) {
            if ("P-200".equals(productId)) {
                sleep(1500);
            } else {
                sleep(700);
            }
            return new Price("INR", 4999);
        }
    }

    static final class Product {
        final String id;
        final String name;

        Product(String id, String name) {
            this.id = id;
            this.name = name;
        }
    }

    static final class Price {
        final String currency;
        final int amount;

        Price(String currency, int amount) {
            this.currency = currency;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return amount < 0 ? "PRICE_UNAVAILABLE" : currency + " " + amount;
        }
    }

    static final class ProductView {
        final String id;
        final String name;
        final int inventory;
        final Price price;

        ProductView(String id, String name, int inventory, Price price) {
            this.id = id;
            this.name = name;
            this.inventory = inventory;
            this.price = price;
        }

        @Override
        public String toString() {
            return "ProductView{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", inventory=" + inventory +
                    ", price=" + price +
                    '}';
        }
    }

    static <T> CompletableFuture<T> withTimeout(
            CompletableFuture<T> original,
            long timeoutMs,
            ScheduledExecutorService scheduler) {
        CompletableFuture<T> timeout = new CompletableFuture<T>();
        scheduler.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        timeout.completeExceptionally(new TimeoutException("Timed out after " + timeoutMs + " ms"));
                    }
                },
                timeoutMs,
                TimeUnit.MILLISECONDS
        );
        return original.applyToEither(timeout, Function.identity());
    }

    static Throwable rootCause(Throwable ex) {
        Throwable current = ex;
        while (current instanceof CompletionException && current.getCause() != null) {
            current = current.getCause();
        }
        return current;
    }

    static void sleep(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
}

What this demonstrates:

  • allOf for batch orchestration
  • per-item async enrichment
  • explicit Java 8 timeout handling
  • fallback behavior for partial failure instead of failing the whole page

This is much closer to a production catalog, search, or marketplace endpoint than a toy hello future example.


allOf for Fan-out/Fan-in

List<CompletableFuture<Item>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> service.fetchItem(id), ioExecutor))
        .collect(Collectors.toList());

CompletableFuture<Void> all = CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[0])
);

List<Item> items = all.thenApply(v -> futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList())
).join();

Use this pattern for batch enrichment endpoints.


Timeout Pattern in Java 8

Java 8 does not have orTimeout/completeOnTimeout convenience APIs. Use scheduler-assisted timeout completion:

static <T> CompletableFuture<T> withTimeout(
        CompletableFuture<T> original,
        long timeoutMs,
        ScheduledExecutorService scheduler) {
    CompletableFuture<T> timeout = new CompletableFuture<>();
    scheduler.schedule(
            () -> timeout.completeExceptionally(new TimeoutException("timeout")),
            timeoutMs, TimeUnit.MILLISECONDS
    );
    return original.applyToEither(timeout, Function.identity());
}

This keeps timeout behavior explicit and reusable.


Exception Boundary Pattern

Prefer handling exceptions near API boundary:

return responseF.handle((value, ex) -> {
    if (ex != null) return fallbackResponse(userId);
    return value;
}).join();

Avoid sprinkling ad-hoc exceptionally blocks at many intermediate steps unless each step has distinct recovery semantics.


Cancellation Guidance

CompletableFuture#cancel(true) marks the future cancelled, but underlying task interruption depends on task/executor behavior.

Practical advice:

  • design tasks to check interruption where possible
  • keep remote call timeouts at HTTP/database client level too
  • avoid assuming cancellation always stops external I/O immediately

Thread Pool Design Rules

  • do not rely on ForkJoinPool.commonPool() for backend IO calls
  • use explicit executors for IO-heavy workloads
  • separate CPU-heavy and IO-heavy executors
  • size pools based on dependency latency and throughput targets
ExecutorService ioExecutor = Executors.newFixedThreadPool(32);

Common Mistakes

  • calling join() too early and accidentally serializing work
  • blocking network/database calls inside default common pool
  • mixing side effects throughout chain without clear boundaries
  • not handling exception propagation consistently

Best Practices Checklist

  • start independent calls immediately
  • combine futures only after all are started
  • pass dedicated executor for remote IO
  • keep async chain readable; extract methods
  • define timeout and fallback policy (covered in next post)

Related Posts

Comments