Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gic-cloud
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data-hook
gic-cloud
Commits
8a3488fb
Commit
8a3488fb
authored
Mar 15, 2022
by
陶光胜
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
取数平台调整
parent
c6fb459a
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
126 additions
and
17 deletions
+126
-17
HiveHelper.java
...main/java/com/gic/cloud/data/hook/service/HiveHelper.java
+22
-16
FlatQueryResultServiceImpl.java
...ud/data/hook/service/impl/FlatQueryResultServiceImpl.java
+66
-1
data-hook-flat-query-source.xml
...ervice/src/main/resources/data-hook-flat-query-source.xml
+38
-0
No files found.
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/HiveHelper.java
View file @
8a3488fb
...
...
@@ -34,6 +34,8 @@ public class HiveHelper implements ApplicationContextAware {
private
static
DataSource
balaSearchSource
=
null
;
private
static
DataSource
bigDataDownloadSource
=
null
;
@Override
public
void
setApplicationContext
(
ApplicationContext
applicationContext
)
throws
BeansException
{
log
.
debug
(
"setApplicationContext"
,
"准备初始化 Hive 连接池"
);
...
...
@@ -41,6 +43,7 @@ public class HiveHelper implements ApplicationContextAware {
downloadSource
=
(
DataSource
)
applicationContext
.
getBean
(
"downloadHiveSource"
);
balaDownloadSource
=
(
DataSource
)
applicationContext
.
getBean
(
"balaDownloadHiveSource"
);
balaSearchSource
=
(
DataSource
)
applicationContext
.
getBean
(
"balaSearchHiveSource"
);
bigDataDownloadSource
=
(
DataSource
)
applicationContext
.
getBean
(
"bigTaskdownloadHiveSource"
);
}
/** 获取 Hive 数据源
...
...
@@ -56,10 +59,6 @@ public class HiveHelper implements ApplicationContextAware {
public
static
Connection
getHiveConnection
()
{
Connection
conn
=
null
;
try
{
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn
=
source
.
getConnection
();
return
conn
;
}
catch
(
Exception
ex
)
{
...
...
@@ -78,10 +77,6 @@ public class HiveHelper implements ApplicationContextAware {
public
static
Connection
getDownloadHiveConnection
()
{
Connection
conn
=
null
;
try
{
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn
=
downloadSource
.
getConnection
();
return
conn
;
}
catch
(
Exception
ex
)
{
...
...
@@ -102,10 +97,6 @@ public class HiveHelper implements ApplicationContextAware {
public
static
Connection
getBalaDownloadHiveConnection
()
{
Connection
conn
=
null
;
try
{
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn
=
balaDownloadSource
.
getConnection
();
return
conn
;
}
catch
(
Exception
ex
)
{
...
...
@@ -125,10 +116,6 @@ public class HiveHelper implements ApplicationContextAware {
public
static
Connection
getBalaSearchHiveConnection
()
{
Connection
conn
=
null
;
try
{
// String url = "jdbc:hive2://115.159.205.44:10015/data_test";
// //String url = "jdbc:hive2://10.0.0.3:10015/data_test";
// Class.forName("org.apache.hive.jdbc.HiveDriver");
// conn = DriverManager.getConnection(url, "hadoop", "");
conn
=
balaSearchSource
.
getConnection
();
return
conn
;
}
catch
(
Exception
ex
)
{
...
...
@@ -144,6 +131,25 @@ public class HiveHelper implements ApplicationContextAware {
}
// TRY CATCH OVER
}
public
static
Connection
getBigDataDownloadHiveConnection
()
{
Connection
conn
=
null
;
try
{
conn
=
bigDataDownloadSource
.
getConnection
();
return
conn
;
}
catch
(
Exception
ex
)
{
DingtalkMessageUtil
.
sendAlertMessage
(
"获取连接告警:获取连接超时"
,
"https://oapi.dingtalk.com/robot/send?access_token=157ad00c2b6491f2f0aac1d89121e4bd2d82d9d33cad0596b88dacfdc12fe455"
);
ex
.
printStackTrace
();
if
(
conn
!=
null
)
{
try
{
conn
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
throw
new
RuntimeException
(
"获取连接异常"
);
}
// TRY CATCH OVER
}
public
static
Connection
getSearchHiveConnection
(
List
<
String
>
enterpriseIds
){
Config
config
=
ConfigService
.
getConfig
(
"application"
);
String
customRouterEnterpriseId
=
config
.
getProperty
(
"customRouterEnterpriseId"
,
""
);
...
...
gic-cloud-data-hook-service/src/main/java/com/gic/cloud/data/hook/service/impl/FlatQueryResultServiceImpl.java
View file @
8a3488fb
...
...
@@ -44,6 +44,7 @@ import java.text.ParseException;
import
java.text.SimpleDateFormat
;
import
java.util.*
;
import
java.util.Date
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ScheduledThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -65,6 +66,10 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
public
static
final
List
<
String
>
CARD
=
Arrays
.
asList
(
"card_num"
,
"receive_card_num"
);
private
static
final
Integer
maxFields
=
20
;
private
static
final
Map
<
String
,
String
>
bigTaskRunningMap
=
new
ConcurrentHashMap
<>();
@Autowired
FlatQueryTableDao
flatQueryTableDao
;
@Autowired
...
...
@@ -74,11 +79,13 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
/** 自助指标查询关联的下载条件列表 */
protected
List
<
FlatQueryTaskCondition
>
taskConditions
=
Lists
.
newArrayList
();
protected
List
<
FlatQueryTaskCondition
>
bigTaskConditions
=
Lists
.
newArrayList
();
private
FlatQueryResultServiceImpl
()
{
log
.
debug
(
"construct"
,
"准备初始化 FlatQuery 查询服务"
);
runDownloadTask
(
3
);
runBalaDownloadTask
(
3
);
runBigDataDownloadTask
(
3
);
runApplyTask
(
5
);
// 每5秒钟进行任务状态检测
}
...
...
@@ -561,7 +568,18 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
log
.
debug
(
"自助指标当前正在执行的任务为:"
,
JSON
.
toJSONString
(
taskConditions
.
get
(
i
)));
if
(
taskConditions
.
get
(
i
).
getBuildPermitted
().
equals
(
Global
.
YES
)
&&
!
taskConditions
.
get
(
i
).
getEnterpriseIds
().
contains
(
"ff8080816dd0385e016ddca436d01fe1"
))
{
try
{
if
(
taskConditions
.
get
(
i
).
getAllFields
().
size
()
>=
maxFields
)
{
bigTaskConditions
.
add
(
taskConditions
.
get
(
i
));
taskConditions
.
remove
(
i
);
continue
;
}
else
{
if
(
bigTaskRunningMap
.
isEmpty
()){
bigTaskConditions
.
add
(
taskConditions
.
get
(
i
));
taskConditions
.
remove
(
i
);
continue
;
}
}
try
{
connection
=
HiveHelper
.
getDownloadHiveConnection
();
condition
=
taskConditions
.
remove
(
i
);
// 移除并获取第一个任务条件
break
;
...
...
@@ -590,7 +608,54 @@ public class FlatQueryResultServiceImpl implements IFlatQueryResultService {
},
interval
*
1000
,
interval
*
1000
,
TimeUnit
.
MILLISECONDS
);
}
/** 下载任务执行计时器 */
//private Timer downloadTaskTimer = new Timer();
ScheduledExecutorService
bigDataDownloadService
=
new
ScheduledThreadPoolExecutor
(
1
,
new
BasicThreadFactory
.
Builder
().
namingPattern
(
"bigDataDownloadTimer-%d"
).
daemon
(
true
).
build
());
/** 大任务下载 如果空闲时正常任务也可进入 */
private
void
runBigDataDownloadTask
(
Integer
interval
)
{
bigDataDownloadService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
Connection
connection
=
null
;
try
{
if
(
bigTaskConditions
!=
null
&&
bigTaskConditions
.
size
()
>
0
)
{
FlatQueryTaskCondition
condition
=
null
;
//FlatQueryTaskCondition condition = taskConditions.remove(0); // 移除并获取第一个任务条件
for
(
int
i
=
0
;
i
<
bigTaskConditions
.
size
();
i
++
)
{
log
.
debug
(
"自助指标当前正在执行的任务为:"
,
JSON
.
toJSONString
(
bigTaskConditions
.
get
(
i
)));
if
(
bigTaskConditions
.
get
(
i
).
getBuildPermitted
().
equals
(
Global
.
YES
))
{
try
{
connection
=
HiveHelper
.
getBigDataDownloadHiveConnection
();
condition
=
bigTaskConditions
.
remove
(
i
);
// 移除并获取第一个任务条件
break
;
}
catch
(
Exception
e
){
log
.
debug
(
"获取连接异常:"
,
e
.
getMessage
());
e
.
printStackTrace
();
continue
;
}
}
// IF OVER
}
// FOR OVER
bigTaskRunningMap
.
put
(
condition
.
getTaskId
(),
condition
.
getTaskId
());
takeFile
(
condition
,
connection
);
bigTaskRunningMap
.
remove
(
condition
.
getTaskId
());
}
// 没有任务则忽略
}
catch
(
Exception
e
){
log
.
debug
(
"自助指标下载异常"
,
e
.
getMessage
());
e
.
printStackTrace
();
}
finally
{
if
(
connection
!=
null
){
try
{
connection
.
close
();
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
}
}
},
interval
*
1000
,
interval
*
1000
,
TimeUnit
.
MILLISECONDS
);
}
/** 下载任务执行计时器 */
//private Timer balaDownloadTaskTimer = new Timer();
...
...
gic-cloud-data-hook-service/src/main/resources/data-hook-flat-query-source.xml
View file @
8a3488fb
...
...
@@ -160,4 +160,41 @@
<property
name=
"removeAbandonedTimeout"
value=
"180"
/>
</bean>
<bean
class=
"org.apache.commons.dbcp.BasicDataSource"
id=
"bigTaskdownloadHiveSource"
destroy-method=
"close"
>
<property
name=
"driverClassName"
value=
"org.apache.hive.jdbc.HiveDriver"
/>
<!--<property name="url" value="jdbc:hive2://115.159.205.44:10015/data_test" />-->
<property
name=
"url"
value=
"${hive.bigTaskdownload.url}"
/>
<property
name=
"username"
value=
"${hive.username}"
/>
<property
name=
"password"
value=
""
/>
<property
name=
"maxActive"
>
<value>
20
</value>
</property>
<property
name=
"maxIdle"
>
<value>
5
</value>
</property>
<property
name=
"minIdle"
>
<value>
1
</value>
</property>
<property
name=
"testWhileIdle"
>
<value>
true
</value>
</property>
<property
name=
"maxWait"
>
<value>
120000
</value>
</property>
<property
name=
"validationQuery"
value=
"SELECT 1"
/>
<!-- 借出连接时不要测试,否则很影响性能 -->
<property
name=
"testOnBorrow"
value=
"false"
/>
<!-- 每30秒运行一次空闲连接回收器 -->
<property
name=
"timeBetweenEvictionRunsMillis"
value=
"30000"
/>
<!-- 池中的连接空闲30分钟后被回收 -->
<property
name=
"minEvictableIdleTimeMillis"
value=
"1800000"
/>
<!-- 在每次空闲连接回收器线程(如果有)运行时检查的连接数量 -->
<property
name=
"numTestsPerEvictionRun"
value=
"10"
/>
<!-- 连接泄漏回收参数,当可用连接数少于3个时才执行 -->
<property
name=
"removeAbandoned"
value=
"true"
/>
<!-- 连接泄漏回收参数,180秒,泄露的连接可以被删除的超时值 -->
<property
name=
"removeAbandonedTimeout"
value=
"180"
/>
</bean>
</beans>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment