Commit 88c20b49 by guos

会员标签4.0

parent bc7ad72c
......@@ -63,8 +63,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
......
......@@ -2,6 +2,7 @@ package com.gic.spark.entity;
import com.gic.spark.entity.table.TabSceneCrowd;
import java.util.LinkedList;
import java.util.List;
/**
......@@ -11,28 +12,13 @@ import java.util.List;
*/
public class SceneCrowdDTO extends TabSceneCrowd{
private LinkedList<TagConditionGroupDTO> conditionGroupDTOList;
private long id;
private String scene_Crowd_Name;
private long real_Time;
private java.sql.Timestamp scene_Crowd_Expire_Time;
private long scene_Crowd_Category_Id;
private String scene_Crowd_Desc;
private long scene_Crowd_Update_Type;
private long scene_Crowd_Update_Type_Val;
private java.sql.Timestamp update_Time;
private long member_Size;
private java.sql.Timestamp create_Time;
private long valid_Flag;
private String tag_Condition_Group_Info;
private long delete_Flag;
private long enterprise_Id;
private String crowd_Param;
public SceneCrowdDTO() {
}
public SceneCrowdDTO(TabSceneCrowd sceneCrowd,List<TagConditionGroupDTO> conditionGroupDTOList){
public SceneCrowdDTO(TabSceneCrowd sceneCrowd,LinkedList<TagConditionGroupDTO> conditionGroupDTOList){
this.setId(sceneCrowd.getId());
this.setScene_Crowd_Name(sceneCrowd.getScene_Crowd_Name());
this.setReal_Time(sceneCrowd.getReal_Time());
......@@ -49,15 +35,15 @@ public class SceneCrowdDTO extends TabSceneCrowd{
this.setDelete_Flag(sceneCrowd.getDelete_Flag());
this.setEnterprise_Id(sceneCrowd.getEnterprise_Id());
this.setCrowd_Param(sceneCrowd.getCrowd_Param());
}
private List<TagConditionGroupDTO> conditionGroupDTOList;
this.setConditionGroupDTOList(conditionGroupDTOList);
}
public List<TagConditionGroupDTO> getConditionGroupDTOList() {
public LinkedList<TagConditionGroupDTO> getConditionGroupDTOList() {
return conditionGroupDTOList;
}
public void setConditionGroupDTOList(List<TagConditionGroupDTO> conditionGroupDTOList) {
public void setConditionGroupDTOList(LinkedList<TagConditionGroupDTO> conditionGroupDTOList) {
this.conditionGroupDTOList = conditionGroupDTOList;
}
}
package com.gic.spark.entity.request;
import java.io.Serializable;
/**
* Created by paste on 2018/7/25 11:26
* <p>
* TODO: class description
*/
public abstract class FilterRequest implements Serializable{
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.entity.request.FilterRequest;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
/**
* Created by paste on 2018/7/25 11:25
* <p>
* TODO: class description
*/
public interface BaseTagFilter {
List<DataSourceEntity> necessarySourceList();
JavaRDD<String> filterValidMember(Long enterpriseId, FilterRequest filterOption);
}
......@@ -54,4 +54,25 @@ public class TagConstant {
public static final String TAG_TIME_TYPE_AFTER_YEAR = "afteryear";//之后几年
public static final String TAG_TIME_TYPE_LAST_YEAR_HASCURRENTYEAR = "CRlastyearHasCurrentYear"; //最近几年包含当年
public static final String TAG_TIME_TYPE_ALL_TIME = "alltime";//目前为止
/*当前卡券数
待领取卡券
已领取未核销卡券
(未核销或未领取)已到期卡券
已核销卡券
已占用卡券
转赠中卡券
已转赠卡券
非会员卡券*/
public static final String TAG_CODE_CURRENT_COUPON_NUM="currentCouponNum"; //当前卡券数
public static final String TAG_CODE_UNCLAIMED_COUPON="unclaimedCoupon"; //待领取卡券
public static final String TAG_CODE_GET_NO_CANCEL_COUPON="getNoCancelCoupon"; //已领取未核销卡券
public static final String TAG_CODE_NO_CANCEL_EXPIRES_COUPON="noCancelExpiresCoupon";//未核销已过期卡券
public static final String TAG_CODE_CANCEL_COUPON="cancelCoupon";//已核销卡券
public static final String TAG_CODE_OCCUPY_COUPON="occupyCoupon";//已占用卡券
public static final String TAG_CODE_DONATIONINCOUPON="donationInCoupon";//转赠中卡券
public static final String TAG_CODE_YETDONATIONCOUPON="yetDonationCoupon";//已转增卡券
}
package com.gic.spark.tag;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.gic.spark.datasource.DataSourceManager;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlDatasource;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.datasource.sharding.ShardingRddManager;
import com.gic.spark.entity.SceneCrowdDTO;
import com.gic.spark.entity.TagConditionDTO;
import com.gic.spark.entity.TagConditionGroupDTO;
import com.gic.spark.entity.request.FilterRequest;
import com.gic.spark.entity.table.TabMemberTagAll;
import com.gic.spark.entity.table.TabSceneCrowd;
import com.gic.spark.filter.BaseTagFilter;
import com.gic.spark.util.DingtalkMessageUtil;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import scala.Tuple2;
import javax.swing.text.TabSet;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
......@@ -60,13 +67,112 @@ public class TagProcess {
Dataset<TabMemberTagAll>memberTagAllDataset= mysqlRddManager.getPojo("tab_member_tag_all", TabMemberTagAll.class,null);
List<TabSceneCrowd> sceneCrowdList=sceneCrowdDataset.collectAsList();
for(TabSceneCrowd sceneCrowd:sceneCrowdList){
List<TagConditionGroupDTO>conditionGroupDTOList=JSONObject.parseArray(sceneCrowd.getTag_Condition_Group_Info(),TagConditionGroupDTO.class);
LinkedList<TagConditionGroupDTO>conditionGroupDTOList=JSONObject.parseObject(sceneCrowd.getTag_Condition_Group_Info(), new TypeReference<LinkedList<TagConditionGroupDTO>>(){});
sceneCrowdDTOList.add(new SceneCrowdDTO(sceneCrowd,conditionGroupDTOList));
}
}
public void process(){
public void process(boolean extractData){
Map<Long, List<TagProcessEntity>> tagGroupByEnterpriseMap = new HashMap<>();
Map<Long, BaseTagFilter> tagIdToFilterMap = new HashMap();
for(SceneCrowdDTO sceneCrowdDTO:sceneCrowdDTOList){
if(sceneCrowdDTO.getReal_Time()==1){
LinkedList<TagConditionGroupDTO> conditionGroupDTOS=sceneCrowdDTO.getConditionGroupDTOList();
for(int i=0;i<conditionGroupDTOS.size();i++){
TagProcessEntity entity=new TagProcessEntity();
entity.enterpriseId=sceneCrowdDTO.getEnterprise_Id();
entity.tagGroupId=sceneCrowdDTO.getId();
entity.level=i+1;
entity.tagList=conditionGroupDTOS.get(i).getConditionInfos();
for(TagConditionDTO conditionDTO:entity.tagList){//将tag同filter进行映射
BaseTagFilter tagFilter=getTagFilter(conditionDTO.getTagEsFieldName());
if(null!=tagFilter){
tagIdToFilterMap.put(conditionDTO.getTagId(),tagFilter);
}
}
if(!tagGroupByEnterpriseMap.containsKey(sceneCrowdDTO.getEnterprise_Id())){
tagGroupByEnterpriseMap.put(sceneCrowdDTO.getEnterprise_Id(),new ArrayList());
}
tagGroupByEnterpriseMap.get(sceneCrowdDTO.getEnterprise_Id()).add(entity);
}
}
}
//准备标签需要的数据
for (Map.Entry<Long, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
for (TagProcessEntity processEntity : enterpriseTagEntry.getValue()) {
for (TagConditionDTO conditionDTO : processEntity.tagList) {
if (tagIdToFilterMap.containsKey(conditionDTO.getTagId())) {
for (DataSourceEntity sourceEntity : tagIdToFilterMap.get(conditionDTO.getTagId()).necessarySourceList()) {
DataSourceManager.getInstance().addSourceEntity(sourceEntity, enterpriseTagEntry.getKey().intValue());
}
}
}
}
}
if (extractData) {
DataSourceManager.getInstance().extractDataToDatabase();
DingtalkMessageUtil.sendAlertMessage("extractData to hive finish !");
}
//处理标签数据
for (Map.Entry<Long, List<TagProcessEntity>> enterpriseTagEntry : tagGroupByEnterpriseMap.entrySet()) {
Long enterpriseId=enterpriseTagEntry.getKey();
List<JavaPairRDD<String, String>> filterRddList = new ArrayList<>();
for(TagProcessEntity entity:enterpriseTagEntry.getValue()){
for(TagConditionDTO conditionDTO:entity.tagList){
if(tagIdToFilterMap.containsKey(conditionDTO.getTagId())){
BaseTagFilter tagFilter=tagIdToFilterMap.get(conditionDTO.getTagId());
FilterRequest filterRequest=TagValueParser.parseFilterValue(conditionDTO);
final String groupId = entity.tagGroupId + "_" + conditionDTO.getTagId() + "_" + entity.level;
JavaPairRDD<String, String> rdd = tagFilter.filterValidMember(enterpriseId,filterRequest).mapToPair(data-> Tuple2.apply(data,groupId));
filterRddList.add(rdd);
}
}
}
if(filterRddList.size()>0){
JavaPairRDD<String,String>memberGroupRdd=null;
for(JavaPairRDD<String,String>rdd:filterRddList){
if(null==memberGroupRdd){
rdd=memberGroupRdd;
}else{
memberGroupRdd.union(rdd);
}
}
}
}
}
private static BaseTagFilter getTagFilter(String tagEsFieldName){
BaseTagFilter tagFilter=null;
switch (tagEsFieldName) {
//卡卷类
case TagConstant.TAG_CODE_CURRENT_COUPON_NUM:
break;
case TagConstant.TAG_CODE_UNCLAIMED_COUPON:
break;
case TagConstant.TAG_CODE_GET_NO_CANCEL_COUPON:
break;
case TagConstant.TAG_CODE_NO_CANCEL_EXPIRES_COUPON:
break;
case TagConstant.TAG_CODE_CANCEL_COUPON:
break;
case TagConstant.TAG_CODE_DONATIONINCOUPON:
break;
case TagConstant.TAG_CODE_YETDONATIONCOUPON:
break;
default:
DingtalkMessageUtil.sendAlertMessage("暂未支持的非实时标签: " + tagEsFieldName);
break;
}
return tagFilter;
}
}
package com.gic.spark.tag;
import com.gic.spark.entity.TagConditionDTO;
import java.util.List;
/**
* Created by paste on 2018/10/24 10:57
* <p>
* TODO: class description
*/
public class TagProcessEntity {
long enterpriseId;
long tagGroupId;
int level;
List<TagConditionDTO> tagList;
}
package com.gic.spark.tag;
import com.gic.spark.entity.TagConditionDTO;
import com.gic.spark.entity.request.FilterRequest;
/**
* @description:
* @author: wangxk
......@@ -7,4 +10,9 @@ package com.gic.spark.tag;
*/
public class TagValueParser {
public static FilterRequest parseFilterValue(TagConditionDTO conditionDTO){
FilterRequest request=null;
return request;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment