Commit ba3bab87 by 刘鑫

feat(kafka): 消费网点工程师新增(全量, 增量)

1 parent a7f9ec8e
......@@ -109,11 +109,6 @@ public class EngineerParamDTO {
private String status;
/**
* 地址
*/
private String address;
/**
* 摩托车、汽车的车牌号,便于计算限行
*/
private String vehicleNo;
......
package com.dituhui.pea.order.kafka;
import com.dituhui.pea.order.common.jackson.JsonUtil;
import com.dituhui.pea.order.dao.BeanMqMessageDao;
import com.dituhui.pea.order.dao.EngineerBusinessDao;
import com.dituhui.pea.order.dao.EngineerInfoDao;
import com.dituhui.pea.order.dto.param.EngineerParamDTO;
import com.dituhui.pea.order.dto.param.Location;
import com.dituhui.pea.order.entity.BeanMqMessageEntity;
import com.dituhui.pea.order.entity.EngineerBusinessEntity;
import com.dituhui.pea.order.entity.EngineerInfoEntity;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 消费BEAN kafka消息
*/
@Component
@Slf4j
@AllArgsConstructor
public class BeanKafkaConsumer {
private final BeanMqMessageDao beanMqMessageDao;
private final EngineerBusinessDao engineerBusinessDao;
private final EngineerInfoDao engineerInfoDao;
/**
* 消费监听 组织架构同步事件通知
......@@ -31,13 +54,12 @@ public class BeanKafkaConsumer {
/**
* 消费监听 网点/小组内的工程师信息同步事件通知
* 消费监听 zu'zh的工程师信息同步事件通知
*/
@KafkaListener(topics = "BEAN_PEA_USER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value);
try {
//TODO 消费
......@@ -49,19 +71,128 @@ public class BeanKafkaConsumer {
/**
* 消费监听 网点/小组内的工程师信息同步事件通知
* 将网点/小组内的工程师信息,推送给PEA系统;包括2种模式:全量/增量;
* 全量数据或者增量数据 ---均为新增工程师记录
*/
@KafkaListener(topics = "BEAN_PEA_ENGINEER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
public void onMessageEngineer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Object value = record.value();
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), value);
String jsonValue = JsonUtil.toJson(value);
log.info("收到消息:topic名称: {},分区:{}, ,值:{}", record.topic(), record.partition(), jsonValue);
Optional<EngineerParamDTO.Request> nodeEngineerInfo = JsonUtil.parse(jsonValue, EngineerParamDTO.Request.class);
//转换失败不提交消费确认
if (nodeEngineerInfo.isEmpty()) {
log.warn("消费失败,无法转换为需要的数据格式, 消息:topic名称: {},分区:{}, ,值:{} ", record.topic(), record.partition(), jsonValue);
return;
}
try {
//TODO 消费
EngineerParamDTO.Request request = nodeEngineerInfo.get();
//网点ID
final String targetGroupId = request.getGroupId();
//fixme 是否需要校验网点id是否存在存疑
//存在则获取工程师信息 最后新增
} finally {
//手动提交
List<EngineerParamDTO.Engineer> engineers = request.getEgineers();
//组装工程师信息
List<EngineerInfoEntity> needSaveEngineer = engineers.stream()
.map(t -> getEngineerInfoEntity(targetGroupId, t)).collect(Collectors.toList());
engineerInfoDao.saveAll(needSaveEngineer);
engineerInfoDao.flush();
List<EngineerBusinessEntity> engineerBusiness = engineers.stream()
.map(t -> getEngineerBusiness(t.getEngineerCode(), t.getLocation(), String.valueOf(t.getVehicle()), t.getVehicleNo()))
.collect(Collectors.toList());
engineerBusinessDao.saveAll(engineerBusiness);
//消费成功保存原始数据信息 并手动提交消费确认
ack.acknowledge();
} catch (Exception exception) {
log.warn("消费失败,, 消息:topic名称: {},分区:{}, ,值:{}\n 失败原因:{} ", record.topic(), record.partition(),
jsonValue, exception.getMessage(), exception);
}
}
private EngineerBusinessEntity getEngineerBusiness(String engineerCode, Location address, String vehicle, String vehicleNo) {
EngineerBusinessEntity engineerBusiness = engineerBusinessDao.getByEngineerCode(engineerCode);
if (Objects.isNull(engineerBusiness)) {
engineerBusiness = new EngineerBusinessEntity();
engineerBusiness.setEngineerCode(engineerCode);
//以下为数据库默认值
engineerBusiness.setWorkOn("08:00");
engineerBusiness.setWorkOff("18:00");
engineerBusiness.setMaxMinute(600);
engineerBusiness.setMaxNum(0);
engineerBusiness.setMaxDistance(0);
engineerBusiness.setPriority(1);
engineerBusiness.setDeparture(3);
engineerBusiness.setDispatchStrategy("CENTER");
engineerBusiness.setCreateTime(LocalDateTime.now());
}
//fixme 工作地址存贮
if (Objects.nonNull(address)) {
engineerBusiness.setAddress(Objects.isNull(address.getAddress()) ? "" : address.getAddress());
engineerBusiness.setX(Objects.isNull(address.getLongitude()) ? "" : String.valueOf(address.getLongitude()));
engineerBusiness.setY(Objects.isNull(address.getLatitude()) ? "" : String.valueOf(address.getLatitude()));
} else {
engineerBusiness.setAddress("");
engineerBusiness.setX("");
engineerBusiness.setY("");
}
engineerBusiness.setVehicle(vehicle);
engineerBusiness.setVehicleNo(vehicleNo);
engineerBusiness.setUpdateTime(LocalDateTime.now());
return engineerBusiness;
}
private EngineerInfoEntity getEngineerInfoEntity(String departId, EngineerParamDTO.Engineer sourceEngineer) {
final String engineerCode = sourceEngineer.getEngineerCode();
//查询是否有当前工程师工号
EngineerInfoEntity engineerInfo = engineerInfoDao.getByEngineerCode(engineerCode);
//获取有用信息保存工程师信息
if (Objects.isNull(engineerInfo)) {
engineerInfo = new EngineerInfoEntity();
}
engineerInfo.setEngineerCode(engineerCode);
engineerInfo.setName(sourceEngineer.getName());
engineerInfo.setGroupId(departId);
engineerInfo.setCosmosId("");
engineerInfo.setGender(sourceEngineer.getGender());
final Date birth = sourceEngineer.getBirth();
engineerInfo.setBirth(Objects.isNull(birth) ? "" : String.valueOf(birth.toInstant().atZone(ZoneId.systemDefault()).getYear()));
engineerInfo.setPhone(sourceEngineer.getPhone());
engineerInfo.setKind(sourceEngineer.getKind());
engineerInfo.setGrade(sourceEngineer.getGrade());
List<String> credentials = sourceEngineer.getCredentials();
engineerInfo.setCredentials(CollectionUtils.isNotEmpty(credentials) ? String.join(",", credentials) : "");
//默认汽车
engineerInfo.setVehicle(0 == sourceEngineer.getVehicle() ? 1 : sourceEngineer.getVehicle());
engineerInfo.setBeanStatus(Integer.valueOf(sourceEngineer.getStatus()));
final Location location = sourceEngineer.getLocation();
//fixme 工作地址存贮
if (Objects.nonNull(location)) {
engineerInfo.setAddress(location.getAddress());
engineerInfo.setWorkAddress(location.getAddress());
} else {
engineerInfo.setWorkAddress("");
engineerInfo.setAddress("");
}
engineerInfo.setCreateTime(LocalDateTime.now());
engineerInfo.setUpdateTime(LocalDateTime.now());
return engineerInfo;
}
private BeanMqMessageEntity buildBeanMqMsg(String msg) {
BeanMqMessageEntity mqMessage = new BeanMqMessageEntity();
mqMessage.setMsg(msg);
mqMessage.setCreateTime(LocalDateTime.now());
return mqMessage;
}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!