Commit 0682cfd8 by 墨竹

fix:改为分批

parent 17ce1146
...@@ -152,11 +152,13 @@ public interface StaffApiService { ...@@ -152,11 +152,13 @@ public interface StaffApiService {
* 用户id wx企业id列表 * 用户id wx企业id列表
* *
* @param wxEnterpriseId wx企业标识 * @param wxEnterpriseId wx企业标识
* @return {@link List<String> } * @param pageNum 页面num
* @param pageSize 页面大小
* @return {@link List<StaffDTO> }
* @author mozhu * @author mozhu
* @date 2021-12-21 17:05:33 * @date 2022-02-25 09:40:15
*/ */
List<StaffDTO> listUserIdByWxEnterpriseId(String wxEnterpriseId); Page<StaffDTO> listUserIdByWxEnterpriseId(String wxEnterpriseId,Integer pageNum, Integer pageSize);
/** /**
......
...@@ -23,7 +23,6 @@ import com.gic.haoban.manage.service.service.*; ...@@ -23,7 +23,6 @@ import com.gic.haoban.manage.service.service.*;
import com.gic.haoban.manage.service.task.BaseSyncOperation; import com.gic.haoban.manage.service.task.BaseSyncOperation;
import com.gic.mq.sdk.GicMQClient; import com.gic.mq.sdk.GicMQClient;
import com.gic.redis.data.util.RedisUtil; import com.gic.redis.data.util.RedisUtil;
import com.gic.wechat.api.dto.qywx.QywxNewUseridDTO;
import com.gic.wechat.api.service.qywx.QywxDepartmentApiService; import com.gic.wechat.api.service.qywx.QywxDepartmentApiService;
import com.gic.wechat.api.service.qywx.QywxUserApiService; import com.gic.wechat.api.service.qywx.QywxUserApiService;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -484,21 +483,6 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ ...@@ -484,21 +483,6 @@ public class DealSyncOperationApiServiceImpl implements DealSyncOperationApiServ
syncCheckDTO.setStoreErrCount(storeErrCount); syncCheckDTO.setStoreErrCount(storeErrCount);
syncCheckDTO.setStaffErrCount(clerkErrCount); syncCheckDTO.setStaffErrCount(clerkErrCount);
syncCheckDTO.setHasDeal(totalCount - preCount); syncCheckDTO.setHasDeal(totalCount - preCount);
//拉取结束后
if (syncCheckDTO.getSyncStatus() == 4) {
String corpid = config.getCorpid();
if (corpid.length() > 20) {
//批量处理wx_user_id to wx_open_user_id
List<StaffDTO> staffDTOS = staffApiService.listUserIdByWxEnterpriseId(wxEnterpriseId);
logger.info("useridToOpenuserid请求:corpid:{}", corpid);
List<QywxNewUseridDTO> qywxNewUseridDTOS = qywxUserApiService.useridToOpenuserid(corpid, config.getWxSuiteid(), staffDTOS.stream().map(StaffDTO::getWxUserId).collect(Collectors.toList()));
if (CollectionUtils.isNotEmpty(qywxNewUseridDTOS)) {
for (QywxNewUseridDTO qywxNewUseridDTO : qywxNewUseridDTOS) {
staffService.updateOpenUserIdsByUserId(wxEnterpriseId, qywxNewUseridDTO.getUserid(), qywxNewUseridDTO.getOpen_userid());
}
}
}
}
return syncCheckDTO; return syncCheckDTO;
} }
......
...@@ -40,6 +40,7 @@ import com.gic.thirdparty.api.service.QQCloudPicService; ...@@ -40,6 +40,7 @@ import com.gic.thirdparty.api.service.QQCloudPicService;
import com.gic.wechat.api.dto.qywx.UserDTO; import com.gic.wechat.api.dto.qywx.UserDTO;
import com.gic.wechat.api.service.qywx.QywxUserApiService; import com.gic.wechat.api.service.qywx.QywxUserApiService;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -389,7 +390,7 @@ public class StaffApiServiceImpl implements StaffApiService { ...@@ -389,7 +390,7 @@ public class StaffApiServiceImpl implements StaffApiService {
String[] arr = getNationCodeAndPhoneNumber(mobile); String[] arr = getNationCodeAndPhoneNumber(mobile);
String nationCode = arr[0]; String nationCode = arr[0];
String phoneNumber = arr[1]; String phoneNumber = arr[1];
TabHaobanStaff staff = staffService.selectByPhoneNumberOrUserIdAndEnterpriseId(phoneNumber,userId, wxEnterpriseId); TabHaobanStaff staff = staffService.selectByPhoneNumberOrUserIdAndEnterpriseId(phoneNumber, userId, wxEnterpriseId);
if (staff != null) { if (staff != null) {
staffService.delOtherStaffByWxUserId(staff.getWxUserId(), staff.getStaffId(), wxEnterpriseId); staffService.delOtherStaffByWxUserId(staff.getWxUserId(), staff.getStaffId(), wxEnterpriseId);
} }
...@@ -899,18 +900,18 @@ public class StaffApiServiceImpl implements StaffApiService { ...@@ -899,18 +900,18 @@ public class StaffApiServiceImpl implements StaffApiService {
return null; return null;
} }
if (StringUtils.isBlank(staffDTO.getPhoneNumber())) { if (StringUtils.isBlank(staffDTO.getPhoneNumber())) {
this.wxGetAdd(userId,wxEnterpriseId); this.wxGetAdd(userId, wxEnterpriseId);
return EntityUtil.changeEntityByJSON(StaffDTO.class, staffService.selectByUserIdAndEnterpriseId(userId, wxEnterpriseId)); return EntityUtil.changeEntityByJSON(StaffDTO.class, staffService.selectByUserIdAndEnterpriseId(userId, wxEnterpriseId));
} }
//todo 历史数据 相同的id需要删除 后续这个逻辑可以删除 //todo 历史数据 相同的id需要删除 后续这个逻辑可以删除
//存在多条 //存在多条
List<String> userIds=new ArrayList<>(); List<String> userIds = new ArrayList<>();
userIds.add(userId); userIds.add(userId);
List<TabHaobanStaff> staffs = staffMapper.listByUserIdsAndWxEnterpriseId(userIds, wxEnterpriseId); List<TabHaobanStaff> staffs = staffMapper.listByUserIdsAndWxEnterpriseId(userIds, wxEnterpriseId);
//需要去除多余的 //需要去除多余的
if (CollectionUtils.isNotEmpty(staffs)&&staffs.size()>1) { if (CollectionUtils.isNotEmpty(staffs) && staffs.size() > 1) {
Map<String, TabHaobanStaff> staffMap = staffs.stream().collect(Collectors.toMap(dto -> dto.getStaffId(), dto -> dto,(o,n)->n)); Map<String, TabHaobanStaff> staffMap = staffs.stream().collect(Collectors.toMap(dto -> dto.getStaffId(), dto -> dto, (o, n) -> n));
List<String> relationStaffIds = staffClerkRelationService.listRelationsStaffId(staffMap.keySet()); List<String> relationStaffIds = staffClerkRelationService.listRelationsStaffId(staffMap.keySet());
if (CollectionUtils.isNotEmpty(relationStaffIds)) { if (CollectionUtils.isNotEmpty(relationStaffIds)) {
TabHaobanStaff staff = staffMap.get(relationStaffIds.get(0)); TabHaobanStaff staff = staffMap.get(relationStaffIds.get(0));
...@@ -1469,8 +1470,10 @@ public class StaffApiServiceImpl implements StaffApiService { ...@@ -1469,8 +1470,10 @@ public class StaffApiServiceImpl implements StaffApiService {
} }
@Override @Override
public List<StaffDTO> listUserIdByWxEnterpriseId(String wxEnterpriseId) { public Page<StaffDTO> listUserIdByWxEnterpriseId(String wxEnterpriseId, Integer pageNum, Integer pageSize) {
return EntityUtil.changeEntityListNew(StaffDTO.class, staffMapper.listUserIdByWxEnterpriseId(wxEnterpriseId)); PageHelper.startPage(pageNum, pageSize);
List<TabHaobanStaff> tabHaobanStaffs = staffMapper.listUserIdByWxEnterpriseId(wxEnterpriseId);
return PageUtil.changePageHelperToCurrentPage(new PageInfo<>(tabHaobanStaffs), StaffDTO.class);
} }
@Override @Override
......
package com.gic.haoban.manage.service.task; package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gic.api.base.commons.Page;
import com.gic.haoban.base.api.common.ServiceResponse; import com.gic.haoban.base.api.common.ServiceResponse;
import com.gic.haoban.manage.api.dto.DealParamMqDTO; import com.gic.haoban.manage.api.dto.DealParamMqDTO;
import com.gic.haoban.manage.api.dto.StaffDTO; import com.gic.haoban.manage.api.dto.StaffDTO;
...@@ -21,6 +22,9 @@ import com.gic.redis.data.util.RedisUtil; ...@@ -21,6 +22,9 @@ import com.gic.redis.data.util.RedisUtil;
import com.gic.wechat.api.dto.qywx.QywxNewUseridDTO; import com.gic.wechat.api.dto.qywx.QywxNewUseridDTO;
import com.gic.wechat.api.service.qywx.QywxUserApiService; import com.gic.wechat.api.service.qywx.QywxUserApiService;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -129,14 +133,30 @@ public class QywxClerkSyncOperation implements BaseSyncOperation { ...@@ -129,14 +133,30 @@ public class QywxClerkSyncOperation implements BaseSyncOperation {
TabHaobanWxApplication tabHaobanWxApplication = wxApplicationMapper.selectByCorpId(corpid, null); TabHaobanWxApplication tabHaobanWxApplication = wxApplicationMapper.selectByCorpId(corpid, null);
logger.info("同步通讯录完成,批量处理wx_user_id,corpid:{}", corpid); logger.info("同步通讯录完成,批量处理wx_user_id,corpid:{}", corpid);
if (corpid.length() > 20 || externalFlag == 1) { if (corpid.length() > 20 || externalFlag == 1) {
//批量处理wx_user_id to wx_open_user_id // 限流器
List<StaffDTO> staffDTOS = staffApiService.listUserIdByWxEnterpriseId(wxEnterpriseId); RRateLimiter rateLimiter = RedisUtil.getRedisClient().getRateLimiter("haoban:manage3:updateWxOpenUserId:limit");
logger.info("useridToOpenuserid请求:corpid:{}", corpid); // 每1秒产生5个令牌
List<QywxNewUseridDTO> qywxNewUseridDTOS = qywxUserApiService.useridToOpenuserid(corpid, tabHaobanWxApplication.getSiteId(), staffDTOS.stream().map(StaffDTO::getWxUserId).collect(Collectors.toList())); rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
if (CollectionUtils.isNotEmpty(qywxNewUseridDTOS)) { int pageNum = 1;
for (QywxNewUseridDTO qywxNewUseridDTO : qywxNewUseridDTOS) { while (true){
staffApiService.updateOpenUserIdsByUserId(wxEnterpriseId, qywxNewUseridDTO.getUserid(), qywxNewUseridDTO.getOpen_userid()); // 获取令牌
rateLimiter.acquire();
//企微转换最多支持1000个
//批量处理wx_user_id to wx_open_user_id
Page<StaffDTO> staffDTOPageInfo = pageStaffListByWxEnterpriseId(wxEnterpriseId,pageNum,1000);
List<StaffDTO> staffDTOS = staffDTOPageInfo.getResult();
if (CollectionUtils.isEmpty(staffDTOS)) {
logger.info("转换完成,为最后一个");
break;
} }
logger.info("useridToOpenuserid请求:corpid:{}", corpid);
List<QywxNewUseridDTO> qywxNewUseridDTOS = qywxUserApiService.useridToOpenuserid(corpid, tabHaobanWxApplication.getSiteId(), staffDTOS.stream().map(StaffDTO::getWxUserId).collect(Collectors.toList()));
if (CollectionUtils.isNotEmpty(qywxNewUseridDTOS)) {
for (QywxNewUseridDTO qywxNewUseridDTO : qywxNewUseridDTOS) {
staffApiService.updateOpenUserIdsByUserId(wxEnterpriseId, qywxNewUseridDTO.getUserid(), qywxNewUseridDTO.getOpen_userid());
}
}
pageNum++;
} }
} }
} }
...@@ -145,4 +165,9 @@ public class QywxClerkSyncOperation implements BaseSyncOperation { ...@@ -145,4 +165,9 @@ public class QywxClerkSyncOperation implements BaseSyncOperation {
} }
private Page<StaffDTO> pageStaffListByWxEnterpriseId(String wxEnterpriseId, Integer pageNum, Integer pageSize) {
return staffApiService.listUserIdByWxEnterpriseId(wxEnterpriseId,pageNum,pageSize);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment