IT/PROGRAMMING

[Spring] ConcurrentKafkaListenerContainerFactoryConfigurer를 사용하고 싶다.

하마연구소장 2019. 3. 14. 09:05
728x90
반응형

스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener 어노테이션을 사용한다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener 어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory를 수동으로 만들어야했다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer 이 녀석을 사용해야했다.

spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19이다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 파일이다.

/*
 * Copyright 2012-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.boot.autoconfigure.kafka;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;

/**
 * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
 *
 * @author Gary Russell
 * @since 1.5.0
 */
public class ConcurrentKafkaListenerContainerFactoryConfigurer {

	private KafkaProperties properties;

	/**
	 * Set the {@link KafkaProperties} to use.
	 * @param properties the properties
	 */
	void setKafkaProperties(KafkaProperties properties) {
		this.properties = properties;
	}

	/**
	 * Configure the specified Kafka listener container factory. The factory can be
	 * further tuned and default settings can be overridden.
	 * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
	 * instance to configure
	 * @param consumerFactory the {@link ConsumerFactory} to use
	 */
	public void configure(
			ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
			ConsumerFactory<Object, Object> consumerFactory) {
		listenerContainerFactory.setConsumerFactory(consumerFactory);
		Listener container = this.properties.getListener();
		ContainerProperties containerProperties = listenerContainerFactory
				.getContainerProperties();
		if (container.getAckMode() != null) {
			containerProperties.setAckMode(container.getAckMode());
		}
		if (container.getAckCount() != null) {
			containerProperties.setAckCount(container.getAckCount());
		}
		if (container.getAckTime() != null) {
			containerProperties.setAckTime(container.getAckTime());
		}
		if (container.getPollTimeout() != null) {
			containerProperties.setPollTimeout(container.getPollTimeout());
		}
		if (container.getConcurrency() != null) {
			listenerContainerFactory.setConcurrency(container.getConcurrency());
		}
	}

}

setKafkaProperties() 메서드의 접근제어자(Access Modifier)가 default 이다.
하~~~~ 한숨이 나온다.
ConcurrentKafkaListenerContainerFactoryConfigurer 인스턴스로 생성하고 setKafkaProperties() 메서드 호출하고 configure() 메서드 호출해야하는데...

왜, public으로 안해놨을까?

setKafkaProperties()을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 내의 코드를 복사하는 방법이 있겠다.

단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않다.

참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default이다.

 

우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties() 메서드를 호출하였다.

// setKafkaProperties()가 access modifier가 public이 아니라서 reflection으로 강제 수행
Method setKafkaPropertiesMethod = ReflectionUtils.findMethod(configurer.getClass(), "setKafkaProperties", KafkaProperties.class);
ReflectionUtils.makeAccessible(setKafkaPropertiesMethod);
ReflectionUtils.invokeMethod(setKafkaPropertiesMethod, configurer, kafkaProperties);

 

뭔가 더 깔끔한 방법이 있을까요?

반응형