Commit e798a6a5 by 陶光胜

报表中心

parent 23327ce8
......@@ -25,6 +25,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.redisson.api.RMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -46,7 +48,7 @@ import java.util.concurrent.TimeUnit;
*/
@Service("downloadReportApiService")
public class DownloadReportApiServiceImpl implements DownloadReportApiService {
private static final Logger log = LogManager.getLogger(DownloadReportApiServiceImpl.class);
@Autowired
private DownloadReportService downloadReportService;
@Autowired
......@@ -71,21 +73,25 @@ public class DownloadReportApiServiceImpl implements DownloadReportApiService {
checkReportHeart.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
RMap<Object, Object> map = RedisUtil.getRedisClient().getMap(reportKey);
if(MapUtils.isNotEmpty(map)){
Iterator<Object> iterator = map.keySet().iterator();
while (iterator.hasNext()){
Object next = iterator.next();
long putTime = (long) map.get(next);
if((System.currentTimeMillis() - putTime) > 10*60*1000){
//超过十分钟 如果报告还在生成中则设置为失败
TabDownloadReport report = downloadReportService.getById((Integer) next);
if(report != null && report.getStatus().equals(DownloadReportStatusEnum.BUILD.getCode())){
downloadReportService.updateDownloadStatus(next.toString(), DownloadReportStatusEnum.ERROR);
try {
RMap<Object, Object> map = RedisUtil.getRedisClient().getMap(reportKey);
if(MapUtils.isNotEmpty(map)){
Iterator<Object> iterator = map.keySet().iterator();
while (iterator.hasNext()){
Object next = iterator.next();
long putTime = (long) map.get(next);
if((System.currentTimeMillis() - putTime) > 10*60*1000){
//超过十分钟 如果报告还在生成中则设置为失败
TabDownloadReport report = downloadReportService.getById((Integer) next);
if(report != null && report.getStatus().equals(DownloadReportStatusEnum.BUILD.getCode())){
downloadReportService.updateDownloadStatus(next.toString(), DownloadReportStatusEnum.ERROR);
}
map.remove(next);
}
map.remove(next);
}
}
}catch (Throwable e){
log.warn(e);
}
}
}, 3, 3, TimeUnit.SECONDS);
......
......@@ -95,40 +95,44 @@ public class HiveDataDownloadApiServiceImpl implements HiveDataDownloadApiServic
downloadService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Iterator<DownloadTaskPOJO> iterator = taskList.iterator();
while (iterator.hasNext()){
DownloadTaskPOJO task = iterator.next();
if(task.isCanDownload()){
iterator.remove();
// 修改任务状态为生成中
downloadReportService.updateDownloadStatus(task.getReportId(), DownloadReportStatusEnum.BUILD);
// 获取hive连接
Connection conn = HiveConnUtils.getConnection();
Statement stat = null;
try {
stat = conn.createStatement();
stat.execute("REFRESH TABLE " + task.getTableName()); // 强制刷新表结构 防止数据出错
ResultSet rs = stat.executeQuery(task.getHiveSql());
String url = FileUtils.createFile(rs, task.getDataType(), task.getDesenField(), task.getFileName(), task.getExcelExtension());
downloadReportService.updateDownloadUrl(Integer.valueOf(task.getReportId()), url, rs.getRow());
} catch (Exception e) {
log.warn("下载数据出现异常", e);
// 任务状态修改
downloadReportService.updateDownloadStatus(task.getReportId(), DownloadReportStatusEnum.ERROR);
//todo 线程重启
} finally {
try{
Iterator<DownloadTaskPOJO> iterator = taskList.iterator();
log.info("定时执行任务下载:{}", taskList.size());
while (iterator.hasNext()){
DownloadTaskPOJO task = iterator.next();
if(task.isCanDownload()){
iterator.remove();
// 修改任务状态为生成中
downloadReportService.updateDownloadStatus(task.getReportId(), DownloadReportStatusEnum.BUILD);
// 获取hive连接
Connection conn = HiveConnUtils.getConnection();
Statement stat = null;
try {
if(stat != null) {
stat.close();
}
if(conn != null) {
conn.close();
stat = conn.createStatement();
stat.execute("REFRESH TABLE " + task.getTableName()); // 强制刷新表结构 防止数据出错
ResultSet rs = stat.executeQuery(task.getHiveSql());
String url = FileUtils.createFile(rs, task.getDataType(), task.getDesenField(), task.getFileName(), task.getExcelExtension());
downloadReportService.updateDownloadUrl(Integer.valueOf(task.getReportId()), url, rs.getRow());
} catch (Exception e) {
log.warn("下载数据出现异常", e);
// 任务状态修改
downloadReportService.updateDownloadStatus(task.getReportId(), DownloadReportStatusEnum.ERROR);
} finally {
try {
if(stat != null) {
stat.close();
}
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
log.warn("stat 关闭异常", e);
}
} catch (SQLException e) {
log.warn("stat 关闭异常", e);
}
}
}
}catch (Throwable e){
log.warn(e);
}
}
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
......@@ -138,27 +142,30 @@ public class HiveDataDownloadApiServiceImpl implements HiveDataDownloadApiServic
applyService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//todo 处理因为异常线程退出问题
Iterator<DownloadTaskPOJO> iterator = taskList.iterator();
while (iterator.hasNext()){
DownloadTaskPOJO task = iterator.next();
if(!task.isCanDownload()){
TabDownloadReport report = downloadReportService.getById(Integer.valueOf(task.getReportId()));
Integer auditResult = report.getAuditResult();
/**没有审批通过则直接跳过*/
if(DownloadReportStatusEnum.WAIT.getCode().equals(auditResult)){
continue;
}
/**审批通过 设置任务为可下载状态***/
if(DownloadReportStatusEnum.PASS.getCode().equals(auditResult)){
task.setCanDownload(true);
}
/**超时移除任务*/
if(DownloadReportStatusEnum.OVER_TIME.getCode().equals(auditResult)){
iterator.remove();
try{
Iterator<DownloadTaskPOJO> iterator = taskList.iterator();
while (iterator.hasNext()){
DownloadTaskPOJO task = iterator.next();
if(!task.isCanDownload()){
TabDownloadReport report = downloadReportService.getById(Integer.valueOf(task.getReportId()));
Integer auditResult = report.getAuditResult();
/**没有审批通过则直接跳过*/
if(DownloadReportStatusEnum.WAIT.getCode().equals(auditResult)){
continue;
}
/**审批通过 设置任务为可下载状态***/
if(DownloadReportStatusEnum.PASS.getCode().equals(auditResult)){
task.setCanDownload(true);
}
/**超时移除任务*/
if(DownloadReportStatusEnum.OVER_TIME.getCode().equals(auditResult)){
iterator.remove();
}
}
}
}
}catch (Throwable e){
log.warn(e);
}
}
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment