Commit 3f4774e9 by fudahua

同步优化初步3

parent a59e2e2b
package com.gic.haoban.manage.service.task;
import com.gic.haoban.manage.api.dto.DealParamMqDTO;
import com.gic.haoban.manage.service.entity.TabHaobanPreDealLog;
/**
* Created 2020/4/13.
*
* @author hua
*/
public abstract class BaseSyncOperation {
public interface BaseSyncOperation {
/**
* 处理单条数据
*
* @param dealParamMqDTO
*/
public abstract void dealSingleByMq(DealParamMqDTO dealParamMqDTO, TabHaobanPreDealLog dataPre);
/**
* 单条处理异常
*
* @param taskId
* @param dataId
* @param enterpriseId
* @param reason
*/
public abstract void dealException(String taskId, String dataId, String enterpriseId, String reason);
/**
* 单条处理成功
*
* @param taskId
* @param dataId
* @param wxEnterpriseId
*/
public abstract void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId);
public void checkDepartmentTask(String taskId);
}
package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSONObject;
import com.gic.clerk.api.dto.ClerkListDTO;
import com.gic.clerk.api.service.ClerkService;
import com.gic.commons.util.GICMQClientUtil;
import com.gic.enterprise.api.dto.StoreDTO;
import com.gic.enterprise.api.service.StoreGroupService;
import com.gic.enterprise.api.service.StoreService;
import com.gic.haoban.base.api.common.Constant;
import com.gic.haoban.base.api.common.ServiceResponse;
import com.gic.haoban.manage.api.dto.DealParamMqDTO;
import com.gic.haoban.manage.api.dto.DepartmentDTO;
import com.gic.haoban.manage.api.enums.PreDealStatusEnum;
import com.gic.haoban.manage.api.enums.PreDealTypeEnum;
import com.gic.haoban.manage.api.enums.SyncTaskStatusEnum;
import com.gic.haoban.manage.api.service.DepartmentApiService;
import com.gic.haoban.manage.service.entity.TabHaobanDepartment;
import com.gic.haoban.manage.service.entity.TabHaobanPreDealLog;
import com.gic.haoban.manage.service.entity.TabHaobanSyncTask;
import com.gic.haoban.manage.service.service.DepartmentService;
import com.gic.haoban.manage.service.service.PreDealService;
import com.gic.haoban.manage.service.service.SyncTaskService;
import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Created 2020/4/13.
*
* @author hua
*/
@Service
public class ClerkSyncOperation implements BaseSyncOperation {
private static final Logger logger = LoggerFactory.getLogger(ClerkSyncOperation.class);
@Autowired
private DepartmentService departmentService;
@Autowired
private StoreGroupService storeGroupService;
@Autowired
private PreDealService preDealService;
@Autowired
private DepartmentApiService departmentApiService;
@Autowired
private SyncTaskService syncTaskService;
@Autowired
private StoreService storeService;
@Autowired
private ClerkService clerkService;
@Override
public void dealSingleByMq(DealParamMqDTO dealParamMqDTO, TabHaobanPreDealLog dataPre) {
logger.info("成员处理:{}", JSONObject.toJSONString(dealParamMqDTO));
}
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
}
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
}
@Override
public void checkDepartmentTask(String taskId) {
}
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
}
}
package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSONObject;
import com.gic.commons.util.GICMQClientUtil;
import com.gic.enterprise.api.dto.StoreDTO;
import com.gic.enterprise.api.dto.StoreGroupDTO;
import com.gic.enterprise.api.service.StoreGroupService;
import com.gic.enterprise.api.service.StoreService;
import com.gic.haoban.base.api.common.Constant;
import com.gic.haoban.base.api.common.ServiceResponse;
import com.gic.haoban.manage.api.dto.DealParamMqDTO;
import com.gic.haoban.manage.api.dto.DepartmentDTO;
import com.gic.haoban.manage.api.enums.PreDealStatusEnum;
import com.gic.haoban.manage.api.enums.PreDealTypeEnum;
import com.gic.haoban.manage.api.enums.SyncTaskStatusEnum;
import com.gic.haoban.manage.api.service.DepartmentApiService;
import com.gic.haoban.manage.service.entity.TabHaobanDepartment;
import com.gic.haoban.manage.service.entity.TabHaobanPreDealLog;
import com.gic.haoban.manage.service.entity.TabHaobanSyncTask;
import com.gic.haoban.manage.service.service.DepartmentService;
import com.gic.haoban.manage.service.service.PreDealService;
import com.gic.haoban.manage.service.service.SyncTaskService;
import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Created 2020/4/13.
*
* @author hua
*/
public class GroupSyncOperation extends BaseSyncOperation {
@Service
public class GroupSyncOperation implements BaseSyncOperation {
private static final Logger logger = LoggerFactory.getLogger(GroupSyncOperation.class);
@Autowired
private DepartmentService departmentService;
@Autowired
private StoreGroupService storeGroupService;
@Autowired
private PreDealService preDealService;
@Autowired
private DepartmentApiService departmentApiService;
@Autowired
private SyncTaskService syncTaskService;
@Autowired
private StoreService storeService;
@Override
public void dealSingleByMq(DealParamMqDTO dealParamMqDTO, TabHaobanPreDealLog dataPre) {
DepartmentDTO departmentDTO = departmentApiService.selectByRelatedId(dataPre.getDataId());
if (departmentDTO != null) {
logger.info("已经同步过了:{}", dataPre.getDataId());
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "该部门不存在");
return;
}
TabHaobanDepartment pParentDepartment = departmentService.selectByRelatedId(dataPre.getpDataId());
if (null == pParentDepartment) {
logger.info("不存在该父部门:c:{},p:{}", dataPre.getDataId(), dataPre.getpDataId());
return;
}
StoreGroupDTO groupDTO = storeGroupService.getStoreGroupById(dealParamMqDTO.getData());
if (null == groupDTO) {
logger.info("该部门不存在");
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "该部门不存在");
return;
}
DepartmentDTO department = new DepartmentDTO();
department.setParentDepartmentId(pParentDepartment.getDepartmentId());
department.setWxEnterpriseId(pParentDepartment.getWxEnterpriseId());
department.setDepartmentName(groupDTO.getStoreGroupName());
department.setChainId(pParentDepartment.getChainId() + Constant.ID_SEPARATOR + dealParamMqDTO.getData());
department.setChainName(pParentDepartment.getChainName() + Constant.NAME_SEPARATOR + department.getDepartmentName());
department.setIsStore(0);
department.setLevel(pParentDepartment.getLevel() + 1);
department.setRelatedId(dealParamMqDTO.getData());
department.setSort(groupDTO.getStoreGroupSort());
//状态 处理状态
boolean resultFlag = true;
try {
ServiceResponse add = departmentApiService.add(department);
if (add.getCode() != 1) {
resultFlag = false;
logger.info("处理失败");
return;
}
} catch (Exception e) {
resultFlag = false;
e.printStackTrace();
logger.info("处理异常");
} finally {
if (!resultFlag) {
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "处理异常或失败");
} else {
dealSuccess(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), dataPre.getWxEnterpriseId());
}
}
}
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
List<String> ids = storeGroupService.getStoreGroupIdsByParentId(enterpriseId, dataId);
if (CollectionUtils.isEmpty(ids)) {
logger.info("下面层级不存在");
//todo 是否需要加锁判断
return;
}
//todo 需要优化 测试
for (String id : ids) {
preDealService.updateStatusByDataId(taskId, id, PreDealStatusEnum.exception.getVal());
}
checkDepartmentTask(taskId);
}
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.computed.getVal());
if (!b) {
logger.info("成功修改错误!{}", dataId);
return;
}
List<StoreDTO> storeDTOS = storeService.listStoreByStoreGroupId(dataId);
//预处理门店 写入预处理表
if (CollectionUtils.isNotEmpty(storeDTOS)) {
List<TabHaobanPreDealLog> preDealLogList = storeDTOS.stream().map(storeDTO -> {
TabHaobanPreDealLog dealLog = new TabHaobanPreDealLog();
dealLog.setDataId(storeDTO.getStoreId());
dealLog.setpDataId(dataId);
dealLog.setDataType(PreDealTypeEnum.store.getVal());
dealLog.setStatusFlag(0);
dealLog.setTaskId(taskId);
dealLog.setEnterpriseId(storeDTO.getEnterpriseId());
dealLog.setWxEnterpriseId(wxEnterpriseId);
return dealLog;
}).collect(Collectors.toList());
preDealService.insert(preDealLogList);
}
List<TabHaobanPreDealLog> list = preDealService.listByPDataId(taskId, dataId, PreDealStatusEnum.pre.getVal());
if (CollectionUtils.isNotEmpty(list)) {
Set<String> mid = list.stream().map(tab -> tab.getDataId()).collect(Collectors.toSet());
dealDepartmentToMq(taskId, mid, SyncTaskStatusEnum.group_sync);
} else {
checkDepartmentTask(taskId);
}
}
@Override
public void checkDepartmentTask(String taskId) {
boolean b = preDealService.checkTask(taskId, PreDealTypeEnum.group.getVal());
if (!b) {
return;
}
logger.info("部门继续成功,开始门店同步:{}", taskId);
String key = "haoban_sync_department_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
//同步成功 进入门店处理
if (syncTask.getStatusFlag().equals(SyncTaskStatusEnum.init.getVal())) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.store_sync.getVal());
List<TabHaobanPreDealLog> dealLogs = preDealService.listByTaskId(taskId, PreDealTypeEnum.store.getVal(), PreDealStatusEnum.pre.getVal());
if (CollectionUtils.isEmpty(dealLogs)) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
} else {
logger.info("处理门店开始");
Set<String> dataIds = dealLogs.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, dataIds, SyncTaskStatusEnum.store_sync);
}
}
RedisUtil.unlock(key);
}
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
dealList.forEach(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendMessage("departmentSyncDealMq", JSONObject.toJSONString(dealParamMqDTO));
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId, relationId);
e.printStackTrace();
}
});
}
}
package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSONObject;
import com.gic.clerk.api.dto.ClerkListDTO;
import com.gic.clerk.api.service.ClerkService;
import com.gic.commons.util.GICMQClientUtil;
import com.gic.enterprise.api.dto.StoreDTO;
import com.gic.enterprise.api.dto.StoreGroupDTO;
import com.gic.enterprise.api.service.StoreGroupService;
import com.gic.enterprise.api.service.StoreService;
import com.gic.haoban.base.api.common.Constant;
import com.gic.haoban.base.api.common.ServiceResponse;
import com.gic.haoban.manage.api.dto.DealParamMqDTO;
import com.gic.haoban.manage.api.dto.DepartmentDTO;
import com.gic.haoban.manage.api.enums.PreDealStatusEnum;
import com.gic.haoban.manage.api.enums.PreDealTypeEnum;
import com.gic.haoban.manage.api.enums.SyncTaskStatusEnum;
import com.gic.haoban.manage.api.service.DepartmentApiService;
import com.gic.haoban.manage.service.entity.TabHaobanDepartment;
import com.gic.haoban.manage.service.entity.TabHaobanPreDealLog;
import com.gic.haoban.manage.service.entity.TabHaobanSyncTask;
import com.gic.haoban.manage.service.service.DepartmentService;
import com.gic.haoban.manage.service.service.PreDealService;
import com.gic.haoban.manage.service.service.SyncTaskService;
import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Created 2020/4/13.
*
* @author hua
*/
@Service
public class StoreSyncOperation implements BaseSyncOperation {
private static final Logger logger = LoggerFactory.getLogger(StoreSyncOperation.class);
@Autowired
private DepartmentService departmentService;
@Autowired
private StoreGroupService storeGroupService;
@Autowired
private PreDealService preDealService;
@Autowired
private DepartmentApiService departmentApiService;
@Autowired
private SyncTaskService syncTaskService;
@Autowired
private StoreService storeService;
@Autowired
private ClerkService clerkService;
@Override
public void dealSingleByMq(DealParamMqDTO dealParamMqDTO, TabHaobanPreDealLog dataPre) {
DepartmentDTO departmentDTO = departmentApiService.selectByRelatedId(dataPre.getDataId());
if (departmentDTO != null) {
logger.info("已经同步过了:{}", dataPre.getDataId());
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "该部门不存在");
return;
}
TabHaobanDepartment pParentDepartment = departmentService.selectByRelatedId(dataPre.getpDataId());
if (null == pParentDepartment) {
logger.info("不存在该父部门:c:{},p:{}", dataPre.getDataId(), dataPre.getpDataId());
return;
}
StoreDTO storeDTO = storeService.getStore(dealParamMqDTO.getData());
if (null == storeDTO) {
logger.info("该部门不存在");
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "该部门不存在");
return;
}
DepartmentDTO department = new DepartmentDTO();
department.setParentDepartmentId(pParentDepartment.getDepartmentId());
department.setWxEnterpriseId(pParentDepartment.getWxEnterpriseId());
department.setDepartmentName(storeDTO.getStoreName());
department.setChainId(pParentDepartment.getChainId() + Constant.ID_SEPARATOR + dealParamMqDTO.getData());
department.setChainName(pParentDepartment.getChainName() + Constant.NAME_SEPARATOR + department.getDepartmentName());
department.setIsStore(0);
department.setLevel(pParentDepartment.getLevel() + 1);
department.setRelatedId(dealParamMqDTO.getData());
//状态 处理状态
boolean resultFlag = true;
try {
ServiceResponse add = departmentApiService.add(department);
if (add.getCode() != 1) {
resultFlag = false;
logger.info("处理失败");
return;
}
} catch (Exception e) {
resultFlag = false;
e.printStackTrace();
logger.info("处理异常");
} finally {
if (!resultFlag) {
dealException(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), "处理异常或失败");
} else {
dealSuccess(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dataPre.getEnterpriseId(), dataPre.getWxEnterpriseId());
}
}
}
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
//处理异常
preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal());
checkDepartmentTask(taskId);
}
@Override
public void dealSuccess(String taskId, String dataId, String enterpriseId, String wxEnterpriseId) {
boolean b = preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.computed.getVal());
if (!b) {
logger.info("成功修改错误!{}", dataId);
return;
}
List<ClerkListDTO> clerks = clerkService.getClerkByStoreId(enterpriseId, dataId);
//预处理门店 写入预处理表
if (CollectionUtils.isNotEmpty(clerks)) {
List<TabHaobanPreDealLog> preDealLogList = clerks.stream().map(clerkMid -> {
TabHaobanPreDealLog dealLog = new TabHaobanPreDealLog();
dealLog.setDataId(clerkMid.getClerkId());
dealLog.setpDataId(dataId);
dealLog.setDataType(PreDealTypeEnum.clerk.getVal());
dealLog.setStatusFlag(0);
dealLog.setTaskId(taskId);
dealLog.setEnterpriseId(clerkMid.getEnterpriseId());
dealLog.setWxEnterpriseId(wxEnterpriseId);
return dealLog;
}).collect(Collectors.toList());
preDealService.insert(preDealLogList);
}
checkDepartmentTask(taskId);
}
@Override
public void checkDepartmentTask(String taskId) {
boolean b = preDealService.checkTask(taskId, PreDealTypeEnum.store.getVal());
if (!b) {
return;
}
logger.info("部门继续成功,开始门店同步:{}", taskId);
String key = "haoban_sync_store_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
//同步成功 进入门店处理
if (syncTask.getStatusFlag().equals(SyncTaskStatusEnum.store_sync.getVal())) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.clerk_sync.getVal());
//是否有成员 需要处理
List<TabHaobanPreDealLog> dealLogs = preDealService.listByTaskId(taskId, PreDealTypeEnum.clerk.getVal(), PreDealStatusEnum.pre.getVal());
if (CollectionUtils.isEmpty(dealLogs)) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
} else {
logger.info("处理成员开始");
Set<String> dataIds = dealLogs.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, dataIds, SyncTaskStatusEnum.clerk_sync);
}
}
RedisUtil.unlock(key);
}
/**
* 放入mq处理部门数据
*
* @param taskId
* @param dealList
*/
private void dealDepartmentToMq(String taskId, Set<String> dealList, SyncTaskStatusEnum syncTaskStatusEnum) {
//预处理分组任务
syncTaskService.updateTaskStatus(taskId, syncTaskStatusEnum.getVal());
dealList.forEach(relationId -> {
DealParamMqDTO dealParamMqDTO = new DealParamMqDTO();
dealParamMqDTO.setTaskId(taskId);
dealParamMqDTO.setData(relationId);
dealParamMqDTO.setType(syncTaskStatusEnum.getVal());
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
clientInstance.sendMessage("departmentSyncDealMq", JSONObject.toJSONString(dealParamMqDTO));
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId, relationId);
e.printStackTrace();
}
});
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment