Commit 2ea2ccb7 by guos

会员标签4.0

parent fd640b16
......@@ -38,6 +38,11 @@ import org.elasticsearch.spark.cfg.SparkSettingsManager;
import org.elasticsearch.spark.rdd.EsRDDWriter;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.*;
import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
......@@ -53,6 +58,8 @@ public class TagProcessManager {
private MysqlRddManager member4RddManager;
private MysqlRddManager enterprise4RddManager;
private DataSourceSharding memberSharding4Datasource;
private MysqlDatasource member4Datasource = null;
private MysqlDatasource enterprise4Datasource = null;
private boolean isProduction;
private static TagProcessManager instance;
public static TagProcessManager getInstance(){
......@@ -69,8 +76,6 @@ public class TagProcessManager {
public void init(boolean isProd){
this.isProduction=isProd;
MysqlDatasource member4Datasource = null;
MysqlDatasource enterprise4Datasource = null;
if(isProduction){
enterprise4Datasource=SparkConfigManager.getInstance().getDatasource("4.0-enterprise");
......@@ -193,6 +198,7 @@ public class TagProcessManager {
}
//处理标签数据
List<Long>sceneCrowdIdList=new ArrayList();
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Integer enterpriseId=enterpriseTagEntry.getKey();
List<JavaPairRDD<Long, String>> filterRddList = new ArrayList();
......@@ -206,6 +212,7 @@ public class TagProcessManager {
JavaPairRDD<Long, String> rdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId));
filterRddList.add(rdd);
sceneCrowdIdList.add(entity.tagGroupId);
}
}
}
......@@ -247,6 +254,8 @@ public class TagProcessManager {
saveToHive(cacheMemberGroupRdd,enterpriseId);
cacheMemberGroupRdd.unpersist();
updateSceneCrowd(sceneCrowdIdList);
}
}
......@@ -303,4 +312,36 @@ public class TagProcessManager {
});
}
private void updateSceneCrowd(List<Long> sceneCrowdIdList){
Connection connection=member4Datasource.getConnection();
PreparedStatement ps=null;
try {
ps= DaoHelper.getPreparedStatement(connection,"update tab_scene_crowd set update_group_time=? where id=?");
for(Long id:sceneCrowdIdList){
System.out.println("sceneCrowdId==>"+id);
ps.setTimestamp(0,new Timestamp(System.currentTimeMillis()));
ps.setLong(1,id);
ps.addBatch();
}
ps.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
}finally {
if(null!=ps){
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(null!=connection){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment