Commit 84150980 by guos

会员标签4.0

parent 5dd09115
......@@ -2,6 +2,7 @@ package com.gic.spark;
import com.gic.spark.entity.enumtype.TimeRangeType;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.Calendar;
import java.util.Date;
......@@ -14,6 +15,11 @@ import java.util.Date;
public class Test {
public static void main(String[] args){
System.out.println("09017237595".matches("^[0-9a-zA-Z_-||.]{1,}$"));
// System.out.println("09017237595".matches("^[0-9a-zA-Z_-||.]{1,}$"));
String str="2020-05-04";
// String s=str.replaceAll("-","");
String s=StringUtils.substringAfter(str, "-").replaceAll("-", "");
System.out.println(s);
}
}
package com.gic.spark.entity.bean;
import java.io.Serializable;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public class TagConsumeBean implements Serializable {
private int ent_id; //企业id
private int card_region_id;//卡域id
private int order_channel_code;//消费渠道
private int ecu_id;//用户id
private String store_id;//门店id
private long order_times; //消费次数(根据配置取的)
private long goods_num;//净销售件数
private double pay_amt;//实付额
private double total_amt;//吊牌价总额
private int sleep_days;//休眠天数
public int getEnt_id() {
return ent_id;
}
public void setEnt_id(int ent_id) {
this.ent_id = ent_id;
}
public int getCard_region_id() {
return card_region_id;
}
public void setCard_region_id(int card_region_id) {
this.card_region_id = card_region_id;
}
public int getOrder_channel_code() {
return order_channel_code;
}
public void setOrder_channel_code(int order_channel_code) {
this.order_channel_code = order_channel_code;
}
public int getEcu_id() {
return ecu_id;
}
public void setEcu_id(int ecu_id) {
this.ecu_id = ecu_id;
}
public String getStore_id() {
return store_id;
}
public void setStore_id(String store_id) {
this.store_id = store_id;
}
public long getOrder_times() {
return order_times;
}
public void setOrder_times(long order_times) {
this.order_times = order_times;
}
public double getPay_amt() {
return pay_amt;
}
public void setPay_amt(double pay_amt) {
this.pay_amt = pay_amt;
}
public long getGoods_num() {
return goods_num;
}
public void setGoods_num(long goods_num) {
this.goods_num = goods_num;
}
public double getTotal_amt() {
return total_amt;
}
public void setTotal_amt(double total_amt) {
this.total_amt = total_amt;
}
public int getSleep_days() {
return sleep_days;
}
public void setSleep_days(int sleep_days) {
this.sleep_days = sleep_days;
}
}
package com.gic.spark.entity.enumtype;
/**
* @description:
* @author: wangxk
* @date: 2020/5/6
*/
public enum StatisticsType {
/**
* 品牌
*/
COMMODITYBRAND("commodityBrand"),
/**
* 会员卡
*/
MCUINFO("mcuInfo"),
/**
* 渠道
*/
CHANNEL("channel");
private String value;
StatisticsType(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
......@@ -10,7 +10,7 @@ import java.util.List;
* <p>
* TODO: class description
*/
public abstract class FilterRequest implements Serializable{
public abstract class AbstractFilterRequest implements Serializable{
private static final long serialVersionUID = 4411295146033929579L;
......
package com.gic.spark.entity.request;
import com.gic.spark.entity.enumtype.CouponType;
import com.gic.spark.entity.enumtype.NumberType;
import java.util.List;
/**
* @description:
* 卡卷类
* @author: wangxk
* @date: 2020/4/21
* @date: 2020/5/6
*/
public class CouponRequest extends FilterRequest{
public class AbstractFilterRequestInteger extends AbstractFilterRequest{
/**
* 最小数字
......@@ -21,7 +17,6 @@ public class CouponRequest extends FilterRequest{
* 最大数字
*/
private Integer endNum;
/**
* 相等数字
*/
......@@ -29,10 +24,6 @@ public class CouponRequest extends FilterRequest{
private NumberType numberType;
private List<Integer>couponList;
private CouponType couponType;
public Integer getBeginNum() {
return beginNum;
}
......@@ -64,20 +55,4 @@ public class CouponRequest extends FilterRequest{
public void setNumberType(NumberType numberType) {
this.numberType = numberType;
}
public List<Integer> getCouponList() {
return couponList;
}
public void setCouponList(List<Integer> couponList) {
this.couponList = couponList;
}
public CouponType getCouponType() {
return couponType;
}
public void setCouponType(CouponType couponType) {
this.couponType = couponType;
}
}
package com.gic.spark.entity.request;
import com.gic.spark.entity.enumtype.StatisticsType;
import java.util.List;
/**
* @description:
* 消费统计分析
* @author: wangxk
* @date: 2020/5/6
*/
public class TagConsumeRequest extends AbstractFilterRequestInteger {
public StatisticsType statisticsType; //统计类型
private List<String>statisticsValList;
public StatisticsType getStatisticsType() {
return statisticsType;
}
public void setStatisticsType(StatisticsType statisticsType) {
this.statisticsType = statisticsType;
}
public List<String> getStatisticsValList() {
return statisticsValList;
}
public void setStatisticsValList(List<String> statisticsValList) {
this.statisticsValList = statisticsValList;
}
}
package com.gic.spark.entity.request;
import com.gic.spark.entity.enumtype.CouponType;
import com.gic.spark.entity.enumtype.NumberType;
import java.util.List;
/**
* @description:
* 卡卷类
* @author: wangxk
* @date: 2020/4/21
*/
public class TagCouponRequest extends AbstractFilterRequestInteger {
private List<Integer>couponList;
private CouponType couponType;
public List<Integer> getCouponList() {
return couponList;
}
public void setCouponList(List<Integer> couponList) {
this.couponList = couponList;
}
public CouponType getCouponType() {
return couponType;
}
public void setCouponType(CouponType couponType) {
this.couponType = couponType;
}
}
......@@ -10,23 +10,7 @@ import java.sql.Timestamp;
* @author: wangxk
* @date: 2020/4/26
*/
public class IntegralRequest extends FilterRequest {
/**
* 最小数字
*/
private Integer beginNum;
/**
* 最大数字
*/
private Integer endNum;
/**
* 相等数字
*/
private Integer equalNum;
private NumberType numberType;
public class TagIntegralRequest extends AbstractFilterRequestInteger {
/**
* 开始时间
......@@ -43,15 +27,6 @@ public class IntegralRequest extends FilterRequest {
private TimeRangeType timeRangeType;
public NumberType getNumberType() {
return numberType;
}
public void setNumberType(NumberType numberType) {
this.numberType = numberType;
}
public Timestamp getBeginTime() {
return beginTime;
}
......@@ -84,27 +59,4 @@ public class IntegralRequest extends FilterRequest {
this.timeRangeType = timeRangeType;
}
public Integer getBeginNum() {
return beginNum;
}
public void setBeginNum(Integer beginNum) {
this.beginNum = beginNum;
}
public Integer getEndNum() {
return endNum;
}
public void setEndNum(Integer endNum) {
this.endNum = endNum;
}
public Integer getEqualNum() {
return equalNum;
}
public void setEqualNum(Integer equalNum) {
this.equalNum = equalNum;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequestInteger;
import com.gic.spark.entity.request.TagConsumeRequest;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/5/7
*/
public abstract class AbstractTagConsumFilter implements BaseTagFilter{
DataSourceHive dataSourceHive = new DataSourceHive(ConstantUtil.ADS_GIC_TRD_ECU_SALES_LABEL_D);
protected static JavaRDD<TagConsumeBean> statisticsTypeHandle(JavaRDD<TagConsumeBean> consumeRDD, TagConsumeRequest consumeRequest){
consumeRDD=consumeRDD.mapPartitions(data->{
List<TagConsumeBean> result=new ArrayList();
while (data.hasNext()){
TagConsumeBean consumeBean=data.next();
switch (consumeRequest.getStatisticsType()){
case COMMODITYBRAND:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
result.add(consumeBean);
}
break;
case CHANNEL:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
result.add(consumeBean);
}
break;
case MCUINFO:
if(consumeRequest.getStatisticsValList().contains(consumeBean.getEnt_id())){
result.add(consumeBean);
}
break;
default:break;
}
}
return result.iterator();
});
return consumeRDD;
}
}
package com.gic.spark.filter;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.tag.TagConstant;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* @description:
* @author: wangxk
* @date: 2020/4/27
*/
public abstract class AbstractTagFilter implements BaseTagFilter{
protected static boolean cuHandle(FilterRequest request, int cuType){
boolean result=false;
switch (request.getDomainType()){
case ACU_INFO:
result=TagConstant.CU_TYPE_ACU==cuType;
break;
case SCU_INFO:
result=TagConstant.CU_TYPE_SCU==cuType;
break;
case MCU_INFO:
result=TagConstant.CU_TYPE_MCU==cuType;
break;
default:break;
}
return result;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
......@@ -15,5 +15,5 @@ public interface BaseTagFilter {
List<DataSourceEntity> necessarySourceList();
JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request);
JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request);
}
......@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
......@@ -36,7 +36,7 @@ public class TagAboutExpireIntegralFilter implements BaseTagFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest filterOption) {
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest filterOption) {
return null;
......
......@@ -4,8 +4,8 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagIntegralBean;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.IntegralRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagIntegralRequest;
import com.gic.spark.tag.TagConstant;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
......@@ -26,7 +26,7 @@ import java.util.Set;
* @author: wangxk
* @date: 2020/4/24
*/
public class TagAccumulatedIntegralFilter extends AbstractTagFilter{
public class TagAccumulatedIntegralFilter implements BaseTagFilter{
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private static TagAccumulatedIntegralFilter instance;
......@@ -47,8 +47,8 @@ public class TagAccumulatedIntegralFilter extends AbstractTagFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request) {
IntegralRequest integralRequest=(IntegralRequest)request;
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull())
.filter(new Column("service_flag").equalTo(1));
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* 连带率
* @author: wangxk
* @date: 2020/5/7
*/
public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
private static TagAssociatedPurchaseRateFilter instance;
public static TagAssociatedPurchaseRateFilter getInstance() {
if(null==instance){
instance=new TagAssociatedPurchaseRateFilter();
}
return instance;
}
private TagAssociatedPurchaseRateFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TagConsumeBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TagConsumeBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer> result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Iterable<TagConsumeBean>> tp2=data.next();
long totalGodsNum=0;
long totalOrderTimes=0;
for(TagConsumeBean consumeBean:tp2._2()){
totalGodsNum+=consumeBean.getGoods_num();
totalOrderTimes+=consumeBean.getOrder_times();
}
double jointRate=totalGodsNum/totalOrderTimes;
switch (consumeRequest.getNumberType()){
case gt:
if(jointRate>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(jointRate>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(jointRate<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(jointRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(jointRate==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(jointRate>=consumeRequest.getBeginNum()
&&jointRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* 平均折扣率
* @author: wangxk
* @date: 2020/5/7
*/
public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
private static TagAverageDiscountFactorFilter instance;
public static TagAverageDiscountFactorFilter getInstance() {
if(null==instance){
instance=new TagAverageDiscountFactorFilter();
}
return instance;
}
private TagAverageDiscountFactorFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TagConsumeBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TagConsumeBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
while (data.hasNext()){
double payAmt=0;
double totalAmt=0;
Tuple2<Integer,Iterable<TagConsumeBean>> tp2=data.next();
for(TagConsumeBean consumeBean:tp2._2()){
payAmt=consumeBean.getPay_amt();
totalAmt=consumeBean.getTotal_amt();
}
double avgDiscountRate=payAmt/totalAmt;
switch (consumeRequest.getNumberType()){
case gt:
if(avgDiscountRate>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(avgDiscountRate>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(avgDiscountRate<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(avgDiscountRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(avgDiscountRate==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(avgDiscountRate>=consumeRequest.getBeginNum()
&&avgDiscountRate<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.entity.request.FilterRequest;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
/**
* @description:
* 已核销卡券
* @author: wangxk
* @date: 2020/4/20
*/
public class TagCancelCouponFilter implements BaseTagFilter{
private static TagCancelCouponFilter instance;
public static TagCancelCouponFilter getInstance() {
if(null==instance){
instance=new TagCancelCouponFilter();
}
return instance;
}
private TagCancelCouponFilter(){};
@Override
public List<DataSourceEntity> necessarySourceList() {
return null;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest filterOption) {
return null;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* 消费休眠天数
* @author: wangxk
* @date: 2020/5/8
*/
public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
private static TagConsumptionSleepDaysFilter instance;
public static TagConsumptionSleepDaysFilter getInstance() {
if(null==instance){
instance=new TagConsumptionSleepDaysFilter();
}
return instance;
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TagConsumeBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TagConsumeBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.reduceByKey((x,y)->x>y?y:x)
.mapPartitions(data->{
List<Integer>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(tp2._2()==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
*消费次数
* @author: wangxk
* @date: 2020/5/6
*/
public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
private static TagConsumptionTimeFilter instance;
public static TagConsumptionTimeFilter getInstance(){
if(null==instance){
instance=new TagConsumptionTimeFilter();
}
return instance;
}
private TagConsumptionTimeFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TagConsumeBean> consumeRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TagConsumeBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
List<Integer>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Long>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(tp2._2()>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(tp2._2()<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(tp2._2().intValue()==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(tp2._2()>=consumeRequest.getBeginNum()
&&tp2._2()<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
......@@ -4,14 +4,12 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagCouponBean;
import com.gic.spark.entity.request.CouponRequest;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.TagCouponRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.util.collection.unsafe.sort.RadixSort;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -42,8 +40,8 @@ public class TagCouponFilter implements BaseTagFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request) {
CouponRequest couponRequest=(CouponRequest)request;
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagCouponRequest couponRequest=(TagCouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD= MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("status").isNotNull()),TagCouponBean.class).javaRDD()
.filter(data->couponRequest.getCouponList().contains(data.getCoupon_id()));
......
......@@ -4,14 +4,11 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagCouponBean;
import com.gic.spark.entity.request.CouponRequest;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.TagCouponRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import java.util.ArrayList;
......@@ -45,8 +42,8 @@ public class TagCurrentCouponNumFilter implements BaseTagFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request) {
CouponRequest couponRequest=(CouponRequest)request;
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagCouponRequest couponRequest=(TagCouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD=MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId),TagCouponBean.class).javaRDD()
.filter(data->(data.getStatus()==3||data.getStatus()==4)&&data.getEffect_End_Time().getTime()>System.currentTimeMillis());
......@@ -104,7 +101,7 @@ public class TagCurrentCouponNumFilter implements BaseTagFilter{
return ecuRDD;
}
public static boolean numberComputeHandle(CouponRequest couponRequest,int num){
public static boolean numberComputeHandle(TagCouponRequest couponRequest, int num){
boolean result=false;
switch (couponRequest.getNumberType()){
case eq:
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceHive;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeRequest;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* 客单价
* @author: wangxk
* @date: 2020/5/7
*/
public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
private static TagPerCustomerTransactionFilter instance;
public static TagPerCustomerTransactionFilter getInstance(){
if(null==instance){
instance=new TagPerCustomerTransactionFilter();
}
return instance;
}
private TagPerCustomerTransactionFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TagConsumeBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TagConsumeBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer> result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Iterable<TagConsumeBean>> tp2=data.next();
int consumeTimes=0;
double payAmt=0;
for(TagConsumeBean consumeBean:tp2._2()){
consumeTimes+=consumeBean.getOrder_times();
payAmt+=consumeBean.getPay_amt();
}
double CusSinglePiece=payAmt/consumeTimes;
switch (consumeRequest.getNumberType()){
case gt:
if(CusSinglePiece>consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case gte:
if(CusSinglePiece>=consumeRequest.getBeginNum()){
result.add(tp2._1());
}
break;
case lt:
if(CusSinglePiece<consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case lte:
if(CusSinglePiece<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
break;
case eq:
if(CusSinglePiece==consumeRequest.getEqualNum()){
result.add(tp2._1());
}
break;
case between:
if(CusSinglePiece>=consumeRequest.getBeginNum()
&&CusSinglePiece<=consumeRequest.getEndNum()){
result.add(tp2._1());
}
default:break;
}
}
return result.iterator();
});
return ecuRdd;
}
}
......@@ -57,16 +57,7 @@ public class TagConstant {
*/
/*当前卡券数
待领取卡券
已领取未核销卡券
(未核销或未领取)已到期卡券
已核销卡券
已占用卡券
转赠中卡券
已转赠卡券
非会员卡券*/
// 卡券
public static final String TAG_CODE_CURRENT_COUPON_NUM="currentCouponNum"; //当前卡券数
public static final String TAG_CODE_UNCLAIMED_COUPON="unclaimedCoupon"; //待领取卡券
public static final String TAG_CODE_GET_NO_CANCEL_COUPON="getNoCancelCoupon"; //已领取未核销卡券
......@@ -76,18 +67,27 @@ public class TagConstant {
public static final String TAG_CODE_DONATION_IN_COUPON="donationInCoupon";//转赠中卡券
public static final String TAG_CODE_YET_DONATION_COUPON="yetDonationCoupon";//已转增卡券
// 积分信息
/*累计获取积分
即将过期积分*/
// 积分信息
public static final String TAG_CODE_ACCUMULATED_INTEGRAL="accumulatedIntegral_f";//累计获取积分
public static final String TAG_CODE_ABOUT_EXPIRE_INTEGRAL="aboutExpireIntegral";//即将过期积分
// 消费统计
public static final String TAG_CODE_CONSUMPTION_TIME="consumptionTime";//消费次数
public static final String TAG_CODE_PER_CUSTOMER_TRANSACTION="perCustomerTransaction";//客单价
public static final String TAG_CODE_ASSOCIATED_PURCHASE_RATE="associatedPurchaseRate";//连带率
public static final String TAG_CODE_AVERAGE_DISCOUNT_FACTOR="averageDiscountFactor";//平均折扣率
public static final String TAG_CODE_CONSUMPTION_SLEEP_DAYS="consumptionSleepDays";//消费休眠天数
public static final String TAG_DOMAIN_ECU_INFO="flag";
public static final String TAG_DOMAIN_ACU_INFO="acuInfo";
public static final String TAG_DOMAIN_SCU_INFO="scuInfo";
public static final String TAG_DOMAIN_MCU_INFO="mcuInfo";
public static final String TAG_STATISTICS_CHANNEL="channel";
public static final String TAG_STATISTICS_BRAND="brand";
public static final String TAG_STATISTICS_MCU_INFO="mcuInfo";
public static final String TAG_KEY_TYPE_TIME_RANGE="timeRange";
public static final String TAG_KEY_TYPE_NUMBER="number";
......
......@@ -12,7 +12,7 @@ import com.gic.spark.entity.SceneCrowdDTO;
import com.gic.spark.entity.TagConditionDTO;
import com.gic.spark.entity.TagConditionGroupDTO;
import com.gic.spark.entity.TagGroupInfo;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.table.TabSceneCrowd;
import com.gic.spark.filter.*;
import com.gic.spark.util.DingtalkMessageUtil;
......@@ -38,7 +38,6 @@ import scala.Tuple2;
import scala.collection.JavaConversions;
import java.util.*;
import org.apache.spark.api.java.Optional;
import scala.collection.JavaConversions;
import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
......@@ -149,7 +148,7 @@ public class TagProcess {
for(TagConditionDTO conditionDTO:entity.tagList){
if(tagIdToFilterMap.containsKey(conditionDTO.getTagId())){
BaseTagFilter tagFilter=tagIdToFilterMap.get(conditionDTO.getTagId());
FilterRequest filterRequest=TagValueParser.parseFilterValue(conditionDTO);
AbstractFilterRequest filterRequest=TagValueParser.parseFilterValue(conditionDTO);
final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
......@@ -278,7 +277,7 @@ public class TagProcess {
tagFilter= TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_CANCEL_COUPON:
tagFilter=TagCancelCouponFilter.getInstance();
tagFilter=TagCouponFilter.getInstance();
break;
case TagConstant.TAG_CODE_DONATION_IN_COUPON:
tagFilter=TagCouponFilter.getInstance();
......@@ -286,6 +285,24 @@ public class TagProcess {
case TagConstant.TAG_CODE_YET_DONATION_COUPON:
tagFilter=TagCouponFilter.getInstance();
break;
//消费统计
case TagConstant.TAG_CODE_CONSUMPTION_TIME:
tagFilter=TagConsumptionTimeFilter.getInstance();
break;
case TagConstant.TAG_CODE_PER_CUSTOMER_TRANSACTION:
tagFilter=TagPerCustomerTransactionFilter.getInstance();
break;
case TagConstant.TAG_CODE_ASSOCIATED_PURCHASE_RATE:
tagFilter=TagAssociatedPurchaseRateFilter.getInstance();
break;
case TagConstant.TAG_CODE_AVERAGE_DISCOUNT_FACTOR:
tagFilter=TagAverageDiscountFactorFilter.getInstance();
break;
case TagConstant.TAG_CODE_CONSUMPTION_SLEEP_DAYS:
tagFilter=TagConsumptionSleepDaysFilter.getInstance();
break;
default:
DingtalkMessageUtil.sendAlertMessage("暂未支持的非实时标签: " + tagEsFieldName);
break;
......
......@@ -9,4 +9,5 @@ public class ConstantUtil {
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
public static final String TAB_COUPON_LOG="tab_coupon_log";
public static final String ADS_GIC_TRD_ECU_SALES_LABEL_D="demoads.ads_gic_trd_ecu_sales_label_d";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment