본문 바로가기

IT Book Summary/스프링 마이크로 서비스 코딩공작소

Chapter 08 스프링 클라우드 스트림을 사용한 이벤트 기반 아키텍처

이번에는 비동기 메시지를 사용해 다른 마이크로서비스와 통신하는 스프링기반 마이크로서비스를 설계하고 구현한다.

새로운 점은 메시지를 사용해 상태변화를 표현하는 이벤트로 통신한다는 개념.

-> 이벤트 기반 아키텍처 EDA, 메시지 기반 아키텍처 MDA

 


8.1 메시지와 EDA, 마이크로서비스의 사례

두가지 서비스 라이선싱 서비스와 조직서비스중

운영환경에서 조직 정보를 조회하는 라이선싱 서비스 호출이 오래걸린다.

데이터베이스 액세스 하지않고 조직 데이터 읽기를 캐싱할 수 있으면 응답시간을 줄일 수 있음.

 

캐싱솔루션 구현 핵심 요구사항

  1. 캐싱된 데이터는 라이선싱 서비스의 모든 인스턴스에 일관성이 있어야 함
    - 어떤 인스턴스에 접근하더라도 동일한 조직데이터 읽기가 보장되어야 한다
  2. 라이선싱 서비스를 호스팅하는 컨테이너 메모리에 조직데이터를 캐싱하면 안됨.
    - 로컬 캐시는 클러드터 내 다른 모든 서비스와 동기화를 보장해야 한다.
  3. 업데이트나 삭제연산으로 조직레코드가 변경될 때 라이선싱 서비스는 조직서비스의 상태 변화를 인식해야 함.
    - 라이선싱 서비스가 캐싱된 조직데이터를 무효화하고 삭제가능하게 해야한다.

구현하는 두가지 방법

  1. 동기식 요청-응답 모델을 사용해 요구사항을 구현하는것. 조직 상태가 변경되면 REST 엔드포인트를 이용해 통신
  2. 조직서비스가 자신이 변경되었음을 알리는 비동기 이벤트를 발송하는 것. 메시지를 메시지 큐에 발행.

동기식 모델 방식보다 비동기 이벤트 방식이 더 적절할 것이다. 

 

 

1 - 동기식 요청 - 응답 방식으로 상태 변화 전달

 

조직데이터 캐시를 위해 분산 키-값 저장소 데이터베이스인 레디스를 사용.

 

전통적인 동기식-요청 응답 모델. 강하게 결합된 서비스

 

서비스 간 강한 결합

조직서비스와 라이선싱 서비스가 강하게 결합되어있다.

조직레코드가 변경될 때마다 직접 통신하면서 조직서비스에서 라이선싱 서비스로 결합이 발생.

그렇다고해서 조직서비스가 직접 레디스를 통신하는것은 다른서비스가 소유한 데이터에 직접 접근하는것이 되므로 금기사항이다.

 

쉽게 깨지는 서비스 관계

두 서비스간 강한결합은 하나의 서비스가 다운되면 서로 영향을 받게 되므로 문제.

 

조직 서비스 변경에 관심 있는 새 소비자를 추가할 때 경직성

의존성으로 인해 라이선싱 서비스 변경시 조직서비스 코드도 변경해야 하므로 유연성이 떨어짐.

 

2 - 메시징을 사용해 서비스 간 상태 변화 전달

 

메시지 큐를 이용한 비동기 이벤트 방식

조직서비스가 관리하는 조직데이터의 상태가 변할 때 조직 서비스가 메시지를 발행하는데 사용됨.

 

느슨한 결합

서비스가 소유한 데이터를 직접 관리하는 엔드포인트만 노출해서 의존성을 최소화.

메시징 방식에서는 상태변화를 전달하는 과정에서 두 서비스가 서로 알지 못한다.

 

내구성

메시지 큐로 인해 메시지 전달을 보장받고, 라이선싱 서비스가 가용하지 않아도 조직서비스가 메시지를 계속 발행함.

조직서비스가 다운되어도 데이터 일부가 캐시에 유지되므로 서비스가 크게 영향을 받지 않는다.

 

확장성

메시지 큐에 저장되므로 메시지 발신자는 응답을 기다릴 필요가 없다.

큐에서 메지시를 읽어오는 소비자를 더 많이 가동시켜 더 신속하게 큐의 메시지를 처리하는것도 가능하다.

 

유연성

메시지 발신자는 누가 메시지를 소비할지 모른다.

기존 발신서비스와 별개로 새로운 메시지 소비자를 쉽게 추가 가능하다.

 

3 - 메시지 아키텍처의 단점

 

메시징 기반 아키텍처는 복잡해 질 수 있으므로 주의를 기울여야 한다.

 

메시지 처리의 의미론

애플리케이션이 메시지 소비순서를 기반으로 동작하는 방식과 순서대로 처리되지 않을때 발생하는것을 이해해야 한다.

예외가 발생해서 순서대로 처리되지 않는다면 어떻게 할지 고려해야한다.

 

메시지 가시성

웹서비스 호출과 메시징을 경유하는 사용자 트랜잭션을 추적하는데 상관관계 ID를 사용하는것도 
발생하는 일을 파악하고 디버깅하는데 중요하다.

 

메시지 코레오그래피

메시징 기반은 단순 요청-응답에서 벗어나기 때문에 비지니스 로직을 추론하는것이 어려워질수 있다.

그래서 메시징 기반 애플리케이션 디버깅과 로그를 살펴보는일이 중요하다.


8.2 스프링 클라우드 스트림 소개

스프링 클라우드를 사용해 메시징을 쉽게 통합 할 수 있다.

스프링 클라우드 스트림 프로젝트는 애플리케이션에 메시지 발행자와 소비자를 쉽게 구축할 수 있는 애너테이션 기반 프레임워크이다.

이것을 사용해 메시징 플랫폼 (카프카, RabbitMQ 등)의 구현 세부사항을 추상화 한다.

 

1 - 스프링 클라우드 스트림 아키텍처

 

스프링 클라우드에서 메시지를 발생하고 소비하는데 4개의 컴포넌트가 관련되어 있다.

 

메시지가 발행되고 소비될때 하부메시징 플랫폼을 추상화하는 스프링 클라우드 스트림 컴포넌트들을 통과한다.

 

소스 SOURCE

소스를 사용해 메시지를 발행.

발행될 메시지를 표현하는 POJO를 전달받는 스프링 애너테이션 인터페이스.

소스는 메시지를 받아 직력화 하고 메시지를 채널로 발행.

 

채널 CHANNEL

메시지 생산자와 소비자가 메시지를 발행하거나 소비한후 메시지를 보관할 큐를 추상화 한것.

채널 이름은 항상 대상 큐의 이름과 관련하므로 채널이 읽거나 쓰는 큐를 전환하려면 코드가 아닌 구성정보를 변경.

 

바인더 BINDER

스프링 클라우드 스트림의 일부인 스프링 코드로 특정 메시지 플랫폼과 통신.

바인더를 사용해 메시지를 발생하고 소비하기위해 별도의 라이브러리를 제공하지 않아도 메시징을 사용할 수 있다.

 

싱크 SINK

서비스는 싱크를 사용해 큐에서 메시지를 받음.

들어오는 메시지를위해 채널을 수신 대기 하고, 메시지를 다시 POJO로 역직렬화 한다.


8.3 간단한 메시지 생산자와 소비자 작성

처음예제는 조직서비스가 라이선싱 서비스로 메시지를 전달한다.

라이선싱 서비스는 메시지를 받아 로그를 출력. 

소스와 싱크만 사용해 구현.

 

조직테이터가 변경되면 카프카에 메시지 발행

 

1 - 조직 서비스의 메시지 생산자 작성

 

의존성 추가

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

 

애플리케이션이 스프링 클라우드 스트림의 메시지 브로커와 바인딩하도록 지정.

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class) //애플리케이션을 메시지 브로커로 바인딩하라고 알림.
public class Application {
    @Bean
    public Filter userContextFilter() {
        UserContextFilter userContextFilter = new UserContextFilter();
        return userContextFilter;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

채널은 메시지 큐 위에 있음.

스프링 클라우드 스트림은 메시지 브로커와 통신할 수 있는 기본 채널이 있다.

 

메시지를 발행하는 코드 

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){ // 소스 인터페이스 구현 주입
        this.source = source;
    }

    public void publishOrgChange(String action,String orgId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, orgId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContext.getCorrelationId()); // 발행될 메시지

        source.output().send(MessageBuilder.withPayload(change).build());
        //Source클래스에서 정의된 채널에서 send()메서드 사용.
    }
}

 

OrganizationChangeModel은 다음 세가지 데이터 요서를 담음.

  • 액션
    이벤트를 발생시킨 액션. 메시지 소비자가 이벤트를 처리하는데 더 많은 컨텍스트를 제공.
  • 조직 ID
    이벤트와 연관된 조직 ID
  • 상관관계 ID
    이벤트를 발생시킨 서비스 호출에 대한 상관관계 ID. 이벤트 추적에 필요.

조직서비스를 바인딩하는 방법은 구성설정으로 이루어진다.

Source가 카프카 메시지 브로커와 메시지 토픽에 매핑되는 구성을 application.yml에서 보여준다.

spring:
  cloud:
    stream: --구성의 시작점.
      bindings:
        output: -- 채널이름. Source.output()채널에 매핑
            destination:  orgChangeTopic --메시지를 넣을 메시지 큐 이름.
            content-type: application/json -- 송수신할 메시지 타입의 정보를 제공
      kafka: --해당서비스가 메시지 버스로 카프카를 사용한다고 스프링에 전달
        binder:
          zkNodes: localhost -- 카프카와 주키퍼의 네트워크 위치를 전달
          brokers: localhost

 

조직서비스에서 실제로 메시지 발행

@Service
public class OrganizationService {
    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired 
    SimpleSourceBean simpleSourceBean; //소스빈 주입

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId);
    }

    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        simpleSourceBean.publishOrgChange("SAVE", org.getId());
        //조직 데이터 변경하는 메서드 모두 simpleSourceBean.publishOrgChange() 호출
    }

    public void updateOrg(Organization org){
        orgRepository.save(org);
        simpleSourceBean.publishOrgChange("UPDATE", org.getId());

    }

    public void deleteOrg(String  orgId){
        orgRepository.delete( orgId );
        simpleSourceBean.publishOrgChange("DELETE", orgId);
    }
}

 

2 - 라이선싱 서비스에서 메시지 소비자 작성

 

스프링 클라우드 스트림을 사용한 서비스가 메시지를 소비하는 방식

 

메시지가 카프카의 orgChangeTopic으로 들어오면 라이선싱 서비스가 응답

라이선싱 서비스에 의존성 추가

<!--Spring Cloud Stream Dependencies-->
    <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream</artifactId>
   </dependency>

   <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-stream-kafka</artifactId>
   </dependency>

 

메시지 브로커와 바인딩하도록 라이선싱 서비스 설정

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Sink.class) //Sink 인터페이스에 정의된 채널을 사용하도록
public class Application {

...

    @StreamListener(Sink.INPUT) //메시지가 입력채널에서 수신될 때마다 이 메서드 실행
    public void loggerSink(OrganizationChangeModel orgChange) {
        logger.debug("Received an event for organization id {}", 
                      orgChange.getOrganizationId());
    }

}

 

메시지 브로커의 토픽을 input 채널에 대한 실제 매핑하는 설정 작업. application.yml

spring:
  cloud:
    stream:
      bindings:
        input: --input채널을 orgChangeTopic큐에 매핑
          destination: orgChangeTopic
          content-type: application/json
          group: licensingGroup --group 프로퍼티는 한번만 처리하는 의미를 보장하는데 사용.
        binder:
          zkNodes: localhost
          brokers: localhost

소비되는 메시지에 대해 한번만 소비한다는의미.

 

소비자 그룹은 서비스 인스턴스 그룹이 메시지를 한번만 처리할 것을 보장

 

3 - 실제 메시지 서비스 보기

 

레코드에 변경이 있을때마다 메시지를 orgChangeTopic 에 발행하는 조직서비스와 그 토픽의 메시지를 받는 라이선싱 서비스.

PUT 메소드로 업데이트하고 콘솔창에 로그메시지가 출력된 것을 볼 수 있다.

 


8.4 스프링 클라우드 스트림 사용 사례: 분산 캐싱

분산캐싱 예제를 만들어보자.

조직데이터가 캐시에 있다면 캐시데이터를 반환.

캐시에 없다면 조직서비스를 호출하고 호출결과를 레디스 해시에 캐싱.

 

1 - 레디스로 조회 캐싱

 

스프링 데이터 레디스 의존성 추가해 라이선싱 서비스 구성

 

<!--Spring Data Redis dependencies-->
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-redis</artifactId>
      <version>1.7.4.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.0</version>
    </dependency>

 

레디스 서버에 데이터베이스 커넥션 설정

 

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Sink.class)
public class Application {

    @Autowired
    private ServiceConfig serviceConfig;

    ...

    @Bean //레디스 서버에 실제 데이터베이스 커넥션을 설정
    public JedisConnectionFactory jedisConnectionFactory() {
        JedisConnectionFactory jedisConnFactory = new JedisConnectionFactory();
        jedisConnFactory.setHostName( serviceConfig.getRedisServer());
        jedisConnFactory.setPort( serviceConfig.getRedisPort() );
        return jedisConnFactory;
    }

    @Bean // 레디스서버에 작업을 수행하는데 사용할 RedisTemplate 객체 생성
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(jedisConnectionFactory());
        return template;
    }
}

 

스프링 데이터 레디스 레포지토리 정의

 

레디스는 키-값을 저장하는 데이터저장소로, 마치 대규모로 분산된 메모리 상주형 해시맵처럼 동작.

데이터를 저장하고 키로 조회

 

라이선싱서비스는 레디스 레포지토리를 위한 두 파일 정의

첫번째 레디스에 액세스해야 하는 라이선싱 서비스 클래스에 주입될 자바 인터페이스.

public interface OrganizationRedisRepository {
    void saveOrganization(Organization org);
    void updateOrganization(Organization org);
    void deleteOrganization(String organizationId);
    Organization findOrganization(String organizationId);
}

 

두번째 인터페이스의 구현파일

RedisTemplate 빈을 사용해 레디스 서버와 통신하고 작업을 수행

@Repository // 스프링데이터에 사용되는 리포지토리
public class OrganizationRedisRepositoryImpl implements OrganizationRedisRepository {
    //조직데이터가 저장되는 레디스 서버의 해시 이름.
    private static final String HASH_NAME ="organization";
   

    private RedisTemplate<String, Organization> redisTemplate;
    
    //레디스 서버에 데이터 작업을 수행하는 스프링 헬러 메서드의 집합
    private HashOperations hashOperations;

    public OrganizationRedisRepositoryImpl(){
        super();
    }

    @Autowired
    private OrganizationRedisRepositoryImpl(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @PostConstruct
    private void init() {
        hashOperations = redisTemplate.opsForHash();
    }


    @Override
    public void saveOrganization(Organization org) {
        hashOperations.put(HASH_NAME, org.getId(), org);
        //레디스와 모든 통신은 키를 사용해 저장되는 Organization객체 단위로 이루어짐.
    }

    @Override
    public void updateOrganization(Organization org) {
        hashOperations.put(HASH_NAME, org.getId(), org);
    }

    @Override
    public void deleteOrganization(String organizationId) {
        hashOperations.delete(HASH_NAME, organizationId);
    }

    @Override
    public Organization findOrganization(String organizationId) {
       return (Organization) hashOperations.get(HASH_NAME, organizationId);
    }
}

 

레디스의 모든 데이터를 키를 사용해 저장하고 조회.

조직 서비스에서 조회한 데이터를 레디스에 저장하므로 조직 ID는 자연스럽게 조직레코드를 저장하는 키로 사용.

 

레디스와 라이선싱 서비스로 조직 데이터 저장하고 읽기

 

라이선싱 서비스가 조직데이터가 필요할때 레디스 캐시를 먼저 확인.

@Component
public class OrganizationRestTemplateClient {
    @Autowired
    RestTemplate restTemplate;

    @Autowired
    OrganizationRedisRepository orgRedisRepo;

    private static final Logger logger = LoggerFactory.getLogger(OrganizationRestTemplateClient.class);

    private Organization checkRedisCache(String organizationId) {
    //조직ID로 레디스에 저장된 Organization 클래스 조회 시도.
        try {
            return orgRedisRepo.findOrganization(organizationId);
        }
        catch (Exception ex){
            logger.error("Error encountered while trying to retrieve organization {} check Redis Cache.  Exception {}", organizationId, ex);
            return null;
        }
    }

    private void cacheOrganizationObject(Organization org) {
        try {
            orgRedisRepo.saveOrganization(org);
        }catch (Exception ex){
            logger.error("Unable to cache organization {} in Redis. Exception {}", org.getId(), ex);
        }
    }

    public Organization getOrganization(String organizationId){
        logger.debug("In Licensing Service.getOrganization: {}", UserContext.getCorrelationId());

        Organization org = checkRedisCache(organizationId);

        if (org!=null){ //레디스에 데이터가 없으면 원본데이터 조회하기위해 조직서비스 호출
            logger.debug("I have successfully retrieved an organization {} from the redis cache: {}", organizationId, org);
            return org;
        }

        logger.debug("Unable to locate organization from the redis cache: {}.", organizationId);

        ResponseEntity<Organization> restExchange =
                restTemplate.exchange(
                        "http://zuulservice/api/organization/v1/organizations/{organizationId}",
                        HttpMethod.GET,
                        null, Organization.class, organizationId);

        /*Save the record from cache*/
        org = restExchange.getBody();

        if (org!=null) { //조회하나 객체를 캐시에 저장
            cacheOrganizationObject(org);
        }

        return org;
    }


}

연속된 2개의 GET 요청을 하면 두가지 출력.

첫 메시지에서 캐시에 없다는 메시지와

두번째 메시지에서 캐시에서 가져왔다는 메시지.

 

2 - 사용자 정의 채널 정의

 

애플리케이션에 둘 이상의 채널을 정의하거나 고유한 채널 이름을 사용하려고 하면, 

사용자 고유 인터페이스를 정의하고 필요한만큼 input과 output 채널을 노출.

 

사용자 정의 채널을 생성하려면 라이선싱 서비스에서 inboundOrgChanges를 호출

 

라이선싱 서비스를 위한 사용자 정의 input채널 정의

public interface CustomChannels {
    @Input("inboundOrgChanges") //채널이름을 정의하는 메서드 레벨 애너테이션
    SubscribableChannel orgs(); //모든 노출된 input 채널은 SubscribableChannel 반환.
}

output 의 경우 

@OutputChannel("outboundOrg")

MessageChennal outboundOrg();

 

라이선싱서비스에서 인풋 사용자 채널 사용하려면 두가지 작업 필요.

input 채널 이름을 카프카 토픽에 매핑하기 위해 라이선싱 서비스 수정.

spring:
  cloud:
    stream:
      bindings:
        inboundOrgChanges: --채널이름을 inboundOrgChanges로 변경
          destination: orgChangeTopic
          content-type: application/json
          group: licensingGroup

사용자 정의 input 채널을 사용하려면

정의한 CustomChannels 인터페이스를 사용해 메시지를 처리할 클래스에 이 인터페이스 주입

 

분산캐싱예제에서는 유입된 메시지를 처리하는 코드를 OrganizationChangeHandler로 옮김.

//Application.java 에서 어노테이션 옯겨옴.
//이때는 CustomChannels 클래스를 사용하고 매개변수로 전달.
@EnableBinding(CustomChannels.class)
public class OrganizationChangeHandler {

    //레디스와 통신하는 OrganizationRedisRepository 클래스
    @Autowired
    private OrganizationRedisRepository organizationRedisRepository;

    private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);

    //Sink.INPUT 대신 애너테이션 사용해 채널이름 전달.
    @StreamListener("inboundOrgChanges")
    public void loggerSink(OrganizationChangeModel orgChange) {
    //메시지를 받으면 데이터에 수행된 액션 조사, 대응.
        logger.debug("Received a message of type " + orgChange.getType());
        switch(orgChange.getAction()){
            case "GET":
                logger.debug("Received a GET event from the organization service for organization id {}",
                orgChange.getOrganizationId()); // 조직데이터가 수정,삭제되면, 레디스에서 조직데이터 삭제
                break;
            case "SAVE":
                logger.debug("Received a SAVE event from the organization service for organization id {}", orgChange.getOrganizationId());
                break;
            case "UPDATE":
                logger.debug("Received a UPDATE event from the organization service for organization id {}", orgChange.getOrganizationId());
                organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
                break;
            case "DELETE":
                logger.debug("Received a DELETE event from the organization service for organization id {}", orgChange.getOrganizationId());
                organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
                break;
            default:
                logger.error("Received an UNKNOWN event from the organization service of type {}", orgChange.getType());
                break;

        }
    }

}

 

3 - 종합: 메시지를 받으면 캐시 삭제

 

조직서비스는 조직데이터가 변경될때마다 메시지를 발행하도록 설정됨.

 

 


  • 메시징을 사용한 비동기식 통신은 마이크로서비스 아키텍처의 중요한 부분
  • 애플리케이션에서 메시징을 사용하면 서비스를 확장하고 결함에 더 잘 견디게 만듬.
  • 스프링 클라우드 스트림은 간단한 애너테이션을 사용하고 하부 메시지 플랫폼별 세부 정보를 추상화해 메시지 발행과 소비를 단순화
  • 스프링 클라우드 스트림의 메시지 소스는 애너테이션된 자바 메서드로 메시지 브로커 큐에 메시지를 발행
  • 스프링 클라우드 스트림의 메시지 싱크는 애너테이션된 자바 메서드로 메시지 브로커에서 메시지를 수신.
  • 레디스는 데이터베이스와 캐시로 사용될 수 있는 키-값 저장소.