Commit 5e3c21f8 by 刘鑫

feat: kafka 组织机构变更处理

1 parent ca328e5f
...@@ -4,6 +4,7 @@ import com.dituhui.pea.order.common.jackson.JsonUtil; ...@@ -4,6 +4,7 @@ import com.dituhui.pea.order.common.jackson.JsonUtil;
import com.dituhui.pea.order.dao.BeanMqMessageDao; import com.dituhui.pea.order.dao.BeanMqMessageDao;
import com.dituhui.pea.order.dao.EngineerBusinessDao; import com.dituhui.pea.order.dao.EngineerBusinessDao;
import com.dituhui.pea.order.dao.EngineerInfoDao; import com.dituhui.pea.order.dao.EngineerInfoDao;
import com.dituhui.pea.order.dao.OrgGroupDao;
import com.dituhui.pea.order.dto.OrganizationTreeDTO; import com.dituhui.pea.order.dto.OrganizationTreeDTO;
import com.dituhui.pea.order.dto.param.EngineerParamDTO; import com.dituhui.pea.order.dto.param.EngineerParamDTO;
import com.dituhui.pea.order.dto.param.Location; import com.dituhui.pea.order.dto.param.Location;
...@@ -39,6 +40,7 @@ public class BeanKafkaConsumer { ...@@ -39,6 +40,7 @@ public class BeanKafkaConsumer {
private final EngineerBusinessDao engineerBusinessDao; private final EngineerBusinessDao engineerBusinessDao;
private final EngineerInfoDao engineerInfoDao; private final EngineerInfoDao engineerInfoDao;
private final BeanRemoteServiceImpl beanRemoteService; private final BeanRemoteServiceImpl beanRemoteService;
private final OrgGroupDao orgGroupDao;
/** /**
* 消费监听 组织架构同步事件通知- 只变动当级数据 * 消费监听 组织架构同步事件通知- 只变动当级数据
...@@ -56,11 +58,25 @@ public class BeanKafkaConsumer { ...@@ -56,11 +58,25 @@ public class BeanKafkaConsumer {
} }
try { try {
OrganizationTreeDTO.Request request = branchInfo.get(); OrganizationTreeDTO.Request request = branchInfo.get();
final String orgId = request.getLevelValue();
if (!"group".equalsIgnoreCase(request.getLevelType())) {
beanRemoteService.departmentDetail(beanRemoteService.getAccessToken(), orgId, null);
}
} finally {
if (beanRemoteService.isBeanSelf(orgId)) {
beanRemoteService.departmentDetail(beanRemoteService.getAccessToken(), orgId, null);
}
if (beanRemoteService.isWangDian(orgId)) {
beanRemoteService.serviceOrgDetail(beanRemoteService.getAccessToken(), orgId);
}
//手动提交 //手动提交
ack.acknowledge(); ack.acknowledge();
} catch (Exception exception) {
log.warn("消费失败,, 消息:topic名称: {},分区:{}, ,值:{}\n 失败原因:{} ", record.topic(), record.partition(),
jsonValue, exception.getMessage(), exception);
} }
} }
...@@ -92,7 +108,7 @@ public class BeanKafkaConsumer { ...@@ -92,7 +108,7 @@ public class BeanKafkaConsumer {
/** /**
* 将网点/小组内的工程师信息,推送给PEA系统;包括2种模式:全量/增量; * 将网点内的工程师信息,推送给PEA系统;包括2种模式:全量/增量;
* 全量数据或者增量数据 ---均为新增工程师记录 * 全量数据或者增量数据 ---均为新增工程师记录
*/ */
@KafkaListener(topics = "BEAN_PEA_ENGINEER_CHANGE", containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = "BEAN_PEA_ENGINEER_CHANGE", containerFactory = "kafkaListenerContainerFactory")
......
...@@ -404,6 +404,22 @@ public class BeanRemoteServiceImpl { ...@@ -404,6 +404,22 @@ public class BeanRemoteServiceImpl {
/** /**
* 是否自有组织机构
*
* @param orgId 组织机构ID
* @return true or false
*/
public boolean isBeanSelf(String orgId) {
BeanR<Department> departmentBeanR = beanRemoteService.departmentDetail(getAccessToken(), orgId);
return departmentBeanR.getSuccess() && !ObjUtil.isNull(departmentBeanR.getData());
}
public boolean isWangDian(String serviceOrgId) {
BeanR<ServiceOrgDetail> beanR = beanRemoteService.serviceOrgDetail(getAccessToken(), serviceOrgId);
return beanR.getSuccess() && !ObjUtil.isNull(beanR.getData());
}
/**
* 处理单极机构,包含大区,分部,分站 * 处理单极机构,包含大区,分部,分站
* *
* @param token * @param token
...@@ -528,8 +544,12 @@ public class BeanRemoteServiceImpl { ...@@ -528,8 +544,12 @@ public class BeanRemoteServiceImpl {
return Result.success(); return Result.success();
} }
private Result serviceOrgDetail(String accessToken, ServiceOrg serviceOrg) { public Result serviceOrgDetail(String accessToken, ServiceOrg serviceOrg) {
BeanR<ServiceOrgDetail> beanR = beanRemoteService.serviceOrgDetail(accessToken, serviceOrg.getServiceOrgId()); return serviceOrgDetail(accessToken, serviceOrg.getServiceOrgId());
}
public Result serviceOrgDetail(String accessToken, String serviceOrgId) {
BeanR<ServiceOrgDetail> beanR = beanRemoteService.serviceOrgDetail(accessToken, serviceOrgId);
log.info("[查询网点/车队列表]【/api/openapi/department/queryServiceOrgList】返回值-------------------->{}", JsonUtil.toJson(beanR)); log.info("[查询网点/车队列表]【/api/openapi/department/queryServiceOrgList】返回值-------------------->{}", JsonUtil.toJson(beanR));
if (!beanR.getSuccess() || ObjUtil.isNull(beanR.getData())) { if (!beanR.getSuccess() || ObjUtil.isNull(beanR.getData())) {
return Result.failed(beanR.getMessage()); return Result.failed(beanR.getMessage());
...@@ -572,7 +592,7 @@ public class BeanRemoteServiceImpl { ...@@ -572,7 +592,7 @@ public class BeanRemoteServiceImpl {
* *
* @return * @return
*/ */
private String getAccessToken() { public String getAccessToken() {
if (redisService.hasKey(CACHE_ACCESS_TOKEN_KEY) && redisService.getExpireTime(CACHE_ACCESS_TOKEN_KEY) > 500) { if (redisService.hasKey(CACHE_ACCESS_TOKEN_KEY) && redisService.getExpireTime(CACHE_ACCESS_TOKEN_KEY) > 500) {
return redisService.get(CACHE_ACCESS_TOKEN_KEY); return redisService.get(CACHE_ACCESS_TOKEN_KEY);
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!