Commit 7188430b by guos

会员标签4.0

parent 7c1a591f
##sharding.config
member.sharding.4=member_sharding_4.0_test
marketing.sharding=marketing_sharding_test
##MQ-zk
mq.zk.host=10.105.70.186
mq.zk.port=2199
mq.secret.id=AKIDnV4ThRrApLformrMwOfopiT3Vmx1LeCY
mq.secret.key=JRiqnbEi8iloggEes692mcREC8uY1V5b
\ No newline at end of file
package com.gic.spark.entity.bean;
import java.io.Serializable;
import java.sql.Timestamp;
/**
* @description:
* @author: wangxk
* @date: 2020/4/28
*/
public class TagCouponBean implements Serializable{
private Integer coupon_id;
private Integer ecu_Id;
private Integer scu_Id;
private Integer acu_Id;
private Integer mcu_Id;
private java.sql.Timestamp effect_End_Time;
private Integer status;
public Integer getCoupon_id() {
return coupon_id;
}
public void setCoupon_id(Integer coupon_id) {
this.coupon_id = coupon_id;
}
public Integer getEcu_Id() {
return ecu_Id;
}
public void setEcu_Id(Integer ecu_Id) {
this.ecu_Id = ecu_Id;
}
public Integer getScu_Id() {
return scu_Id;
}
public void setScu_Id(Integer scu_Id) {
this.scu_Id = scu_Id;
}
public Integer getAcu_Id() {
return acu_Id;
}
public void setAcu_Id(Integer acu_Id) {
this.acu_Id = acu_Id;
}
public Integer getMcu_Id() {
return mcu_Id;
}
public void setMcu_Id(Integer mcu_Id) {
this.mcu_Id = mcu_Id;
}
public Timestamp getEffect_End_Time() {
return effect_End_Time;
}
public void setEffect_End_Time(Timestamp effect_End_Time) {
this.effect_End_Time = effect_End_Time;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
}
......@@ -13,6 +13,7 @@ public class TagIntegralBean implements Serializable {
private int enterprise_Id;
private int cu_Type;
private int cu_Id;
private int file_ecu_id;
private int integral_Value;
private java.sql.Timestamp create_Time;
......@@ -55,4 +56,12 @@ public class TagIntegralBean implements Serializable {
public void setCreate_Time(Timestamp create_Time) {
this.create_Time = create_Time;
}
public int getFile_ecu_id() {
return file_ecu_id;
}
public void setFile_ecu_id(int file_ecu_id) {
this.file_ecu_id = file_ecu_id;
}
}
package com.gic.spark.entity.enumtype;
/**
* @description:
* @author: wangxk
* @date: 2020/4/29
*/
public enum CouponType {
/**
* 待领取
*/
UNCLAIMED,
/**
* 已领取未核销
*/
NO_CANCEL,
/**
*
*/
NO_CANCEL_EXPIRES,
/**
*
*/
CANCEL,
/**
*
*/
OCCUPY,
/**
*
*/
DONATION_IN,
/**
*
*/
YET_DONATION
}
......@@ -7,6 +7,8 @@ package com.gic.spark.entity.enumtype;
*/
public enum DomainType {
ECU_INFO("ecuInfo"),
ACU_INFO("acuInfo"),
SCU_INFO("scuInfo"),
......
package com.gic.spark.entity.request;
import com.gic.spark.entity.enumtype.NumberType;
import java.util.List;
/**
* @description:
* 卡卷类
......@@ -7,7 +11,72 @@ package com.gic.spark.entity.request;
* @date: 2020/4/21
*/
public class CouponRequest extends FilterRequest{
private static final long serialVersionUID = 4411295146033929579L;
/**
* 最小数字
*/
private Integer beginNum;
/**
* 最大数字
*/
private Integer endNum;
/**
* 相等数字
*/
private Integer equalNum;
private NumberType numberType;
private List<Integer>couponList;
private String tagCode;
public Integer getBeginNum() {
return beginNum;
}
public void setBeginNum(Integer beginNum) {
this.beginNum = beginNum;
}
public Integer getEndNum() {
return endNum;
}
public void setEndNum(Integer endNum) {
this.endNum = endNum;
}
public Integer getEqualNum() {
return equalNum;
}
public void setEqualNum(Integer equalNum) {
this.equalNum = equalNum;
}
public NumberType getNumberType() {
return numberType;
}
public void setNumberType(NumberType numberType) {
this.numberType = numberType;
}
public List<Integer> getCouponList() {
return couponList;
}
public void setCouponList(List<Integer> couponList) {
this.couponList = couponList;
}
public String getTagCode() {
return tagCode;
}
public void setTagCode(String tagCode) {
this.tagCode = tagCode;
}
}
......@@ -12,6 +12,8 @@ import java.util.List;
*/
public abstract class FilterRequest implements Serializable{
private static final long serialVersionUID = 4411295146033929579L;
protected DomainType domainType;
protected List<String> cuVals;
......
......@@ -15,5 +15,5 @@ public interface BaseTagFilter {
List<DataSourceEntity> necessarySourceList();
JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest filterOption);
JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request);
}
......@@ -3,6 +3,8 @@ package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
......@@ -15,7 +17,7 @@ import java.util.List;
*/
public class TagAboutExpireIntegralFilter implements BaseTagFilter{
private DataSourceSharding memberSharding=new DataSourceSharding("member_sharding_4.0_test","tab_integral_cu_change_log");
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MEMBER_SHARDING_4, ConstantUtil.TAB_INTEGRAL_CU_CHANGE_LOG);
private static TagAboutExpireIntegralFilter instance;
public static TagAboutExpireIntegralFilter getInstance() {
......@@ -29,7 +31,7 @@ public class TagAboutExpireIntegralFilter implements BaseTagFilter{
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(memberSharding);
result.add(dataSourceSharding);
return result;
}
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagCouponBean;
import com.gic.spark.entity.request.CouponRequest;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
* @author: wangxk
* @date: 2020/4/29
*/
public class TagCouponFilter implements BaseTagFilter{
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MARKETING_SHARDING, ConstantUtil.TAB_COUPON_LOG);
private static TagCouponFilter instance;
public static TagCouponFilter getInstance(){
if(null==instance){
instance=new TagCouponFilter();
}
return instance;
}
private TagCouponFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceSharding);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request) {
CouponRequest couponRequest=(CouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD= MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId),TagCouponBean.class).javaRDD()
.filter(data->couponRequest.getCouponList().contains(data.getCoupon_id()));
couponBeanRDD.mapPartitions(data->{
Set<Integer>result=new HashSet();
while (data.hasNext()){
TagCouponBean couponBean=data.next();
switch (couponRequest.getTagCode()){
}
}
return result.iterator();
});
return null;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.entity.DataSourceSharding;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagCouponBean;
import com.gic.spark.entity.request.CouponRequest;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.util.AppEnvUtil;
import com.gic.spark.util.ConstantUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @description:
......@@ -14,6 +27,7 @@ import java.util.List;
*/
public class TagCurrentCouponNumFilter implements BaseTagFilter{
private DataSourceSharding dataSourceSharding=new DataSourceSharding(AppEnvUtil.MARKETING_SHARDING, ConstantUtil.TAB_COUPON_LOG);
private static TagCurrentCouponNumFilter instance;
public static TagCurrentCouponNumFilter getInstance() {
if(null==instance){
......@@ -25,11 +39,94 @@ public class TagCurrentCouponNumFilter implements BaseTagFilter{
@Override
public List<DataSourceEntity> necessarySourceList() {
return null;
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceSharding);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest filterOption) {
return null;
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, FilterRequest request) {
CouponRequest couponRequest=(CouponRequest)request;
JavaRDD<TagCouponBean> couponBeanRDD=MysqlRddManager.getPojoFromDataset(dataSourceSharding.getDatasetByEnterpriseId(enterpriseId),TagCouponBean.class).javaRDD()
.filter(data->(data.getStatus()==3||data.getStatus()==4)&&data.getEffect_End_Time().getTime()>System.currentTimeMillis());
JavaRDD<Integer>ecuRDD=couponBeanRDD.mapPartitionsToPair(data->{
List<Tuple2<String,Integer>>result=new ArrayList();
while (data.hasNext()){
TagCouponBean couponBean=data.next();
switch(couponRequest.getDomainType()){
case ECU_INFO:
result.add(Tuple2.apply(couponBean.getEcu_Id().toString(),1));
break;
case ACU_INFO:
if(null!=couponBean.getAcu_Id()&&couponRequest.getCuVals().contains(couponBean.getAcu_Id())){
result.add(Tuple2.apply(couponBean.getAcu_Id()+"&"+couponBean.getEcu_Id(),1));
}
break;
case SCU_INFO:
if(null!=couponBean.getAcu_Id()&&couponRequest.getCuVals().contains(couponBean.getScu_Id())){
result.add(Tuple2.apply(couponBean.getScu_Id()+"&"+couponBean.getEcu_Id(),1));
}
break;
case MCU_INFO:
if(null!=couponBean.getMcu_Id()&&couponRequest.getCuVals().contains(couponBean.getMcu_Id())){
result.add(Tuple2.apply(couponBean.getMcu_Id()+"&"+couponBean.getEcu_Id(),1));
}
break;
default:break;
}
}
return result.iterator();
}).reduceByKey((x,y)->x+y)
.mapPartitions(data->{
Set<Integer> result=new HashSet();
while (data.hasNext()){
Tuple2<String,Integer>tp2=data.next();
switch (couponRequest.getDomainType()){
case ECU_INFO:
if(numberComputeHandle(couponRequest,tp2._2())){
result.add(Integer.valueOf(tp2._1()));
}
break;
default:
if(numberComputeHandle(couponRequest,tp2._2())){
result.add(Integer.valueOf(tp2._1().split("&")[1]));
}
break;
}
}
return result.iterator();
});
return ecuRDD;
}
public static boolean numberComputeHandle(CouponRequest couponRequest,int num){
boolean result=false;
switch (couponRequest.getNumberType()){
case eq:
result=num==couponRequest.getEqualNum();
break;
case lt:
result=num<couponRequest.getEndNum();
break;
case lte:
result=num<=couponRequest.getEndNum();
break;
case gt:
result=num>couponRequest.getBeginNum();
break;
case gte:
result=num>=couponRequest.getBeginNum();
break;
case between:
result=num>=couponRequest.getBeginNum()&&num<=couponRequest.getEndNum();
break;
default:break;
}
return result;
}
}
......@@ -81,6 +81,7 @@ public class TagConstant {
public static final String TAG_CODE_ACCUMULATED_INTEGRAL="accumulatedIntegral_f";//累计获取积分
public static final String TAG_CODE_ABOUT_EXPIRE_INTEGRAL="aboutExpireIntegral";//即将过期积分
public static final String TAG_DOMAIN_ECU_INFO="flag";
public static final String TAG_DOMAIN_ACU_INFO="acuInfo";
public static final String TAG_DOMAIN_SCU_INFO="scuInfo";
public static final String TAG_DOMAIN_MCU_INFO="mcuInfo";
......@@ -102,4 +103,11 @@ public class TagConstant {
public static final int CU_TYPE_ACU=1;
public static final int CU_TYPE_SCU=2;
public static final int CU_TYPE_MCU=3;
public static final String TAG_COMPONENT_CITY="component.city";//省市组件
public static final String TAG_COMPONENT_STORE="component.store";//门店组件
public static final String TAG_COMPONENT_COMMODITY="component.commodity";//商品组件
public static final String TAG_COMPONENT_COUPON="component.coupon";//卡券组件
public static final String TAG_COMPONENT_CLERK="component.clerk";//导购选择器
}
......@@ -5,12 +5,15 @@ import com.gic.spark.entity.TagConditionValDTO;
import com.gic.spark.entity.enumtype.DomainType;
import com.gic.spark.entity.enumtype.NumberType;
import com.gic.spark.entity.enumtype.TimeRangeType;
import com.gic.spark.entity.request.CouponRequest;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.request.IntegralRequest;
import com.gic.spark.util.DateUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -24,17 +27,112 @@ public class TagValueParser {
public static FilterRequest parseFilterValue(TagConditionDTO conditionDTO){
FilterRequest request=null;
switch (conditionDTO.getTagEsFieldName()){
//积分
case TagConstant.TAG_CODE_ACCUMULATED_INTEGRAL:
request=getIntegralRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_ABOUT_EXPIRE_INTEGRAL:
request=getIntegralRequest(conditionDTO.getTagTemplateElInfo());
break;
//卡卷
case TagConstant.TAG_CODE_CURRENT_COUPON_NUM:
request=getCurrentCouponNumRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_UNCLAIMED_COUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_GET_NO_CANCEL_COUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_NO_CANCEL_EXPIRES_COUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_CANCEL_COUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_OCCUPY_COUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_DONATIONINCOUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
case TagConstant.TAG_CODE_YETDONATIONCOUPON:
request=getCouponRequest(conditionDTO.getTagTemplateElInfo(),conditionDTO.getTagEsFieldName());
break;
default:break;
}
return request;
}
private static FilterRequest getCouponRequest(List<TagConditionValDTO> conditionValDTOList, String tagCode) {
CouponRequest request=new CouponRequest();
for(TagConditionValDTO conditionValDTO:conditionValDTOList){
switch (conditionValDTO.getKey()){
case TagConstant.TAG_COMPONENT_COUPON:
request.setTagCode(tagCode);
List<Integer>couponList=new ArrayList();
if(StringUtils.isNotEmpty(conditionValDTO.getVal())){
String[] couponIds= conditionValDTO.getVal().split(",");
for (String couponId : couponIds) {
couponList.add(Integer.valueOf(couponId));
}
}
request.setCouponList(couponList);
break;
default:break;
}
}
return request;
}
private static FilterRequest getCurrentCouponNumRequest(List<TagConditionValDTO> conditionValDTOList) {
CouponRequest request=new CouponRequest();
for(TagConditionValDTO conditionValDTO:conditionValDTOList){
String[] keys=conditionValDTO.getKey().split(".");
if(keys.length==3){
setDomainHandle(request,keys[1],conditionValDTO.getVal());
}else{
switch (keys[0]){
case TagConstant.TAG_KEY_TYPE_NUMBER:
switch (keys[1]){
case TagConstant.TAG_COMPUTE_TYPE_GT:
request.setBeginNum(Integer.parseInt(conditionValDTO.getVal()));
request.setNumberType(NumberType.gt);
break;
case TagConstant.TAG_COMPUTE_TYPE_GTE:
request.setBeginNum(Integer.parseInt(conditionValDTO.getVal()));
request.setNumberType(NumberType.gte);
break;
case TagConstant.TAG_COMPUTE_TYPE_LT:
request.setEndNum(Integer.parseInt(conditionValDTO.getVal()));
request.setNumberType(NumberType.lt);
break;
case TagConstant.TAG_COMPUTE_TYPE_LTE:
request.setEndNum(Integer.parseInt(conditionValDTO.getVal()));
request.setNumberType(NumberType.lte);
break;
case TagConstant.TAG_COMPUTE_TYPE_EQ:
request.setEqualNum(Integer.parseInt(conditionValDTO.getVal()));
request.setNumberType(NumberType.eq);
break;
case TagConstant.TAG_COMPUTE_TYPE_BETWEEN:
String[] vals=conditionValDTO.getVal().split(",");
request.setBeginNum(Integer.parseInt(vals[0]));
request.setEndNum(Integer.parseInt(vals[1]));
request.setNumberType(NumberType.between);
break;
default:break;
}
break;
default:break;
}
}
}
return request;
}
private static FilterRequest getIntegralRequest(List<TagConditionValDTO> conditionValDTOList) {
IntegralRequest request=new IntegralRequest();
for(TagConditionValDTO conditionValDTO:conditionValDTOList){
......@@ -101,6 +199,9 @@ public class TagValueParser {
private static void setDomainHandle(FilterRequest request,String key,String val){
switch (key){
case TagConstant.TAG_DOMAIN_ECU_INFO:
request.setDomainType(DomainType.ECU_INFO);
break;
case TagConstant.TAG_DOMAIN_ACU_INFO:
request.setDomainType(DomainType.ACU_INFO);
break;
......
package com.gic.spark.util;
import org.apache.commons.configuration.ConfigurationException;
import java.util.Properties;
/**
* @description:
* @author: wangxk
* @date: 2018/7/25
*/
public class AppEnvUtil {
public static String MEMBER_SHARDING_4;
public static String MARKETING_SHARDING;
static {
try {
Properties properties = new Properties();
properties.load(AppEnvUtil.class.getClassLoader().getResourceAsStream("application.properties"));
MEMBER_SHARDING_4=properties.getProperty("member.sharding.4");
MARKETING_SHARDING=properties.getProperty("marketing.sharding");
} catch (Exception e) {
e.printStackTrace();
System.out.println("读取配置信息失败!");
}
}
public static void main(String[] args) throws ConfigurationException {
}
}
package com.gic.spark.util;
/**
* @description:
* @author: wangxk
* @date: 2020/4/28
*/
public class ConstantUtil {
public static final String TAB_INTEGRAL_CU_CHANGE_LOG="tab_integral_cu_change_log";
public static final String TAB_COUPON_LOG="tab_coupon_log";
}
##sharding.config
member.sharding.4="member_sharding_4.0_test
marketing.sharding=marketing_sharding
##MQ-zk
#
mq.zk.host=10.105.231.13
#mq.zk.host=115.159.227.79
mq.zk.port=2199
mq.secret.id=AKIDb6PWWN8gYIuKYaobq00qV1PglIl46Ynq
mq.secret.key=0oiSuCLJ8uX0BGUsBJIg0EKEw25TileO
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment