Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-spark-tag-4.0
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wangxiaokang
gic-spark-tag-4.0
Commits
7de8101e
Commit
7de8101e
authored
Aug 12, 2020
by
guos
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
会员标签4.0
parent
f00fdf9b
Hide whitespace changes
Inline
Side-by-side
Showing
37 changed files
with
545 additions
and
98 deletions
+545
-98
TrdEcuSalesLabelBean.java
.../java/com/gic/spark/entity/bean/TrdEcuSalesLabelBean.java
+1
-1
TrdVirtualOrderBean.java
...n/java/com/gic/spark/entity/bean/TrdVirtualOrderBean.java
+19
-1
TrdVirtualOrderItemBean.java
...va/com/gic/spark/entity/bean/TrdVirtualOrderItemBean.java
+64
-0
TagConsumeCommodityRequest.java
.../gic/spark/entity/request/TagConsumeCommodityRequest.java
+21
-0
AbstractTagConsumFilter.java
...in/java/com/gic/spark/filter/AbstractTagConsumFilter.java
+4
-4
AbstractTagConsumRecordFilter.java
...a/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
+4
-4
TagAssociatedPurchaseRateFilter.java
...com/gic/spark/filter/TagAssociatedPurchaseRateFilter.java
+4
-5
TagAverageDiscountFactorFilter.java
.../com/gic/spark/filter/TagAverageDiscountFactorFilter.java
+4
-4
TagConsumeCommodityFilter.java
.../java/com/gic/spark/filter/TagConsumeCommodityFilter.java
+72
-0
TagConsumeTimeFilter.java
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
+3
-3
TagConsumeTotalFilter.java
...main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
+2
-2
TagConsumptionSleepDaysFilter.java
...a/com/gic/spark/filter/TagConsumptionSleepDaysFilter.java
+2
-2
TagConsumptionTimeFilter.java
...n/java/com/gic/spark/filter/TagConsumptionTimeFilter.java
+2
-3
TagFirstConsumeCommodityFilter.java
.../com/gic/spark/filter/TagFirstConsumeCommodityFilter.java
+80
-0
TagFirstConsumeTimeFilter.java
.../java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
+3
-4
TagFirstConsumptionChannelFilter.java
...om/gic/spark/filter/TagFirstConsumptionChannelFilter.java
+2
-2
TagFirstConsumptionMoneyFilter.java
.../com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
+8
-8
TagFirstOfflineConsumptionStoreFilter.java
...c/spark/filter/TagFirstOfflineConsumptionStoreFilter.java
+2
-3
TagFirstOnlineConsumptionStoreFilter.java
...ic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
+2
-3
TagHistoryConsumeCommodityFilter.java
...om/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
+63
-0
TagHistoryConsumeTotalFilter.java
...va/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
+2
-2
TagHistoryOfflineConsumptionStoreFilter.java
...spark/filter/TagHistoryOfflineConsumptionStoreFilter.java
+2
-2
TagHistoryOnlineConsumptionStoreFilter.java
.../spark/filter/TagHistoryOnlineConsumptionStoreFilter.java
+2
-3
TagLatelyConsumeCommodityFilter.java
...com/gic/spark/filter/TagLatelyConsumeCommodityFilter.java
+80
-0
TagLatelyConsumeTimeFilter.java
...java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
+3
-3
TagLatelyConsumptionChannelFilter.java
...m/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
+2
-2
TagLatelyConsumptionMoneyFilter.java
...com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
+8
-8
TagLatelyOnlineConsumptionStoreFilter.java
...c/spark/filter/TagLatelyOnlineConsumptionStoreFilter.java
+2
-2
TagLowestSingleConsumptionMoneyFilter.java
...c/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
+8
-8
TagOfflineConsumptionStoreFilter.java
...om/gic/spark/filter/TagOfflineConsumptionStoreFilter.java
+2
-2
TagOnlineConsumptionStoreFilter.java
...com/gic/spark/filter/TagOnlineConsumptionStoreFilter.java
+2
-2
TagPerCustomerTransactionFilter.java
...com/gic/spark/filter/TagPerCustomerTransactionFilter.java
+4
-5
TagRecentlyOfflineConsumptionStoreFilter.java
...park/filter/TagRecentlyOfflineConsumptionStoreFilter.java
+2
-2
TagTopSingleConsumptionMoneyFilter.java
.../gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
+8
-8
TagConstant.java
src/main/java/com/gic/spark/tag/TagConstant.java
+6
-0
TagValueParser.java
src/main/java/com/gic/spark/tag/TagValueParser.java
+49
-0
ConstantUtil.java
src/main/java/com/gic/spark/util/ConstantUtil.java
+1
-0
No files found.
src/main/java/com/gic/spark/entity/bean/T
agConsume
Bean.java
→
src/main/java/com/gic/spark/entity/bean/T
rdEcuSalesLabel
Bean.java
View file @
7de8101e
...
...
@@ -7,7 +7,7 @@ import java.io.Serializable;
* @author: wangxk
* @date: 2020/5/7
*/
public
class
T
agConsumeBean
implements
Serializable
{
public
class
T
rdEcuSalesLabelBean
implements
Serializable
{
private
int
ent_id
;
//企业id
private
int
card_region_id
;
//卡域id
...
...
src/main/java/com/gic/spark/entity/bean/T
agConsumeRecord
Bean.java
→
src/main/java/com/gic/spark/entity/bean/T
rdVirtualOrder
Bean.java
View file @
7de8101e
...
...
@@ -7,7 +7,7 @@ import java.io.Serializable;
* @author: wangxk
* @date: 2020/8/3
*/
public
class
T
agConsumeRecord
Bean
implements
Serializable
{
public
class
T
rdVirtualOrder
Bean
implements
Serializable
{
private
int
ent_id
;
//企业id
private
int
area_id
;
//会员卡域ID
private
int
order_channel_code
;
//订单来源渠道(线下1,达摩微商城2,微盟微商城3,线上天猫4)
...
...
@@ -16,6 +16,7 @@ public class TagConsumeRecordBean implements Serializable {
private
String
store_info_id
;
//门店id
private
int
is_eff_order
;
//是否有效订单(0:否,1是)
private
int
is_mbr_order
;
//是否会员订单(0:否,1是)
private
String
order_id
;
private
String
order_no
;
private
String
oorder_no
;
private
long
goods_num
;
//订单商品数量
...
...
@@ -24,6 +25,7 @@ public class TagConsumeRecordBean implements Serializable {
private
double
total_amt
;
//吊牌价总额
private
String
receipts_time
;
//单据日期
private
String
shop_id
;
//店铺id
private
String
virtual_id
;
//虚拟订单id
public
int
getEnt_id
()
{
return
ent_id
;
...
...
@@ -152,4 +154,20 @@ public class TagConsumeRecordBean implements Serializable {
public
void
setShop_id
(
String
shop_id
)
{
this
.
shop_id
=
shop_id
;
}
public
String
getOrder_id
()
{
return
order_id
;
}
public
void
setOrder_id
(
String
order_id
)
{
this
.
order_id
=
order_id
;
}
public
String
getVirtual_id
()
{
return
virtual_id
;
}
public
void
setVirtual_id
(
String
virtual_id
)
{
this
.
virtual_id
=
virtual_id
;
}
}
src/main/java/com/gic/spark/entity/bean/TrdVirtualOrderItemBean.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
entity
.
bean
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TrdVirtualOrderItemBean
{
private
String
ent_id
;
private
String
ecu_id
;
private
String
order_id
;
//真实订单
private
String
receipts_time
;
//单据时间
private
String
sku_code
;
private
String
virtual_order_id
;
//虚拟订单id
public
String
getEnt_id
()
{
return
ent_id
;
}
public
void
setEnt_id
(
String
ent_id
)
{
this
.
ent_id
=
ent_id
;
}
public
String
getEcu_id
()
{
return
ecu_id
;
}
public
void
setEcu_id
(
String
ecu_id
)
{
this
.
ecu_id
=
ecu_id
;
}
public
String
getOrder_id
()
{
return
order_id
;
}
public
void
setOrder_id
(
String
order_id
)
{
this
.
order_id
=
order_id
;
}
public
String
getReceipts_time
()
{
return
receipts_time
;
}
public
void
setReceipts_time
(
String
receipts_time
)
{
this
.
receipts_time
=
receipts_time
;
}
public
String
getSku_code
()
{
return
sku_code
;
}
public
void
setSku_code
(
String
sku_code
)
{
this
.
sku_code
=
sku_code
;
}
public
String
getVirtual_order_id
()
{
return
virtual_order_id
;
}
public
void
setVirtual_order_id
(
String
virtual_order_id
)
{
this
.
virtual_order_id
=
virtual_order_id
;
}
}
src/main/java/com/gic/spark/entity/request/TagConsumeCommodityRequest.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
entity
.
request
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TagConsumeCommodityRequest
extends
AbstractFilterRequestTime
{
private
List
<
String
>
skuCodeList
;
public
List
<
String
>
getSkuCodeList
()
{
return
skuCodeList
;
}
public
void
setSkuCodeList
(
List
<
String
>
skuCodeList
)
{
this
.
skuCodeList
=
skuCodeList
;
}
}
src/main/java/com/gic/spark/filter/AbstractTagConsumFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -17,11 +17,11 @@ import java.util.List;
public
abstract
class
AbstractTagConsumFilter
implements
TagFilter
{
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
ADS_GIC_TRD_ECU_SALES_LABEL_D
);
protected
static
JavaRDD
<
T
agConsumeBean
>
statisticsTypeHandle
(
JavaRDD
<
TagConsumeBean
>
consumeRDD
,
TagConsumeRequest
consumeRequest
){
protected
static
JavaRDD
<
T
rdEcuSalesLabelBean
>
statisticsTypeHandle
(
JavaRDD
<
TrdEcuSalesLabelBean
>
consumeRDD
,
TagConsumeRequest
consumeRequest
){
consumeRDD
=
consumeRDD
.
mapPartitions
(
data
->{
List
<
T
agConsume
Bean
>
result
=
new
ArrayList
();
List
<
T
rdEcuSalesLabel
Bean
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
T
agConsume
Bean
consumeBean
=
data
.
next
();
T
rdEcuSalesLabel
Bean
consumeBean
=
data
.
next
();
switch
(
consumeRequest
.
getStatisticsType
()){
case
COMMODITYBRAND:
if
(
consumeRequest
.
getStatisticsValList
().
contains
(
consumeBean
.
getEnt_id
())){
...
...
src/main/java/com/gic/spark/filter/AbstractTagConsumRecordFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.AbstractFilterRequestTime
;
import
com.gic.spark.util.ConstantUtil
;
...
...
@@ -23,11 +23,11 @@ public abstract class AbstractTagConsumRecordFilter implements TagFilter{
DataSourceHive
dataSourceHive
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_WDORDER_D
);
protected
static
JavaRDD
<
T
agConsumeRecordBean
>
statisticsTypeHandle
(
JavaRDD
<
TagConsumeRecord
Bean
>
consumeRecordRDD
,
AbstractFilterRequest
request
){
protected
static
JavaRDD
<
T
rdVirtualOrderBean
>
statisticsTypeHandle
(
JavaRDD
<
TrdVirtualOrder
Bean
>
consumeRecordRDD
,
AbstractFilterRequest
request
){
consumeRecordRDD
=
consumeRecordRDD
.
mapPartitions
(
data
->{
List
<
T
agConsumeRecord
Bean
>
result
=
new
ArrayList
();
List
<
T
rdVirtualOrder
Bean
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
T
agConsumeRecord
Bean
consumeRecordBean
=
data
.
next
();
T
rdVirtualOrder
Bean
consumeRecordBean
=
data
.
next
();
switch
(
request
.
getStatisticsType
()){
case
COMMODITYBRAND:
if
(
request
.
getStatisticsValList
().
contains
(
consumeRecordBean
.
getEnt_id
())){
...
...
src/main/java/com/gic/spark/filter/TagAssociatedPurchaseRateFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -38,16 +37,16 @@ public class TagAssociatedPurchaseRateFilter extends AbstractTagConsumFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
T
agConsumeBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsume
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdEcuSalesLabel
Bean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsume
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdEcuSalesLabel
Bean
>>
tp2
=
data
.
next
();
long
totalGodsNum
=
0
;
long
totalOrderTimes
=
0
;
for
(
T
agConsume
Bean
consumeBean:
tp2
.
_2
()){
for
(
T
rdEcuSalesLabel
Bean
consumeBean:
tp2
.
_2
()){
totalGodsNum
+=
consumeBean
.
getGoods_num
();
totalOrderTimes
+=
consumeBean
.
getOrder_times
();
}
...
...
src/main/java/com/gic/spark/filter/TagAverageDiscountFactorFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -39,7 +39,7 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
T
agConsumeBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsume
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdEcuSalesLabel
Bean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
...
...
@@ -47,8 +47,8 @@ public class TagAverageDiscountFactorFilter extends AbstractTagConsumFilter {
while
(
data
.
hasNext
()){
double
payAmt
=
0
;
double
totalAmt
=
0
;
Tuple2
<
Integer
,
Iterable
<
T
agConsume
Bean
>>
tp2
=
data
.
next
();
for
(
T
agConsume
Bean
consumeBean:
tp2
.
_2
()){
Tuple2
<
Integer
,
Iterable
<
T
rdEcuSalesLabel
Bean
>>
tp2
=
data
.
next
();
for
(
T
rdEcuSalesLabel
Bean
consumeBean:
tp2
.
_2
()){
payAmt
=
consumeBean
.
getPay_amt
();
totalAmt
=
consumeBean
.
getTotal_amt
();
}
...
...
src/main/java/com/gic/spark/filter/TagConsumeCommodityFilter.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TagConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
private
static
TagConsumeCommodityFilter
instance
;
public
static
TagConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagConsumeCommodityFilter
();
}
return
instance
;
}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
result
.
add
(
dataSourceHiveOrderItem
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
String
,
Integer
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
filter
(
data
->
checkTime
(
commodityRequest
,
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()));
JavaPairRDD
<
String
,
String
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
return
true
;
}
return
false
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getVirtual_order_id
()))
.
reduceByKey
((
x
,
y
)->
x
);
JavaRDD
<
Integer
>
ecuRdd
=
orderRdd
.
leftOuterJoin
(
orderItemRDD
)
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
()).
distinct
();
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagConsumeTimeFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -42,8 +42,8 @@ public class TagConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
T
agConsumeRecord
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
agConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrder
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
rdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
DateUtil
.
strToDate
(
data
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
)))
...
...
src/main/java/com/gic/spark/filter/TagConsumeTotalFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -41,7 +41,7 @@ public class TagConsumeTotalFilter extends AbstractTagConsumRecordFilter {
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagConsumptionSleepDaysFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -38,7 +38,7 @@ public class TagConsumptionSleepDaysFilter extends AbstractTagConsumFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
T
agConsumeBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsume
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdEcuSalesLabel
Bean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getSleep_days
()))
.
reduceByKey
((
x
,
y
)->
x
>
y
?
y:
x
)
...
...
src/main/java/com/gic/spark/filter/TagConsumptionTimeFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -37,7 +36,7 @@ public class TagConsumptionTimeFilter extends AbstractTagConsumFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
T
agConsumeBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsume
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdEcuSalesLabel
Bean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
.
getOrder_times
())).
reduceByKey
((
x
,
y
)->
x
+
y
)
.
mapPartitions
(
data
->{
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumeCommodityFilter.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TagFirstConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
private
static
TagFirstConsumeCommodityFilter
instance
;
public
static
TagFirstConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagFirstConsumeCommodityFilter
();
}
return
instance
;
}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
result
.
add
(
dataSourceHiveOrderItem
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
String
,
Integer
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
reduceByKey
((
x
,
y
)->{
if
(
DateUtil
.
strToDate
(
x
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
<
DateUtil
.
strToDate
(
y
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()){
return
x
;
}
else
{
return
y
;
}
})
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
_2
().
getVirtual_id
(),
data
.
_1
()));
JavaPairRDD
<
String
,
String
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
return
true
;
}
return
false
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getVirtual_order_id
()))
.
reduceByKey
((
x
,
y
)->
x
);
JavaRDD
<
Integer
>
ecuRdd
=
orderRdd
.
leftOuterJoin
(
orderItemRDD
)
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
()).
distinct
();
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagFirstConsumeTimeFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -12,7 +12,6 @@ import org.apache.spark.sql.Column;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
/**
...
...
@@ -42,8 +41,8 @@ public class TagFirstConsumeTimeFilter extends AbstractTagConsumRecordFilter {
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
T
agConsumeRecord
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
agConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrder
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
rdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumptionChannelFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeChannelRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -37,7 +37,7 @@ public class TagFirstConsumptionChannelFilter extends AbstractTagConsumRecordFil
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
...
...
src/main/java/com/gic/spark/filter/TagFirstConsumptionMoneyFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -39,7 +39,7 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
@@ -49,11 +49,11 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsumeRecord
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdVirtualOrder
Bean
>>
tp2
=
data
.
next
();
double
firstConsumAmount
=
0
;
T
agConsumeRecord
Bean
firstConsumeAmountBean
=
null
;
Map
<
String
,
T
agConsumeRecord
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
agConsumeRecord
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
T
rdVirtualOrder
Bean
firstConsumeAmountBean
=
null
;
Map
<
String
,
T
rdVirtualOrder
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
rdVirtualOrder
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
...
...
@@ -62,13 +62,13 @@ public class TagFirstConsumptionMoneyFilter extends AbstractTagConsumRecordFilte
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
T
agConsumeRecord
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
T
rdVirtualOrder
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
T
agConsumeRecord
Bean
amountBean:
effectiveOrderMap
.
values
()){
for
(
T
rdVirtualOrder
Bean
amountBean:
effectiveOrderMap
.
values
()){
if
(
null
==
firstConsumeAmountBean
){
firstConsumeAmountBean
=
amountBean
;
}
else
{
...
...
src/main/java/com/gic/spark/filter/TagFirstOfflineConsumptionStoreFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.mysql.util.StringUtil
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -43,7 +42,7 @@ public class TagFirstOfflineConsumptionStoreFilter extends AbstractTagConsumReco
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagFirstOnlineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,8 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TagConsumeRecordBean
;
import
com.gic.spark.entity.enumtype.YearMonthDayType
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -42,7 +41,7 @@ public class TagFirstOnlineConsumptionStoreFilter extends AbstractTagConsumRecor
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagHistoryConsumeCommodityFilter.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TagHistoryConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
private
static
TagHistoryConsumeCommodityFilter
instance
;
public
static
TagHistoryConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagHistoryConsumeCommodityFilter
();
}
return
instance
;
}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
result
.
add
(
dataSourceHiveOrderItem
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaRDD
<
TrdVirtualOrderItemBean
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
();
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_id
(),
data
.
getEcu_id
()))
.
leftOuterJoin
(
orderItemRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getSku_code
()))
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
_2
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
_2
())))
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
())
.
distinct
();
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagHistoryConsumeTotalFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -40,7 +40,7 @@ public class TagHistoryConsumeTotalFilter extends AbstractTagConsumRecordFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
src/main/java/com/gic/spark/filter/TagHistoryOfflineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
org.apache.commons.lang.StringUtils
;
...
...
@@ -40,7 +40,7 @@ public class TagHistoryOfflineConsumptionStoreFilter extends AbstractTagConsumRe
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagHistoryOnlineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,10 +2,9 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
...
...
@@ -41,7 +40,7 @@ public class TagHistoryOnlineConsumptionStoreFilter extends AbstractTagConsumRec
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumeCommodityFilter.java
0 → 100644
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.TrdVirtualOrderBean
;
import
com.gic.spark.entity.bean.TrdVirtualOrderItemBean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeCommodityRequest
;
import
com.gic.spark.util.ConstantUtil
;
import
com.gic.spark.util.DateUtil
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaRDD
;
import
scala.Tuple2
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/8/12
*/
public
class
TagLatelyConsumeCommodityFilter
extends
AbstractTagConsumRecordFilter
{
DataSourceHive
dataSourceHiveOrderItem
=
new
DataSourceHive
(
ConstantUtil
.
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
);
private
static
TagLatelyConsumeCommodityFilter
instance
;
public
static
TagLatelyConsumeCommodityFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagLatelyConsumeCommodityFilter
();
}
return
instance
;
}
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
List
<
DataSourceEntity
>
result
=
new
ArrayList
();
result
.
add
(
dataSourceHive
);
result
.
add
(
dataSourceHiveOrderItem
);
return
result
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeCommodityRequest
commodityRequest
=(
TagConsumeCommodityRequest
)
request
;
JavaRDD
<
TrdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderBean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
commodityRequest
);
JavaPairRDD
<
String
,
Integer
>
orderRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
))
.
reduceByKey
((
x
,
y
)->{
if
(
DateUtil
.
strToDate
(
x
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()
>
DateUtil
.
strToDate
(
y
.
getReceipts_time
(),
DateUtil
.
FORMAT_DATETIME_19
).
getTime
()){
return
x
;
}
else
{
return
y
;
}
})
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
_2
().
getVirtual_id
(),
data
.
_1
()));
JavaPairRDD
<
String
,
String
>
orderItemRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHiveOrderItem
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrderItemBean
.
class
).
javaRDD
()
.
filter
(
data
->{
if
(
StringUtils
.
isNotEmpty
(
data
.
getSku_code
())
&&
commodityRequest
.
getSkuCodeList
().
contains
(
data
.
getSku_code
())){
return
true
;
}
return
false
;
}).
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getVirtual_order_id
(),
data
.
getVirtual_order_id
()))
.
reduceByKey
((
x
,
y
)->
x
);
JavaRDD
<
Integer
>
ecuRdd
=
orderRdd
.
leftOuterJoin
(
orderItemRDD
)
.
filter
(
data
->
data
.
_2
().
_2
().
isPresent
())
.
map
(
data
->
data
.
_2
().
_1
()).
distinct
();
return
ecuRdd
;
}
}
src/main/java/com/gic/spark/filter/TagLatelyConsumeTimeFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeTimeRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -40,8 +40,8 @@ public class TagLatelyConsumeTimeFilter extends AbstractTagConsumRecordFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeTimeRequest
consumeTimeRequest
=(
TagConsumeTimeRequest
)
request
;
JavaRDD
<
T
agConsumeRecord
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
agConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrder
Bean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
)
.
filter
(
new
Column
(
"is_eff_order"
).
equalTo
(
1
)),
T
rdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeTimeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
StringUtils
.
isNotEmpty
(
data
.
getReceipts_time
()))
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumptionChannelFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeChannelRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -40,7 +40,7 @@ public class TagLatelyConsumptionChannelFilter extends AbstractTagConsumRecordFi
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeChannelRequest
channelRequest
=(
TagConsumeChannelRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
channelRequest
);
...
...
src/main/java/com/gic/spark/filter/TagLatelyConsumptionMoneyFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -42,7 +42,7 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
@@ -52,11 +52,11 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsumeRecord
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdVirtualOrder
Bean
>>
tp2
=
data
.
next
();
double
latelyConsumAmount
=
0
;
T
agConsumeRecord
Bean
latelyConsumeAmountBean
=
null
;
Map
<
String
,
T
agConsumeRecord
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
agConsumeRecord
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
T
rdVirtualOrder
Bean
latelyConsumeAmountBean
=
null
;
Map
<
String
,
T
rdVirtualOrder
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
rdVirtualOrder
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
...
...
@@ -65,13 +65,13 @@ public class TagLatelyConsumptionMoneyFilter extends AbstractTagConsumRecordFil
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
T
agConsumeRecord
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
T
rdVirtualOrder
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
T
agConsumeRecord
Bean
amountBean:
effectiveOrderMap
.
values
()){
for
(
T
rdVirtualOrder
Bean
amountBean:
effectiveOrderMap
.
values
()){
if
(
null
==
latelyConsumeAmountBean
){
latelyConsumeAmountBean
=
amountBean
;
}
else
{
...
...
src/main/java/com/gic/spark/filter/TagLatelyOnlineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -41,7 +41,7 @@ public class TagLatelyOnlineConsumptionStoreFilter extends AbstractTagConsumReco
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagLowestSingleConsumptionMoneyFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -40,7 +40,7 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
@@ -69,11 +69,11 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsumeRecord
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdVirtualOrder
Bean
>>
tp2
=
data
.
next
();
double
topConsumAmount
=
0
;
T
agConsumeRecord
Bean
topConsumeAmountBean
=
null
;
Map
<
String
,
T
agConsumeRecord
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
agConsumeRecord
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
T
rdVirtualOrder
Bean
topConsumeAmountBean
=
null
;
Map
<
String
,
T
rdVirtualOrder
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
rdVirtualOrder
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
...
...
@@ -82,13 +82,13 @@ public class TagLowestSingleConsumptionMoneyFilter extends AbstractTagConsumReco
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
T
agConsumeRecord
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
T
rdVirtualOrder
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
T
agConsumeRecord
Bean
amountBean:
effectiveOrderMap
.
values
()){
for
(
T
rdVirtualOrder
Bean
amountBean:
effectiveOrderMap
.
values
()){
topConsumAmount
=
configStatus
==
1
?
(
topConsumeAmountBean
.
getPaid_amt
()<
topConsumAmount
?
topConsumeAmountBean
.
getPaid_amt
():
topConsumAmount
):
(
topConsumeAmountBean
.
getPay_amt
()<
topConsumAmount
?
topConsumeAmountBean
.
getPay_amt
():
topConsumAmount
);
...
...
src/main/java/com/gic/spark/filter/TagOfflineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.enumtype.YearMonthDayType
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
...
...
@@ -43,7 +43,7 @@ public class TagOfflineConsumptionStoreFilter extends AbstractTagConsumRecordFil
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagOnlineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.enumtype.YearMonthDayType
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
...
...
@@ -38,7 +38,7 @@ public class TagOnlineConsumptionStoreFilter extends AbstractTagConsumRecordFilt
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
storeRequest
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()!=
1
...
...
src/main/java/com/gic/spark/filter/TagPerCustomerTransactionFilter.java
View file @
7de8101e
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.entity.DataSourceHive
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsume
Bean
;
import
com.gic.spark.entity.bean.T
rdEcuSalesLabel
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeRequest
;
import
org.apache.spark.api.java.JavaRDD
;
...
...
@@ -38,16 +37,16 @@ public class TagPerCustomerTransactionFilter extends AbstractTagConsumFilter{
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeRequest
consumeRequest
=(
TagConsumeRequest
)
request
;
JavaRDD
<
T
agConsumeBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsume
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdEcuSalesLabelBean
>
consumeRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdEcuSalesLabel
Bean
.
class
).
javaRDD
();
consumeRDD
=
statisticsTypeHandle
(
consumeRDD
,
consumeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRDD
.
mapToPair
(
data
->
Tuple2
.
apply
(
data
.
getEcu_id
(),
data
)).
groupByKey
()
.
mapPartitions
(
data
->{
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsume
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdEcuSalesLabel
Bean
>>
tp2
=
data
.
next
();
int
consumeTimes
=
0
;
double
payAmt
=
0
;
for
(
T
agConsume
Bean
consumeBean:
tp2
.
_2
()){
for
(
T
rdEcuSalesLabel
Bean
consumeBean:
tp2
.
_2
()){
consumeTimes
+=
consumeBean
.
getOrder_times
();
payAmt
+=
consumeBean
.
getPay_amt
();
}
...
...
src/main/java/com/gic/spark/filter/TagRecentlyOfflineConsumptionStoreFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeStoreRequest
;
import
com.gic.spark.util.DateUtil
;
...
...
@@ -40,7 +40,7 @@ public class TagRecentlyOfflineConsumptionStoreFilter extends AbstractTagConsumR
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeStoreRequest
storeRequest
=(
TagConsumeStoreRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
storeRequest
);
JavaRDD
<
Integer
>
ecuRdd
=
consumeRecordRDD
.
filter
(
data
->
data
.
getOrder_channel_code
()==
1
...
...
src/main/java/com/gic/spark/filter/TagTopSingleConsumptionMoneyFilter.java
View file @
7de8101e
...
...
@@ -2,7 +2,7 @@ package com.gic.spark.filter;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.datasource.mysql.MysqlRddManager
;
import
com.gic.spark.entity.bean.T
agConsumeRecord
Bean
;
import
com.gic.spark.entity.bean.T
rdVirtualOrder
Bean
;
import
com.gic.spark.entity.request.AbstractFilterRequest
;
import
com.gic.spark.entity.request.TagConsumeAmountRequest
;
import
com.gic.spark.util.CommonUtil
;
...
...
@@ -39,7 +39,7 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
AbstractFilterRequest
request
)
{
TagConsumeAmountRequest
consumeAmountRequest
=(
TagConsumeAmountRequest
)
request
;
JavaRDD
<
T
agConsumeRecordBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TagConsumeRecord
Bean
.
class
).
javaRDD
();
JavaRDD
<
T
rdVirtualOrderBean
>
consumeRecordRDD
=
MysqlRddManager
.
getPojoFromDataset
(
dataSourceHive
.
getDatasetByEnterpriseId
(
enterpriseId
),
TrdVirtualOrder
Bean
.
class
).
javaRDD
();
consumeRecordRDD
=
statisticsTypeHandle
(
consumeRecordRDD
,
consumeAmountRequest
);
int
configStatus
=
CommonUtil
.
getConfigStatus
(
enterpriseId
);
...
...
@@ -68,11 +68,11 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
List
<
Integer
>
result
=
new
ArrayList
();
while
(
data
.
hasNext
()){
Tuple2
<
Integer
,
Iterable
<
T
agConsumeRecord
Bean
>>
tp2
=
data
.
next
();
Tuple2
<
Integer
,
Iterable
<
T
rdVirtualOrder
Bean
>>
tp2
=
data
.
next
();
double
topConsumAmount
=
0
;
T
agConsumeRecord
Bean
topConsumeAmountBean
=
null
;
Map
<
String
,
T
agConsumeRecord
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
agConsumeRecord
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
T
rdVirtualOrder
Bean
topConsumeAmountBean
=
null
;
Map
<
String
,
T
rdVirtualOrder
Bean
>
effectiveOrderMap
=
new
HashMap
();
List
<
T
rdVirtualOrder
Bean
>
noEffectiveOrderList
=
new
ArrayList
();
tp2
.
_2
().
forEach
(
element
->{
if
(
element
.
getPay_amt
()>
0
){
effectiveOrderMap
.
put
(
element
.
getOorder_no
(),
element
);
...
...
@@ -81,13 +81,13 @@ public class TagTopSingleConsumptionMoneyFilter extends AbstractTagConsumRecord
}
});
noEffectiveOrderList
.
forEach
(
noEffectiveOrder
->{
T
agConsumeRecord
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
T
rdVirtualOrder
Bean
effectiveOrder
=
effectiveOrderMap
.
get
(
noEffectiveOrder
.
getOorder_no
());
if
(
noEffectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
).
equals
(
effectiveOrder
.
getReceipts_time
().
substring
(
0
,
10
))){
effectiveOrder
.
setPaid_amt
(
effectiveOrder
.
getPaid_amt
()+
noEffectiveOrder
.
getPaid_amt
());
effectiveOrder
.
setPay_amt
(
effectiveOrder
.
getPay_amt
()+
noEffectiveOrder
.
getPay_amt
());
}
});
for
(
T
agConsumeRecord
Bean
amountBean:
effectiveOrderMap
.
values
()){
for
(
T
rdVirtualOrder
Bean
amountBean:
effectiveOrderMap
.
values
()){
topConsumAmount
=
configStatus
==
1
?
(
topConsumeAmountBean
.
getPaid_amt
()>
topConsumAmount
?
topConsumeAmountBean
.
getPaid_amt
():
topConsumAmount
):
(
topConsumeAmountBean
.
getPay_amt
()>
topConsumAmount
?
topConsumeAmountBean
.
getPay_amt
():
topConsumAmount
);
...
...
src/main/java/com/gic/spark/tag/TagConstant.java
View file @
7de8101e
...
...
@@ -56,6 +56,12 @@ public class TagConstant {
public static final String TAG_TIME_TYPE_ALL_TIME = "alltime";//目前为止
*/
//消费商品
public
static
final
String
TAG_CODE_FIRST_CONSUME_COMMODITY
=
"firstConsumeCommodity"
;
//首次消费商
public
static
final
String
TAG_CODE_LATELY_CONSUME_COMMODITY
=
"latelyConsumeCommodity"
;
//最近消费商
public
static
final
String
TAG_CODE_HISTORY_CONSUME_COMMODITY
=
"historyConsumeCommodity"
;
//历史消费商
public
static
final
String
TAG_CODE_CONSUME_COMMODITY
=
"consumeCommodity"
;
//消费商品
//消费渠道
public
static
final
String
TAG_CODE_FIRST_CONSUMPTION_CHANNEL
=
"firstConsumptionChannel"
;
//首次消费渠道
public
static
final
String
TAG_CODE_LATELY_CONSUMPTION_CHANNEL
=
"latelyConsumptionChannel"
;
//最近消费渠道
...
...
src/main/java/com/gic/spark/tag/TagValueParser.java
View file @
7de8101e
...
...
@@ -146,11 +146,60 @@ public class TagValueParser {
case
TagConstant
.
TAG_CODE_ONLINE_CONSUMPTION_STORE
:
request
=
getOnlineConsumeStoreRequest
(
conditionDTO
.
getTagTemplateElInfo
());
break
;
//消费商品
case
TagConstant
.
TAG_CODE_FIRST_CONSUME_COMMODITY
:
request
=
getConsumeCommodityRequest
(
conditionDTO
.
getTagTemplateElInfo
());
break
;
case
TagConstant
.
TAG_CODE_LATELY_CONSUME_COMMODITY
:
request
=
getConsumeCommodityRequest
(
conditionDTO
.
getTagTemplateElInfo
());
break
;
case
TagConstant
.
TAG_CODE_HISTORY_CONSUME_COMMODITY
:
request
=
getConsumeCommodityRequest
(
conditionDTO
.
getTagTemplateElInfo
());
break
;
case
TagConstant
.
TAG_CODE_CONSUME_COMMODITY
:
request
=
getConsumeCommodityRequest
(
conditionDTO
.
getTagTemplateElInfo
());
break
;
default
:
break
;
}
return
request
;
}
private
static
AbstractFilterRequest
getConsumeCommodityRequest
(
List
<
TagConditionValDTO
>
conditionValDTOList
)
{
TagConsumeCommodityRequest
request
=
new
TagConsumeCommodityRequest
();
for
(
TagConditionValDTO
conditionValDTO
:
conditionValDTOList
)
{
if
(
Pattern
.
compile
(
"flag"
).
matcher
(
conditionValDTO
.
getKey
()).
find
())
{
setStatisticsTypeHandle
(
request
,
conditionValDTO
.
getKey
(),
conditionValDTO
.
getVal
());
}
else
if
(
conditionValDTO
.
getKey
().
equals
(
TagValKeyEnum
.
COMPONENT_COMMODITY
.
getKey
())){
List
<
String
>
skuCodeList
=
null
;
//需要调用接口
request
.
setSkuCodeList
(
skuCodeList
);
}
else
if
(
Pattern
.
compile
(
"timeRange"
).
matcher
(
conditionValDTO
.
getKey
()).
find
()){
String
[]
keys
=
conditionValDTO
.
getKey
().
split
(
"."
);
switch
(
keys
[
1
])
{
case
TagConstant
.
TAG_TIME_TYPE_FIXATION
:
String
[]
vals
=
conditionValDTO
.
getVal
().
split
(
","
);
request
.
setBeginTime
(
DateUtil
.
stringToDate
(
vals
[
0
]));
request
.
setEndTime
(
DateUtil
.
stringToDate
(
vals
[
1
]));
request
.
setTimeRangeType
(
TimeRangeType
.
FIXATION
);
break
;
case
TagConstant
.
TAG_TIME_TYPE_LATELY
:
request
.
setTimeNum
(
Integer
.
parseInt
(
conditionValDTO
.
getVal
()));
request
.
setTimeRangeType
(
TimeRangeType
.
LATELY
);
request
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
break
;
case
TagConstant
.
TAG_TIME_TYPE_LATER
:
request
.
setTimeNum
(
Integer
.
parseInt
(
conditionValDTO
.
getVal
()));
request
.
setTimeRangeType
(
TimeRangeType
.
LATER
);
request
.
setYearMonthDayType
(
YearMonthDayType
.
DAY
);
break
;
default
:
break
;
}
}
}
return
request
;
}
private
static
AbstractFilterRequest
getOnlineConsumeStoreRequest
(
List
<
TagConditionValDTO
>
conditionValDTOList
)
{
TagConsumeStoreRequest
request
=
new
TagConsumeStoreRequest
();
List
<
String
>
storeList
=
new
ArrayList
();
...
...
src/main/java/com/gic/spark/util/ConstantUtil.java
View file @
7de8101e
...
...
@@ -12,4 +12,5 @@ public class ConstantUtil {
public
static
final
String
TAB_INTEGRAL_CU_CHANGE_LOG
=
"tab_integral_cu_change_log"
;
public
static
final
String
ADS_GIC_TRD_ECU_SALES_LABEL_D
=
"demoads.ads_gic_trd_ecu_sales_label_d"
;
public
static
final
String
DWD_GIC_TRD_VIRTUAL_WDORDER_D
=
"democdm.dwd_gic_trd_virtual_wdorder_d"
;
public
static
final
String
DWD_GIC_TRD_VIRTUAL_ORDER_ITEM_D
=
"democdm.dwd_gic_trd_virtual_order_item_d"
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment