Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-spark-tag-4.0
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wangxiaokang
gic-spark-tag-4.0
Commits
9001b712
Commit
9001b712
authored
Aug 10, 2020
by
guos
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
会员标签4.0
parent
ddbf228e
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
29 changed files
with
1771 additions
and
75 deletions
+1771
-75
TagConsumeRecordBean.java
.../java/com/gic/spark/entity/bean/TagConsumeRecordBean.java
+146
-0
TimeRangeType.java
...ain/java/com/gic/spark/entity/enumtype/TimeRangeType.java
+5
-0
YearMonthDayType.java
.../java/com/gic/spark/entity/enumtype/YearMonthDayType.java
+35
-0
AbstractFilterRequest.java
...a/com/gic/spark/entity/request/AbstractFilterRequest.java
+21
-0
AbstractFilterRequestDouble.java
...gic/spark/entity/request/AbstractFilterRequestDouble.java
+58
-0
AbstractFilterRequestTime.java
...m/gic/spark/entity/request/AbstractFilterRequestTime.java
+70
-0
TagChannelRequest.java
.../java/com/gic/spark/entity/request/TagChannelRequest.java
+21
-0
TagConsumeAmountRequest.java
...com/gic/spark/entity/request/TagConsumeAmountRequest.java
+73
-0
TagConsumeRequest.java
.../java/com/gic/spark/entity/request/TagConsumeRequest.java
+0
-19
TagConsumeTimeRequest.java
...a/com/gic/spark/entity/request/TagConsumeTimeRequest.java
+14
-0
TabDataActuallyPaidConfig.java
...com/gic/spark/entity/table/TabDataActuallyPaidConfig.java
+77
-0
AbstractTagConsumRecordFilter.java
...a/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
+87
-0
TagConsumeTimeFilter.java
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
+55
-0
TagConsumeTotalFilter.java
...main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
+110
-0
TagFirstConsumeTimeFilter.java
.../java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
+56
-0
TagFirstConsumptionChannelFilter.java
...om/gic/spark/filter/TagFirstConsumptionChannelFilter.java
+61
-0
TagFirstConsumptionMoneyFilter.java
.../com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
+124
-0
TagHistoryConsumeTotalFilter.java
...va/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
+89
-0
TagLatelyConsumeTimeFilter.java
...java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
+55
-0
TagLatelyConsumptionChannelFilter.java
...m/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
+64
-0
TagLatelyConsumptionMoneyFilter.java
...com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
+127
-0
TagLowestSingleConsumptionMoneyFilter.java
...c/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
+137
-0
TagTopSingleConsumptionMoneyFilter.java
.../gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
+136
-0
TagConstant.java
src/main/java/com/gic/spark/tag/TagConstant.java
+22
-3
TagFilterFactory.java
src/main/java/com/gic/spark/tag/TagFilterFactory.java
+85
-38
TagProcess.java
src/main/java/com/gic/spark/tag/TagProcess.java
+25
-13
TagValueParser.java
src/main/java/com/gic/spark/tag/TagValueParser.java
+0
-0
CommonUtil.java
src/main/java/com/gic/spark/util/CommonUtil.java
+17
-2
ConstantUtil.java
src/main/java/com/gic/spark/util/ConstantUtil.java
+1
-0
No files found.
src/main/java/com/gic/spark/entity/bean/TagConsumeRecordBean.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
bean
;
import
java.io.Serializable
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/3
*/
public
class
TagConsumeRecordBean
implements
Serializable
{
private
int
ent_id
;
//企业id
private
int
area_id
;
//会员卡域ID
private
int
order_channel_code
;
//订单来源渠道(线下1,达摩微商城2,微盟微商城3,线上天猫4)
private
int
cu_id
;
private
int
ecu_id
;
//用户id
private
String
store_info_id
;
//门店id
private
int
is_eff_order
;
//是否有效订单(0:否,1是)
private
int
is_mbr_order
;
//是否会员订单(0:否,1是)
private
String
order_no
;
private
String
oorder_no
;
private
long
goods_num
;
//订单商品数量
private
double
pay_amt
;
//应付金额
private
double
paid_amt
;
//实付金额
private
double
total_amt
;
//吊牌价总额
private
String
receipts_time
;
//单据日期
public
int
getEnt_id
()
{
return
ent_id
;
}
public
void
setEnt_id
(
int
ent_id
)
{
this
.
ent_id
=
ent_id
;
}
public
int
getArea_id
()
{
return
area_id
;
}
public
void
setArea_id
(
int
area_id
)
{
this
.
area_id
=
area_id
;
}
public
int
getOrder_channel_code
()
{
return
order_channel_code
;
}
public
void
setOrder_channel_code
(
int
order_channel_code
)
{
this
.
order_channel_code
=
order_channel_code
;
}
public
int
getCu_id
()
{
return
cu_id
;
}
public
void
setCu_id
(
int
cu_id
)
{
this
.
cu_id
=
cu_id
;
}
public
int
getEcu_id
()
{
return
ecu_id
;
}
public
void
setEcu_id
(
int
ecu_id
)
{
this
.
ecu_id
=
ecu_id
;
}
public
String
getStore_info_id
()
{
return
store_info_id
;
}
public
void
setStore_info_id
(
String
store_info_id
)
{
this
.
store_info_id
=
store_info_id
;
}
public
int
getIs_eff_order
()
{
return
is_eff_order
;
}
public
void
setIs_eff_order
(
int
is_eff_order
)
{
this
.
is_eff_order
=
is_eff_order
;
}
public
int
getIs_mbr_order
()
{
return
is_mbr_order
;
}
public
void
setIs_mbr_order
(
int
is_mbr_order
)
{
this
.
is_mbr_order
=
is_mbr_order
;
}
public
long
getGoods_num
()
{
return
goods_num
;
}
public
void
setGoods_num
(
long
goods_num
)
{
this
.
goods_num
=
goods_num
;
}
public
double
getPay_amt
()
{
return
pay_amt
;
}
public
void
setPay_amt
(
double
pay_amt
)
{
this
.
pay_amt
=
pay_amt
;
}
public
double
getPaid_amt
()
{
return
paid_amt
;
}
public
void
setPaid_amt
(
double
paid_amt
)
{
this
.
paid_amt
=
paid_amt
;
}
public
double
getTotal_amt
()
{
return
total_amt
;
}
public
void
setTotal_amt
(
double
total_amt
)
{
this
.
total_amt
=
total_amt
;
}
public
String
getReceipts_time
()
{
return
receipts_time
;
}
public
void
setReceipts_time
(
String
receipts_time
)
{
this
.
receipts_time
=
receipts_time
;
}
public
String
getOrder_no
()
{
return
order_no
;
}
public
void
setOrder_no
(
String
order_no
)
{
this
.
order_no
=
order_no
;
}
public
String
getOorder_no
()
{
return
oorder_no
;
}
public
void
setOorder_no
(
String
oorder_no
)
{
this
.
oorder_no
=
oorder_no
;
}
}
src/main/java/com/gic/spark/entity/enumtype/TimeRangeType.java
View file @
9001b712
...
...
@@ -8,6 +8,11 @@ package com.gic.spark.entity.enumtype;
public
enum
TimeRangeType
{
/**
* 当前 :年/月/日
*/
CURRENT
(
"current"
),
/**
* 之后
*/
LATER
(
"later"
),
...
...
src/main/java/com/gic/spark/entity/enumtype/YearMonthDayType.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
enumtype
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/5
*/
public
enum
YearMonthDayType
{
/**
* 日
*/
DAY
(
"day"
),
/**
* 月
*/
MONTH
(
"month"
),
/**
* 年
*/
YEAR
(
"year"
);
private
String
value
;
YearMonthDayType
(
String
value
)
{
this
.
value
=
value
;
}
public
String
getValue
()
{
return
value
;
}
public
void
setValue
(
String
value
)
{
this
.
value
=
value
;
}
}
src/main/java/com/gic/spark/entity/request/AbstractFilterRequest.java
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
com.gic.spark.entity.enumtype.DomainType
;
import
com.gic.spark.entity.enumtype.StatisticsType
;
import
java.io.Serializable
;
import
java.util.List
;
...
...
@@ -14,6 +15,10 @@ public abstract class AbstractFilterRequest implements Serializable{
private
static
final
long
serialVersionUID
=
4411295146033929579L
;
public
StatisticsType
statisticsType
;
//统计类型
private
List
<
String
>
statisticsValList
;
protected
DomainType
domainType
;
protected
List
<
String
>
cuVals
;
...
...
@@ -33,4 +38,20 @@ public abstract class AbstractFilterRequest implements Serializable{
public
void
setCuVals
(
List
<
String
>
cuVals
)
{
this
.
cuVals
=
cuVals
;
}
public
StatisticsType
getStatisticsType
()
{
return
statisticsType
;
}
public
void
setStatisticsType
(
StatisticsType
statisticsType
)
{
this
.
statisticsType
=
statisticsType
;
}
public
List
<
String
>
getStatisticsValList
()
{
return
statisticsValList
;
}
public
void
setStatisticsValList
(
List
<
String
>
statisticsValList
)
{
this
.
statisticsValList
=
statisticsValList
;
}
}
src/main/java/com/gic/spark/entity/request/AbstractFilterRequestDouble.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
com.gic.spark.entity.enumtype.NumberType
;
/**
* @description:
* @author: wangxk
* @date: 2020/5/6
*/
public
class
AbstractFilterRequestDouble
extends
AbstractFilterRequest
{
/**
* 最小数字
*/
private
Double
beginNum
;
/**
* 最大数字
*/
private
Double
endNum
;
/**
* 相等数字
*/
private
Double
equalNum
;
private
NumberType
numberType
;
public
Double
getBeginNum
()
{
return
beginNum
;
}
public
void
setBeginNum
(
Double
beginNum
)
{
this
.
beginNum
=
beginNum
;
}
public
Double
getEndNum
()
{
return
endNum
;
}
public
void
setEndNum
(
Double
endNum
)
{
this
.
endNum
=
endNum
;
}
public
Double
getEqualNum
()
{
return
equalNum
;
}
public
void
setEqualNum
(
Double
equalNum
)
{
this
.
equalNum
=
equalNum
;
}
public
NumberType
getNumberType
()
{
return
numberType
;
}
public
void
setNumberType
(
NumberType
numberType
)
{
this
.
numberType
=
numberType
;
}
}
src/main/java/com/gic/spark/entity/request/AbstractFilterRequestTime.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
com.gic.spark.entity.enumtype.TimeRangeType
;
import
com.gic.spark.entity.enumtype.YearMonthDayType
;
import
java.sql.Timestamp
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/5
*/
public
class
AbstractFilterRequestTime
extends
AbstractFilterRequest
{
/**
* 开始时间
*/
private
Timestamp
beginTime
;
/**
* 结束时间
*/
private
Timestamp
endTime
;
/**
* 最近 年/月/日 时间
*/
private
int
timeNum
;
private
TimeRangeType
timeRangeType
;
private
YearMonthDayType
yearMonthDayType
;
public
Timestamp
getBeginTime
()
{
return
beginTime
;
}
public
void
setBeginTime
(
Timestamp
beginTime
)
{
this
.
beginTime
=
beginTime
;
}
public
Timestamp
getEndTime
()
{
return
endTime
;
}
public
void
setEndTime
(
Timestamp
endTime
)
{
this
.
endTime
=
endTime
;
}
public
int
getTimeNum
()
{
return
timeNum
;
}
public
void
setTimeNum
(
int
timeNum
)
{
this
.
timeNum
=
timeNum
;
}
public
TimeRangeType
getTimeRangeType
()
{
return
timeRangeType
;
}
public
void
setTimeRangeType
(
TimeRangeType
timeRangeType
)
{
this
.
timeRangeType
=
timeRangeType
;
}
public
YearMonthDayType
getYearMonthDayType
()
{
return
yearMonthDayType
;
}
public
void
setYearMonthDayType
(
YearMonthDayType
yearMonthDayType
)
{
this
.
yearMonthDayType
=
yearMonthDayType
;
}
}
src/main/java/com/gic/spark/entity/request/TagChannelRequest.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/10
*/
public
class
TagChannelRequest
extends
AbstractFilterRequest
{
private
List
<
String
>
ChannelVals
;
public
List
<
String
>
getChannelVals
()
{
return
ChannelVals
;
}
public
void
setChannelVals
(
List
<
String
>
channelVals
)
{
ChannelVals
=
channelVals
;
}
}
src/main/java/com/gic/spark/entity/request/TagConsumeAmountRequest.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
com.gic.spark.entity.enumtype.StatisticsType
;
import
com.gic.spark.entity.enumtype.TimeRangeType
;
import
com.gic.spark.entity.enumtype.YearMonthDayType
;
import
java.util.Date
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/5
*/
public
class
TagConsumeAmountRequest
extends
AbstractFilterRequestDouble
{
/**
* 开始时间
*/
private
Date
beginTime
;
/**
* 结束时间
*/
private
Date
endTime
;
/**
* 最近 年/月/日 时间
*/
private
int
timeNum
;
private
TimeRangeType
timeRangeType
;
private
YearMonthDayType
yearMonthDayType
;
public
Date
getBeginTime
()
{
return
beginTime
;
}
public
void
setBeginTime
(
Date
beginTime
)
{
this
.
beginTime
=
beginTime
;
}
public
Date
getEndTime
()
{
return
endTime
;
}
public
void
setEndTime
(
Date
endTime
)
{
this
.
endTime
=
endTime
;
}
public
int
getTimeNum
()
{
return
timeNum
;
}
public
void
setTimeNum
(
int
timeNum
)
{
this
.
timeNum
=
timeNum
;
}
public
TimeRangeType
getTimeRangeType
()
{
return
timeRangeType
;
}
public
void
setTimeRangeType
(
TimeRangeType
timeRangeType
)
{
this
.
timeRangeType
=
timeRangeType
;
}
public
YearMonthDayType
getYearMonthDayType
()
{
return
yearMonthDayType
;
}
public
void
setYearMonthDayType
(
YearMonthDayType
yearMonthDayType
)
{
this
.
yearMonthDayType
=
yearMonthDayType
;
}
}
src/main/java/com/gic/spark/entity/request/TagConsumeRequest.java
View file @
9001b712
...
...
@@ -12,23 +12,4 @@ import java.util.List;
*/
public
class
TagConsumeRequest
extends
AbstractFilterRequestInteger
{
public
StatisticsType
statisticsType
;
//统计类型
private
List
<
String
>
statisticsValList
;
public
StatisticsType
getStatisticsType
()
{
return
statisticsType
;
}
public
void
setStatisticsType
(
StatisticsType
statisticsType
)
{
this
.
statisticsType
=
statisticsType
;
}
public
List
<
String
>
getStatisticsValList
()
{
return
statisticsValList
;
}
public
void
setStatisticsValList
(
List
<
String
>
statisticsValList
)
{
this
.
statisticsValList
=
statisticsValList
;
}
}
src/main/java/com/gic/spark/entity/request/TagConsumeTimeRequest.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
request
;
import
com.gic.spark.entity.enumtype.StatisticsType
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/7
*/
public
class
TagConsumeTimeRequest
extends
AbstractFilterRequestTime
{
}
src/main/java/com/gic/spark/entity/table/TabDataActuallyPaidConfig.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
entity
.
table
;
public
class
TabDataActuallyPaidConfig
{
private
Integer
actually_Paid_Config_Id
;
private
Integer
enterprise_Id
;
private
Integer
classify
;
private
Integer
config_Status
;
private
Integer
status
;
private
java
.
sql
.
Timestamp
create_Time
;
private
java
.
sql
.
Timestamp
update_Time
;
public
Integer
getActually_Paid_Config_Id
()
{
return
actually_Paid_Config_Id
;
}
public
void
setActually_Paid_Config_Id
(
Integer
actually_Paid_Config_Id
)
{
this
.
actually_Paid_Config_Id
=
actually_Paid_Config_Id
;
}
public
Integer
getEnterprise_Id
()
{
return
enterprise_Id
;
}
public
void
setEnterprise_Id
(
Integer
enterprise_Id
)
{
this
.
enterprise_Id
=
enterprise_Id
;
}
public
Integer
getClassify
()
{
return
classify
;
}
public
void
setClassify
(
Integer
classify
)
{
this
.
classify
=
classify
;
}
public
Integer
getConfig_Status
()
{
return
config_Status
;
}
public
void
setConfig_Status
(
Integer
config_Status
)
{
this
.
config_Status
=
config_Status
;
}
public
Integer
getStatus
()
{
return
status
;
}
public
void
setStatus
(
Integer
status
)
{
this
.
status
=
status
;
}
public
java
.
sql
.
Timestamp
getCreate_Time
()
{
return
create_Time
;
}
public
void
setCreate_Time
(
java
.
sql
.
Timestamp
create_Time
)
{
this
.
create_Time
=
create_Time
;
}
public
java
.
sql
.
Timestamp
getUpdate_Time
()
{
return
update_Time
;
}
public
void
setUpdate_Time
(
java
.
sql
.
Timestamp
update_Time
)
{
this
.
update_Time
=
update_Time
;
}
}
src/main/java/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequestTime
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
/**
* @description:
* 消费记录
* @author: wangxk
* @date: 2020/8/3
*/
public
abstract
class
AbstractTagConsumRecordFilter
implements
TagFilter
{
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
protected
static
JavaRDD
<
TagConsumeRecordBean
>
statisticsTypeHandle
(
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
,
AbstractFilterRequest
request
){
consumeRecordRDD
=
consumeRecordRDD
.
mapPartitions
(
data
->{
List
<
TagConsumeRecordBean
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
TagConsumeRecordBean
consumeRecordBean
=
data
.
next
();
switch
(
request
.
getStatisticsType
()){
case
COMMODITYBRAND:
if
(
request
.
getStatisticsValList
().
contains
(
consumeRecordBean
.
getEnt_id
())){
result
.
add
(
consumeRecordBean
);
}
break
;
case
CHANNEL:
if
(
request
.
getStatisticsValList
().
contains
(
consumeRecordBean
.
getOrder_channel_code
())){
result
.
add
(
consumeRecordBean
);
}
break
;
case
MCUINFO:
if
(
request
.
getStatisticsValList
().
contains
(
consumeRecordBean
.
getArea_id
())){
result
.
add
(
consumeRecordBean
);
}
break
;
default
:
break
;
}
}
return
result
.
iterator
();
});
return
consumeRecordRDD
;
}
protected
static
boolean
checkTime
(
AbstractFilterRequestTime
request
,
long
consumeTime
){
Boolean
result
=
false
;
switch
(
request
.
getTimeRangeType
()){
case
FIXATION:
if
(
consumeTime
>=
request
.
getBeginTime
().
getTime
()
&&
consumeTime
<=
request
.
getEndTime
().
getTime
())
result
=
true
;
break
;
case
LATELY:
switch
(
request
.
getYearMonthDayType
()){
case
DAY:
if
(
consumeTime
>
DateUtil
.
addNumForDay
(
new
Date
(),
request
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
case
MONTH:
if
(
consumeTime
>
DateUtil
.
addNumForMonth
(
new
Date
(),
request
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
case
YEAR:
if
(
consumeTime
>
DateUtil
.
addNumForYear
(
new
Date
(),
request
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
default
:
break
;
}
break
;
default
:
break
;
}
return
result
;
}
}
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.sql.Column
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/10
*/
public
class
TagConsumeTimeFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagConsumeTimeFilter
instance
;
public
static
TagConsumeTimeFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagConsumeTimeFilter
();
}
return
instance
;
}
private
TagConsumeTimeFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
)))
.
filter
(
data
->
checkTime
(
consumeTimeRequest
,
data
.
_2
().
getTime
()))
.
reduceByKey
((
x
,
y
)->
x
)
.
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/3
*/
public
class
TagConsumeTotalFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagConsumeTotalFilter
instance
;
public
static
TagConsumeTotalFilter
getInstance
(){
if
(
null
==
instance
){
instance
=
new
TagConsumeTotalFilter
();
}
return
instance
;
}
private
TagConsumeTotalFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->{
boolean
result
=
false
;
if
(
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
())){
Date
receiptsTime
=
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATE_10
);
switch
(
consumeAmountRequest
.
getTimeRangeType
()){
case
FIXATION:
if
(
receiptsTime
.
getTime
()>=
consumeAmountRequest
.
getBeginTime
().
getTime
()
&&
receiptsTime
.
getTime
()<=
consumeAmountRequest
.
getEndTime
().
getTime
())
result
=
true
;
break
;
case
LATELY:
if
(
receiptsTime
.
getTime
()>
DateUtil
.
addNumForDay
(
new
Date
(),
consumeAmountRequest
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
default
:
break
;
}
}
return
result
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
configStatus
==
1
?
data
.
getPaid_amt
():
data
.
getPay_amt
()))
.
reduceByKey
((
x
,
y
)->
x
+
y
)
.
filter
(
data
->{
boolean
result
=
false
;
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()
&&
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
lt:
if
(
data
.
_2
()<
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
gt:
if
(
data
.
_2
()>
consumeAmountRequest
.
getBeginNum
()){
result
=
true
;
}
break
;
case
eq:
if
(
data
.
_2
()==
consumeAmountRequest
.
getEqualNum
()){
result
=
true
;
}
break
;
case
lte:
if
(
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
gte:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()){
result
=
true
;
}
break
;
default
:
break
;
}
return
result
;
}).
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.sql.Column
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/7
*/
public
class
TagFirstConsumeTimeFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagFirstConsumeTimeFilter
instance
;
public
static
TagFirstConsumeTimeFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagFirstConsumeTimeFilter
();
}
return
instance
;
}
private
TagFirstConsumeTimeFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
reduceByKey
((
x
,
y
)->
x
<
y
?
x:
y
)
.
filter
(
data
->
checkTime
(
consumeTimeRequest
,
data
.
_2
()))
.
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagFirstConsumptionChannelFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagChannelRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/10
*/
public
class
TagFirstConsumptionChannelFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagFirstConsumptionChannelFilter
instance
;
public
static
TagFirstConsumptionChannelFilter
getInstance
()
{
return
instance
;
}
private
TagFirstConsumptionChannelFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagChannelRequest
channelRequest
=(
TagChannelRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
reduceByKey
((
x
,
y
)->{
if
(
DateUtil
.
strToDate
(
x
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
<
DateUtil
.
strToDate
(
y
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()){
return
x
;
}
else
{
return
y
;
}
}).
filter
(
data
->{
if
(
channelRequest
.
getChannelVals
().
contains
(
data
.
_2
().
getOrder_channel_code
())){
return
true
;
}
return
false
;
}).
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.*
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/6
*/
public
class
TagFirstConsumptionMoneyFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagFirstConsumptionMoneyFilter
instance
;
public
static
TagFirstConsumptionMoneyFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagFirstConsumptionMoneyFilter
();
}
return
instance
;
}
private
TagFirstConsumptionMoneyFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
TagConsumeRecordBean
>>
tp2
=
data
.
next
();
double
firstConsumAmount
=
0
;
TagConsumeRecordBean
firstConsumeAmountBean
=
null
;
Map
<
String
,
TagConsumeRecordBean
>
effectiveOrderMap
=
new
HashMap
();
List
<
TagConsumeRecordBean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
}
else
{
noEffectiveOrderList
.
add
(
element
);
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
TagConsumeRecordBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
TagConsumeRecordBean
amountBean:
effectiveOrderMap
.
values
()){
if
(
null
==
firstConsumeAmountBean
){
firstConsumeAmountBean
=
amountBean
;
}
else
{
if
(
DateUtil
.
stringToDate
(
amountBean
.
getReceipts_time
()).
getTime
()
<
DateUtil
.
stringToDate
(
firstConsumeAmountBean
.
getReceipts_time
()).
getTime
()){
firstConsumeAmountBean
=
amountBean
;
}
}
}
firstConsumAmount
=
null
!=
firstConsumeAmountBean
?
(
configStatus
==
1
?
firstConsumeAmountBean
.
getPaid_amt
():
firstConsumeAmountBean
.
getPay_amt
())
:
firstConsumAmount
;
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
&&
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lt:
if
(
firstConsumAmount
<
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gt:
if
(
firstConsumAmount
>
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
eq:
if
(
firstConsumAmount
==
consumeAmountRequest
.
getEqualNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lte:
if
(
firstConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gte:
if
(
firstConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
default
:
break
;
}
}
return
result
.
iterator
();
});
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/6
*/
public
class
TagHistoryConsumeTotalFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagHistoryConsumeTotalFilter
instance
;
public
static
TagHistoryConsumeTotalFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagHistoryConsumeTotalFilter
();
}
return
instance
;
}
private
TagHistoryConsumeTotalFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
configStatus
==
1
?
data
.
getPaid_amt
():
data
.
getPay_amt
()))
.
reduceByKey
((
x
,
y
)->
x
+
y
)
.
filter
(
data
->{
boolean
result
=
false
;
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()
&&
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
lt:
if
(
data
.
_2
()<
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
gt:
if
(
data
.
_2
()>
consumeAmountRequest
.
getBeginNum
()){
result
=
true
;
}
break
;
case
eq:
if
(
data
.
_2
()==
consumeAmountRequest
.
getEqualNum
()){
result
=
true
;
}
break
;
case
lte:
if
(
data
.
_2
()<=
consumeAmountRequest
.
getEndNum
()){
result
=
true
;
}
break
;
case
gte:
if
(
data
.
_2
()>=
consumeAmountRequest
.
getBeginNum
()){
result
=
true
;
}
break
;
default
:
break
;
}
return
result
;
}).
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.sql.Column
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/10
*/
public
class
TagLatelyConsumeTimeFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagLatelyConsumeTimeFilter
instance
;
public
static
TagLatelyConsumeTimeFilter
getInstance
(){
if
(
null
==
instance
){
instance
=
new
TagLatelyConsumeTimeFilter
();
}
return
instance
;
}
private
TagLatelyConsumeTimeFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
reduceByKey
((
x
,
y
)->
x
>
y
?
x:
y
)
.
filter
(
data
->
checkTime
(
consumeTimeRequest
,
data
.
_2
()))
.
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagChannelRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/10
*/
public
class
TagLatelyConsumptionChannelFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagLatelyConsumptionChannelFilter
instance
;
public
static
TagLatelyConsumptionChannelFilter
getInstance
(){
if
(
null
==
instance
){
instance
=
new
TagLatelyConsumptionChannelFilter
();
}
return
instance
;
}
private
TagLatelyConsumptionChannelFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagChannelRequest
channelRequest
=(
TagChannelRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
reduceByKey
((
x
,
y
)->{
if
(
DateUtil
.
strToDate
(
x
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
>
DateUtil
.
strToDate
(
y
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()){
return
x
;
}
else
{
return
y
;
}
}).
filter
(
data
->{
if
(
channelRequest
.
getChannelVals
().
contains
(
data
.
_2
().
getOrder_channel_code
())){
return
true
;
}
return
false
;
}).
map
(
data
->
data
.
_1
());
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/6
*/
public
class
TagLatelyConsumptionMoneyFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagLatelyConsumptionMoneyFilter
instance
;
public
static
TagLatelyConsumptionMoneyFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagLatelyConsumptionMoneyFilter
();
}
return
instance
;
}
private
TagLatelyConsumptionMoneyFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
TagConsumeRecordBean
>>
tp2
=
data
.
next
();
double
latelyConsumAmount
=
0
;
TagConsumeRecordBean
latelyConsumeAmountBean
=
null
;
Map
<
String
,
TagConsumeRecordBean
>
effectiveOrderMap
=
new
HashMap
();
List
<
TagConsumeRecordBean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
}
else
{
noEffectiveOrderList
.
add
(
element
);
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
TagConsumeRecordBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
TagConsumeRecordBean
amountBean:
effectiveOrderMap
.
values
()){
if
(
null
==
latelyConsumeAmountBean
){
latelyConsumeAmountBean
=
amountBean
;
}
else
{
if
(
DateUtil
.
stringToDate
(
amountBean
.
getReceipts_time
()).
getTime
()
>
DateUtil
.
stringToDate
(
latelyConsumeAmountBean
.
getReceipts_time
()).
getTime
()){
latelyConsumeAmountBean
=
amountBean
;
}
}
}
latelyConsumAmount
=
null
!=
latelyConsumeAmountBean
?
(
configStatus
==
1
?
latelyConsumeAmountBean
.
getPaid_amt
():
latelyConsumeAmountBean
.
getPay_amt
())
:
latelyConsumAmount
;
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
latelyConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
&&
latelyConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lt:
if
(
latelyConsumAmount
<
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gt:
if
(
latelyConsumAmount
>
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
eq:
if
(
latelyConsumAmount
==
consumeAmountRequest
.
getEqualNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lte:
if
(
latelyConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gte:
if
(
latelyConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
default
:
break
;
}
}
return
result
.
iterator
();
});
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.*
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/6
*/
public
class
TagLowestSingleConsumptionMoneyFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagLowestSingleConsumptionMoneyFilter
instance
;
public
static
TagLowestSingleConsumptionMoneyFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagLowestSingleConsumptionMoneyFilter
();
}
return
instance
;
}
private
TagLowestSingleConsumptionMoneyFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->{
boolean
result
=
false
;
if
(
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
())){
Date
receiptsTime
=
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATE_10
);
switch
(
consumeAmountRequest
.
getTimeRangeType
()){
case
FIXATION:
if
(
receiptsTime
.
getTime
()>=
consumeAmountRequest
.
getBeginTime
().
getTime
()
&&
receiptsTime
.
getTime
()<=
consumeAmountRequest
.
getEndTime
().
getTime
())
result
=
true
;
break
;
case
LATELY:
if
(
receiptsTime
.
getTime
()>
DateUtil
.
addNumForDay
(
new
Date
(),
consumeAmountRequest
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
default
:
break
;
}
}
return
result
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
TagConsumeRecordBean
>>
tp2
=
data
.
next
();
double
topConsumAmount
=
0
;
TagConsumeRecordBean
topConsumeAmountBean
=
null
;
Map
<
String
,
TagConsumeRecordBean
>
effectiveOrderMap
=
new
HashMap
();
List
<
TagConsumeRecordBean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
}
else
{
noEffectiveOrderList
.
add
(
element
);
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
TagConsumeRecordBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
TagConsumeRecordBean
amountBean:
effectiveOrderMap
.
values
()){
topConsumAmount
=
configStatus
==
1
?
(
topConsumeAmountBean
.
getPaid_amt
()<
topConsumAmount
?
topConsumeAmountBean
.
getPaid_amt
():
topConsumAmount
):
(
topConsumeAmountBean
.
getPay_amt
()<
topConsumAmount
?
topConsumeAmountBean
.
getPay_amt
():
topConsumAmount
);
}
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
topConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
&&
topConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lt:
if
(
topConsumAmount
<
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gt:
if
(
topConsumAmount
>
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
eq:
if
(
topConsumAmount
==
consumeAmountRequest
.
getEqualNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lte:
if
(
topConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gte:
if
(
topConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
default
:
break
;
}
}
return
result
.
iterator
();
});
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
0 → 100644
View file @
9001b712
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.*
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/6
*/
public
class
TagTopSingleConsumptionMoneyFilter
extends
AbstractTagConsumRecordFilter
{
private
static
TagTopSingleConsumptionMoneyFilter
instance
;
public
static
TagTopSingleConsumptionMoneyFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagTopSingleConsumptionMoneyFilter
();
}
return
instance
;
}
private
TagTopSingleConsumptionMoneyFilter
(){}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
TagConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecordBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->{
boolean
result
=
false
;
if
(
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
())){
Date
receiptsTime
=
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATE_10
);
switch
(
consumeAmountRequest
.
getTimeRangeType
()){
case
FIXATION:
if
(
receiptsTime
.
getTime
()>=
consumeAmountRequest
.
getBeginTime
().
getTime
()
&&
receiptsTime
.
getTime
()<=
consumeAmountRequest
.
getEndTime
().
getTime
())
result
=
true
;
break
;
case
LATELY:
if
(
receiptsTime
.
getTime
()>
DateUtil
.
addNumForDay
(
new
Date
(),
consumeAmountRequest
.
getTimeNum
()).
getTime
()){
result
=
true
;
}
break
;
default
:
break
;
}
}
return
result
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
TagConsumeRecordBean
>>
tp2
=
data
.
next
();
double
topConsumAmount
=
0
;
TagConsumeRecordBean
topConsumeAmountBean
=
null
;
Map
<
String
,
TagConsumeRecordBean
>
effectiveOrderMap
=
new
HashMap
();
List
<
TagConsumeRecordBean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
}
else
{
noEffectiveOrderList
.
add
(
element
);
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
TagConsumeRecordBean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
TagConsumeRecordBean
amountBean:
effectiveOrderMap
.
values
()){
topConsumAmount
=
configStatus
==
1
?
(
topConsumeAmountBean
.
getPaid_amt
()>
topConsumAmount
?
topConsumeAmountBean
.
getPaid_amt
():
topConsumAmount
):
(
topConsumeAmountBean
.
getPay_amt
()>
topConsumAmount
?
topConsumeAmountBean
.
getPay_amt
():
topConsumAmount
);
}
switch
(
consumeAmountRequest
.
getNumberType
()){
case
between:
if
(
topConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()
&&
topConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lt:
if
(
topConsumAmount
<
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gt:
if
(
topConsumAmount
>
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
eq:
if
(
topConsumAmount
==
consumeAmountRequest
.
getEqualNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
lte:
if
(
topConsumAmount
<=
consumeAmountRequest
.
getEndNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
case
gte:
if
(
topConsumAmount
>=
consumeAmountRequest
.
getBeginNum
()){
result
.
add
(
tp2
.
_1
());
}
break
;
default
:
break
;
}
}
return
result
.
iterator
();
});
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/tag/TagConstant.java
View file @
9001b712
...
...
@@ -56,8 +56,27 @@ public class TagConstant {
public static final String TAG_TIME_TYPE_ALL_TIME = "alltime";//目前为止
*/
// 卡券
//消费渠道
public
static
final
String
TAG_CODE_FIRST_CONSUMPTION_CHANNEL
=
"firstConsumptionChannel"
;
//首次消费渠道
public
static
final
String
TAG_CODE_LATELY_CONSUMPTION_CHANNEL
=
"latelyConsumptionChannel"
;
//最近消费渠道
//消费时间
public
static
final
String
TAG_CODE_FIRST_CONSUME_TIME
=
"firstConsumeTime"
;
//首次消费时间
// public static final String TAG_CODE_HISTORY_CONSUME_TIME="historyConsumeTime";//历史消费时
public
static
final
String
TAG_CODE_LATELY_CONSUME_TIME
=
"latelyConsumeTime"
;
//最近消费时间
public
static
final
String
TAG_CODE_CONSUME_TIME
=
"consumeTime"
;
//消费时间
public
static
final
String
TAG_CODE_CONSUME_SLEEP_DAY
=
"consumeSleepDay"
;
//消费休眠天数
//消费记录
public
static
final
String
TAG_CODE_CONSUME_TOTAL
=
"consumeTotal"
;
//消费总额
public
static
final
String
TAG_CODE_HISTORY_CONSUME_TOTAL
=
"historyConsumeTotal"
;
//历史消费总额
public
static
final
String
TAG_CODE_FIRST_CONSUMPTION_MONEY
=
"firstConsumptionMoney"
;
//首次消费金额
public
static
final
String
TAG_CODE_LATELY_CONSUMPTION_MONEY
=
"latelyConsumptionMoney"
;
//最近消费金额
public
static
final
String
TAG_CODE_TOP_SINGLE_CONSUMPTION_MONEY
=
"topSingleConsumptionMoney"
;
//最高单笔消费金额
public
static
final
String
TAG_CODE_LOWEST_SINGLE_CONSUMPTION_MONEY
=
"lowestSingleConsumptionMoney"
;
//最低单笔消费金
// 卡券
public
static
final
String
TAG_CODE_CURRENT_COUPON_NUM
=
"currentCouponNum"
;
//当前卡券数
public
static
final
String
TAG_CODE_UNCLAIMED_COUPON
=
"unclaimedCoupon"
;
//待领取卡券
public
static
final
String
TAG_CODE_GET_NO_CANCEL_COUPON
=
"getNoCancelCoupon"
;
//已领取未核销卡券
...
...
@@ -85,7 +104,7 @@ public class TagConstant {
public
static
final
String
TAG_DOMAIN_MCU_INFO
=
"mcuInfo"
;
public
static
final
String
TAG_STATISTICS_CHANNEL
=
"channel"
;
public
static
final
String
TAG_STATISTICS_
BRAND
=
"b
rand"
;
public
static
final
String
TAG_STATISTICS_
COMMODITY_BRAND
=
"commodityB
rand"
;
public
static
final
String
TAG_STATISTICS_MCU_INFO
=
"mcuInfo"
;
public
static
final
String
TAG_KEY_TYPE_TIME_RANGE
=
"timeRange"
;
...
...
src/main/java/com/gic/spark/tag/TagFilterFactory.java
View file @
9001b712
...
...
@@ -24,54 +24,101 @@ public class TagFilterFactory {
TagFilter
tagFilter
=
null
;
switch
(
tagEsFieldName
)
{
//积分信息
//
case TagConstant.TAG_CODE_ACCUMULATED_INTEGRAL:
//
tagFilter= TagAccumulatedIntegralFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_ABOUT_EXPIRE_INTEGRAL:
//
tagFilter= TagAboutExpireIntegralFilter.getInstance();
//
break;
case
TagConstant
.
TAG_CODE_ACCUMULATED_INTEGRAL
:
tagFilter
=
TagAccumulatedIntegralFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_ABOUT_EXPIRE_INTEGRAL
:
tagFilter
=
TagAboutExpireIntegralFilter
.
getInstance
();
break
;
//卡卷类
case
TagConstant
.
TAG_CODE_CURRENT_COUPON_NUM
:
tagFilter
=
TagCurrentCouponNumFilter
.
getInstance
();
break
;
//
case TagConstant.TAG_CODE_UNCLAIMED_COUPON:
//
tagFilter= TagCouponFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_GET_NO_CANCEL_COUPON:
//
tagFilter= TagCouponFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_NO_CANCEL_EXPIRES_COUPON:
//
tagFilter= TagCouponFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_CANCEL_COUPON:
//
tagFilter=TagCouponFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_DONATION_IN_COUPON:
//
tagFilter=TagCouponFilter.getInstance();
//
break;
//
case TagConstant.TAG_CODE_YET_DONATION_COUPON:
//
tagFilter=TagCouponFilter.getInstance();
//
break;
case
TagConstant
.
TAG_CODE_UNCLAIMED_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_GET_NO_CANCEL_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_NO_CANCEL_EXPIRES_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_CANCEL_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_DONATION_IN_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_YET_DONATION_COUPON
:
tagFilter
=
TagCouponFilter
.
getInstance
();
break
;
//消费统计
// case TagConstant.TAG_CODE_CONSUMPTION_TIME:
// tagFilter=TagConsumptionTimeFilter.getInstance();
// break;
// case TagConstant.TAG_CODE_PER_CUSTOMER_TRANSACTION:
// tagFilter=TagPerCustomerTransactionFilter.getInstance();
// break;
// case TagConstant.TAG_CODE_ASSOCIATED_PURCHASE_RATE:
// tagFilter=TagAssociatedPurchaseRateFilter.getInstance();
// break;
// case TagConstant.TAG_CODE_AVERAGE_DISCOUNT_FACTOR:
// tagFilter=TagAverageDiscountFactorFilter.getInstance();
// break;
// case TagConstant.TAG_CODE_CONSUMPTION_SLEEP_DAYS:
// tagFilter=TagConsumptionSleepDaysFilter.getInstance();
case
TagConstant
.
TAG_CODE_CONSUMPTION_TIME
:
tagFilter
=
TagConsumptionTimeFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_PER_CUSTOMER_TRANSACTION
:
tagFilter
=
TagPerCustomerTransactionFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_ASSOCIATED_PURCHASE_RATE
:
tagFilter
=
TagAssociatedPurchaseRateFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_AVERAGE_DISCOUNT_FACTOR
:
tagFilter
=
TagAverageDiscountFactorFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_CONSUMPTION_SLEEP_DAYS
:
tagFilter
=
TagConsumptionSleepDaysFilter
.
getInstance
();
break
;
//消费金额记录
case
TagConstant
.
TAG_CODE_CONSUME_TOTAL
:
tagFilter
=
TagConsumeTotalFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_HISTORY_CONSUME_TOTAL
:
tagFilter
=
TagHistoryConsumeTotalFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_FIRST_CONSUMPTION_MONEY
:
tagFilter
=
TagFirstConsumptionMoneyFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_LATELY_CONSUMPTION_MONEY
:
tagFilter
=
TagLatelyConsumptionMoneyFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_TOP_SINGLE_CONSUMPTION_MONEY
:
tagFilter
=
TagTopSingleConsumptionMoneyFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_LOWEST_SINGLE_CONSUMPTION_MONEY
:
tagFilter
=
TagLowestSingleConsumptionMoneyFilter
.
getInstance
();
break
;
//消费时间
case
TagConstant
.
TAG_CODE_FIRST_CONSUME_TIME
:
tagFilter
=
TagFirstConsumeTimeFilter
.
getInstance
();
break
;
// case TagConstant.TAG_CODE_HISTORY_CONSUME_TIME:
// tagFilter=
// break;
case
TagConstant
.
TAG_CODE_LATELY_CONSUME_TIME
:
tagFilter
=
TagLatelyConsumeTimeFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_CONSUME_TIME
:
tagFilter
=
TagConsumeTimeFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_CONSUME_SLEEP_DAY
:
// tagFilter=
break
;
//渠道
case
TagConstant
.
TAG_CODE_FIRST_CONSUMPTION_CHANNEL
:
tagFilter
=
TagFirstConsumptionChannelFilter
.
getInstance
();
break
;
case
TagConstant
.
TAG_CODE_LATELY_CONSUMPTION_CHANNEL
:
tagFilter
=
TagLatelyConsumptionChannelFilter
.
getInstance
();
break
;
default
:
DingtalkMessageUtil
.
sendAlertMessage
(
"暂未支持的非实时标签: "
+
tagEsFieldName
);
break
;
...
...
src/main/java/com/gic/spark/tag/TagProcess.java
View file @
9001b712
...
...
@@ -13,12 +13,10 @@ import com.gic.spark.entity.TagConditionDTO;
import
com.gic.spark.entity.TagConditionGroupDTO
;
import
com.gic.spark.entity.TagGroupInfo
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.table.TabDataActuallyPaidConfig
;
import
com.gic.spark.entity.table.TabSceneCrowd
;
import
com.gic.spark.filter.*
;
import
com.gic.spark.util.AppEnvUtil
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DingtalkMessageUtil
;
import
com.gic.spark.util.EsRequestUtil
;
import
com.gic.spark.util.*
;
import
com.google.common.base.Joiner
;
import
com.google.common.collect.Lists
;
import
org.apache.commons.lang.StringUtils
;
...
...
@@ -51,8 +49,9 @@ import static com.gic.spark.datasource.mysql.MysqlDatasource.JDBC_OPTIONS;
public
class
TagProcess
{
private
List
<
SceneCrowdDTO
>
sceneCrowdDTOList
=
new
ArrayList
();
private
MysqlRddManager
mysqlRddManager
;
private
DataSourceSharding
memberDatasource
;
private
MysqlRddManager
member4RddManager
;
private
MysqlRddManager
enterprise4RddManager
;
private
DataSourceSharding
memberSharding4Datasource
;
private
boolean
isProduction
;
private
static
TagProcess
instance
;
public
static
TagProcess
getInstance
(){
...
...
@@ -70,27 +69,40 @@ public class TagProcess {
public
void
init
(
boolean
isProd
){
this
.
isProduction
=
isProd
;
MysqlDatasource
member4Datasource
=
new
MysqlDatasource
();
MysqlDatasource
enterprise4Datasource
=
new
MysqlDatasource
();
if
(
isProduction
){
member4Datasource
.
setJdbcUrl
(
"jdbc:mysql://10.0.0.6:3306/gic-member4.0"
+
JDBC_OPTIONS
);
member4Datasource
.
setUser
(
"cdb_outerroot"
);
member4Datasource
.
setPassword
(
"@09ui%sbc09"
);
enterprise4Datasource
.
setJdbcUrl
(
"jdbc:mysql://10.0.0.6:3306/gic-member4.0"
+
JDBC_OPTIONS
);
enterprise4Datasource
.
setUser
(
"cdb_outerroot"
);
enterprise4Datasource
.
setPassword
(
"@09ui%sbc09"
);
}
else
{
member4Datasource
.
setJdbcUrl
(
"jdbc:mysql://10.0.0.6:3306/gic-member4.0"
+
JDBC_OPTIONS
);
member4Datasource
.
setUser
(
"cdb_outerroot"
);
member4Datasource
.
setPassword
(
"@09ui%sbc09"
);
enterprise4Datasource
.
setJdbcUrl
(
"jdbc:mysql://10.0.0.6:3306/gic-enterprise4.0"
+
JDBC_OPTIONS
);
enterprise4Datasource
.
setUser
(
"cdb_outerroot"
);
enterprise4Datasource
.
setPassword
(
"@09ui%sbc09"
);
}
mysqlRddManager
=
member4Datasource
.
buildRddManager
();
memberDatasource
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_MEMBER_USER
);
member4RddManager
=
member4Datasource
.
buildRddManager
();
enterprise4RddManager
=
enterprise4Datasource
.
buildRddManager
();
memberSharding4Datasource
=
new
DataSourceSharding
(
AppEnvUtil
.
MEMBER_SHARDING_4
,
ConstantUtil
.
TAB_MEMBER_USER
);
Dataset
<
TabSceneCrowd
>
sceneCrowdDataset
=
mysql
RddManager
.
getPojo
(
"tab_scene_crowd"
,
TabSceneCrowd
.
class
,
null
)
List
<
TabSceneCrowd
>
sceneCrowdList
=
member4
RddManager
.
getPojo
(
"tab_scene_crowd"
,
TabSceneCrowd
.
class
,
null
)
.
filter
(
new
Column
(
"valid_flag"
).
equalTo
(
1
))
.
filter
(
new
Column
(
"real_time"
).
equalTo
(
2
))
;
List
<
TabSceneCrowd
>
sceneCrowdList
=
sceneCrowdDataset
.
collectAsList
();
.
filter
(
new
Column
(
"real_time"
).
equalTo
(
2
))
.
collectAsList
();
for
(
TabSceneCrowd
sceneCrowd:
sceneCrowdList
){
LinkedList
<
TagConditionGroupDTO
>
conditionGroupDTOList
=
JSONObject
.
parseObject
(
sceneCrowd
.
getTag_Condition_Group_Info
(),
new
TypeReference
<
LinkedList
<
TagConditionGroupDTO
>>(){});
sceneCrowdDTOList
.
add
(
new
SceneCrowdDTO
(
sceneCrowd
,
conditionGroupDTOList
));
}
List
<
TabDataActuallyPaidConfig
>
dataActuallyPaidConfigList
=
enterprise4RddManager
.
getPojo
(
"tab_data_actually_paid_config"
,
TabDataActuallyPaidConfig
.
class
,
null
)
.
filter
(
new
Column
(
"status"
).
equalTo
(
1
))
.
collectAsList
();
dataActuallyPaidConfigList
.
forEach
(
data
->
CommonUtil
.
dataActuallyPaidConfigMap
.
put
(
data
.
getEnterprise_Id
(),
data
));
}
public
void
setEnterpriseId
(
List
<
Integer
>
enterpriseIdList
){
...
...
@@ -169,7 +181,7 @@ public class TagProcess {
}
}
}
DataSourceManager
.
getInstance
().
addSourceEntity
(
memberDatasource
,
enterpriseTagEntry
.
getKey
().
intValue
());
DataSourceManager
.
getInstance
().
addSourceEntity
(
member
Sharding4
Datasource
,
enterpriseTagEntry
.
getKey
().
intValue
());
}
/* if (extractData) {
...
...
src/main/java/com/gic/spark/tag/TagValueParser.java
View file @
9001b712
This diff is collapsed.
Click to expand it.
src/main/java/com/gic/spark/util/CommonUtil.java
View file @
9001b712
package
com
.
gic
.
spark
.
util
;
import
com.gic.spark.entity.table.TabDataActuallyPaidConfig
;
import
scala.Tuple2
;
import
java.util.HashMap
;
...
...
@@ -12,6 +13,20 @@ import java.util.Map;
*/
public
class
CommonUtil
{
public
static
Map
<
String
,
Integer
>
enterPerformanceMap
=
new
HashMap
();
public
static
Map
<
String
,
Tuple2
<
String
,
String
>>
userGroupMap
=
new
HashMap
();
public
static
Map
<
Integer
,
TabDataActuallyPaidConfig
>
dataActuallyPaidConfigMap
=
new
HashMap
();
/**
* 1:实付
* 0:关闭(应付)
* @param enterprise_Id
* @return
*/
public
static
Integer
getConfigStatus
(
Integer
enterprise_Id
){
TabDataActuallyPaidConfig
dataActuallyPaidConfig
=
dataActuallyPaidConfigMap
.
get
(
enterprise_Id
);
if
(
null
==
dataActuallyPaidConfig
||
null
==
dataActuallyPaidConfig
.
getConfig_Status
()){
return
0
;
}
else
{
return
dataActuallyPaidConfig
.
getConfig_Status
();
}
}
}
src/main/java/com/gic/spark/util/ConstantUtil.java
View file @
9001b712
...
...
@@ -11,4 +11,5 @@ public class ConstantUtil {
public
static
final
String
TAB_COUPON_LOG
=
"tab_coupon_log"
;
public
static
final
String
TAB_INTEGRAL_CU_CHANGE_LOG
=
"tab_integral_cu_change_log"
;
public
static
final
String
ADS_GIC_TRD_ECU_SALES_LABEL_D
=
"demoads.ads_gic_trd_ecu_sales_label_d"
;
public
static
final
String
DWD_GIC_TRD_VIRTUAL_WDORDER_D
=
"democdm.dwd_gic_trd_virtual_wdorder_d"
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment