Commit b763b989 by 刘鑫

feat(kafka): 消费网点工程师新增(全量, 增量)

1 parent d6197e58
...@@ -4,11 +4,13 @@ import com.dituhui.pea.order.common.jackson.JsonUtil; ...@@ -4,11 +4,13 @@ import com.dituhui.pea.order.common.jackson.JsonUtil;
import com.dituhui.pea.order.dao.BeanMqMessageDao; import com.dituhui.pea.order.dao.BeanMqMessageDao;
import com.dituhui.pea.order.dao.EngineerBusinessDao; import com.dituhui.pea.order.dao.EngineerBusinessDao;
import com.dituhui.pea.order.dao.EngineerInfoDao; import com.dituhui.pea.order.dao.EngineerInfoDao;
import com.dituhui.pea.order.dto.OrganizationTreeDTO;
import com.dituhui.pea.order.dto.param.EngineerParamDTO; import com.dituhui.pea.order.dto.param.EngineerParamDTO;
import com.dituhui.pea.order.dto.param.Location; import com.dituhui.pea.order.dto.param.Location;
import com.dituhui.pea.order.entity.BeanMqMessageEntity; import com.dituhui.pea.order.entity.BeanMqMessageEntity;
import com.dituhui.pea.order.entity.EngineerBusinessEntity; import com.dituhui.pea.order.entity.EngineerBusinessEntity;
import com.dituhui.pea.order.entity.EngineerInfoEntity; import com.dituhui.pea.order.entity.EngineerInfoEntity;
import com.dituhui.pea.order.service.impl.BeanRemoteServiceImpl;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
...@@ -35,16 +37,26 @@ public class BeanKafkaConsumer { ...@@ -35,16 +37,26 @@ public class BeanKafkaConsumer {
private final BeanMqMessageDao beanMqMessageDao; private final BeanMqMessageDao beanMqMessageDao;
private final EngineerBusinessDao engineerBusinessDao; private final EngineerBusinessDao engineerBusinessDao;
private final EngineerInfoDao engineerInfoDao; private final EngineerInfoDao engineerInfoDao;
private final BeanRemoteServiceImpl beanRemoteService;
/** /**
* 消费监听 组织架构同步事件通知 * 消费监听 组织架构同步事件通知- 只变动当级数据
*/ */
@KafkaListener(topics = "BEAN_PEA_TAG_CHANGE", containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = "BEAN_PEA_TAG_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageTAG(ConsumerRecord<?, ?> record, Acknowledgment ack) { public void onMessageTAG(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value(); Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value); String jsonValue = JsonUtil.toJson(value);
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), jsonValue);
Optional<OrganizationTreeDTO.Request> branchInfo = JsonUtil.parse(jsonValue, OrganizationTreeDTO.Request.class);
//转换失败不提交消费确认
if (branchInfo.isEmpty()) {
log.warn("消费失败,无法转换为需要的数据格式, 消息:topic名称: {},分区:{}, ,值:{} ", record.topic(), record.partition(), jsonValue);
return;
}
try { try {
//TODO 消费 OrganizationTreeDTO.Request request = branchInfo.get();
} finally { } finally {
//手动提交 //手动提交
...@@ -54,18 +66,27 @@ public class BeanKafkaConsumer { ...@@ -54,18 +66,27 @@ public class BeanKafkaConsumer {
/** /**
* 消费监听 zu'zh的工程师信息同步事件通知 * 消费监听 组织的工程师信息同步事件通知 -只变动当级数据
*/ */
@KafkaListener(topics = "BEAN_PEA_USER_CHANGE", containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = "BEAN_PEA_USER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) { public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value(); Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value); String jsonValue = JsonUtil.toJson(value);
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), jsonValue);
Optional<OrganizationTreeDTO.Request> branchInfo = JsonUtil.parse(jsonValue, OrganizationTreeDTO.Request.class);
//转换失败不提交消费确认
if (branchInfo.isEmpty()) {
log.warn("消费失败,无法转换为需要的数据格式, 消息:topic名称: {},分区:{}, ,值:{} ", record.topic(), record.partition(), jsonValue);
return;
}
try { try {
//TODO 消费 OrganizationTreeDTO.Request request = branchInfo.get();
String levelValue = request.getLevelValue();
} finally { beanRemoteService.listDepartUserAndSaveUserDetailInfo(levelValue);
//手动提交
ack.acknowledge(); ack.acknowledge();
} catch (Exception exception) {
log.warn("消费失败,, 消息:topic名称: {},分区:{}, ,值:{}\n 失败原因:{} ", record.topic(), record.partition(),
jsonValue, exception.getMessage(), exception);
} }
} }
...@@ -104,7 +125,7 @@ public class BeanKafkaConsumer { ...@@ -104,7 +125,7 @@ public class BeanKafkaConsumer {
.collect(Collectors.toList()); .collect(Collectors.toList());
engineerBusinessDao.saveAll(engineerBusiness); engineerBusinessDao.saveAll(engineerBusiness);
//消费成功保存原始数据信息 并手动提交消费确认 //消费成功 手动提交消费确认
ack.acknowledge(); ack.acknowledge();
} catch (Exception exception) { } catch (Exception exception) {
log.warn("消费失败,, 消息:topic名称: {},分区:{}, ,值:{}\n 失败原因:{} ", record.topic(), record.partition(), log.warn("消费失败,, 消息:topic名称: {},分区:{}, ,值:{}\n 失败原因:{} ", record.topic(), record.partition(),
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!