Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-spark-tag-4.0
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wangxiaokang
gic-spark-tag-4.0
Commits
39dc367a
Commit
39dc367a
authored
Apr 20, 2020
by
guos
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
会员标签4.0
parent
88c20b49
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
385 additions
and
223 deletions
+385
-223
TagGroupInfo.java
src/main/java/com/gic/spark/entity/TagGroupInfo.java
+22
-0
TabMemberTagAll.java
...main/java/com/gic/spark/entity/table/TabMemberTagAll.java
+0
-177
TabSceneCrowd.java
src/main/java/com/gic/spark/entity/table/TabSceneCrowd.java
+27
-27
BaseTagFilter.java
src/main/java/com/gic/spark/filter/BaseTagFilter.java
+1
-1
TagCancelCouponFilter.java
...main/java/com/gic/spark/filter/TagCancelCouponFilter.java
+35
-0
TagCurrentCouponNumFilter.java
.../java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
+35
-0
TagDonationInCouponFilter.java
.../java/com/gic/spark/filter/TagDonationInCouponFilter.java
+35
-0
TagGetNoCancelCouponFilter.java
...java/com/gic/spark/filter/TagGetNoCancelCouponFilter.java
+33
-0
TagNoCancelExpiresCouponFilter.java
.../com/gic/spark/filter/TagNoCancelExpiresCouponFilter.java
+34
-0
TagOccupyCouponFilter.java
...main/java/com/gic/spark/filter/TagOccupyCouponFilter.java
+34
-0
TagUnclaimedCouponFilter.java
...n/java/com/gic/spark/filter/TagUnclaimedCouponFilter.java
+34
-0
TagYetDonationCouponFilter.java
...java/com/gic/spark/filter/TagYetDonationCouponFilter.java
+34
-0
TagProcess.java
src/main/java/com/gic/spark/tag/TagProcess.java
+0
-0
TagProcessEntity.java
src/main/java/com/gic/spark/tag/TagProcessEntity.java
+2
-2
TagValueParser.java
src/main/java/com/gic/spark/tag/TagValueParser.java
+1
-0
EsRequestUtil.java
src/main/java/com/gic/spark/util/EsRequestUtil.java
+4
-16
ESShardPartitioner.scala
...in/scala/org/elasticsearch/spark/ESShardPartitioner.scala
+26
-0
EsRddFactory.scala
src/main/scala/org/elasticsearch/spark/EsRddFactory.scala
+28
-0
No files found.
src/main/java/com/gic/spark/entity/TagGroupInfo.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
entity
;
import
java.io.Serializable
;
/**
* @description:
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagGroupInfo
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
8710695590658340671L
;
private
Integer
mcu_id
;
private
Integer
enterprise_id
;
private
String
tag_group_info
;
public
TagGroupInfo
(
Integer
mcu_id
,
Integer
enterprise_id
,
String
tag_group_info
)
{
this
.
mcu_id
=
mcu_id
;
this
.
enterprise_id
=
enterprise_id
;
this
.
tag_group_info
=
tag_group_info
;
}
}
src/main/java/com/gic/spark/entity/table/TabMemberTagAll.java
deleted
100644 → 0
View file @
88c20b49
package
com
.
gic
.
spark
.
entity
.
table
;
public
class
TabMemberTagAll
{
private
long
tag_Id
;
private
long
enterprise_Id
;
private
long
link_Id
;
private
String
tag_Name
;
private
long
tag_Type
;
private
String
tag_Describe
;
private
long
category_Id
;
private
long
real_Time_Flag
;
private
long
delete_Flag
;
private
String
remark
;
private
String
es_Field
;
private
String
tag_Code
;
private
long
tag_Group_Id
;
private
long
contain_Flag
;
private
long
create_User_Id
;
private
java
.
sql
.
Timestamp
create_Time
;
private
java
.
sql
.
Timestamp
update_Time
;
public
long
getTag_Id
()
{
return
tag_Id
;
}
public
void
setTag_Id
(
long
tag_Id
)
{
this
.
tag_Id
=
tag_Id
;
}
public
long
getEnterprise_Id
()
{
return
enterprise_Id
;
}
public
void
setEnterprise_Id
(
long
enterprise_Id
)
{
this
.
enterprise_Id
=
enterprise_Id
;
}
public
long
getLink_Id
()
{
return
link_Id
;
}
public
void
setLink_Id
(
long
link_Id
)
{
this
.
link_Id
=
link_Id
;
}
public
String
getTag_Name
()
{
return
tag_Name
;
}
public
void
setTag_Name
(
String
tag_Name
)
{
this
.
tag_Name
=
tag_Name
;
}
public
long
getTag_Type
()
{
return
tag_Type
;
}
public
void
setTag_Type
(
long
tag_Type
)
{
this
.
tag_Type
=
tag_Type
;
}
public
String
getTag_Describe
()
{
return
tag_Describe
;
}
public
void
setTag_Describe
(
String
tag_Describe
)
{
this
.
tag_Describe
=
tag_Describe
;
}
public
long
getCategory_Id
()
{
return
category_Id
;
}
public
void
setCategory_Id
(
long
category_Id
)
{
this
.
category_Id
=
category_Id
;
}
public
long
getReal_Time_Flag
()
{
return
real_Time_Flag
;
}
public
void
setReal_Time_Flag
(
long
real_Time_Flag
)
{
this
.
real_Time_Flag
=
real_Time_Flag
;
}
public
long
getDelete_Flag
()
{
return
delete_Flag
;
}
public
void
setDelete_Flag
(
long
delete_Flag
)
{
this
.
delete_Flag
=
delete_Flag
;
}
public
String
getRemark
()
{
return
remark
;
}
public
void
setRemark
(
String
remark
)
{
this
.
remark
=
remark
;
}
public
String
getEs_Field
()
{
return
es_Field
;
}
public
void
setEs_Field
(
String
es_Field
)
{
this
.
es_Field
=
es_Field
;
}
public
String
getTag_Code
()
{
return
tag_Code
;
}
public
void
setTag_Code
(
String
tag_Code
)
{
this
.
tag_Code
=
tag_Code
;
}
public
long
getTag_Group_Id
()
{
return
tag_Group_Id
;
}
public
void
setTag_Group_Id
(
long
tag_Group_Id
)
{
this
.
tag_Group_Id
=
tag_Group_Id
;
}
public
long
getContain_Flag
()
{
return
contain_Flag
;
}
public
void
setContain_Flag
(
long
contain_Flag
)
{
this
.
contain_Flag
=
contain_Flag
;
}
public
long
getCreate_User_Id
()
{
return
create_User_Id
;
}
public
void
setCreate_User_Id
(
long
create_User_Id
)
{
this
.
create_User_Id
=
create_User_Id
;
}
public
java
.
sql
.
Timestamp
getCreate_Time
()
{
return
create_Time
;
}
public
void
setCreate_Time
(
java
.
sql
.
Timestamp
create_Time
)
{
this
.
create_Time
=
create_Time
;
}
public
java
.
sql
.
Timestamp
getUpdate_Time
()
{
return
update_Time
;
}
public
void
setUpdate_Time
(
java
.
sql
.
Timestamp
update_Time
)
{
this
.
update_Time
=
update_Time
;
}
}
src/main/java/com/gic/spark/entity/table/TabSceneCrowd.java
View file @
39dc367a
...
...
@@ -3,29 +3,29 @@ package com.gic.spark.entity.table;
public
class
TabSceneCrowd
{
private
long
id
;
private
int
id
;
private
String
scene_Crowd_Name
;
private
long
real_Time
;
private
int
real_Time
;
private
java
.
sql
.
Timestamp
scene_Crowd_Expire_Time
;
private
long
scene_Crowd_Category_Id
;
private
int
scene_Crowd_Category_Id
;
private
String
scene_Crowd_Desc
;
private
long
scene_Crowd_Update_Type
;
private
long
scene_Crowd_Update_Type_Val
;
private
int
scene_Crowd_Update_Type
;
private
int
scene_Crowd_Update_Type_Val
;
private
java
.
sql
.
Timestamp
update_Time
;
private
long
member_Size
;
private
int
member_Size
;
private
java
.
sql
.
Timestamp
create_Time
;
private
long
valid_Flag
;
private
int
valid_Flag
;
private
String
tag_Condition_Group_Info
;
private
long
delete_Flag
;
private
long
enterprise_Id
;
private
int
delete_Flag
;
private
int
enterprise_Id
;
private
String
crowd_Param
;
public
long
getId
()
{
public
int
getId
()
{
return
id
;
}
public
void
setId
(
long
id
)
{
public
void
setId
(
int
id
)
{
this
.
id
=
id
;
}
...
...
@@ -39,11 +39,11 @@ public class TabSceneCrowd {
}
public
long
getReal_Time
()
{
public
int
getReal_Time
()
{
return
real_Time
;
}
public
void
setReal_Time
(
long
real_Time
)
{
public
void
setReal_Time
(
int
real_Time
)
{
this
.
real_Time
=
real_Time
;
}
...
...
@@ -57,11 +57,11 @@ public class TabSceneCrowd {
}
public
long
getScene_Crowd_Category_Id
()
{
public
int
getScene_Crowd_Category_Id
()
{
return
scene_Crowd_Category_Id
;
}
public
void
setScene_Crowd_Category_Id
(
long
scene_Crowd_Category_Id
)
{
public
void
setScene_Crowd_Category_Id
(
int
scene_Crowd_Category_Id
)
{
this
.
scene_Crowd_Category_Id
=
scene_Crowd_Category_Id
;
}
...
...
@@ -75,20 +75,20 @@ public class TabSceneCrowd {
}
public
long
getScene_Crowd_Update_Type
()
{
public
int
getScene_Crowd_Update_Type
()
{
return
scene_Crowd_Update_Type
;
}
public
void
setScene_Crowd_Update_Type
(
long
scene_Crowd_Update_Type
)
{
public
void
setScene_Crowd_Update_Type
(
int
scene_Crowd_Update_Type
)
{
this
.
scene_Crowd_Update_Type
=
scene_Crowd_Update_Type
;
}
public
long
getScene_Crowd_Update_Type_Val
()
{
public
int
getScene_Crowd_Update_Type_Val
()
{
return
scene_Crowd_Update_Type_Val
;
}
public
void
setScene_Crowd_Update_Type_Val
(
long
scene_Crowd_Update_Type_Val
)
{
public
void
setScene_Crowd_Update_Type_Val
(
int
scene_Crowd_Update_Type_Val
)
{
this
.
scene_Crowd_Update_Type_Val
=
scene_Crowd_Update_Type_Val
;
}
...
...
@@ -102,11 +102,11 @@ public class TabSceneCrowd {
}
public
long
getMember_Size
()
{
public
int
getMember_Size
()
{
return
member_Size
;
}
public
void
setMember_Size
(
long
member_Size
)
{
public
void
setMember_Size
(
int
member_Size
)
{
this
.
member_Size
=
member_Size
;
}
...
...
@@ -120,11 +120,11 @@ public class TabSceneCrowd {
}
public
long
getValid_Flag
()
{
public
int
getValid_Flag
()
{
return
valid_Flag
;
}
public
void
setValid_Flag
(
long
valid_Flag
)
{
public
void
setValid_Flag
(
int
valid_Flag
)
{
this
.
valid_Flag
=
valid_Flag
;
}
...
...
@@ -138,20 +138,20 @@ public class TabSceneCrowd {
}
public
long
getDelete_Flag
()
{
public
int
getDelete_Flag
()
{
return
delete_Flag
;
}
public
void
setDelete_Flag
(
long
delete_Flag
)
{
public
void
setDelete_Flag
(
int
delete_Flag
)
{
this
.
delete_Flag
=
delete_Flag
;
}
public
long
getEnterprise_Id
()
{
public
int
getEnterprise_Id
()
{
return
enterprise_Id
;
}
public
void
setEnterprise_Id
(
long
enterprise_Id
)
{
public
void
setEnterprise_Id
(
int
enterprise_Id
)
{
this
.
enterprise_Id
=
enterprise_Id
;
}
...
...
src/main/java/com/gic/spark/filter/BaseTagFilter.java
View file @
39dc367a
...
...
@@ -15,5 +15,5 @@ public interface BaseTagFilter {
List
<
DataSourceEntity
>
necessarySourceList
();
JavaRDD
<
String
>
filterValidMember
(
Long
enterpriseId
,
FilterRequest
filterOption
);
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
);
}
src/main/java/com/gic/spark/filter/TagCancelCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 已核销卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagCancelCouponFilter
implements
BaseTagFilter
{
private
static
TagCancelCouponFilter
instance
;
public
static
TagCancelCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagCancelCouponFilter
();
}
return
instance
;
}
private
TagCancelCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagCurrentCouponNumFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 当前卡券数
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagCurrentCouponNumFilter
implements
BaseTagFilter
{
private
static
TagCurrentCouponNumFilter
instance
;
public
static
TagCurrentCouponNumFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagCurrentCouponNumFilter
();
}
return
instance
;
}
private
TagCurrentCouponNumFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagDonationInCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 转赠中卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagDonationInCouponFilter
implements
BaseTagFilter
{
private
static
TagDonationInCouponFilter
instance
;
public
static
TagDonationInCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagDonationInCouponFilter
();
}
return
instance
;
}
private
TagDonationInCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagGetNoCancelCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagGetNoCancelCouponFilter
implements
BaseTagFilter
{
private
static
TagGetNoCancelCouponFilter
instance
;
public
static
TagGetNoCancelCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagGetNoCancelCouponFilter
();
}
return
instance
;
}
private
TagGetNoCancelCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagNoCancelExpiresCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 未核销已过期卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagNoCancelExpiresCouponFilter
implements
BaseTagFilter
{
private
static
TagNoCancelExpiresCouponFilter
instance
;
public
static
TagNoCancelExpiresCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagNoCancelExpiresCouponFilter
();
}
return
instance
;
}
private
TagNoCancelExpiresCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagOccupyCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 已占用卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagOccupyCouponFilter
implements
BaseTagFilter
{
private
static
TagOccupyCouponFilter
instance
;
public
static
TagOccupyCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagOccupyCouponFilter
();
}
return
instance
;
}
private
TagOccupyCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagUnclaimedCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 待领取卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagUnclaimedCouponFilter
implements
BaseTagFilter
{
private
static
TagUnclaimedCouponFilter
instance
;
public
static
TagUnclaimedCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagUnclaimedCouponFilter
();
}
return
instance
;
}
private
TagUnclaimedCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/filter/TagYetDonationCouponFilter.java
0 → 100644
View file @
39dc367a
package
com
.
gic
.
spark
.
filter
;
import
com.gic.spark.datasource.entity.DataSourceEntity
;
import
com.gic.spark.entity.request.FilterRequest
;
import
org.apache.spark.api.java.JavaRDD
;
import
java.util.List
;
/**
* @description:
* 已转增卡券
* @author: wangxk
* @date: 2020/4/20
*/
public
class
TagYetDonationCouponFilter
implements
BaseTagFilter
{
private
static
TagYetDonationCouponFilter
instance
;
public
static
TagYetDonationCouponFilter
getInstance
()
{
if
(
null
==
instance
){
instance
=
new
TagYetDonationCouponFilter
();
}
return
instance
;
}
private
TagYetDonationCouponFilter
(){};
@Override
public
List
<
DataSourceEntity
>
necessarySourceList
()
{
return
null
;
}
@Override
public
JavaRDD
<
Integer
>
filterValidMember
(
Integer
enterpriseId
,
FilterRequest
filterOption
)
{
return
null
;
}
}
src/main/java/com/gic/spark/tag/TagProcess.java
View file @
39dc367a
This diff is collapsed.
Click to expand it.
src/main/java/com/gic/spark/tag/TagProcessEntity.java
View file @
39dc367a
...
...
@@ -10,8 +10,8 @@ import java.util.List;
* TODO: class description
*/
public
class
TagProcessEntity
{
long
enterpriseId
;
long
tagGroupId
;
int
enterpriseId
;
int
tagGroupId
;
int
level
;
List
<
TagConditionDTO
>
tagList
;
}
src/main/java/com/gic/spark/tag/TagValueParser.java
View file @
39dc367a
...
...
@@ -13,6 +13,7 @@ public class TagValueParser {
public
static
FilterRequest
parseFilterValue
(
TagConditionDTO
conditionDTO
){
FilterRequest
request
=
null
;
return
request
;
}
}
src/main/java/com/gic/spark/util/EsRequestUtil.java
View file @
39dc367a
...
...
@@ -12,26 +12,14 @@ import java.util.Map;
*/
public
class
EsRequestUtil
{
public
static
String
getESIindexName
(
String
enterprise_id
,
boolean
isProd
)
{
String
esIndexName
=
""
;
String
url
;
if
(
isProd
)
{
url
=
"http://demogic.com/screening/getEsNameAlias.json?enterpriseId="
+
enterprise_id
+
"&code=member"
;
}
else
{
url
=
"http://gicdev.demogic.com/screening/getEsNameAlias.json?enterpriseId="
+
enterprise_id
+
"&code=member"
;
}
public
static
String
getESIindexName
(
Integer
enterpriseId
)
{
String
url
=
"https://four.gicdev.com/member-config/member/index-version/"
+
enterpriseId
;
HttpResponse
response
=
HttpClient
.
getHttpResponseByGet
(
url
);
int
responseCode
=
response
.
getStatusLine
().
getStatusCode
();
if
(
responseCode
==
200
)
{
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
HttpClient
.
getResponseString
(
response
));
// System.out.println("jsonObject "+jsonObject.toJSONString());
Map
result
=
(
Map
)
jsonObject
.
get
(
"response"
);
int
code
=
(
Integer
)
result
.
get
(
"errcode"
);
if
(
code
==
1
)
{
esIndexName
=
(
String
)
result
.
get
(
"name"
);
}
return
HttpClient
.
getResponseString
(
response
);
}
return
esIndexName
;
return
null
;
}
}
src/main/scala/org/elasticsearch/spark/ESShardPartitioner.scala
0 → 100644
View file @
39dc367a
package
org.elasticsearch.spark
import
org.apache.commons.logging.LogFactory
import
org.apache.spark.Partitioner
import
org.elasticsearch.hadoop.cfg.PropertiesSettings
import
org.elasticsearch.hadoop.rest.RestRepository
class
ESShardPartitioner
(
settings
:
String
)
extends
Partitioner
{
protected
val
log
=
LogFactory
.
getLog
(
this
.
getClass
())
protected
var
_numPartitions
=
-
1
override
def
numPartitions
:
Int
=
{
val
newSettings
=
new
PropertiesSettings
().
load
(
settings
)
val
repository
=
new
RestRepository
(
newSettings
)
val
targetShards
=
repository
.
getWriteTargetPrimaryShards
(
newSettings
.
getNodesClientOnly
())
repository
.
close
()
_numPartitions
=
targetShards
.
size
()
*
25
_numPartitions
}
override
def
getPartition
(
key
:
Any
)
:
Int
=
{
val
shardId
=
ShardAlg
.
shard
(
key
.
toString
(),
_numPartitions
)
shardId
}
}
src/main/scala/org/elasticsearch/spark/EsRddFactory.scala
0 → 100644
View file @
39dc367a
package
org.elasticsearch.spark
import
org.elasticsearch.spark.rdd.EsRDDWriter
/**
* Created by paste on 2018/9/18 19:19
*
* TODO: class description
*/
class
EsRddFactory
{
}
object
EsRddFactory
{
def
createEsRdd
(
serializedSettings
:
String
,
runtimeMetadata
:
Boolean
=
false
)
:
EsRDDWriter
[
Nothing
]
=
{
val
test
=
new
EsRDDWriter
(
serializedSettings
,
runtimeMetadata
)
// new EsRddW
test
}
def
createJsonWriter
(
serializedSettings
:
String
,
runtimeMetadata
:
Boolean
=
false
)
:
EsRDDWriter
[
String
]
=
{
val
test
=
new
EsRDDWriter
[
String
](
serializedSettings
,
runtimeMetadata
)
// new EsRddW
test
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment