Commit ad3a1107 by fudahua

feat: 新版本的下载

parent 8330f895
......@@ -8,6 +8,9 @@ public class DownloadTaskStatus {
/** 排队中 */
public static final String WAITING = "waiting";
/** hdfs已经处理完等待下载 */
public static final String DOWNLOAD_HDFS = "downloading";
/** 生成中 */
public static final String BUILDING = "building";
......
......@@ -146,6 +146,16 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoopCommonVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoopCommonVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoopCommonVersion}</version>
<exclusions>
......@@ -199,8 +209,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<verbose/>
......
package com.gic.cloud.data.hook.service;
import com.gic.cloud.data.hook.service.entity.ColumnInfo;
import java.util.List;
public interface DownloadFunc {
public void deal(String[] cells, List<ColumnInfo> titles, boolean fileFirst);
}
package com.gic.cloud.data.hook.service;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class HDFSUtil {
private static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
private static HDFSUtil hdfsUtil=null;
private static FileSystem fileSystem=null;
public static HDFSUtil getInstance(){
if (hdfsUtil==null) {
synchronized (HDFSUtil.class) {
if (hdfsUtil==null) {
hdfsUtil = new HDFSUtil();
}
}
}
return hdfsUtil;
}
private HDFSUtil(){
Config appConfig = ConfigService.getAppConfig();
String hdfsUrl = appConfig.getProperty("hdfs.url", null);
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS",hdfsUrl);
try {
fileSystem = FileSystem.get(configuration);
} catch (IOException e) {
logger.info("hdfs初始化失败-{}",e);
}
}
/**
* 下载到本地
* @param srcPath
* @param toPath
* @return
*/
public boolean downloadFile(String srcPath,String toPath) {
try {
fileSystem.copyToLocalFile(true,new Path(srcPath),new Path(toPath));
return true;
} catch (IOException e) {
logger.info("下载失败:{}",e);
return false;
}
}
}
......@@ -62,6 +62,11 @@ public interface DownloadTaskDao {
*/
public List<DownloadTask> getDownloadTaskOfWaiting(@Param("queryDataSource") String queryDataSource);
/** 获取等待申请通过状态的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfHasDownload(@Param("queryDataSource") String queryDataSource);
DownloadProcessDTO getDownloadProcess();
int updateTaskStatusError(@Param("idList") List<String> idList);
......
package com.gic.cloud.data.hook.service.entity;
import java.io.Serializable;
public class ColumnInfo implements Serializable {
private String type;
private String title;
public ColumnInfo() {
}
public ColumnInfo(String type, String title) {
this.type = type;
this.title = title;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
}
......@@ -228,6 +228,13 @@ public class DownloadTaskServiceImpl implements IDownloadTaskService {
return downloadTaskDao.getDownloadTaskOfWaiting(queryDataSource);
}
/** 获取在审核申请等待状态中的任务
* @return
*/
public List<DownloadTask> getDownloadTaskOfHasDownload(String queryDataSource) {
return downloadTaskDao.getDownloadTaskOfWaiting(queryDataSource);
}
/** 获取指定申请编号的风险模式记录
* @param applyId
......
......@@ -11,20 +11,21 @@ import com.gic.cloud.data.hook.api.service.DecryptKeyService;
import com.gic.cloud.data.hook.api.service.IDownloadTaskService;
import com.gic.cloud.data.hook.api.service.IFlatQueryResultService;
import com.gic.cloud.data.hook.service.*;
import com.gic.cloud.data.hook.service.dao.FlatQueryFieldDao;
import com.gic.cloud.data.hook.service.dao.FlatQueryTableDao;
import com.gic.cloud.data.hook.service.entity.ColumnInfo;
import com.gic.cloud.data.hook.service.entity.CsvDataFilterMode;
import com.gic.cloud.data.hook.service.entity.CsvResultSetHelper;
import com.gic.dubbo.entity.ProviderLocalTag;
import com.gic.enterprise.api.dto.EnterpriseDTO;
import com.gic.enterprise.api.service.EnterpriseService;
import com.gic.qcloud.BucketNameEnum;
import com.gic.qcloud.FileRecordLogUtil;
import com.gic.qcloud.FileUploadUtil;
import com.gic.thirdparty.cloudfile.CloudFileUtil;
import com.gic.thirdparty.cloudfile.enums.CloudFileBusinessOptEnum;
import com.gic.thirdparty.cloudfile.enums.CloudFileTypeEnum;
import com.gic.thirdparty.cloudfile.pojo.CloudFileInfo;
import com.google.common.collect.Lists;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import com.opencsv.ResultSetHelper;
import org.apache.commons.collections.CollectionUtils;
......@@ -32,11 +33,6 @@ import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.poi.hssf.usermodel.HSSFCell;
import org.apache.poi.hssf.usermodel.HSSFCellStyle;
import org.apache.poi.hssf.usermodel.HSSFDataFormat;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
......@@ -44,13 +40,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.*;
import java.nio.charset.Charset;
import java.sql.*;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
......@@ -59,6 +51,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/** 自助指标查询服务实现
* @author Sam
......@@ -73,6 +68,9 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
/** csv / xls 下载目录 */
public static final String SAVE_FOLDER = "/usr/local/data-hook-file";
public static final String HDFS_URL = "/data/hook";
public static final List<String> PHONE = Arrays.asList("mobile", "phone", "phone_number", "receive_phone_number");
public static final List<String> CARD = Arrays.asList("card_num", "receive_card_num");
......@@ -92,6 +90,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
@Autowired
DecryptKeyService decryptKeyService;
/** 自助查询字段 DAO */
@Autowired
private FlatQueryFieldDao flatQueryFieldDao;
@Autowired
private EnterpriseService enterpriseService;
......@@ -102,7 +104,8 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
private FlatQueryResultServiceImpl() {
log.debug("construct", "准备初始化 FlatQuery 查询服务");
runDistTask(3);
runDealHiveFile(3);
// runDistTask(3);
runDownloadTask(3);
runBalaDownloadTask(3);
runBigDataDownloadTask(3);
......@@ -520,7 +523,7 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
curTask.setApplyPermitted(Global.NO); // 完整模式默认不允许
curTask.setApplyStatus(DownloadApplyStatus.WAITING);
} // IF ELSE OVER
curTask.setRealSql(""); // 设置查询为空
// curTask.setRealSql(""); // 设置查询为空
curTask.setEnterpriseId(enterpriseIds.get(0));
Integer preReportCount = DownloadTaskServiceImpl.getInstance().getDownloadTaskCountByEnterpriseId(curTask.getEnterpriseId());
curTask.setReportId(ServiceUtils.buildReportId(curTask.getQueryDataType(), preReportCount));
......@@ -544,9 +547,23 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
condition.setAuthStoreIdList(authStoreIdList);
// 保存下载任务
curTask.setDownloadCondition(JSON.toJSONString(condition));
//sql
String fullQuery = buildFlatQuerySQL(
false, // 下载用途
condition.getTableId(),
condition.getEnterpriseIds(),
condition.getConditions(),
condition.getOrderField(),
condition.getOrderDir(),
condition.getExecDistinct(),
0,
condition.getAuthStoreIdList());
curTask.setRealSql(fullQuery);
curTask.setFieldSize(condition.getAllFields().size());
DownloadTaskServiceImpl.getInstance().createDownloadTask(curTask);
// 保存至列表
this.taskConditions.add(condition);
// this.taskConditions.add(condition);
return curTask;
}
......@@ -640,7 +657,32 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
private
/**
* 执行 下载
* @param interval
*/
private void runDealHiveFile(Integer interval) {
distService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ProviderLocalTag providerLocalTag = ProviderLocalTag.tag.get();
providerLocalTag.traceId = UUID.randomUUID().toString();
try{
List<DownloadTask> downloadTasks = DownloadTaskServiceImpl.getInstance().getDownloadTaskOfHasDownload(QueryDataSource.FLAT_QUERY);
downloadTasks.parallelStream().forEach(mid->{
//下载处理
takeFileNew(mid);
});
}catch (Exception e){
logger.info("异常:{}",e);
logger.info("[ 自助指标下载异常 ]: {}", e.getMessage());
e.printStackTrace();
} finally {
}
}
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
/** 下载任务执行计时器 */
......@@ -800,6 +842,120 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
}, interval*1000, interval*1000, TimeUnit.MILLISECONDS);
}
/**
* download下载
*/
public void takeFileNew(DownloadTask task) {
if (task==null) {
logger.info("不存在");
return;
}
FlatQueryTaskCondition condition = JSON.parseObject(task.getDownloadCondition(), FlatQueryTaskCondition.class);
if (!task.getStatus().equals(DownloadTaskStatus.DOWNLOAD_HDFS)) {
logger.info("文件hdfs没有处理完成:{}",JSONObject.toJSONString(task));
return;
}
task.setStatus(DownloadTaskStatus.DOWNLOAD_HDFS);
task.setDownloadWay(-1);
task.setDownloadTime(new Date());
//下载文件
String dirName="hdfs"+task.getId();
String path=HDFS_URL+"/"+dirName;
boolean downloadFlag = HDFSUtil.getInstance().downloadFile(path, SAVE_FOLDER);
if (!downloadFlag) {
logger.info("下载失败:{}-{}",path,JSONObject.toJSONString(task));
return;
}
List<String> xlsxFiles=new ArrayList<>();
AtomicInteger count=new AtomicInteger(0);
AtomicReference<XlsxFileInfo> currentFile=null;
readCsvFile(condition.getTableId(),dirName,(cells,titles,firstFlag)->{
if (count.get()==0) {
XlsxFileInfo xlsxFileInfo = new XlsxFileInfo();
xlsxFileInfo.filepath=SAVE_FOLDER + "/" + task.getId() + xlsxFiles.size() + ".xlsx";
xlsxFileInfo.workbook=new SXSSFWorkbook(100);
currentFile.set(xlsxFileInfo);
xlsxFiles.add(currentFile.get().filepath);
}
saveXlsSplitNew(currentFile.get().filepath,cells,titles,currentFile.get(),count);
});
//是否压缩
boolean zipFlag = (xlsxFiles.size() > 1) ? true : false;
String cloudFileUrl=null;
try {
String taskFileExt = task.getUseCompress().equals(Global.YES) || zipFlag ? ".zip" : task.getFormat().equals(DownloadFileFormat.CSV) ? ".csv" : ".xlsx";
if (zipFlag || task.getUseCompress().equals(Global.YES)) {
logger.info("[ runDownloadTask.run ]: {}", "准备生成自助指标压缩文件 " + task.getId() + ".zip");
String zipFilePath = SAVE_FOLDER + "/" + task.getId() + ".zip";
OutputStream os = new FileOutputStream(zipFilePath);
File zipFile = new File(zipFilePath);
List<File> files = xlsxFiles.stream().map(mid -> new File(mid)).collect(Collectors.toList());
FileUtil.toZip(files, os);
logger.info("[ 开始上传压缩文件到腾讯云 ]: {}", task.getId());
cloudFileUrl = cloudFileUpload(task.getEnterpriseId(), zipFile, taskFileExt.substring(1));
} else {
logger.info("[ 开始上传文件到腾讯云 ]: {}", task.getId());
cloudFileUrl = cloudFileUpload(task.getEnterpriseId(), new File(currentFile.get().filepath), taskFileExt.substring(1));
}
logger.info("[ 上传腾讯云 ]: {}", "地址为:" + cloudFileUrl);
task.setStatus(DownloadTaskStatus.COMPLISHED);
task.setOverTime(new Date());
task.setFilePath(cloudFileUrl);
DownloadTaskServiceImpl.getInstance().updateDownloadTask(task);
}catch (Exception e) {
logger.info("异常:{}",e);
}
}
private class XlsxFileInfo{
String filepath;
SXSSFWorkbook workbook;
}
/**
* 读物文件
* @param dirName
* @param func
*/
private void readCsvFile(String tableId,String dirName,DownloadFunc func){
File file = new File(SAVE_FOLDER+"/"+dirName);
File[] files = file.listFiles();
List<ColumnInfo> titles=null;
for (File midFile : files) {
if (midFile.getName().lastIndexOf("csv")<0) {
continue;
}
try {
CSVReader csvReader = new CSVReader(new FileReader(midFile));
String[] cells = csvReader.readNext();
boolean first=true;
do {
if (titles == null) {
List<String> cellsList = Arrays.asList(cells);
List<FlatQueryField> fields = flatQueryFieldDao.getFlatQueryFieldByTableId(tableId);
Map<String, ColumnInfo> columnInfoMap = fields.stream().filter(mid -> cellsList.contains(mid.getFieldName())).collect(Collectors.toMap(mid -> mid.getFieldName(),
mid -> new ColumnInfo(mid.getFieldType(), mid.getFieldMark())));
for (String cell : cells) {
titles.add(columnInfoMap.get(cell));
}
}
func.deal(cells,titles,first);
first=false;
}while ((cells = csvReader.readNext())!=null);
} catch (Exception e) {
logger.info("读取异常:{}",e);
}
}
}
private void takeFile(FlatQueryTaskCondition condition, Connection conn, int downloadType){
if (condition != null) {
// 更新任务状态
......@@ -983,6 +1139,86 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
} // IF OVER
}
/**
* 写入excel
* @param originalFilePath
* @param cells
*/
private void saveXlsSplitNew(String originalFilePath, String[] cells, List<ColumnInfo> titles, XlsxFileInfo xlsxFileInfo, AtomicInteger count){
try {
if (xlsxFileInfo.workbook==null) {
xlsxFileInfo.workbook=new SXSSFWorkbook(100);
Sheet sheet = xlsxFileInfo.workbook.createSheet();
Row row = sheet.createRow(0);
Cell cell;
logger.info("类型:{}",JSONObject.toJSONString(titles));
for(int j =0; j<titles.size(); j++){
cell = row.createCell(j);
cell.setCellValue(titles.get(j).getTitle());
}
return;
}
SXSSFWorkbook wb = xlsxFileInfo.workbook; // 内存中保留 100 行
Sheet sheet = wb.getSheetAt(0);
//日期
CellStyle yyyyMMddhhmmss = wb.createCellStyle();
DataFormat dataFormat = wb.createDataFormat();
yyyyMMddhhmmss.setDataFormat(dataFormat.getFormat("yyyy-MM-dd HH:mm:ss"));
//日期
CellStyle yyyyMMdd = wb.createCellStyle();
DataFormat yyyyMMddDataFormat = wb.createDataFormat();
yyyyMMdd.setDataFormat(yyyyMMddDataFormat.getFormat("yyyy-MM-dd"));
Row row = sheet.createRow(count.get());
for(int j=0; j<cells.length; j++){
String type = titles.get(j).getType();
Cell midCell = row.createCell(j);
String columnValue = cells[j];
if (StringUtils.isBlank(columnValue)) {
midCell.setCellValue(columnValue);
continue;
}
switch (type) {
case FlatQueryFieldType.DATE:
if (columnValue.length()==10) {
midCell.setCellStyle(yyyyMMdd);
midCell.setCellValue(DateUtils.parseDate(columnValue,new String[]{"yyyy-MM-dd"}));
}else {
midCell.setCellStyle(yyyyMMddhhmmss);
midCell.setCellValue(DateUtils.parseDate(columnValue, new String[]{"yyyy-MM-dd HH:mm:ss"}));
}
break;
case FlatQueryFieldType.NUMBER:
if (columnValue.indexOf(".")>0) {
midCell.setCellValue(Double.valueOf(columnValue));
}else {
midCell.setCellValue(Long.valueOf(columnValue));
}
break;
default:
midCell.setCellValue(columnValue);
}
}
Integer limitSize = FileUtil.getLimitSize();
int c = count.incrementAndGet();
if (c>=limitSize) {
FileOutputStream fileOut = new FileOutputStream(originalFilePath);
wb.write(fileOut);
//fileOut.flush(); // SXSSFWorkbook 使用 auto-flush 模式
fileOut.close();
//wb.close();
wb.dispose(); // SXSSFWorkbook 没有 close 方法
logger.info("[ runDownloadTask.run ]: {}", "已生成自助指标下载文件 " + originalFilePath + ".xlsx");
count.set(0);
}
}catch (Exception e) {
logger.info("异常:{}",e);
}
}
private void saveXlsSplit(String originalFilePath,ResultSetHelper helper,ResultSet rs,FlatQueryTaskCondition condition){
try {
Integer limitSize = FileUtil.getLimitSize();
......
......@@ -187,6 +187,17 @@
AND q.del_flag = '0'
</select>
<select id="getDownloadTaskOfHasDownload" resultType="DownloadTask">
SELECT *
FROM
<include refid="queryTables"/>
<include refid="queryJoins"/>
WHERE
q.query_data_source = #{queryDataSource}
AND q.status = "downloading"
AND q.del_flag = '0'
</select>
<select id="getDownloadProcess" resultType="DownloadProcessDTO">
SELECT sum(amount) dataAmount,count(1) taskAmount
FROM
......
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class hdfs {
public static void main(String[] args) {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://10.0.1.13:4007");
try {
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.copyToLocalFile(false,new Path("/data/emr/order-1.csv"),new Path("D:\\testorder"));
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment