Skip to main content

Command Palette

Search for a command to run...

Low Latency Parallel Processing Trading System

simple use case of trade processing & booking

Updated
7 min read

In this tutorial, we will be developing an two different microservices to demonstrate microservices based low latency trading application. these two microservice are

  • FXRate API Service - responsible to make fx rates available over the HTTP to the trade processing service to calculate notional amount. we would avoid caling FX rate api from trade processing service for every trade rather we will do local caching & bulk prefetch.

lets deep dive into the FXRate service implementation in detail:

Setup FxRateAPI Service project

Visit spring initializer to setup spring boot project with required dependencies, refer below screenshot to create project with same versions.

click on generate, extract the project and open in intellij to work on further things. once you open project add below dependencies in pom.xml

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>com.github.ben-manes.caffeine</groupId>
			<artifactId>caffeine</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

modify application.yaml configuration file

server:
  port: 9091

spring:
  application:
    name: fx-rate-service
  threads:
    virtual:
      enabled: true
  cache:
    type: caffeine
    caffeine:
      spec: maximumSize=1000,expireAfterWrite=30s

management:
  endpoints:
    web:
      exposure:
        include: health,metrics,caches,info
  endpoint:
    health:
      show-details: always

FxRateAPIService will use port 9091 to expose to apis, also we have enabled the caffeine cache which will expires the cached data after every 30 sec.

Application setup - model, service, controller

Create the package and classes in below structure:

Model - FxRateResponse

Create record FxRateResponse under model package. record provides constructor, getters, toString, equals and hashcode method. no need to create it explicitly as we use to do it in normal concrete model.

package com.example.fxrate_api_service.model;

import java.math.BigDecimal;
import java.time.Instant;

public record FxRateResponse(

        String currencyPair,
        BigDecimal rate,
        Instant timestamp,
        String source
        
) {
}

Service - FxRateService

FxRateService is designed to support two API's.

  • Provide rate for single currency pair request - caching enabled for this service layer method, when we send request for suppose USDINR, it will execute the getSingleFxRate method for the same and when it receives second request(within 30 sec) then it will return the cached result rather than execute the same function again and again.

  • Provide rate for multiple currency pair request - same caching concept used here as well.

Note - it is mandatory to annotate service layer with annotation @EnableCaching otherwise caching won't work.

package com.example.fxrate_api_service.service;

import com.example.fxrate_api_service.model.FxRateResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
@EnableCaching
public class FxRateService {
    public final Map<String, BigDecimal> rateSource = new ConcurrentHashMap<>(Map.of(
            "EURUSD", new BigDecimal("1.0875"),
            "GBPUSD", new BigDecimal("1.2650"),
            "USDJPY", new BigDecimal("149.85"),
            "USDCHF", new BigDecimal("0.9012"),
            "AUDUSD", new BigDecimal("0.6530"),
            "USDCAD", new BigDecimal("1.3620"),
            "NZDUSD", new BigDecimal("0.6015"),
            "EURGBP", new BigDecimal("0.8596"),
            "USDSGD", new BigDecimal("1.3450"),
            "USDHKD", new BigDecimal("7.8215")
    ));
    @Cacheable(value = "FxRates", key = "#currencyPair")
    public FxRateResponse getSingleFxRate(String currencyPair) {
        log.info("fetching rate for " + currencyPair + " from source");
        BigDecimal rate = rateSource.computeIfAbsent(currencyPair.toUpperCase(),this::fetchFromProvider);
        return new FxRateResponse(currencyPair, rate, Instant.now(), "RateStore");
    }
    @Cacheable(value = "FxRatesBulk", key = "#currencyPairs")
    public Map<String, BigDecimal> getBulkRates(List<String> currencyPairs){
        log.info("bulk fetching rates {} ",currencyPairs.size());
        Map<String,BigDecimal> result = new HashMap<>();
        for (String ccypair : currencyPairs){
           result.put(ccypair,rateSource.computeIfAbsent(ccypair,this::fetchFromProvider));
        }
        return result;
    }
    private BigDecimal fetchFromProvider(String currencyPair) {
        log.warn("Unknown currency pair {} using default rate", currencyPair);
        return BigDecimal.ONE;
    }
}

Controller - FxRateController

We have two controller methods defined to handle single ccypair & bulk ccypair request.

package com.example.fxrate_api_service.controller;

import com.example.fxrate_api_service.model.FxRateResponse;
import com.example.fxrate_api_service.service.FxRateService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.math.BigDecimal;
import java.util.Map;

@Slf4j
@RestController
@RequestMapping("/fx/api")
@RequiredArgsConstructor
public class FxRateController {

    private final FxRateService fxRateService;

    @GetMapping("/rate/{currencyPair}")
    public ResponseEntity<FxRateResponse> getRateForSingleCcypair(@PathVariable String currencyPair) {
        FxRateResponse response = fxRateService.getSingleFxRate(currencyPair);
        return ResponseEntity.ok(response);
    }
    @PostMapping("/rates/bulk")
    public ResponseEntity<Map<String, BigDecimal>> loadAllRates(@RequestBody List<String> ccypairs){
        return ResponseEntity.ok(fxRateService.getBulkRates(ccypairs));
    }
}

Single Currency Pair API Test

start the service in intellij and test the apis using postman.

launch postman, select get method and provide url - http://localhost:9091/fx/api/rate/USDHKD

Cache testing

when you execute the request for very first time and api returns the response, application prints info log fetching rate for USDHKD from source on console, but when you execute twice within range of 30 sec, it wouldn't log this message that means caffeine cache working in the background.

Bulk Currency Pairs Test

select post method in postman, provide the url - http://localhost:9091/fx/api/rates/bulk and click on the body -> JSON and add list of currency pairs and click on send.

["EURUSD","GBPUSD","USDJPY","USDCHF","AUDUSD","USDCAD","NZDUSD"]

folk and run code provide on github for FXRateAPIService: project code

Trade Processing Service

This service is responsible for doing trade processing. This is also a rest API based service which will receive bulk of trades which it needs to process in chunks. Generate another project from spring initializer with same version and dependency.

create application.properties file under src/main/resource with below set of properties

# Server
server.port=9092

# Spring
spring.application.name=trade-processing-service
spring.threads.virtual.enabled=true

# Datasource - not used in currency project
spring.datasource.url=jdbc:postgresql://localhost:5432/trades
spring.datasource.username=postgres
spring.datasource.password=root
spring.datasource.hikari.maximum-pool-size=50

# JPA  - not used
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.jdbc.batch_size=50

# FX Service
fx.service.url=http://localhost:9091
fx.cache.refresh-interval-ms=30000
fx.cache.ttl-seconds=60

# Currency pairs ? this is the key fix
fx.cache.currency-pairs[0]=EURUSD
fx.cache.currency-pairs[1]=GBPUSD
fx.cache.currency-pairs[2]=USDJPY
fx.cache.currency-pairs[3]=USDCHF
fx.cache.currency-pairs[4]=AUDUSD
fx.cache.currency-pairs[5]=USDCAD
fx.cache.currency-pairs[6]=NZDUSD
fx.cache.currency-pairs[7]=EURGBP
fx.cache.currency-pairs[8]=USDSGD
fx.cache.currency-pairs[9]=USDHKD

# Resilience4j
resilience4j.circuitbreaker.instances.fxRateService.sliding-window-size=20
resilience4j.circuitbreaker.instances.fxRateService.failure-rate-threshold=50
resilience4j.circuitbreaker.instances.fxRateService.wait-duration-in-open-state=10s
resilience4j.circuitbreaker.instances.fxRateService.permitted-number-of-calls-in-half-open-state=3
resilience4j.circuitbreaker.instances.fxRateService.slow-call-duration-threshold=3s
resilience4j.circuitbreaker.instances.fxRateService.slow-call-rate-threshold=60
resilience4j.circuitbreaker.instances.fxRateService.register-health-indicator=true

resilience4j.bulkhead.instances.fxRateService.max-concurrent-calls=25
resilience4j.bulkhead.instances.fxRateService.max-wait-duration=100ms

resilience4j.timelimiter.instances.fxRateService.timeout-duration=3s
resilience4j.timelimiter.instances.fxRateService.cancel-running-future=true

# Actuator
management.endpoints.web.exposure.include=health,metrics,prometheus,circuitbreakers,caches
management.endpoint.health.show-details=always

Model Trade

represents incoming trade to the trade processing service.

package com.example.trade_processing_service.model;

import java.math.BigDecimal;
import java.time.Instant;

public record Trade(
        String tradeId,
        String currencyPair,
        BigDecimal notionalAmount,
        TradeType tradeType,
        Instant receivedAt
) {
    public enum TradeType { BUY, SELL }
}

Model - ProcessedTrade

Represents trade after FX Rate has been applied and processing is completed.

package com.example.trade_processing_service.model;

import java.math.BigDecimal;
import java.time.Instant;

public record ProcessedTrade(
        String tradeId,
        String currencyPair,
        BigDecimal notionalAmount,
        BigDecimal fxRate,
        BigDecimal convertedAmount,
        Trade.TradeType tradeType,
        RateSource rateSource,
        Instant processedAt
) {
    public enum RateSource {
        LIVE,           // fetched from FX service in real time
        CACHE,          // served from Caffeine local cache
        STALE_CACHE,    // circuit open — served stale value
        FALLBACK        // hardcoded fallback — flag for reprocessing
    }

    public boolean requiresReprocessing() {
        return rateSource == RateSource.FALLBACK;
    }
}

Client - FxRateClient

We will be using feign client to fetch the fx rate from FxRateAPIService over the HTTP call. we have defined dedicated configuration to use for this in application.properties file. FxFeignConfig implements the logic for custom timeouts and retry logic.

create the FxRateConfig interface under client package as below

package com.example.trade_processing_service.client;

import com.example.trade_processing_service.config.FxFeignConfig;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.math.BigDecimal;
import java.util.Map;

@FeignClient(
        name = "fx-rate-service",
        url = "${fx.service.url}",
        configuration = FxFeignConfig.class
)
public interface FxRateClient {

    record FxRateResponse(String currencyPair, BigDecimal rate, String timestamp, String source) {}

    @GetMapping("/fx/api/rate/{currencyPair}")
    FxRateResponse getRate(@PathVariable("currencyPair") String currencyPair);

    @PostMapping("/fx/api/rates/bulk")
    Map<String, BigDecimal> getBulkRates(@RequestBody List<String> currencyPairs);
}

Config - FxFeignConfig

We need FxFeignConfig class under config package which defined the feign client configuration like timeouts, retry behaviour and error handling for FxRateService.

package com.example.trade_processing_service.config;

import feign.Request;
import feign.Retryer;
import feign.codec.ErrorDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class FxFeignConfig {

    /*
      #1 - connection timeout - 2 Sec (fail fast if FX service is unreachable)
      #2 - read timeout - 5 sec (give enough time for bulk fetches)
     */
    @Bean
    public Request.Options fxOptions(){
        return new Request.Options(2000,5000,true);
    }

    /*
        Retry - 2 attempts

     */

    @Bean
    public Retryer fxRetryer(){
        return new Retryer.Default(100,500,2);
    }

    /*
       Custom error decoder - log fx service errors
       with context
     */
    @Bean
    public ErrorDecoder fxErrorDecoder(){
        return (methodKey,response) -> {
          return new ErrorDecoder.Default().decode(methodKey,response);  
        };
    }
}

Caffeine Cache Configuration - CacheConfig

The Fx Rate cache is the hot path, reads must happen in nanoseconds.

package com.example.trade_processing_service.config;

import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
public class CacheConfig {

    @Value("${fx.cache.ttl-seconds:60}")
    private long ttlseconds;

    @Bean
    public CacheManager cacheManager(){
        CaffeineCacheManager manager = new CaffeineCacheManager("fxRates");
        manager.setCaffeine(
                Caffeine.newBuilder()
                        .expireAfterWrite(Duration.ofSeconds(ttlseconds))
                        .maximumSize(1000)
                        .recordStats() //enables cache hit/miss metrics via actuator
        );
        return manager;
    }
}

FxCacheProperties

package com.example.trade_processing_service.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.*;

@ConfigurationProperties(prefix = "fx.cache")
@Component
@Getter
@Setter
public class FxCacheProperties {

    private List<String> currencyPairs = new ArrayList<>();
    private long ttlseconds = 60;
    private long refreshIntervalMs = 30000;

}

RawCacheConfig

package com.example.trade_processing_service.config;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.math.BigDecimal;
import java.time.Duration;

@Configuration
public class RawCacheConfig {

    @Value("${fx.cache.ttl-seconds:60}")
    private long ttlSeconds;

    @Bean
    public Cache<String, BigDecimal> rawFxCache() {
        return Caffeine.newBuilder()
                .expireAfterWrite(Duration.ofSeconds(ttlSeconds))
                .maximumSize(1000)
                .recordStats()
                .build();
    }
}

FxRateCacheService