Commit 28dfa2fe by guos

会员标签4.0

parent 7de8101e
......@@ -3,7 +3,7 @@ package com.gic.spark;
import com.gic.sharding.sdk.ShardingConfigManager;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.config.SparkConfigManager;
import com.gic.spark.tag.TagProcess;
import com.gic.spark.tag.TagProcessManager;
/**
* @description:
......@@ -25,7 +25,7 @@ public class SparkTagProcess {
ShardingConfigManager.initDefualt();
SparkEnvManager.getInstance().init("member_tag_4.0");
SparkConfigManager.getInstance().init(isProd ? SparkConfigManager.EnvType.PRO : SparkConfigManager.EnvType.DEV, null);
TagProcess.getInstance().init(isProd);
TagProcess.getInstance().process(extractData);
TagProcessManager.getInstance().init(isProd);
TagProcessManager.getInstance().process(extractData);
}
}
......@@ -3,7 +3,7 @@ package com.gic.spark;
import com.gic.sharding.sdk.ShardingConfigManager;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.config.SparkConfigManager;
import com.gic.spark.tag.TagProcess;
import com.gic.spark.tag.TagProcessManager;
import java.util.ArrayList;
import java.util.List;
......@@ -36,14 +36,14 @@ public class SparkTagProcessByEnterpriseOrId {
ShardingConfigManager.initDefualt();
SparkEnvManager.getInstance().init("member_tag_4.0");
SparkConfigManager.getInstance().init(isProd ? SparkConfigManager.EnvType.PRO : SparkConfigManager.EnvType.DEV, null);
TagProcess.getInstance().init(isProd);
TagProcessManager.getInstance().init(isProd);
if(type.equals("enterprise")){
TagProcess.getInstance().setEnterpriseId(params);
TagProcessManager.getInstance().setEnterpriseId(params);
}else if(type.equals("tagGroup")){
TagProcess.getInstance().setTagGroupId(params);
TagProcessManager.getInstance().setTagGroupId(params);
}else{
return;
}
TagProcess.getInstance().process(extractData);
TagProcessManager.getInstance().process(extractData);
}
}
package com.gic.spark.datasource.entity;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.tag.TagProcess;
import com.gic.spark.tag.TagProcessManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
......@@ -25,7 +25,7 @@ public abstract class DataSourceEntity {
public abstract String getHiveTableName();
protected String getSchema(){
return TagProcess.getInstance().isProduction()?"tag4_prod":"tag4_test";
return TagProcessManager.getInstance().isProduction()?"tag4_prod":"tag4_test";
}
public Dataset<Row> getDatasetByEnterpriseId(Integer enterpriseId) {
......
package com.gic.spark.datasource.entity;
import com.gic.spark.application.SparkEnvManager;
import com.gic.spark.tag.TagProcess;
import com.gic.spark.util.HivePartitionUtil;
import com.gic.spark.util.SparkHiveUtil;
import com.google.common.base.Joiner;
......
......@@ -10,51 +10,51 @@ import java.sql.Timestamp;
*/
public class TagCouponBean implements Serializable{
private Integer coupon_id;
private Integer ecu_Id;
private Integer scu_Id;
private Integer acu_Id;
private Integer mcu_Id;
private Long coupon_id;
private Long ecu_Id;
private Long scu_Id;
private Long acu_Id;
private Long mcu_Id;
private java.sql.Timestamp effect_End_Time;
private Integer status;
public Integer getCoupon_id() {
public Long getCoupon_id() {
return coupon_id;
}
public void setCoupon_id(Integer coupon_id) {
public void setCoupon_id(Long coupon_id) {
this.coupon_id = coupon_id;
}
public Integer getEcu_Id() {
public Long getEcu_Id() {
return ecu_Id;
}
public void setEcu_Id(Integer ecu_Id) {
public void setEcu_Id(Long ecu_Id) {
this.ecu_Id = ecu_Id;
}
public Integer getScu_Id() {
public Long getScu_Id() {
return scu_Id;
}
public void setScu_Id(Integer scu_Id) {
public void setScu_Id(Long scu_Id) {
this.scu_Id = scu_Id;
}
public Integer getAcu_Id() {
public Long getAcu_Id() {
return acu_Id;
}
public void setAcu_Id(Integer acu_Id) {
public void setAcu_Id(Long acu_Id) {
this.acu_Id = acu_Id;
}
public Integer getMcu_Id() {
public Long getMcu_Id() {
return mcu_Id;
}
public void setMcu_Id(Integer mcu_Id) {
public void setMcu_Id(Long mcu_Id) {
this.mcu_Id = mcu_Id;
}
......
......@@ -10,42 +10,50 @@ import java.sql.Timestamp;
*/
public class TagIntegralBean implements Serializable {
private int enterprise_Id;
private int cu_Type;
private int cu_Id;
private int file_ecu_id;
private int integral_Value;
private Integer enterprise_Id;
private Integer cu_Type;
private Long cu_Id;
private Long file_ecu_id;
private Integer integral_Value;
private java.sql.Timestamp create_Time;
public int getEnterprise_Id() {
public Integer getEnterprise_Id() {
return enterprise_Id;
}
public void setEnterprise_Id(int enterprise_Id) {
public void setEnterprise_Id(Integer enterprise_Id) {
this.enterprise_Id = enterprise_Id;
}
public int getCu_Type() {
public Integer getCu_Type() {
return cu_Type;
}
public void setCu_Type(int cu_Type) {
public void setCu_Type(Integer cu_Type) {
this.cu_Type = cu_Type;
}
public int getCu_Id() {
public Long getCu_Id() {
return cu_Id;
}
public void setCu_Id(int cu_Id) {
public void setCu_Id(Long cu_Id) {
this.cu_Id = cu_Id;
}
public int getIntegral_Value() {
public Long getFile_ecu_id() {
return file_ecu_id;
}
public void setFile_ecu_id(Long file_ecu_id) {
this.file_ecu_id = file_ecu_id;
}
public Integer getIntegral_Value() {
return integral_Value;
}
public void setIntegral_Value(int integral_Value) {
public void setIntegral_Value(Integer integral_Value) {
this.integral_Value = integral_Value;
}
......@@ -56,12 +64,4 @@ public class TagIntegralBean implements Serializable {
public void setCreate_Time(Timestamp create_Time) {
this.create_Time = create_Time;
}
public int getFile_ecu_id() {
return file_ecu_id;
}
public void setFile_ecu_id(int file_ecu_id) {
this.file_ecu_id = file_ecu_id;
}
}
......@@ -9,94 +9,94 @@ import java.io.Serializable;
*/
public class TrdEcuSalesLabelBean implements Serializable {
private int ent_id; //企业id
private int card_region_id;//卡域id
private int order_channel_code;//消费渠道
private int ecu_id;//用户id
private String store_info_id;//门店id
private long order_times; //消费次数(根据配置取的)
private long goods_num;//净销售件数
private double pay_amt;//实付额
private double total_amt;//吊牌价总额
private int sleep_days;//休眠天数
public int getEnt_id() {
private Integer ent_id; //企业id
private Long mbr_area_id;//卡域id
private Integer order_channel_code;//消费渠道
private Long ecu_id;//用户id
private Integer store_info_id;//门店id
private Long order_times; //消费次数(根据配置取的)
private Long seff_goods_num;//净销售件数
private Double pay_amt;//实付额
private Double total_amt;//吊牌价总额
private Integer sleep_days;//休眠天数
public Integer getEnt_id() {
return ent_id;
}
public void setEnt_id(int ent_id) {
public void setEnt_id(Integer ent_id) {
this.ent_id = ent_id;
}
public int getCard_region_id() {
return card_region_id;
public Long getMbr_area_id() {
return mbr_area_id;
}
public void setCard_region_id(int card_region_id) {
this.card_region_id = card_region_id;
public void setMbr_area_id(Long mbr_area_id) {
this.mbr_area_id = mbr_area_id;
}
public int getOrder_channel_code() {
public Integer getOrder_channel_code() {
return order_channel_code;
}
public void setOrder_channel_code(int order_channel_code) {
public void setOrder_channel_code(Integer order_channel_code) {
this.order_channel_code = order_channel_code;
}
public int getEcu_id() {
public Long getEcu_id() {
return ecu_id;
}
public void setEcu_id(int ecu_id) {
public void setEcu_id(Long ecu_id) {
this.ecu_id = ecu_id;
}
public String getStore_info_id() {
public Integer getStore_info_id() {
return store_info_id;
}
public void setStore_info_id(String store_info_id) {
public void setStore_info_id(Integer store_info_id) {
this.store_info_id = store_info_id;
}
public long getOrder_times() {
public Long getOrder_times() {
return order_times;
}
public void setOrder_times(long order_times) {
public void setOrder_times(Long order_times) {
this.order_times = order_times;
}
public double getPay_amt() {
return pay_amt;
public Long getSeff_goods_num() {
return seff_goods_num;
}
public void setPay_amt(double pay_amt) {
this.pay_amt = pay_amt;
public void setSeff_goods_num(Long seff_goods_num) {
this.seff_goods_num = seff_goods_num;
}
public long getGoods_num() {
return goods_num;
public Double getPay_amt() {
return pay_amt;
}
public void setGoods_num(long goods_num) {
this.goods_num = goods_num;
public void setPay_amt(Double pay_amt) {
this.pay_amt = pay_amt;
}
public double getTotal_amt() {
public Double getTotal_amt() {
return total_amt;
}
public void setTotal_amt(double total_amt) {
public void setTotal_amt(Double total_amt) {
this.total_amt = total_amt;
}
public int getSleep_days() {
public Integer getSleep_days() {
return sleep_days;
}
public void setSleep_days(int sleep_days) {
public void setSleep_days(Integer sleep_days) {
this.sleep_days = sleep_days;
}
}
......@@ -8,118 +8,142 @@ import java.io.Serializable;
* @date: 2020/8/3
*/
public class TrdVirtualOrderBean implements Serializable {
private int ent_id; //企业id
private int area_id;//会员卡域ID
private int order_channel_code;//订单来源渠道(线下1,达摩微商城2,微盟微商城3,线上天猫4)
private int cu_id;
private int ecu_id;//用户id
private String store_info_id;//门店id
private int is_eff_order;//是否有效订单(0:否,1是)
private int is_mbr_order;//是否会员订单(0:否,1是)
private String order_id;
private String order_no;
private String oorder_no;
private long goods_num;//订单商品数量
private double pay_amt;//应付金额
private double paid_amt;//实付金额
private double total_amt;//吊牌价总额
private Integer ent_id; //企业id
private Long area_id;//会员卡域ID
private Integer order_channel_code;//订单来源渠道(线下1,达摩微商城2,微盟微商城3,线上天猫4)
private Long cu_id;
private Long ecu_id;//用户id
private Integer store_info_id;//门店id
private Integer is_eff_order;//是否有效订单(0:否,1是)
private Integer is_mbr_order;//是否会员订单(0:否,1是)
private Long order_id;//真实订单id
private String order_no;//虚拟订单流水号
private String oorder_no;//原订单流水号
private Long goods_num;//订单商品数量
private Double pay_amt;//应付金额
private Double paid_amt;//实付金额
private Double total_amt;//吊牌价总额
private String receipts_time;//单据日期
private String shop_id;//店铺id
private String virtual_id;//虚拟订单id
private Long shop_id;//店铺id
private Long virtual_id;//虚拟订单id
public int getEnt_id() {
public Integer getEnt_id() {
return ent_id;
}
public void setEnt_id(int ent_id) {
public void setEnt_id(Integer ent_id) {
this.ent_id = ent_id;
}
public int getArea_id() {
public Long getArea_id() {
return area_id;
}
public void setArea_id(int area_id) {
public void setArea_id(Long area_id) {
this.area_id = area_id;
}
public int getOrder_channel_code() {
public Integer getOrder_channel_code() {
return order_channel_code;
}
public void setOrder_channel_code(int order_channel_code) {
public void setOrder_channel_code(Integer order_channel_code) {
this.order_channel_code = order_channel_code;
}
public int getCu_id() {
public Long getCu_id() {
return cu_id;
}
public void setCu_id(int cu_id) {
public void setCu_id(Long cu_id) {
this.cu_id = cu_id;
}
public int getEcu_id() {
public Long getEcu_id() {
return ecu_id;
}
public void setEcu_id(int ecu_id) {
public void setEcu_id(Long ecu_id) {
this.ecu_id = ecu_id;
}
public String getStore_info_id() {
public Integer getStore_info_id() {
return store_info_id;
}
public void setStore_info_id(String store_info_id) {
public void setStore_info_id(Integer store_info_id) {
this.store_info_id = store_info_id;
}
public int getIs_eff_order() {
public Integer getIs_eff_order() {
return is_eff_order;
}
public void setIs_eff_order(int is_eff_order) {
public void setIs_eff_order(Integer is_eff_order) {
this.is_eff_order = is_eff_order;
}
public int getIs_mbr_order() {
public Integer getIs_mbr_order() {
return is_mbr_order;
}
public void setIs_mbr_order(int is_mbr_order) {
public void setIs_mbr_order(Integer is_mbr_order) {
this.is_mbr_order = is_mbr_order;
}
public long getGoods_num() {
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
public String getOrder_no() {
return order_no;
}
public void setOrder_no(String order_no) {
this.order_no = order_no;
}
public String getOorder_no() {
return oorder_no;
}
public void setOorder_no(String oorder_no) {
this.oorder_no = oorder_no;
}
public Long getGoods_num() {
return goods_num;
}
public void setGoods_num(long goods_num) {
public void setGoods_num(Long goods_num) {
this.goods_num = goods_num;
}
public double getPay_amt() {
public Double getPay_amt() {
return pay_amt;
}
public void setPay_amt(double pay_amt) {
public void setPay_amt(Double pay_amt) {
this.pay_amt = pay_amt;
}
public double getPaid_amt() {
public Double getPaid_amt() {
return paid_amt;
}
public void setPaid_amt(double paid_amt) {
public void setPaid_amt(Double paid_amt) {
this.paid_amt = paid_amt;
}
public double getTotal_amt() {
public Double getTotal_amt() {
return total_amt;
}
public void setTotal_amt(double total_amt) {
public void setTotal_amt(Double total_amt) {
this.total_amt = total_amt;
}
......@@ -131,43 +155,19 @@ public class TrdVirtualOrderBean implements Serializable {
this.receipts_time = receipts_time;
}
public String getOrder_no() {
return order_no;
}
public void setOrder_no(String order_no) {
this.order_no = order_no;
}
public String getOorder_no() {
return oorder_no;
}
public void setOorder_no(String oorder_no) {
this.oorder_no = oorder_no;
}
public String getShop_id() {
public Long getShop_id() {
return shop_id;
}
public void setShop_id(String shop_id) {
public void setShop_id(Long shop_id) {
this.shop_id = shop_id;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getVirtual_id() {
public Long getVirtual_id() {
return virtual_id;
}
public void setVirtual_id(String virtual_id) {
public void setVirtual_id(Long virtual_id) {
this.virtual_id = virtual_id;
}
}
......@@ -7,34 +7,34 @@ package com.gic.spark.entity.bean;
*/
public class TrdVirtualOrderItemBean {
private String ent_id;
private String ecu_id;
private String order_id; //真实订单
private Long ent_id;
private Long ecu_id;
private Long order_id; //真实订单
private String receipts_time;//单据时间
private String sku_code;
private String virtual_order_id;//虚拟订单id
private Long virtual_order_id;//虚拟订单id
public String getEnt_id() {
public Long getEnt_id() {
return ent_id;
}
public void setEnt_id(String ent_id) {
public void setEnt_id(Long ent_id) {
this.ent_id = ent_id;
}
public String getEcu_id() {
public Long getEcu_id() {
return ecu_id;
}
public void setEcu_id(String ecu_id) {
public void setEcu_id(Long ecu_id) {
this.ecu_id = ecu_id;
}
public String getOrder_id() {
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
......@@ -54,11 +54,11 @@ public class TrdVirtualOrderItemBean {
this.sku_code = sku_code;
}
public String getVirtual_order_id() {
public Long getVirtual_order_id() {
return virtual_order_id;
}
public void setVirtual_order_id(String virtual_order_id) {
public void setVirtual_order_id(Long virtual_order_id) {
this.virtual_order_id = virtual_order_id;
}
}
......@@ -36,7 +36,7 @@ public class TagAboutExpireIntegralFilter implements TagFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest filterOption) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest filterOption) {
return null;
......
......@@ -47,14 +47,14 @@ public class TagAccumulatedIntegralFilter implements TagFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagIntegralRequest integralRequest=(TagIntegralRequest)request;
Dataset<Row> dataset=dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("create_Time").isNotNull())
.filter(new Column("service_flag").equalTo(1));
JavaRDD<TagIntegralBean>integralRDD=MysqlRddManager.getPojoFromDataset(dataset,TagIntegralBean.class).javaRDD();
JavaRDD<Integer>ecuRdd=integralRDD.mapPartitions(data->{
JavaRDD<Long>ecuRdd=integralRDD.mapPartitions(data->{
List<TagIntegralBean>result=new ArrayList();
while (data.hasNext()){
TagIntegralBean integralBean=data.next();
......@@ -105,7 +105,7 @@ public class TagAccumulatedIntegralFilter implements TagFilter {
return result.iterator();
}).mapToPair(data-> Tuple2.apply(data.getCu_Id()+"@"+data.getFile_ecu_id(),data)).groupByKey()
.mapPartitions(data->{
Set<Integer>result=new HashSet();
Set<Long>result=new HashSet();
while (data.hasNext()){
Tuple2<String,Iterable<TagIntegralBean>> tp2=data.next();
int integralValue=0;
......@@ -115,33 +115,33 @@ public class TagAccumulatedIntegralFilter implements TagFilter {
switch (integralRequest.getNumberType()){
case eq:
if(integralRequest.getEqualNum()==integralValue){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gt:
if(integralValue>integralRequest.getBeginNum()){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case gte:
if(integralValue>=integralRequest.getBeginNum()){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lt:
if(integralValue<integralRequest.getEndNum()){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case lte:
if(integralValue<=integralRequest.getEndNum()){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
case between:
if(integralValue>=integralRequest.getBeginNum()
&&integralValue<=integralRequest.getEndNum()){
result.add(Integer.parseInt(tp2._1().split("@")[1]));
result.add(Long.parseLong(tp2._1().split("@")[1]));
}
break;
default:break;
......
......@@ -35,19 +35,19 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer> result=new ArrayList();
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
long totalGodsNum=0;
long totalOrderTimes=0;
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
totalGodsNum+=consumeBean.getGoods_num();
totalGodsNum+=consumeBean.getSeff_goods_num();
totalOrderTimes+=consumeBean.getOrder_times();
}
double jointRate=totalGodsNum/totalOrderTimes;
......
......@@ -37,17 +37,17 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
double payAmt=0;
double totalAmt=0;
Tuple2<Integer,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
payAmt=consumeBean.getPay_amt();
totalAmt=consumeBean.getTotal_amt();
......
......@@ -44,15 +44,16 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest);
JavaPairRDD<String,Integer>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.filter(data->checkTime(commodityRequest,DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getVirtual_id(),data.getEcu_id()));
JavaPairRDD<String,String> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......@@ -63,7 +64,7 @@ public class TagConsumeCommodityFilter extends AbstractTagConsumRecordFilter{
.reduceByKey((x,y)->x);
JavaRDD<Integer>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
JavaRDD<Long>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
.filter(data->data._2()._2().isPresent())
.map(data->data._2()._1()).distinct();
......
......@@ -40,12 +40,12 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19)))
.filter(data->checkTime(consumeTimeRequest,data._2().getTime()))
.reduceByKey((x,y)->x)
......
......@@ -39,13 +39,13 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->{
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
boolean result=false;
if(StringUtils.isNotEmpty(data.getReceipts_time())){
Date receiptsTime=DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATE_10);
......
......@@ -36,16 +36,16 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getSleep_days()))
.reduceByKey((x,y)->x>y?y:x)
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Integer>tp2=data.next();
Tuple2<Long,Integer>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
......
......@@ -34,15 +34,15 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD=MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getOrder_times())).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Long>tp2=data.next();
Tuple2<Long,Long>tp2=data.next();
switch (consumeRequest.getNumberType()){
case gt:
if(tp2._2()>consumeRequest.getBeginNum()){
......
......@@ -40,13 +40,13 @@ public class TagCouponFilter implements TagFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagCouponRequest couponRequest=(TagCouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD= MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("status").isNotNull()),TagCouponBean.class).javaRDD()
.filter(data->couponRequest.getCouponList().contains(data.getCoupon_id()));
JavaRDD<Integer>ecuRDD=couponBeanRDD.mapPartitions(data->{
Set<Integer>result=new HashSet();
JavaRDD<Long>ecuRDD=couponBeanRDD.mapPartitions(data->{
Set<Long>result=new HashSet();
while (data.hasNext()){
TagCouponBean couponBean=data.next();
switch (couponRequest.getCouponType()){
......
......@@ -24,8 +24,7 @@ import java.util.Set;
*/
public class TagCurrentCouponNumFilter implements TagFilter {
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_MEMBER_USER);
// private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MARKETING_SHARDING, ConstantUtil.TAB_COUPON_LOG);
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MARKETING_SHARDING, ConstantUtil.TAB_COUPON_LOG);
private static TagCurrentCouponNumFilter instance;
public static TagCurrentCouponNumFilter getInstance() {
if(null==instance){
......@@ -44,13 +43,13 @@ public class TagCurrentCouponNumFilter implements TagFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagCouponRequest couponRequest=(TagCouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD=MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId),TagCouponBean.class).javaRDD()
.filter(data->(data.getStatus()==3||data.getStatus()==4)&&data.getEffect_End_Time().getTime()>System.currentTimeMillis());
JavaRDD<Integer>ecuRDD=couponBeanRDD.mapPartitionsToPair(data->{
JavaRDD<Long>ecuRDD=couponBeanRDD.mapPartitionsToPair(data->{
List<Tuple2<String,Integer>>result=new ArrayList();
while (data.hasNext()){
TagCouponBean couponBean=data.next();
......@@ -79,18 +78,18 @@ public class TagCurrentCouponNumFilter implements TagFilter {
return result.iterator();
}).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
Set<Integer> result=new HashSet();
Set<Long> result=new HashSet();
while (data.hasNext()){
Tuple2<String,Integer>tp2=data.next();
switch (couponRequest.getDomainType()){
case ECU_INFO:
if(numberComputeHandle(couponRequest,tp2._2())){
result.add(Integer.valueOf(tp2._1()));
result.add(Long.parseLong(tp2._1()));
}
break;
default:
if(numberComputeHandle(couponRequest,tp2._2())){
result.add(Integer.valueOf(tp2._1().split("&")[1]));
result.add(Long.parseLong(tp2._1().split("&")[1]));
}
break;
}
......
......@@ -15,5 +15,5 @@ public interface TagFilter {
List<DataSourceEntity> necessarySourceList();
JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request);
JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request);
}
......@@ -44,12 +44,12 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest);
JavaPairRDD<String,Integer>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
......@@ -60,7 +60,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
}
})
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
JavaPairRDD<String,String> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......@@ -71,7 +71,7 @@ public class TagFirstConsumeCommodityFilter extends AbstractTagConsumRecordFilte
.reduceByKey((x,y)->x);
JavaRDD<Integer>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
JavaRDD<Long>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
.filter(data->data._2()._2().isPresent())
.map(data->data._2()._1()).distinct();
......
......@@ -39,13 +39,13 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter {
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->StringUtils.isNotEmpty(data.getReceipts_time()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.reduceByKey((x,y)->x<y?x:y)
.filter(data->checkTime(consumeTimeRequest,data._2()))
......
......@@ -35,13 +35,13 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -37,19 +37,19 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer, Iterable<TrdVirtualOrderBean>> tp2=data.next();
Tuple2<Long, Iterable<TrdVirtualOrderBean>> tp2=data.next();
double firstConsumAmount=0;
TrdVirtualOrderBean firstConsumeAmountBean=null;
Map<String,TrdVirtualOrderBean>effectiveOrderMap=new HashMap();
......
......@@ -40,14 +40,14 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time())
&&StringUtils.isNotEmpty(data.getStore_info_id()))
&&null!=data.getStore_info_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getStore_info_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -39,14 +39,14 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
&& null!=data.getShop_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -44,13 +44,13 @@ public class TagHistoryConsumeCommodityFilter extends AbstractTagConsumRecordFil
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest);
JavaRDD<TrdVirtualOrderItemBean> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD();
JavaRDD<Integer>ecuRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data.getEcu_id()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data->Tuple2.apply(data.getVirtual_id(),data.getEcu_id()))
.leftOuterJoin(orderItemRDD.mapToPair(data->Tuple2.apply(data.getVirtual_order_id(),data.getSku_code()))
.filter(data->StringUtils.isNotEmpty(data._2())
&&commodityRequest.getSkuCodeList().contains(data._2())))
......
......@@ -38,13 +38,13 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),configStatus==1?data.getPaid_amt():data.getPay_amt()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),configStatus==1?data.getPaid_amt():data.getPay_amt()))
.reduceByKey((x,y)->x+y)
.filter(data->{
boolean result=false;
......
......@@ -38,13 +38,13 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& StringUtils.isNotEmpty(data.getStore_info_id()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& null!=data.getStore_info_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
.filter(data->storeRequest.getStoreList().contains(data._2()))
.reduceByKey((x,y)->x)
......
......@@ -38,13 +38,13 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getShop_id()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& null!=data.getShop_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getShop_id()))
.filter(data->storeRequest.getStoreList().contains(data._2()))
.reduceByKey((x,y)->x)
......
......@@ -44,12 +44,12 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeCommodityRequest commodityRequest=(TagConsumeCommodityRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,commodityRequest);
JavaPairRDD<String,Integer>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaPairRDD<Long,Long>orderRdd= consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
......@@ -60,7 +60,8 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
}
})
.mapToPair(data->Tuple2.apply(data._2().getVirtual_id(),data._1()));
JavaPairRDD<String,String> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
JavaPairRDD<Long,Long> orderItemRDD=MysqlRddManager.getPojoFromDataset(dataSourceHiveOrderItem.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderItemBean.class).javaRDD()
.filter(data->{
if(StringUtils.isNotEmpty(data.getSku_code())
&&commodityRequest.getSkuCodeList().contains(data.getSku_code())){
......@@ -71,7 +72,7 @@ public class TagLatelyConsumeCommodityFilter extends AbstractTagConsumRecordFilt
.reduceByKey((x,y)->x);
JavaRDD<Integer>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
JavaRDD<Long>ecuRdd=orderRdd.leftOuterJoin(orderItemRDD)
.filter(data->data._2()._2().isPresent())
.map(data->data._2()._1()).distinct();
......
......@@ -38,13 +38,13 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeTimeRequest consumeTimeRequest=(TagConsumeTimeRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId)
.filter(new Column("is_eff_order").equalTo(1)), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeTimeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(), DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.reduceByKey((x,y)->x>y?x:y)
.filter(data->checkTime(consumeTimeRequest,data._2()))
......
......@@ -38,13 +38,13 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> StringUtils.isNotEmpty(data.getReceipts_time()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -40,19 +40,19 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
JavaRDD<Long>ecuRdd=consumeRecordRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer, Iterable<TrdVirtualOrderBean>> tp2=data.next();
Tuple2<Long, Iterable<TrdVirtualOrderBean>> tp2=data.next();
double latelyConsumAmount=0;
TrdVirtualOrderBean latelyConsumeAmountBean=null;
Map<String,TrdVirtualOrderBean> effectiveOrderMap=new HashMap();
......
......@@ -39,14 +39,14 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
&& null!=data.getShop_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -38,13 +38,13 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->{
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
boolean result=false;
if(StringUtils.isNotEmpty(data.getReceipts_time())){
Date receiptsTime= DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATE_10);
......@@ -66,10 +66,10 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
}).mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer, Iterable<TrdVirtualOrderBean>> tp2=data.next();
Tuple2<Long, Iterable<TrdVirtualOrderBean>> tp2=data.next();
double topConsumAmount=0;
TrdVirtualOrderBean topConsumeAmountBean=null;
Map<String,TrdVirtualOrderBean> effectiveOrderMap=new HashMap();
......
......@@ -40,14 +40,14 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& StringUtils.isNotEmpty(data.getStore_info_id())
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& null!=data.getStore_info_id()
&& StringUtils.isNotEmpty(data.getReceipts_time()))
.filter(data->checkTime(storeRequest, DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
......
......@@ -35,15 +35,15 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
&& null!=data.getShop_id())
.filter(data->checkTime(storeRequest, DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.filter(data->storeRequest.getStoreList().contains(data._2()._2()))
......
......@@ -35,15 +35,15 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeRequest consumeRequest=(TagConsumeRequest)request;
JavaRDD<TrdEcuSalesLabelBean> consumeRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdEcuSalesLabelBean.class).javaRDD();
consumeRDD=statisticsTypeHandle(consumeRDD,consumeRequest);
JavaRDD<Integer>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
JavaRDD<Long>ecuRdd=consumeRDD.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data)).groupByKey()
.mapPartitions(data->{
List<Integer> result=new ArrayList();
List<Long> result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
Tuple2<Long,Iterable<TrdEcuSalesLabelBean>> tp2=data.next();
int consumeTimes=0;
double payAmt=0;
for(TrdEcuSalesLabelBean consumeBean:tp2._2()){
......
......@@ -37,15 +37,15 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> data.getOrder_channel_code()==1
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data-> data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time())
&&StringUtils.isNotEmpty(data.getStore_info_id()))
&&null!=data.getStore_info_id())
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getStore_info_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
......
......@@ -37,13 +37,13 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
public JavaRDD<Long> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeAmountRequest consumeAmountRequest=(TagConsumeAmountRequest)request;
JavaRDD<TrdVirtualOrderBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId), TrdVirtualOrderBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,consumeAmountRequest);
int configStatus= CommonUtil.getConfigStatus(enterpriseId);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->{
JavaRDD<Long>ecuRdd=consumeRecordRDD.filter(data->{
boolean result=false;
if(StringUtils.isNotEmpty(data.getReceipts_time())){
Date receiptsTime= DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATE_10);
......@@ -65,10 +65,10 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
}).mapToPair(data-> Tuple2.apply(data.getEcu_id(),data))
.groupByKey()
.mapPartitions(data->{
List<Integer>result=new ArrayList();
List<Long>result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer, Iterable<TrdVirtualOrderBean>> tp2=data.next();
Tuple2<Long, Iterable<TrdVirtualOrderBean>> tp2=data.next();
double topConsumAmount=0;
TrdVirtualOrderBean topConsumeAmountBean=null;
Map<String,TrdVirtualOrderBean> effectiveOrderMap=new HashMap();
......
......@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
......@@ -37,7 +38,6 @@ import org.elasticsearch.spark.rdd.EsRDDWriter;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.util.*;
import org.apache.spark.api.java.Optional;
import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
......@@ -46,21 +46,21 @@ import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
* @author: wangxk
* @date: 2020/4/15
*/
public class TagProcess {
public class TagProcessManager {
private List<SceneCrowdDTO>sceneCrowdDTOList=new ArrayList();
private MysqlRddManager member4RddManager;
private MysqlRddManager enterprise4RddManager;
private DataSourceSharding memberSharding4Datasource;
private boolean isProduction;
private static TagProcess instance;
public static TagProcess getInstance(){
private static TagProcessManager instance;
public static TagProcessManager getInstance(){
if(null==instance){
instance=new TagProcess();
instance=new TagProcessManager();
}
return instance;
}
private TagProcess(){}
private TagProcessManager(){}
public boolean isProduction() {
return isProduction;
......@@ -89,7 +89,7 @@ public class TagProcess {
}
member4RddManager=member4Datasource.buildRddManager();
enterprise4RddManager=enterprise4Datasource.buildRddManager();
memberSharding4Datasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_MEMBER_USER);
memberSharding4Datasource = new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_ENTERPRISE_USER);
List<TabSceneCrowd> sceneCrowdList=member4RddManager.getPojo("tab_scene_crowd", TabSceneCrowd.class,null)
.filter(new Column("valid_flag").equalTo(1))
......@@ -184,7 +184,7 @@ public class TagProcess {
DataSourceManager.getInstance().addSourceEntity(memberSharding4Datasource,enterpriseTagEntry.getKey().intValue());
}
/* if (extractData) {
if (extractData) {
DataSourceManager.getInstance().init(isProduction);
DataSourceManager.getInstance().extractDataToDatabase();
DingtalkMessageUtil.sendAlertMessage("extractData to hive finish !");
......@@ -193,7 +193,7 @@ public class TagProcess {
//处理标签数据
for (Map.Entry<Integer, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Integer enterpriseId=enterpriseTagEntry.getKey();
List<JavaPairRDD<Integer, String>> filterRddList = new ArrayList();
List<JavaPairRDD<Long, String>> filterRddList = new ArrayList();
for(TagProcessEntity entity:enterpriseTagEntry.getValue()){
for(TagConditionDTO conditionDTO:entity.tagList){
if(tagIdToFilterMap.containsKey(conditionDTO.getTagId())){
......@@ -202,41 +202,41 @@ public class TagProcess {
final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
JavaPairRDD<Integer, String> rdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId));
JavaPairRDD<Long, String> rdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId));
filterRddList.add(rdd);
}
}
}
if(filterRddList.size()>0){
JavaPairRDD<Integer,String>memberGroupRdd=null;
for(JavaPairRDD<Integer,String>rdd:filterRddList){
JavaPairRDD<Long,String>memberGroupRdd=null;
for(JavaPairRDD<Long,String>rdd:filterRddList){
if(null==memberGroupRdd){
rdd=memberGroupRdd;
}else{
memberGroupRdd.union(rdd);
}
}
JavaPairRDD<Integer,Integer>memberUserRdd=memberDatasource.getDatasetByEnterpriseId(enterpriseId).select("id").javaRDD()
.mapToPair(data->Tuple2.apply((Integer)data.getAs("id"),(Integer)data.getAs("id")))
JavaPairRDD<Long,Long>userRdd=memberSharding4Datasource.getDatasetByEnterpriseId(enterpriseId).select("id").javaRDD()
.mapToPair(data->Tuple2.apply((Long)data.getAs("id"),(Long)data.getAs("id")))
.reduceByKey((x,y)->x);
JavaPairRDD<Integer,String>updateMemberGroupRdd=memberUserRdd.leftOuterJoin(memberGroupRdd.groupByKey())
JavaPairRDD<Long,String>updateMemberGroupRdd=userRdd.leftOuterJoin(memberGroupRdd.groupByKey())
.mapPartitionsToPair(data->{
List<Tuple2<Integer,String>> result=new ArrayList();
List<Tuple2<Long,String>> result=new ArrayList();
while (data.hasNext()){
Tuple2<Integer, Optional<Iterable<String>>> tp2=data.next()._2();
Integer mcuId=tp2._1();
Tuple2<Long, Optional<Iterable<String>>> tp2=data.next()._2();
Long id=tp2._1();
String groupId=tp2._2().isPresent()? Joiner.on(" ").join(Lists.newArrayList(tp2._2().get())):"";
JSONObject json = new JSONObject();
json.put("mcuId",mcuId);
json.put("tagGroup",groupId);
result.add(Tuple2.apply(mcuId,json.toString()));
json.put("id",id);
json.put("sceneTags_b",groupId);
result.add(Tuple2.apply(id,json.toString()));
}
return result.iterator();
});
JavaPairRDD<Integer,String>cacheMemberGroupRdd=updateMemberGroupRdd.cache();
JavaPairRDD<Long,String>cacheMemberGroupRdd=updateMemberGroupRdd.cache();
updateIndex(cacheMemberGroupRdd,enterpriseId);
......@@ -244,16 +244,16 @@ public class TagProcess {
cacheMemberGroupRdd.unpersist();
}
}*/
}
}
private void saveToHive(JavaPairRDD<Integer, String> updateMemberGroup,Integer enterpriseId){
private void saveToHive(JavaPairRDD<Long, String> updateMemberGroup,Integer enterpriseId){
JavaRDD<TagGroupInfo> tagGroupInfoRDD=updateMemberGroup.mapPartitions(data->{
List<TagGroupInfo>tagGroupInfoList=new ArrayList();
while(data.hasNext()){
Tuple2<Integer,String>tp2=data.next();
Tuple2<Long,String>tp2=data.next();
JSONObject json = JSONObject.parseObject(tp2._2());
if(StringUtils.isNotEmpty(json.getString("tagGroup"))){
tagGroupInfoList.add(new TagGroupInfo(json.getInteger("mcuId"),enterpriseId,json.getString("tagGroup")));
......@@ -268,7 +268,7 @@ public class TagProcess {
}
private void updateIndex(JavaPairRDD<Integer, String> updateMemberGroup, Integer enterpriseId) {
private void updateIndex(JavaPairRDD<Long, String> updateMemberGroup, Integer enterpriseId) {
Map<String, String> conf = new HashMap();
String indexName = EsRequestUtil.getESIindexName(enterpriseId);
......
......@@ -7,6 +7,7 @@ package com.gic.spark.util;
*/
public class ConstantUtil {
public static final String TAB_ENTERPRISE_USER="tab_enterprise_user";
public static final String TAB_MEMBER_USER="tab_member_user";
public static final String TAB_COUPON_LOG="tab_coupon_log";
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment