Commit 335e8760 by 徐高华

同步好友优化

parent 7bee41d5
......@@ -209,4 +209,6 @@ public interface DealSyncOperationApiService {
* @param taskId
*/
void dealTagTask(String wxEnterpriseId, String enterpriseId, List<String> tagIds, String taskId);
void taskStatusTimer(String params) ;
}
......@@ -92,8 +92,6 @@ public interface PreDealService {
*/
public int countByTaskId(String taskId, int dataType, int status);
public int countByTaskId(String taskId);
/**
* 获取任务数量
*
......@@ -201,5 +199,6 @@ public interface PreDealService {
*/
public List<String> listReDataIdByPDataId(String taskId, String pDataId, Integer dataType);
public void addTaskStatusCache(String taskId) ;
}
......@@ -14,6 +14,7 @@ import com.gic.redis.data.util.RedisUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.redisson.api.RSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -30,6 +31,8 @@ import java.util.List;
@Service
public class PreDealServiceImpl implements PreDealService {
public static final String HAOBAN_TASK_ID_SET_CACHE = "haobanTaskSyncIdSetCache" ;
@Autowired
private PreDealLogMapper preDealLogMapper;
private static final Logger logger = LoggerFactory.getLogger(PreDealServiceImpl.class);
......@@ -93,16 +96,8 @@ public class PreDealServiceImpl implements PreDealService {
@Override
public boolean checkTask(String taskId, int dataType) {
String key = taskId+"_checkTask"+dataType ;
Object o = null ; //RedisUtil.getCache(key) ;
if(null == o) {
int i = preDealLogMapper.checkTask(taskId, dataType, PreDealStatusEnum.pre.getVal());
boolean flag = (i==0) ;
RedisUtil.setCache(key,flag,60*3l);
return flag ;
}else {
return (boolean) o ;
}
int i = preDealLogMapper.checkTask(taskId, dataType, PreDealStatusEnum.pre.getVal());
return (i==0) ;
}
@Override
......@@ -116,19 +111,6 @@ public class PreDealServiceImpl implements PreDealService {
}
@Override
public int countByTaskId(String taskId) {
String key = taskId+"countByTaskId" ;
Object o = null ; //RedisUtil.getCache(key) ;
if(null == o) {
int count = this.preDealLogMapper.countByTaskId(taskId,-1,0) ;
RedisUtil.setCache(key,count,60*3l);
return count ;
}else {
return (int) o ;
}
}
@Override
public int countExcepAndPreByTaskId(String taskId, int dataType) {
return preDealLogMapper.countExcepAndPreByTaskId(taskId, dataType);
}
......@@ -192,34 +174,23 @@ public class PreDealServiceImpl implements PreDealService {
@Override
public boolean checkFriendTask(String taskId, int dataType) {
String key = taskId+"_checkFriendTask"+dataType ;
Object o = null ; //RedisUtil.getCache(key) ;
if(null == o) {
int i = preDealLogMapper.checkTaskFriend(taskId, null, dataType);
boolean flag = (i==0) ;
RedisUtil.setCache(key,flag,60*3l);
return flag ;
}else {
return (boolean) o ;
}
int i = preDealLogMapper.checkTaskFriend(taskId, null, dataType);
return (i==0) ;
}
@Override
public boolean checkFriendTaskByPDataId(String taskId, String pDataId, int dataType) {
String key = taskId+"_checkFriendTaskByPDataId"+dataType ;
Object o = null ; //RedisUtil.getCache(key) ;
if(null == o) {
int i = preDealLogMapper.checkTaskFriend(taskId, pDataId, dataType);
boolean flag = (i==0) ;
RedisUtil.setCache(key,flag,60*3l);
return flag ;
}else {
return (boolean) o ;
}
int i = preDealLogMapper.checkTaskFriend(taskId, pDataId, dataType);
return (i==0) ;
}
@Override
public List<String> listReDataIdByPDataId(String taskId, String pDataId, Integer dataType) {
return preDealLogMapper.listReDataIdByPDataId(taskId, pDataId, dataType);
}
public void addTaskStatusCache(String taskId){
RSet<String> set = RedisUtil.getRedisClient().getSet(HAOBAN_TASK_ID_SET_CACHE);
set.add(taskId) ;
}
}
......@@ -19,6 +19,7 @@ import com.gic.haoban.manage.api.service.DealSyncOperationApiService;
import com.gic.haoban.manage.service.dao.mapper.StaffMapper;
import com.gic.haoban.manage.service.entity.*;
import com.gic.haoban.manage.service.service.*;
import com.gic.haoban.manage.service.service.impl.PreDealServiceImpl;
import com.gic.haoban.manage.service.task.BaseSyncOperation;
import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil;
......@@ -26,6 +27,8 @@ import com.gic.wechat.api.service.qywx.QywxDepartmentApiService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -90,7 +93,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
private static final String LOCK_SUCCESS_KEY = "sync_wx_success_lock_";
private static final String FRIEND_LOCK = "haoban_fresh_wx_friend_lock_";
public static final String FRIEND_LOCK = "haoban_fresh_wx_friend_lock_";
private static final String FRIEND_TAG_LOCK = "haoban_sync_tag_lock_";
......@@ -135,41 +138,7 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
@Override
public void departmentDealMq2(String params) {
DealParamMqDTO dealParamMqDTO = JSONObject.parseObject(params, DealParamMqDTO.class);
TabHaobanPreDealLog dataPre = preDealService.getByDataId(dealParamMqDTO.getTaskId(), dealParamMqDTO.getData(), dealParamMqDTO.getType() - 1, PreDealStatusEnum.all.getVal());
logger.info("好办同步处理mq处理开始:{},dataPre={}", params,JSON.toJSONString(dataPre));
if (dataPre == null) {
return;
}
Integer dataType = dataPre.getDataType();
logger.info("同步类型,dataType:{}",dataType);
if (dataType.equals(PreDealTypeEnum.dept.getVal())) {
logger.info("0 部门处理:{}", params);
operationMap.get("qywxDeptSyncOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.clerk.getVal())) {
logger.info("2 成员处理:{}", params);
operationMap.get("qywxStaffSyncOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.friend_clerk.getVal())) {
logger.info("6 好友导购处理:{}", params);
operationMap.get("friendClerkSyncNewOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.friend.getVal())) {
logger.info("8 第三方好友处理:{}", params);
operationMap.get("friendSyncNewOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.self_friend.getVal())) {
logger.info("7 自建好友处理:{}", params);
operationMap.get("selfFriendSyncNewOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.tag.getVal())) {
logger.info("10 会员标签处理:{}", params);
operationMap.get("friendTagSyncOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if (dataType.equals(PreDealTypeEnum.tag_member.getVal())) {
logger.info("11 会员标签处理标签项:{}", params);
operationMap.get("friendMemberTagSyncOperation").dealSingleByMq(dealParamMqDTO, dataPre);
} else if(dataType.equals(PreDealTypeEnum.modify_hm.getVal())) {
logger.info("20 活码批量修改");
operationMap.get("qwHmOperation").dealSingleByMq(dealParamMqDTO, dataPre);
}else {
logger.info("不是正常数据类型:{}", params);
}
this.departmentDealMq(params) ;
}
......@@ -836,4 +805,58 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
Set<String> hashSet = dealLogList.stream().map(TabHaobanPreDealLog::getDataId).collect(Collectors.toSet());
dealDepartmentToMq(taskId, hashSet, SyncTaskStatusEnum.tag);
}
/**
* 定时修改任务状态
* @param params
*/
@Override
public void taskStatusTimer(String params) {
try {
RSet<String> set = RedisUtil.getRedisClient().getSet(PreDealServiceImpl.HAOBAN_TASK_ID_SET_CACHE);
logger.info("处理好办任务状态={}",set);
Iterator<String> it = set.iterator() ;
while(it.hasNext()) {
String taskId = it.next();
TabHaobanSyncTask task = syncTaskService.getSyncTask(taskId);
if(null == task) {
logger.info("任务不存在taskId={}",taskId);
it.remove();
continue;
}
// 刷新整个企业好友
if(task.getTaskType()==SyncTaskTypeEnum.FRIEND_WX_ENTEPRISE.getType()) {
boolean flag = this.updateTaskStatus(taskId);
if(flag) {
it.remove();
}
}
}
logger.info("处理好办任务状态定时完成");
}catch(Exception e) {
logger.warn(e.toString(),e);
}
}
private boolean updateTaskStatus(String taskId) {
boolean b = preDealService.checkFriendTask(taskId, PreDealTypeEnum.friend_clerk.getVal());
if (!b) {
return false;
}
logger.info("好友同步成功:{}", taskId);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
if (syncTask.getStatusFlag() == SyncTaskStatusEnum.compute.getVal()
|| syncTask.getStatusFlag() == SyncTaskStatusEnum.exception_compute.getVal()) {
logger.info("已经处理成功:{}", taskId);
return true;
}
int errCount = preDealService.countByTaskId(taskId, -1, PreDealStatusEnum.exception.getVal());
if (errCount > 0) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.exception_compute.getVal());
} else {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
}
return true ;
}
}
......@@ -50,8 +50,6 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
@Autowired
private WxEnterpriseService wxEnterpriseService ;
private static final String FRIEND_LOCK = "haoban_fresh_wx_friend_lock_";
private static final String TRY_AGAIN = "haoban_fresh_wx_friend_again_";
@Override
......@@ -62,6 +60,7 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
try {
String wxEnterpriseId = dataPre.getWxEnterpriseId();
String staffId = dataPre.getDataId() ;
this.preDealService.addTaskStatusCache(dataPre.getTaskId());
TabHaobanStaff staff = this.staffService.selectById(staffId) ;
if(null == staff) {
logger.info("staff不存在,staffId={}",staffId);
......@@ -186,7 +185,6 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
@Override
public void dealException(String taskId, String dataId, String enterpriseId, String reason) {
preDealService.updateStatusByDataId(taskId, dataId, PreDealStatusEnum.exception.getVal(), reason);
checkDepartmentTask(taskId);
}
@Override
......@@ -195,33 +193,10 @@ public class FriendClerkSyncNewOperation implements BaseSyncOperation {
List<String> dataIds = preDealService.listReDataIdByPDataId(taskId, dataId, PreDealTypeEnum.self_friend.getVal());
logger.info("下一步处理的数据:{},dataId={}", JSONObject.toJSONString(dataIds) , dataId);
dealDepartmentToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.self_friend_sync);
checkDepartmentTask(taskId);
}
@Override
public void checkDepartmentTask(String taskId) {
boolean b = preDealService.checkFriendTask(taskId, PreDealTypeEnum.friend_clerk.getVal());
if (!b) {
return;
}
logger.info("好友同步成功:{}", taskId);
String key = "haoban_sync_friend_task_" + taskId;
RedisUtil.lock(key, 3L);
TabHaobanSyncTask syncTask = syncTaskService.getSyncTask(taskId);
if (syncTask.getStatusFlag() == SyncTaskStatusEnum.compute.getVal()
|| syncTask.getStatusFlag() == SyncTaskStatusEnum.exception_compute.getVal()) {
logger.info("已经处理成功:{}", taskId);
return;
}
int errCount = preDealService.countByTaskId(taskId, -1, PreDealStatusEnum.exception.getVal());
if (errCount > 0) {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.exception_compute.getVal());
} else {
syncTaskService.updateTaskStatus(taskId, SyncTaskStatusEnum.compute.getVal());
}
String staffLockKey = FRIEND_LOCK + syncTask.getAddUser();
RedisUtil.delCache(staffLockKey);
RedisUtil.unlock(key);
}
......
......@@ -12,6 +12,7 @@ import com.gic.haoban.manage.api.service.MemberUnionidRelatedApiService;
import com.gic.haoban.manage.service.entity.TabHaobanPreDealLog;
import com.gic.haoban.manage.service.exception.WxApiLimitException;
import com.gic.haoban.manage.service.service.*;
import com.gic.haoban.manage.service.service.out.impl.DealSyncOperationApiServiceImpl;
import com.gic.haoban.manage.service.task.BaseSyncOperation;
import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil;
......@@ -42,8 +43,6 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
private MemberUnionidRelatedApiService memberUnionidRelatedApiService;
@Autowired
private SyncTaskService syncTaskService;
@Autowired
private FriendClerkSyncNewOperation friendClerkSyncNewOperation;
private static final String TRY_AGAIN = "haoban_fresh_wx_friend_again_";
......@@ -171,12 +170,13 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
* @param pDataId
*/
public void checkDepartmentTask(String taskId, String pDataId) {
//校验是否都已经处理完成
boolean b = preDealService.checkFriendTaskByPDataId(taskId, pDataId, PreDealTypeEnum.friend.getVal());
if (!b) {
return;
}
logger.info("同步第三方服务商好友结束:{},{}", taskId, pDataId);
String staffLockKey = DealSyncOperationApiServiceImpl.FRIEND_LOCK + pDataId;
RedisUtil.delCache(staffLockKey);
logger.info("同步第三方好友结束:{},{},{}", taskId, pDataId,staffLockKey);
String key = "haoban_sync_third_friend_task_" + taskId + ":" + pDataId;
String lockKey = "haoban_sync_third_friend_task_lock_" + taskId + ":" + pDataId;
RedisUtil.lock(lockKey, 3L);
......@@ -186,18 +186,17 @@ public class FriendSyncNewOperation implements BaseSyncOperation {
RedisUtil.unlock(lockKey);
return;
}
RedisUtil.setCache(key, pDataId, 30L, TimeUnit.SECONDS);
TabHaobanPreDealLog preDealLog = preDealService.getByDataId(taskId, pDataId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.dealing.getVal());
if (preDealLog != null) {
preDealService.updateStatusByDataId(taskId, pDataId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
this.checkDepartmentTask(taskId);
}
RedisUtil.unlock(lockKey);
}
@Override
public void checkDepartmentTask(String taskId) {
friendClerkSyncNewOperation.checkDepartmentTask(taskId);
}
/**
......
......@@ -48,8 +48,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
@Autowired
private SyncTaskService syncTaskService;
@Autowired
private FriendClerkSyncNewOperation friendClerkSyncNewOperation;
@Autowired
private WxEnterpriseService wxEnterpriseService ;
private static final String TRY_AGAIN = "haoban_fresh_wx_friend_again_";
......@@ -95,7 +93,7 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
}
//更新状态
this.preDealService.updateFriendStatusByDataId(taskId, dataPre.getDataId(), PreDealStatusEnum.computed.getVal(), reason, json.toJSONString(), relationKey , PreDealTypeEnum.self_friend.getVal());
dealSuccess(taskId, dataPre.getDataId(), null, wxEnterpriseId);
dealSuccess(taskId, dataPre.getDataId(), dataPre.getpDataId(), wxEnterpriseId);
} catch (WxApiLimitException e) {
logger.info("接口次数限制:{}", JSONObject.toJSONString(dataPre));
dealFlag = tryAgainToMq(dataPre);
......@@ -120,10 +118,8 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
}
@Override
public void dealSuccess(String taskId, String dataId, String dataContent, String wxEnterpriseId) {
TabHaobanPreDealLog data = preDealService.getByDataId(taskId, dataId, PreDealTypeEnum.self_friend.getVal(), -1);
//校验成功是否都
checkDepartmentTask(taskId, data.getpDataId());
public void dealSuccess(String taskId, String dataId, String pDataId, String wxEnterpriseId) {
checkDepartmentTask(taskId, pDataId);
}
/**
......@@ -164,7 +160,7 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
logger.info("校验直接返回");
return;
}
logger.info("同步第三方服务商好友开始:{},{}", taskId, pDataId);
logger.info("同步第三方好友开始:{},{}", taskId, pDataId);
String key = "haoban_sync_self_friend_task_" + taskId + ":" + pDataId;
String lockKey = "haoban_sync_self_friend_task_lock_" + taskId + ":" + pDataId;
RedisUtil.lock(lockKey, 3L);
......@@ -180,8 +176,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
logger.info("第三方的数据为空");
//更新父级别
preDealService.updateStatusByDataId(taskId, pDataId, PreDealTypeEnum.friend_clerk.getVal(), PreDealStatusEnum.computed.getVal(), "成功");
//校验异常
this.checkDepartmentTask(taskId);
} else {
dealDepartmentToMq(taskId, new HashSet<>(dataIds), SyncTaskStatusEnum.friend_sync);
}
......@@ -190,7 +184,6 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
@Override
public void checkDepartmentTask(String taskId) {
friendClerkSyncNewOperation.checkDepartmentTask(taskId);
}
/**
......@@ -213,7 +206,7 @@ public class SelfFriendSyncNewOperation implements BaseSyncOperation {
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
try {
Log.info("发送队列SelfFriendSyncNewOperation={}",JSON.toJSONString(ret));
clientInstance.sendBatchMessages("departmentSyncDealMq2", ret);
clientInstance.sendBatchMessages("departmentSyncDealMq", ret);
} catch (Exception e) {
logger.info("发送失败:{},{}", taskId);
e.printStackTrace();
......
......@@ -257,6 +257,18 @@
and status_flag in(0,3)
</select>
<select id="checkTaskFriend" resultType="Integer">
select
count(*)
from tab_haoban_pre_deal_log
where task_id = #{taskId}
<if test="pDataId!=null">
and p_data_id=#{pDataId}
</if>
and data_type=#{dataType}
and status_flag in(0,1)
</select>
<select id="listRebuildDepartByTaskId" resultMap="BaseResultMap">
SELECT
<include refid="Base_Column_List"/>
......@@ -365,18 +377,6 @@
and data_type =#{dataType}
</select>
<select id="checkTaskFriend" resultType="Integer">
select
count(*)
from tab_haoban_pre_deal_log
where task_id = #{taskId}
<if test="pDataId!=null">
and p_data_id=#{pDataId}
</if>
and data_type=#{dataType}
and status_flag in(0,1)
</select>
<select id="listReDataIdByPDataId" resultType="String">
select
data_id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment