Commit 8fb1c0ea by fudahua

Merge branch 'feature-2023-04' into 'developer'

feat: 正在处理的

See merge request !88
parents 2c7d9597 a2ccc65c
......@@ -76,4 +76,9 @@ public interface DownloadTaskDao {
*/
public int getCountDownloadTaskOfBuilding(@Param("queryDataSource") String queryDataSource);
/** 获取等待申请通过状态的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfBuilding(@Param("queryDataSource") String queryDataSource);
}
......@@ -242,6 +242,13 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService {
return downloadTaskDao.getCountDownloadTaskOfBuilding(queryDataSource);
}
/** 获取在审核申请等待状态中的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfBuilding(String queryDataSource) {
return downloadTaskDao.getDownloadTaskOfBuilding(queryDataSource);
}
/** 获取指定申请编号的风险模式记录
* @param applyId
......
......@@ -75,6 +75,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public static final String HDFS_URL = "/data/hook";
public static final String LOCK_KEY="data:hook:hive";
public static final List<String> PHONE = Arrays.asList("mobile", "phone", "phone_number", "receive_phone_number");
......@@ -111,9 +113,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDealHiveFile(3);
// runDistTask(3);
runDownloadTask(3);
runBalaDownloadTask(3);
runBigDataDownloadTask(3);
// runDownloadTask(3);
// runBalaDownloadTask(3);
// runBigDataDownloadTask(3);
runApplyTask(5); // 每5秒钟进行任务状态检测
}
......@@ -128,6 +130,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public void dealFileMq(String params) {
logger.info("处理文件:{}",params);
DownloadTask downloadTask = JSON.parseObject(params, DownloadTask.class);
String key=LOCK_KEY+":"+downloadTask.getId();
RedisUtil.delCache(key);
takeFileNew(downloadTask);
}
......@@ -582,28 +586,48 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
@Override
@PostConstruct
public void initTask(){
RedisUtil.lock(LOCK_KEY,3L);
try {
List<DownloadRecord> list = this.downloadTaskService.listUnDownloadTask(QueryDataSource.FLAT_QUERY);
list = DataInitUtils.listByHost(list);
log.debug("flatInitList", JSON.toJSONString(list));
if(CollectionUtils.isNotEmpty(list)){
for(DownloadRecord record : list){
if(StringUtils.isNotBlank(record.getDownloadCondition())){
FlatQueryTaskCondition condition = JSON.parseObject(record.getDownloadCondition(), FlatQueryTaskCondition.class);
if(record.getApplyStatus().equals(DownloadApplyStatus.TIMEOUT)){
// List<DownloadRecord> list = this.downloadTaskService.listUnDownloadTask(QueryDataSource.FLAT_QUERY);
// list = DataInitUtils.listByHost(list);
// log.debug("flatInitList", JSON.toJSONString(list));
// if(CollectionUtils.isNotEmpty(list)){
// for(DownloadRecord record : list){
// if(StringUtils.isNotBlank(record.getDownloadCondition())){
// FlatQueryTaskCondition condition = JSON.parseObject(record.getDownloadCondition(), FlatQueryTaskCondition.class);
// if(record.getApplyStatus().equals(DownloadApplyStatus.TIMEOUT)){
// continue;
// }
// if(condition.getBuildPermitted().equals(Global.NO)){
// if(record.getApplyPermitted().equals(Global.YES)){
// condition.setBuildPermitted(Global.YES);
// }
// }
// this.taskConditions.add(condition);
//
// }
// }
// }
//之前正在处理的队列需要重新处理
List<DownloadTask> tasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfBuilding(QueryDataSource.FLAT_QUERY);
logger.info("初始化任务:{}",JSONObject.toJSONString(tasks));
for (DownloadTask task : tasks) {
String key=LOCK_KEY+":"+task.getId();
Object cache = RedisUtil.getCache(key);
if (cache!=null) {
continue;
}
if(condition.getBuildPermitted().equals(Global.NO)){
if(record.getApplyPermitted().equals(Global.YES)){
condition.setBuildPermitted(Global.YES);
}
}
this.taskConditions.add(condition);
}
}
delFileOrDirByTaskId(task.getId());
//重新处理
task.setStatus(DownloadTaskStatus.WAITING);
task.setDownloadTime(new Date());
DownloadTaskServiceImpl.getInstance().updateDownloadTask(task);
}
}catch (Exception e){
e.printStackTrace();
}finally {
RedisUtil.unlock(LOCK_KEY);
}
}
......@@ -683,17 +707,17 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
logger.info("执行hivefile");
try{
String lockKey="data:hook:hive";
RedisUtil.lock(lockKey,3L);
RedisUtil.lock(LOCK_KEY,3L);
int curBuildingCount = DownloadTaskServiceImpl.getInstance().getCountDownloadTaskOfBuilding(QueryDataSource.FLAT_QUERY);
Config appConfig = ConfigService.getAppConfig();
Integer maxBuildingCount = appConfig.getIntProperty("buildingCount", 5);
if (maxBuildingCount<=curBuildingCount) {
RedisUtil.unlock(lockKey);
logger.info("超出限制:{}>{}",curBuildingCount,maxBuildingCount);
RedisUtil.unlock(LOCK_KEY);
return;
}
List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,10);
List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY,5);
if (CollectionUtils.isNotEmpty(downloadTasks)) {
for (DownloadTask downloadTask : downloadTasks) {
downloadTask.setStatus(DownloadTaskStatus.BUILDING);
......@@ -701,15 +725,11 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
DownloadTaskServiceImpl.getInstance().updateDownloadTask(downloadTask);
GicMQClient clientInstance = GICMQClientUtil.getClientInstance();
clientInstance.sendMessage("dataHookFileDeal",JSONObject.toJSONString(downloadTask));
String key=LOCK_KEY+":"+downloadTask.getId();
RedisUtil.setCache(key,1,12*60*60L);
}
}
RedisUtil.unlock(lockKey);
// downloadTasks.parallelStream().forEach(mid->{
// ProviderLocalTag localTag = ProviderLocalTag.tag.get();
// localTag.traceId = traceId;
// //下载处理
// takeFileNew(mid);
// });
RedisUtil.unlock(LOCK_KEY);
}catch (Exception e){
logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
......@@ -905,7 +925,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
boolean downloadFlag = HDFSUtil.getInstance().downloadFile(path, SAVE_FOLDER);
if (!downloadFlag) {
logger.info("下载失败:{}-{}",path,JSONObject.toJSONString(task));
return;
throw new RuntimeException("下载失败:"+condition.getTaskId());
}
stopWatch.stop();
logger.info("下载耗时:{}",stopWatch.getLastTaskTimeMillis());
......@@ -1312,6 +1332,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
}
}catch (Exception e) {
logger.info("异常:{}",e);
throw new RuntimeException(e);
}
}
......
......@@ -196,6 +196,7 @@
WHERE
q.query_data_source = #{queryDataSource}
AND q.status = "downloading"
AND q.apply_permitted = 1
AND q.del_flag = '0'
order by start_time asc limit ${num}
</select>
......@@ -231,4 +232,16 @@
AND q.del_flag = '0'
</select>
<select id="getDownloadTaskOfBuilding" resultType="DownloadTask">
SELECT *
FROM
<include refid="queryTables"/>
<include refid="queryJoins"/>
WHERE
q.query_data_source = #{queryDataSource}
AND q.status = "building"
AND q.del_flag = '0'
order by start_time asc
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment