본문 바로가기

Java/공부공부

[JAVA/JPA] CompletableFuture 자바 비동기 처리 (멀티스레드)

 

 

db에서 가져온 정보(쿼리) + 외부api 서버 연결여부(비동기)

                serverInfo            +         connectionStatus

위의 두가지 정보를 한번에 리턴하기 위해  CompletableFuture 클래스로 비동기 작업 진행.

 


 

(서버 연결여부 확인 로직) _ http 요청 (GET)

    public String checkConnection(String port) {

        try {
            RestTemplate request = new RestTemplate();
            String url = "http://" + host + port;
            String response = request.getForObject(url, String.class);

            if(response.equals("result message")) {
                return "connect";
            }
        } catch (IOException e) {
            throw e;
        }
        return "disconnected";
    }

 

 

(서버 연결여부 확인 로직) _ Socket 연결

    public String checkConnection(String port) {

        try {
            Socket socket = new Socket(host, port);
            socket.close();
            return "connect";
        } catch (IOException e) {
            throw e;
        }
    }

 

연결이 되어있는 경우, "connect" 문자열 반환.  => 메인로직에서 serverInfo와 connectionStatus를 함께 return.

연결이 되어있지 않은 경우, 에러 반환. => 이때도 메인 로직에서 serverInfo는 return 해줘야함.

 


 

* RestTemplate ( HTTP GET 요청 )  =>  Socket 방식으로 변경

 

RestTemplate 은 스프링에서 제공하는 HTTP 클라이언트로 서버에 HTTP 요청을 보내고 응답을 받음.

 => 상위레벨(7계층) 프로토콜로 비교적 무거운 요청. 

=> 서버 연결 여부(4계층_TCP/IP)뿐만 아니라,

     서버가 제공하는 서비스에 대한 세부적인 정보(요청에 대한 응답)를 얻을 수 있음.

 

Socket 객체는 TCP/IP (4계층) 프로토콜을 이용해 서버와 직접 연결.

 => 단순히 포트 연결 여부만(서버 실행 여부) 빠르게 확인. (오버헤드가 적음) 

 


 

(메인로직)

public Map<String, Object> getServerInfo(Map<String, Object> data) throws ExecutionException, InterruptedException {

    String serverCode = String.valueOf(data.get("serverCode"));

    // 비동기 처리 (db정보조회 & 서버연결상태확인)
    CompletableFuture<Map<String, Object>> future = CompletableFuture.supplyAsync(() -> {

        Map<String, Object> serverInfo = serverMapper.getServerInfo(data);
        String first_info = String.valueOf(serverInfo.get("first_info"));
        String second_info = String.valueOf(serverInfo.get("second_info"));

        // db데이터만 리턴 (서버연결확인X)
        if (first_info.equals("")) {
            return serverInfo;

        // db데이터 + 서버연결여부 리턴
        } else {
            // db데이터 + disconnected 리턴 (서버연결확인X)
            if (second_info.equals("")) {
                serverInfo.put("connectionStatus", "disconnected");
                return serverInfo;
            }
            return CompletableFuture.supplyAsync(() -> checkConnection(serverCode))
                    .thenApply(status -> {
                        serverInfo.put("connectionStatus", status);
                        return serverInfo;
                    }).exceptionally(e -> {
                        serverInfo.put("connectionStatus", "disconnected");
                        return serverInfo;
                    }).join();
        }
    });

    // 최종 결과 반환
    Map<String, Object> result = future.get();
    log.info("Server Info Result: {}", result);

    return result;
}

 

db에서 조회해온 serverInfo에서 

first_info 가 없는 경우   =>  serverInfo 리턴 & 서버연결여부 확인X    ( second_info 가 없음)

second_info 가 없는 경우   =>  serverInfo 리턴 & 서버연결여부 확인X 

둘 다 있는 경우 =>  checkConnection 메소드 실행해서 서버 연결여부 확인. (비동기)

                        서버연결   =>  "connect" 리턴

                       서버연결X  =>  "disconnected" 리턴

                     비동기 결과를 serverInfo에 담아 최종적으로 serverInfo만 리턴

 

** thenCompose() 메소드를 사용할 수도 있지만, 

  db조회와 서버확인 두 작업 사이에 의존성이 강하지 않아서 중첩으로 처리. 

 


 

* CompletableFuture

자바의 비동기 프로그래밍을 지원하는 클래스.

 - 별도의 스레트 풀에서 작업이 비동기로 실행. 결과를 기다리지 않고 다음 작업을 이어나갈 수 있음. (논블로킹 방식)

 - 작업들을 순차적/병렬적으로 연결할 수 있음. _ thenApply(), thenCompose()

 - 작업을 수동으로 완료할 수 있음. _ complete() 

 

 

 runAsyns() 메소드 (결과 반환X)

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 비동기 작업 수행
    System.out.println("비동기 작업 실행 중...");
});

 

 

 supplyAsyns() 메소드

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    // 비동기 작업 수행 후 결과 반환
    return 42;
});

Integer result = future.get();  // 결과를 기다림 (블로킹)
System.out.println("Result: " + result);

=> 비동기 작업 결과 반환.

 

 

thenApply() 메소드

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 42;
}).thenApply(result -> result * 2);

Integer finalResult = future.join(); // 결과를 기다리지 않고 완료되기를 기다림 (블로킹)
System.out.println("Final result: " + finalResult);  // 결과는 84

=> 비동기 작업 결과로 다른 작업 실행. 

 

 

thenCombine() 메소드 

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    return 42;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    return 10;
});

CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

Integer combinedResult = combinedFuture.join();
System.out.println("Combined result: " + combinedResult);  // 결과는 52

=> 두 개의 비동기 작업을 병렬로 실행. 

 

 

thenCompose() 메소드 

// 첫 번째 비동기 작업: 사용자 ID를 비동기적으로 가져옴
CompletableFuture<String> userIdFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching user ID...");
    // 가상의 비동기 작업 (예: 데이터베이스에서 사용자 ID 조회)
    return "user123";
});

// 두 번째 비동기 작업: 가져온 사용자 ID로 사용자 세부 정보를 가져옴
CompletableFuture<String> userDetailFuture = userIdFuture.thenCompose(userId -> {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching user details for ID: " + userId);
        // 가상의 비동기 작업 (예: API 호출)
        return "User details for " + userId;
    });
});

// 최종 결과 출력
String result = userDetailFuture.get();  // 비동기 작업의 완료를 기다림
System.out.println("Result: " + result);

=> 의존적인 두개의 비동기 작업을 연속으로 실행. (두 작업이 강한 의존성을 가질 때)

 

 

exceptionally() 메소드

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("에러 발생!");
    }
    return 42;
}).exceptionally(ex -> {
    System.out.println("예외 처리: " + ex.getMessage());
    return 0;  // 예외 발생 시 기본값 반환
});

Integer result = future.join();
System.out.println("Result: " + result);  // 결과는 0

=> 비동기 작업 중 발생한 예외 처리.