Commit ae9205c2 by guos

会员标签4.0

parent f9b3208d
......@@ -38,8 +38,14 @@ public class DataSourceSharding extends DataSourceEntity {
this.shardingConfig = shardingConfig;
}
private void checkShardingConfigInit(){
if(null==getShardingConfig()){
throw new RuntimeException("configName ["+shardingConfig+"] create ShardingConfig fail !");
}
}
@Override
public String getSourceKey() {
this.checkShardingConfigInit();
System.out.println("shardingConfig==>"+shardingConfig);
System.out.println("name==>"+sourceType.getName()+" ,id==>"+shardingConfig.get_id()+" ,targetTable==>"+targetTable);
return sourceType.getName() + "_" + shardingConfig.get_id() + "_" + targetTable;
......@@ -47,11 +53,13 @@ public class DataSourceSharding extends DataSourceEntity {
@Override
public String getHiveTableName() {
this.checkShardingConfigInit();
return getSchema()+"." + shardingConfig.get_id() + "_" + targetTable;
}
@Override
public void extractDataToPartitionedHiveTable(List<Integer> enterpriseList) {
this.checkShardingConfigInit();
SparkSession sparkSession = SparkEnvManager.getInstance().getSparkSession();
ShardingRddManager shardingRddManager = new ShardingRddManager(shardingConfig,sparkSession);
Dataset<Row> sourceDataset = shardingRddManager.getDatasetByEnterpriseIds(targetTable, enterpriseList, null, null).repartition(new Column("enterprise_id"));
......
......@@ -37,6 +37,7 @@ public class TagCurrentCouponNumFilter implements TagFilter {
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
System.out.println("dataSourceSharding==>"+dataSourceSharding);
result.add(dataSourceSharding);
return result;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment