Commit f00fdf9b by guos

会员标签4.0

parent 9001b712
package com.gic.spark;
import com.alibaba.fastjson.JSONObject;
import com.gic.spark.entity.enumtype.TimeRangeType;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.Calendar;
import java.util.Date;
import java.util.*;
/**
* @description:
......@@ -16,11 +16,17 @@ public class Test {
public static void main(String[] args){
// System.out.println("09017237595".matches("^[0-9a-zA-Z_-||.]{1,}$"));
//
// String str="2020-05-04";
//// String s=str.replaceAll("-","");
// String s=StringUtils.substringAfter(str, "-").replaceAll("-", "");
// System.out.println(s);
// StringBuilder st=new StringBuilder();
String str="2020-05-04";
// String s=str.replaceAll("-","");
String s=StringUtils.substringAfter(str, "-").replaceAll("-", "");
System.out.println(s);
StringBuilder st=new StringBuilder();
String str="{\"dm\":[\"7319\",\"7315\"]}";
Map<String,List<String>> map = JSONObject.parseObject(str,HashMap.class);
for(List<String> list:map.values()){
list.forEach(data->System.out.println("element==>"+data));
}
}
}
......@@ -23,6 +23,7 @@ public class TagConsumeRecordBean implements Serializable {
private double paid_amt;//实付金额
private double total_amt;//吊牌价总额
private String receipts_time;//单据日期
private String shop_id;//店铺id
public int getEnt_id() {
return ent_id;
......@@ -143,4 +144,12 @@ public class TagConsumeRecordBean implements Serializable {
public void setOorder_no(String oorder_no) {
this.oorder_no = oorder_no;
}
public String getShop_id() {
return shop_id;
}
public void setShop_id(String shop_id) {
this.shop_id = shop_id;
}
}
......@@ -4,6 +4,7 @@ import com.gic.spark.entity.enumtype.TimeRangeType;
import com.gic.spark.entity.enumtype.YearMonthDayType;
import java.sql.Timestamp;
import java.util.Date;
/**
* @description:
......@@ -14,11 +15,11 @@ public class AbstractFilterRequestTime extends AbstractFilterRequest{
/**
* 开始时间
*/
private Timestamp beginTime;
private Date beginTime;
/**
* 结束时间
*/
private Timestamp endTime;
private Date endTime;
/**
* 最近 年/月/日 时间
*/
......@@ -28,19 +29,19 @@ public class AbstractFilterRequestTime extends AbstractFilterRequest{
private YearMonthDayType yearMonthDayType;
public Timestamp getBeginTime() {
public Date getBeginTime() {
return beginTime;
}
public void setBeginTime(Timestamp beginTime) {
public void setBeginTime(Date beginTime) {
this.beginTime = beginTime;
}
public Timestamp getEndTime() {
public Date getEndTime() {
return endTime;
}
public void setEndTime(Timestamp endTime) {
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
......
......@@ -7,7 +7,7 @@ import java.util.List;
* @author: wangxk
* @date: 2020/8/10
*/
public class TagChannelRequest extends AbstractFilterRequest{
public class TagConsumeChannelRequest extends AbstractFilterRequest{
private List<String> ChannelVals;
......
package com.gic.spark.entity.request;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagConsumeStoreRequest extends AbstractFilterRequestTime{
private List<String>storeList;
public List<String> getStoreList() {
return storeList;
}
public void setStoreList(List<String> storeList) {
this.storeList = storeList;
}
}
......@@ -4,7 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagChannelRequest;
import com.gic.spark.entity.request.TagConsumeChannelRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
......@@ -36,7 +36,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagChannelRequest channelRequest=(TagChannelRequest)request;
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest);
......
package com.gic.spark.filter;
import com.gic.mysql.util.StringUtil;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagFirstOfflineConsumptionStoreFilter instance;
public static TagFirstOfflineConsumptionStoreFilter getInstance() {
return instance;
}
private TagFirstOfflineConsumptionStoreFilter(){}
public static void setInstance(TagFirstOfflineConsumptionStoreFilter instance) {
TagFirstOfflineConsumptionStoreFilter.instance = instance;
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time())
&&StringUtils.isNotEmpty(data.getStore_info_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getStore_info_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
<DateUtil.strToDate(y._1(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
}).filter(data->storeRequest.getStoreList().contains(data._2()._2()))
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.enumtype.YearMonthDayType;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagFirstOnlineConsumptionStoreFilter instance;
public static TagFirstOnlineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagFirstOnlineConsumptionStoreFilter();
}
return instance;
}
private TagFirstOnlineConsumptionStoreFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
<DateUtil.strToDate(y._1(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
}).filter(data->storeRequest.getStoreList().contains(data._2()._2()))
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagHistoryOfflineConsumptionStoreFilter instance;
public static TagHistoryOfflineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagHistoryOfflineConsumptionStoreFilter();
}
return instance;
}
private TagHistoryOfflineConsumptionStoreFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& StringUtils.isNotEmpty(data.getStore_info_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
.filter(data->storeRequest.getStoreList().contains(data._2()))
.reduceByKey((x,y)->x)
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagHistoryOnlineConsumptionStoreFilter instance;
public static TagHistoryOnlineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagHistoryOnlineConsumptionStoreFilter();
}
return instance;
}
private TagHistoryOnlineConsumptionStoreFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getShop_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getShop_id()))
.filter(data->storeRequest.getStoreList().contains(data._2()))
.reduceByKey((x,y)->x)
.map(data->data._1());
return ecuRdd;
}
}
......@@ -4,7 +4,7 @@ import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagChannelRequest;
import com.gic.spark.entity.request.TagConsumeChannelRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
......@@ -39,7 +39,7 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagChannelRequest channelRequest=(TagChannelRequest)request;
TagConsumeChannelRequest channelRequest=(TagConsumeChannelRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,channelRequest);
......
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagLatelyOnlineConsumptionStoreFilter instance;
public static TagLatelyOnlineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagLatelyOnlineConsumptionStoreFilter();
}
return instance;
}
private TagLatelyOnlineConsumptionStoreFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
>DateUtil.strToDate(y._1(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
}).filter(data->storeRequest.getStoreList().contains(data._2()._2()))
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.enumtype.YearMonthDayType;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagOfflineConsumptionStoreFilter instance;
public static TagOfflineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagOfflineConsumptionStoreFilter();
}
return instance;
}
private TagOfflineConsumptionStoreFilter(){}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()==1
&& StringUtils.isNotEmpty(data.getStore_info_id())
&& StringUtils.isNotEmpty(data.getReceipts_time()))
.filter(data->checkTime(storeRequest, DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),data.getStore_info_id()))
.filter(data->storeRequest.getStoreList().contains(data._2()))
.reduceByKey((x,y)->x)
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.enumtype.YearMonthDayType;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagOnlineConsumptionStoreFilter instance;
public static TagOnlineConsumptionStoreFilter getInstance() {
return instance;
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
storeRequest.setYearMonthDayType(YearMonthDayType.DAY);
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data->data.getOrder_channel_code()!=1
&& StringUtils.isNotEmpty(data.getReceipts_time())
&& StringUtils.isNotEmpty(data.getShop_id()))
.filter(data->checkTime(storeRequest, DateUtil.strToDate(data.getReceipts_time(),DateUtil.FORMAT_DATETIME_19).getTime()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getShop_id())))
.filter(data->storeRequest.getStoreList().contains(data._2()._2()))
.reduceByKey((x,y)->x)
.map(data->data._1());
return ecuRdd;
}
}
package com.gic.spark.filter;
import com.gic.spark.datasource.entity.DataSourceEntity;
import com.gic.spark.datasource.mysql.MysqlRddManager;
import com.gic.spark.entity.bean.TagConsumeRecordBean;
import com.gic.spark.entity.request.AbstractFilterRequest;
import com.gic.spark.entity.request.TagConsumeStoreRequest;
import com.gic.spark.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: wangxk
* @date: 2020/8/11
*/
public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFilter{
private static TagRecentlyOfflineConsumptionStoreFilter instance;
public static TagRecentlyOfflineConsumptionStoreFilter getInstance() {
if(null==instance){
instance=new TagRecentlyOfflineConsumptionStoreFilter();
}
return instance;
}
@Override
public List<DataSourceEntity> necessarySourceList() {
List<DataSourceEntity> result = new ArrayList();
result.add(dataSourceHive);
return result;
}
@Override
public JavaRDD<Integer> filterValidMember(Integer enterpriseId, AbstractFilterRequest request) {
TagConsumeStoreRequest storeRequest=(TagConsumeStoreRequest)request;
JavaRDD<TagConsumeRecordBean>consumeRecordRDD= MysqlRddManager.getPojoFromDataset(dataSourceHive.getDatasetByEnterpriseId(enterpriseId),TagConsumeRecordBean.class).javaRDD();
consumeRecordRDD=statisticsTypeHandle(consumeRecordRDD,storeRequest);
JavaRDD<Integer>ecuRdd=consumeRecordRDD.filter(data-> data.getOrder_channel_code()==1
&&StringUtils.isNotEmpty(data.getReceipts_time())
&&StringUtils.isNotEmpty(data.getStore_info_id()))
.mapToPair(data-> Tuple2.apply(data.getEcu_id(),Tuple2.apply(data.getReceipts_time(),data.getStore_info_id())))
.reduceByKey((x,y)->{
if(DateUtil.strToDate(x._1(),DateUtil.FORMAT_DATETIME_19).getTime()
>DateUtil.strToDate(y._1(),DateUtil.FORMAT_DATETIME_19).getTime()){
return x;
}else{
return y;
}
}).filter(data->storeRequest.getStoreList().contains(data._2()._2()))
.map(data->data._1());
return ecuRdd;
}
}
......@@ -60,7 +60,17 @@ public class TagConstant {
public static final String TAG_CODE_FIRST_CONSUMPTION_CHANNEL="firstConsumptionChannel";//首次消费渠道
public static final String TAG_CODE_LATELY_CONSUMPTION_CHANNEL="latelyConsumptionChannel";//最近消费渠道
//消费时间
public static final String TAG_CODE_FIRST_OFFLINE_CONSUMPTION_STORE="firstOfflineConsumptionStore";//首次线下消费门
public static final String TAG_CODE_RECENTLY_OFFLINE_CONSUMPTION_STORE="recentlyOfflineConsumptionStore";//最近线下消费门
public static final String TAG_CODE_HISTORY_OFFLINE_CONSUMPTION_STORE="historyOfflineConsumptionStore";//历史线下消费门
public static final String TAG_CODE_OFFLINE_CONSUMPTION_STORE="offlineConsumptionStore";//线下消费门店
public static final String TAG_CODE_FIRST_ONLINE_CONSUMPTION_STORE="firstOnlineConsumptionStore";//首次线上消费店铺
public static final String TAG_CODE_LATELY_ONLINE_CONSUMPTION_STORE="latelyOnlineConsumptionStore";//最近线上消费店
public static final String TAG_CODE_HISTORY_ONLINE_CONSUMPTION_STORE="historyOnlineConsumptionStore";//历史线上消费店
public static final String TAG_CODE_ONLINE_CONSUMPTION_STORE="onlineConsumptionStore";//线上消费店铺
//消费时间
public static final String TAG_CODE_FIRST_CONSUME_TIME="firstConsumeTime";//首次消费时间
// public static final String TAG_CODE_HISTORY_CONSUME_TIME="historyConsumeTime";//历史消费时
public static final String TAG_CODE_LATELY_CONSUME_TIME="latelyConsumeTime";//最近消费时间
......
......@@ -119,6 +119,35 @@ public class TagFilterFactory {
case TagConstant.TAG_CODE_LATELY_CONSUMPTION_CHANNEL:
tagFilter=TagLatelyConsumptionChannelFilter.getInstance();
break;
//线下消费门店
case TagConstant.TAG_CODE_FIRST_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagFirstOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_RECENTLY_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagRecentlyOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagHistoryOfflineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_OFFLINE_CONSUMPTION_STORE:
tagFilter=TagOfflineConsumptionStoreFilter.getInstance();
break;
//线上消费店铺
case TagConstant.TAG_CODE_FIRST_ONLINE_CONSUMPTION_STORE:
tagFilter=TagFirstOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_LATELY_ONLINE_CONSUMPTION_STORE:
tagFilter=TagLatelyOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_HISTORY_ONLINE_CONSUMPTION_STORE:
tagFilter=TagHistoryOnlineConsumptionStoreFilter.getInstance();
break;
case TagConstant.TAG_CODE_ONLINE_CONSUMPTION_STORE:
tagFilter=TagOnlineConsumptionStoreFilter.getInstance();
break;
default:
DingtalkMessageUtil.sendAlertMessage("暂未支持的非实时标签: " + tagEsFieldName);
break;
......
package com.gic.spark.tag;
import com.alibaba.fastjson.JSONObject;
import com.gic.spark.entity.TagConditionDTO;
import com.gic.spark.entity.TagConditionValDTO;
import com.gic.spark.entity.enumtype.*;
import com.gic.spark.entity.request.*;
import com.gic.spark.util.DateUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.regex.Pattern;
import static com.gic.spark.entity.enumtype.TagValKeyEnum.COMPONENT_STORE;
/**
* @description:
* @author: wangxk
......@@ -111,21 +111,122 @@ public class TagValueParser {
case TagConstant.TAG_CODE_CONSUME_SLEEP_DAY:
// request=
break;
default:break;
//消费渠道
case TagConstant.TAG_CODE_FIRST_CONSUMPTION_CHANNEL:
request=getChannelRequest(conditionDTO.getTagTemplateElInfo());
request=getConsumeChannelRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_LATELY_CONSUMPTION_CHANNEL:
request=getChannelRequest(conditionDTO.getTagTemplateElInfo());
request=getConsumeChannelRequest(conditionDTO.getTagTemplateElInfo());
break;
//线下消费门店
case TagConstant.TAG_CODE_FIRST_OFFLINE_CONSUMPTION_STORE:
request=getOfflineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_RECENTLY_OFFLINE_CONSUMPTION_STORE:
request=getOfflineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_HISTORY_OFFLINE_CONSUMPTION_STORE:
request=getOfflineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_OFFLINE_CONSUMPTION_STORE:
request=getOfflineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
//线上消费店铺
case TagConstant.TAG_CODE_FIRST_ONLINE_CONSUMPTION_STORE:
request=getOnlineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_LATELY_ONLINE_CONSUMPTION_STORE:
request=getOnlineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_HISTORY_ONLINE_CONSUMPTION_STORE:
request=getOnlineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
case TagConstant.TAG_CODE_ONLINE_CONSUMPTION_STORE:
request=getOnlineConsumeStoreRequest(conditionDTO.getTagTemplateElInfo());
break;
default:break;
}
return request;
}
private static AbstractFilterRequest getOnlineConsumeStoreRequest(List<TagConditionValDTO> conditionValDTOList) {
TagConsumeStoreRequest request=new TagConsumeStoreRequest();
List<String>storeList=new ArrayList();
for (TagConditionValDTO conditionValDTO : conditionValDTOList) {
if (Pattern.compile("flag").matcher(conditionValDTO.getKey()).find()) {
setStatisticsTypeHandle(request, conditionValDTO.getKey(), conditionValDTO.getVal());
} else if(conditionValDTO.getKey().equals(TagValKeyEnum.COMPONENT_STORE.getKey())){
String val=conditionValDTO.getVal();
Map<String,List<String>> storeMap = JSONObject.parseObject(val,HashMap.class);
for(List<String>lts:storeMap.values()){
lts.forEach(store->storeList.add(store));
}
request.setStoreList(storeList);
}else if(Pattern.compile("timeRange").matcher(conditionValDTO.getKey()).find()){
String[] keys=conditionValDTO.getKey().split(".");
switch (keys[1]){
case TagConstant.TAG_TIME_TYPE_FIXATION:
String[] vals=conditionValDTO.getVal().split(",");
request.setBeginTime(DateUtil.stringToDate(vals[0]));
request.setEndTime(DateUtil.stringToDate(vals[1]));
request.setTimeRangeType(TimeRangeType.FIXATION);
break;
case TagConstant.TAG_TIME_TYPE_LATELY:
request.setTimeNum(Integer.parseInt(conditionValDTO.getVal()));
request.setTimeRangeType(TimeRangeType.LATELY);
request.setYearMonthDayType(YearMonthDayType.DAY);
break;
case TagConstant.TAG_TIME_TYPE_LATER:
request.setTimeNum(Integer.parseInt(conditionValDTO.getVal()));
request.setTimeRangeType(TimeRangeType.LATER);
request.setYearMonthDayType(YearMonthDayType.DAY);
break;
default:break;
}
}
}
return request;
}
private static AbstractFilterRequest getOfflineConsumeStoreRequest(List<TagConditionValDTO> conditionValDTOList) {
TagConsumeStoreRequest request=new TagConsumeStoreRequest();
for (TagConditionValDTO conditionValDTO : conditionValDTOList) {
if (Pattern.compile("flag").matcher(conditionValDTO.getKey()).find()) {
setStatisticsTypeHandle(request, conditionValDTO.getKey(), conditionValDTO.getVal());
} else if(conditionValDTO.getKey().equals(TagValKeyEnum.COMPONENT_STORE.getKey())){
String val=conditionValDTO.getVal();
List<String>storeList=null ;//需要调用接口
request.setStoreList(storeList);
}else if(Pattern.compile("timeRange").matcher(conditionValDTO.getKey()).find()){
String[] keys=conditionValDTO.getKey().split(".");
switch (keys[1]){
case TagConstant.TAG_TIME_TYPE_FIXATION:
String[] vals=conditionValDTO.getVal().split(",");
request.setBeginTime(DateUtil.stringToDate(vals[0]));
request.setEndTime(DateUtil.stringToDate(vals[1]));
request.setTimeRangeType(TimeRangeType.FIXATION);
break;
case TagConstant.TAG_TIME_TYPE_LATELY:
request.setTimeNum(Integer.parseInt(conditionValDTO.getVal()));
request.setTimeRangeType(TimeRangeType.LATELY);
request.setYearMonthDayType(YearMonthDayType.DAY);
break;
case TagConstant.TAG_TIME_TYPE_LATER:
request.setTimeNum(Integer.parseInt(conditionValDTO.getVal()));
request.setTimeRangeType(TimeRangeType.LATER);
request.setYearMonthDayType(YearMonthDayType.DAY);
break;
default:break;
}
}
}
return request;
}
private static AbstractFilterRequest getChannelRequest(List<TagConditionValDTO> conditionValDTOList) {
TagChannelRequest request = new TagChannelRequest();
private static AbstractFilterRequest getConsumeChannelRequest(List<TagConditionValDTO> conditionValDTOList) {
TagConsumeChannelRequest request = new TagConsumeChannelRequest();
for (TagConditionValDTO conditionValDTO : conditionValDTOList) {
if (Pattern.compile("flag").matcher(conditionValDTO.getKey()).find()) {
setStatisticsTypeHandle(request, conditionValDTO.getKey(), conditionValDTO.getVal());
......@@ -309,8 +410,8 @@ public class TagValueParser {
if(key.equals(TagValKeyEnum.CALENDAR_YMD.getKey())){
String[] vals=val.split(",");
request.setBeginTime(new Timestamp(DateUtil.stringToDate(vals[0]).getTime()));
request.setEndTime(new Timestamp(DateUtil.stringToDate(vals[1]).getTime()));
request.setBeginTime(DateUtil.stringToDate(vals[0]));
request.setEndTime(DateUtil.stringToDate(vals[1]));
request.setTimeRangeType(TimeRangeType.FIXATION);
}else if(key.equals(TagValKeyEnum.CALENDAR_YMD_DAY_TODAY.getKey())){
request.setYearMonthDayType(YearMonthDayType.DAY);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment