Commit 43744d0e by guos

会员标签4.0

parent c679cffa
......@@ -21,5 +21,6 @@ public class Test {
// String s=str.replaceAll("-","");
String s=StringUtils.substringAfter(str, "-").replaceAll("-", "");
System.out.println(s);
StringBuilder st=new StringBuilder();
}
}
......@@ -42,7 +42,7 @@ public class DataSourceManager {
public void addSourceEntity(DataSourceEntity sourceEntity, Integer enterpriseId) {
if (!isProduction) {
/*if (!isProduction) {
switch (sourceEntity.getSourceType()) {
case MYSQL:
DataSourceMysql mysqlDataSource = (DataSourceMysql) sourceEntity;
......@@ -66,7 +66,7 @@ public class DataSourceManager {
break;
}
}
}*/
String key = sourceEntity.getSourceKey();
......
package com.gic.spark.datasource.entity;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.tag.TagProcess;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
......@@ -12,10 +13,7 @@ import java.util.List;
* TODO: class description
*/
public abstract class DataSourceEntity {
// public static final int SOURCE_TYPE_MYSQL = 0;
// public static final int SOURCE_TYPE_MYSQL_SHARDING = 1;
// public static final int SOURCE_TYPE_MONGODB = 2;
// public static final int SOURCE_TYPE_HIVE = 3;
protected DataSourceType sourceType;
public DataSourceType getSourceType() {
......@@ -26,6 +24,10 @@ public abstract class DataSourceEntity {
public abstract String getHiveTableName();
protected String getSchema(){
return TagProcess.getInstance().isProduction()?"tag4_prod":"tag4_test";
}
public Dataset<Row> getDatasetByEnterpriseId(Integer enterpriseId) {
return SparkEnvManager.getInstance().getSparkSession().sql(String.format("select * from %s where enterprise_id='%s'", getHiveTableName(), enterpriseId));
}
......
package com.gic.spark.datasource.entity;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.tag.TagProcess;
import com.gic.spark.util.HivePartitionUtil;
import com.gic.spark.util.SparkHiveUtil;
import com.google.common.base.Joiner;
......@@ -40,7 +41,7 @@ public class DataSourceHive extends DataSourceEntity {
@Override
public String getHiveTableName() {
return "tag." + schema.replaceAll("\\.", "_") + "_" + table;
return getSchema()+"." + schema.replaceAll("\\.", "_") + "_" + table;
}
......@@ -51,12 +52,12 @@ public class DataSourceHive extends DataSourceEntity {
.sql(String.format("select * from %s where enterprise_id in (%s)", sourceTable, Joiner.on("','").join(enterpriseList)))
.repartition(new Column("enterprise_id"));
if (table.equals("new_t_fact_order_ddd_cloud")) {
if (table.equals("ads_gic_trd_ecu_sales_label_d")) {
sourceDataset = sparkSession
.sql(String.format("select * from %s where enterprise_id in (%s) distribute by enterprise_id, is_member_order", sourceTable, Joiner.on("','").join(enterpriseList)));
.sql(String.format("select * from %s where enterprise_id in (%s) distribute by enterprise_id ", sourceTable, Joiner.on("','").join(enterpriseList)));
}
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", "tag", schema.replaceAll("\\.", "_") + "_" + table);
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", getSchema(), schema.replaceAll("\\.", "_") + "_" + table,sparkSession);
HivePartitionUtil.saveDatasetToPartitionTable(sparkSession, sourceDataset, getHiveTableName());
}
......
......@@ -42,18 +42,18 @@ public class DataSourceMongodb extends DataSourceEntity {
@Override
public String getHiveTableName() {
return "tag." + database + "_" + collection;
return getSchema()+"." + database + "_" + collection;
}
@Override
public void extractDataToPartitionedHiveTable(List<Integer> enterpriseList) {
SparkSession sparkSession = SparkEnvManager.getInstance().getSparkSession();
boolean isTableCreated = SparkHiveUtil.isTableExist("tag", database + "_" + collection, sparkSession);
boolean isTableCreated = SparkHiveUtil.isTableExist(getSchema(), database + "_" + collection, sparkSession);
String[] enterpriseIds = new String[enterpriseList.size()];
enterpriseList.toArray(enterpriseIds);
Dataset<Row> dataset = MongodbRddManager.getInstance().getDocumentsByColumn(database, collection, partitionColumn, enterpriseIds).toDF();
if (!isTableCreated) {
SparkHiveUtil.createHivePartitionTable(dataset, partitionColumn, "tag", database + "_" + collection);
SparkHiveUtil.createHivePartitionTable(dataset, partitionColumn, getSchema(), database + "_" + collection);
isTableCreated = true;
}
HivePartitionUtil.saveDatasetToPartitionTable(sparkSession, dataset, getHiveTableName());
......
......@@ -43,7 +43,7 @@ public class DataSourceMysql extends DataSourceEntity {
@Override
public String getHiveTableName() {
return "tag." + datasource.getSchema().replaceAll("\\.", "_") + "_" + targetTable;
return getSchema()+"." + datasource.getSchema().replaceAll("\\.", "_") + "_" + targetTable;
}
@Override
......@@ -51,7 +51,7 @@ public class DataSourceMysql extends DataSourceEntity {
SparkSession sparkSession = SparkEnvManager.getInstance().getSparkSession();
Dataset<Row> sourceDataset = datasource.buildRddManager().getDatasetByEnterpriseIds(targetTable, enterpriseList, null, null).repartition(new Column("enterprise_id"));
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", "tag", datasource.getSchema().replaceAll("\\.", "_") + "_" + targetTable);
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", getSchema(), datasource.getSchema().replaceAll("\\.", "_") + "_" + targetTable);
HivePartitionUtil.saveDatasetToPartitionTable(sparkSession, sourceDataset, getHiveTableName());
......
......@@ -44,7 +44,7 @@ public class DataSourceSharding extends DataSourceEntity {
@Override
public String getHiveTableName() {
return "tag." + shardingConfig.get_id() + "_" + targetTable;
return getSchema()+"." + shardingConfig.get_id() + "_" + targetTable;
}
@Override
......@@ -52,7 +52,7 @@ public class DataSourceSharding extends DataSourceEntity {
SparkSession sparkSession = SparkEnvManager.getInstance().getSparkSession();
ShardingRddManager shardingRddManager = new ShardingRddManager(shardingConfig,sparkSession);
Dataset<Row> sourceDataset = shardingRddManager.getDatasetByEnterpriseIds(targetTable, enterpriseList, null, null).repartition(new Column("enterprise_id"));
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", "tag", shardingConfig.get_id().replaceAll("\\.", "_") + "_" + targetTable,sparkSession);
SparkHiveUtil.createHivePartitionTable(sourceDataset, "enterprise_id", getSchema(), shardingConfig.get_id().replaceAll("\\.", "_") + "_" + targetTable,sparkSession);
HivePartitionUtil.saveDatasetToPartitionTable(sparkSession, sourceDataset, getHiveTableName());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment