스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener 어노테이션을 사용한다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener 어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory를 수동으로 만들어야했다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer 이 녀석을 사용해야했다.
spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19이다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 파일이다.
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
/**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
*
* @author Gary Russell
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
*/
void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
* instance to configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode());
}
if (container.getAckCount() != null) {
containerProperties.setAckCount(container.getAckCount());
}
if (container.getAckTime() != null) {
containerProperties.setAckTime(container.getAckTime());
}
if (container.getPollTimeout() != null) {
containerProperties.setPollTimeout(container.getPollTimeout());
}
if (container.getConcurrency() != null) {
listenerContainerFactory.setConcurrency(container.getConcurrency());
}
}
}
setKafkaProperties() 메서드의 접근제어자(Access Modifier)가 default 이다.
하~~~~ 한숨이 나온다.
ConcurrentKafkaListenerContainerFactoryConfigurer 인스턴스로 생성하고 setKafkaProperties() 메서드 호출하고 configure() 메서드 호출해야하는데...
왜, public으로 안해놨을까?
setKafkaProperties()을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 내의 코드를 복사하는 방법이 있겠다.
단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않다.
참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default이다.
우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties() 메서드를 호출하였다.
// setKafkaProperties()가 access modifier가 public이 아니라서 reflection으로 강제 수행
Method setKafkaPropertiesMethod = ReflectionUtils.findMethod(configurer.getClass(), "setKafkaProperties", KafkaProperties.class);
ReflectionUtils.makeAccessible(setKafkaPropertiesMethod);
ReflectionUtils.invokeMethod(setKafkaPropertiesMethod, configurer, kafkaProperties);
뭔가 더 깔끔한 방법이 있을까요?
'IT > PROGRAMMING' 카테고리의 다른 글
YouTube(유튜브) 동영상의 썸네일 이미지 추출하는 방법 (0) | 2019.04.09 |
---|---|
[Java] Gson, Jackson(ObjectMapper)으로 JSON 문자열 출력할 때, pretty printing하는 방법 (0) | 2019.04.08 |
[Spring] Hibernate에서 "Could not build ClassFile" 발생하는 오류 해결 (0) | 2019.03.10 |
[Spring] spring-boot 1.5.X 부터 spring-kafka 사용할 수가 있네요. (0) | 2019.03.06 |
[Java] 일정시간이 지나면 값이 없어지는(expiring) 맵, 캐시로 사용가능한 ExpiringMap (0) | 2019.02.24 |