Commit e13a9493 by guos

会员标签4.0

parent 58ac380f
package com.gic.spark.datasource.entity;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.util.DataUtil;
import com.gic.spark.util.DateUtil;
import com.gic.spark.util.HivePartitionUtil;
import com.gic.spark.util.SparkHiveUtil;
import com.google.common.base.Joiner;
......@@ -52,7 +54,7 @@ public class DataSourceHive extends DataSourceEntity {
// .repartition(new Column("ent_id"));
Dataset<Row> sourceDataset = sparkSession
.sql(String.format("select * from %s where ent_id in (%s) distribute by ent_id ", sourceTable, Joiner.on("','").join(enterpriseList)));
.sql(String.format("select * from %s where ds='%s' ent_id in (%s) distribute by ent_id ", sourceTable, DateUtil.dateToString(DateUtil.getStartTimeOfDayAgo(1),DateUtil.FORMAT_DATE_8), Joiner.on("','").join(enterpriseList)));
SparkHiveUtil.createHivePartitionTable(sourceDataset, "ent_id", getSchema(), schema.replaceAll("\\.", "_") + "_" + table,sparkSession);
......
......@@ -7,18 +7,18 @@ import java.io.Serializable;
* @author: wangxk
* @date: 2020/5/7
*/
public class TrdEcuSalesLabelBean implements Serializable {
public class TrdEcuBrandLabelBean implements Serializable {
private Integer ent_id; //企业id
private Long mbr_area_id;//卡域id
private Integer order_channel_code;//消费渠道
private Long ecu_id;//用户id
private Integer store_info_id;//门店id
private Long order_times; //消费次数(根据配置取的)
private Long seff_goods_num;//净销售件数
private Integer order_times; //消费次数(根据配置取的)
private Integer seff_goods_num;//净销售件数
private Double pay_amt;//实付额
private Double total_amt;//吊牌价总额
private Integer sleep_days;//休眠天数
private Long ent_brand_id;//商品品牌id
public Integer getEnt_id() {
return ent_id;
......@@ -36,14 +36,6 @@ public class TrdEcuSalesLabelBean implements Serializable {
this.mbr_area_id = mbr_area_id;
}
public Integer getOrder_channel_code() {
return order_channel_code;
}
public void setOrder_channel_code(Integer order_channel_code) {
this.order_channel_code = order_channel_code;
}
public Long getEcu_id() {
return ecu_id;
}
......@@ -60,19 +52,19 @@ public class TrdEcuSalesLabelBean implements Serializable {
this.store_info_id = store_info_id;
}
public Long getOrder_times() {
public Integer getOrder_times() {
return order_times;
}
public void setOrder_times(Long order_times) {
public void setOrder_times(Integer order_times) {
this.order_times = order_times;
}
public Long getSeff_goods_num() {
public Integer getSeff_goods_num() {
return seff_goods_num;
}
public void setSeff_goods_num(Long seff_goods_num) {
public void setSeff_goods_num(Integer seff_goods_num) {
this.seff_goods_num = seff_goods_num;
}
......@@ -99,4 +91,12 @@ public class TrdEcuSalesLabelBean implements Serializable {
public void setSleep_days(Integer sleep_days) {
this.sleep_days = sleep_days;
}
public Long getEnt_brand_id() {
return ent_brand_id;
}
public void setEnt_brand_id(Long ent_brand_id) {
this.ent_brand_id = ent_brand_id;
}
}
package com.gic.spark.filter;
import com.alibaba.fastjson.JSONObject;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
......@@ -16,26 +15,26 @@ import java.util.List;
* @date: 2020/5/7
*/
public abstract class AbstractTagConsumFilter implements BaseTagFilter {
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_SALES_LABEL_D);
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_BRAND_LABEL_D);
protected static JavaRDD<TrdEcuSalesLabelBean> statisticsTypeHandle(JavaRDD<TrdEcuSalesLabelBean> consumeRDD, TagConsumeRequest consumeRequest){
protected static JavaRDD<TrdEcuBrandLabelBean> statisticsTypeHandle(JavaRDD<TrdEcuBrandLabelBean> consumeRDD, TagConsumeRequest consumeRequest){
consumeRDD=consumeRDD.mapPartitions(data->{
List<TrdEcuSalesLabelBean> result=new ArrayList();
List<TrdEcuBrandLabelBean> result=new ArrayList();
while (data.hasNext()){
TrdEcuSalesLabelBean consumeBean=data.next();
TrdEcuBrandLabelBean consumeBean=data.next();
switch (consumeRequest.getStatisticsType()){
case COMMODITYBRAND:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getEnt_brand_id()))){
result.add(consumeBean);
}
break;
case CHANNEL:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getStore_info_id()))){
result.add(consumeBean);
}
break;
case MCUINFO:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
if(consumeRequest.getStatisticsValList().contains(String.valueOf(consumeBean.getMbr_area_id()))){
result.add(consumeBean);
}
break;
......
......@@ -63,17 +63,17 @@ public abstract class AbstractTagConsumRecordFilter implements BaseTagFilter {
case LATELY:
switch (request.getYearMonthDayType()){
case DAY:
if(consumeTime> DateUtil.addNumForDay(new Date(),request.getTimeNum()).getTime()){
if(consumeTime> DateUtil.addNumForDay(new Date(),-request.getTimeNum()).getTime()){
result=true;
}
break;
case MONTH:
if(consumeTime>DateUtil.addNumForMonth(new Date(),request.getTimeNum()).getTime()){
if(consumeTime>DateUtil.addNumForMonth(new Date(),-request.getTimeNum()).getTime()){
result=true;
}
break;
case YEAR:
if(consumeTime>DateUtil.addNumForYear(new Date(),request.getTimeNum()).getTime()){
if(consumeTime>DateUtil.addNumForYear(new Date(),-request.getTimeNum()).getTime()){
result=true;
}
break;
......
......@@ -89,7 +89,7 @@ public class TagAboutExpireIntegralFilter implements BaseTagFilter {
switch (integralRequest.getTimeRangeType()){
case LATELY:
if(integralBean.getLimit_time().getTime()>
DateUtil.addNumForDay(DateUtil.getDate(),integralRequest.getTimeNum()).getTime()){
DateUtil.addNumForDay(DateUtil.getDate(),-integralRequest.getTimeNum()).getTime()){
result.add(integralBean);
}
break;
......
......@@ -87,7 +87,7 @@ public class TagAccumulatedIntegralFilter implements BaseTagFilter {
switch (integralRequest.getTimeRangeType()){
case LATELY:
if(integralBean.getCreate_Time().getTime()>
DateUtil.addNumForDay(DateUtil.getDate(),integralRequest.getTimeNum()).getTime()){
DateUtil.addNumForDay(DateUtil.getDate(),-integralRequest.getTimeNum()).getTime()){
result.add(integralBean);
}
break;
......
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
......@@ -37,16 +37,16 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next();
long totalGodsNum=0;
long totalOrderTimes=0;
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
totalGodsNum+=consumeBean.getSeff_goods_num();
totalOrderTimes+=consumeBean.getOrder_times();
}
......
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
......@@ -39,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
......@@ -47,8 +47,8 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
while (data.hasNext()){
double payAmt=0;
double totalAmt=0;
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next();
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
payAmt=consumeBean.getPay_amt();
totalAmt=consumeBean.getTotal_amt();
}
......
......@@ -56,7 +56,7 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
result=true;
break;
case LATELY:
if(receiptsTime.getTime()>DateUtil.addNumForDay(new Date(),consumeAmountRequest.getTimeNum()).getTime()){
if(receiptsTime.getTime()>DateUtil.addNumForDay(new Date(),-consumeAmountRequest.getTimeNum()).getTime()){
result=true;
}
break;
......
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
......@@ -38,7 +38,7 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.reduceByKey((x,y)->x>y?y:x)
......
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
......@@ -36,13 +36,13 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> consumeRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Long>tp2=data.next();
Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
......
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TrdEcuSalesLabelBean;
import com.gic.spark.entity.bean.TrdEcuBrandLabelBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
......@@ -37,16 +37,16 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
@Override
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
JavaRDD<TrdEcuBrandLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEntId(enterpriseId), TrdEcuBrandLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
Tuple2<Long,Iterable<TrdEcuBrandLabelBean>> tp2=data.next();
int consumeTimes=0;
double payAmt=0;
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
for(TrdEcuBrandLabelBean consumeBean:tp2._2()){
consumeTimes+=consumeBean.getOrder_times();
payAmt+=consumeBean.getPay_amt();
}
......
......@@ -11,7 +11,8 @@ public class ConstantUtil {
public static final String TAB_MEMBER_USER="tab_member_user";
public static final String TAB_COUPON_LOG="tab_coupon_log";
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d";
// public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d";
public static final String ADS_GIC_TRD_ECU_BRAND_LABEL_D="demoads.ads_gic_trd_ecu_brand_label_d";
public static final String DWD_GIC_TRD_VIRTUAL_WDORDER_D="democdm.dwd_gic_trd_virtual_wdorder_d";
public static final String DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D="democdm.dwd_gic_trd_virtual_order_item_d";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment