Commit 9e4da8a7 by 刘鑫

FEAT: kafka 相关

1 parent 883a65a7
spring:
kafka:
bootstrap-servers: b-3.rgccrkafkauat.v3smv9.c2.kafka.cn-northwest-1.amazonaws.com.cn:9096,b-1.rgccrkafkauat.v3smv9.c2.kafka.cn-northwest-1.amazonaws.com.cn:9096,b-2.rgccrkafkauat.v3smv9.c2.kafka.cn-northwest-1.amazonaws.com.cn:9096
consumer:
#默认消费者组
#group-id: test-consumer-group
#最早未被消费的offset
auto-offset-reset: latest
#是否自动提交
enable-auto-commit: false
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="rgccr-rice-bean-uat" password="czJavntCRSeLc";
\ No newline at end of file
......@@ -125,6 +125,11 @@
<version>9.38.0.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -10,6 +10,10 @@ import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import javax.validation.Validator;
......@@ -32,6 +36,25 @@ public class OrderConfig {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 配置kafka手动提交offset
*
* @param consumerFactory 消费者factory
* @return 监听factory
*/
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//消费者并发启动个数,最好跟kafka分区数量一致,不能超过分区数量
//factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(1500);
//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder() {
Jackson2ObjectMapperBuilder builder = new Jackson2ObjectMapperBuilder();
......
package com.dituhui.pea.order.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* 消费BEAN kafka消息
*/
@Component
@Slf4j
public class BeanKafkaConsumer {
/**
* 消费监听 组织架构同步事件通知
*/
@KafkaListener(topics = "BEAN_PEA_TAG_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageTAG(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value);
try {
//TODO 消费
} finally {
//手动提交
ack.acknowledge();
}
}
/**
* 消费监听 网点/小组内的工程师信息同步事件通知
*/
@KafkaListener(topics = "BEAN_PEA_USER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value);
try {
//TODO 消费
} finally {
//手动提交
ack.acknowledge();
}
}
/**
* 消费监听 网点/小组内的工程师信息同步事件通知
*/
@KafkaListener(topics = "BEAN_PEA_ENGINEER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageEngineer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value);
try {
//TODO 消费
} finally {
//手动提交
ack.acknowledge();
}
}
}
......@@ -20,6 +20,10 @@ spring:
import-check:
# no config file
enabled: false
config:
import:
- optional:nacos:redis-config.yaml?group=project&refreshEnabled=true
- nacos:bean-kafka-config.yaml?group=project&refreshEnabled=true
#config:
# import:
# - optional:nacos:project-order.yaml
......@@ -59,6 +63,9 @@ sentry:
SaaS:
url: https://pea-test.bshg.com.cn
ak: 64e1cde3f9144bfb850b7d37c51af559
bean:
server: https://bean-test.bshg.com.cn
app-key: eDZEhTCxAcM9paRfwPjAM7RGkfmbf15S_PEA
scheduler:
init-engineer-capacity:
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!