Commit df56af4f by songyinghui

Merge branch 'feature-202309-数据组队列迁移' into feature-content4-3

parents 09c79440 58337875
......@@ -207,6 +207,12 @@
<artifactId>gic-operating-api</artifactId>
<version>${gic-operating-api}</version>
</dependency>
<!-- 数仓 -->
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.10</version>
</dependency>
<dependency>
<groupId>com.gic</groupId>
<artifactId>gic-content-api</artifactId>
......
package com.gic.haoban.manage.service.dao.mapper.content.holo;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
/**
* @Author MUSI
* @Date 2023/9/4 4:11 PM
* @Description
* @Version
**/
@Slf4j
@Component
public class HoloDataSource implements InitializingBean, DisposableBean {
HoloClient client;
public synchronized void init() {
try {
Config appConfig = ConfigService.getAppConfig();
String holoUrl = appConfig.getProperty("holo.url", "");
String holoUserName = appConfig.getProperty("holo.username", "");
String holoPassword = appConfig.getProperty("holo.password", "");
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(holoUrl);
holoConfig.setUsername(holoUserName);
holoConfig.setPassword(holoPassword);
this.client = new HoloClient(holoConfig);
this.client.setAsyncCommit(false);
log.info("holo链接初始化完成");
}catch (Exception ex) {
log.info("初始化client 异常 >>>", ex);
}
}
@Override
public void destroy() throws Exception {
if (client != null) {
client.close();
}
}
@Override
public void afterPropertiesSet() throws Exception {
if (client == null) {
this.init();
}
}
public HoloClient getClient() {
if (this.client == null) {
this.init();
}
return this.client;
}
}
package com.gic.haoban.manage.service.entity.content;
import com.alibaba.fastjson.annotation.JSONField;
import java.util.Date;
import java.io.Serializable;
......@@ -56,10 +58,12 @@ public class TabHaobanTriggerCustomerDetail implements Serializable {
/**
* 创建时间
*/
@JSONField(format = "yyyy-MM-dd HH:dd:ss")
private Date createTime;
/**
* 修改时间
*/
@JSONField(format = "yyyy-MM-dd HH:dd:ss")
private Date updateTime;
/**
* 0正常; 1删除
......
......@@ -86,4 +86,6 @@ public class InteractRecordMessageBO implements Serializable {
* @see com.gic.haoban.manage.api.enums.content.ShareBizType
*/
private Integer bizType;
private String appKey;
}
package com.gic.haoban.manage.service.service.content.adaptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gic.haoban.manage.service.pojo.bo.content.message.InteractRecordMessageBO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* @Author MUSI
* @Date 2023/9/4 3:14 PM
* @Description
* @Version
**/
@Slf4j
@Component
public class MessageConvertHelper {
/**
* 转换消息
* @param text
* @return
*/
public InteractRecordMessageBO convertMessageBo(String text) {
JSONObject jsonObject = JSON.parseObject(text);
if (MapUtils.isEmpty(jsonObject)) {
return null;
}
InteractRecordMessageBO interactRecordMessageBO = new InteractRecordMessageBO();
String appKey = jsonObject.getString("app_key");
interactRecordMessageBO.setAppKey(appKey);
String gp = jsonObject.getString("gp");
JSONObject gpJson = JSON.parseObject(gp);
String cusp = jsonObject.getString("cusp");
JSONObject cuspJson = JSON.parseObject(cusp);
String sysp = jsonObject.getString("sysp");
JSONObject syspJson = JSON.parseObject(sysp);
String enterpriseId = gpJson.getString("merch_id");
interactRecordMessageBO.setEnterpriseId(enterpriseId);
String memberId = gpJson.getString("member_id");
interactRecordMessageBO.setMemberId(memberId);
String unionId = gpJson.getString("union_id");
interactRecordMessageBO.setUnionId(unionId);
String clerkId = cuspJson.getString("share_id");
interactRecordMessageBO.setClerkId(clerkId);
String storeId = cuspJson.getString("share_shop_id");
interactRecordMessageBO.setStoreId(storeId);
String materialId = cuspJson.getString("material_id");
interactRecordMessageBO.setMaterialId(materialId);
String shareMaterialChannel = cuspJson.getString("share_material_channel");
interactRecordMessageBO.setChannelSource(this.convertChannelSource(shareMaterialChannel));
String eventCode = jsonObject.getString("event_code");
interactRecordMessageBO.setEventType(this.convertEventType(eventCode));
Integer duration = jsonObject.getInteger("duration");
interactRecordMessageBO.setDurationTime(duration);
String orderNumber = cuspJson.getString("order_number");
interactRecordMessageBO.setOrderNumber(orderNumber);
String goodsId = cuspJson.getString("goods_id");
interactRecordMessageBO.setGoodsId(goodsId);
String runningUuid = jsonObject.getString("runningUuid");
interactRecordMessageBO.setBusinessUUId(runningUuid);
Long lastAccessTime = jsonObject.getLong("server_timestamp");
interactRecordMessageBO.setLastAccessTime(lastAccessTime);
String refUrl = syspJson.getString("ref_url");
interactRecordMessageBO.setRefUrl(refUrl);
return interactRecordMessageBO;
}
private Integer convertChannelSource(String shareMaterialChannel) {
if (StringUtils.isBlank(shareMaterialChannel)) {
return null;
}
switch (shareMaterialChannel) {
case "null" :
case "NULL":
return null;
case "朋友圈":
return 1;
case "客户群":
return 2;
case "对话框":
return 3;
default:
return 0;
}
}
private Integer convertEventType(String eventCode) {
if (StringUtils.isBlank(eventCode)) {
return null;
}
switch (eventCode) {
case "material_page":
case "material_page_duration":
return 1;
case "store_goods_detail":
return 2;
case "store_buy_sucess":
return 3;
case "store_add_cart":
return 4;
default:
return null;
}
}
}
package com.gic.haoban.manage.service.service.content.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.model.TableSchema;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.gic.haoban.common.utils.DateUtil;
import com.gic.haoban.manage.service.dao.mapper.content.TabHaobanTriggerCustomerDetailMapper;
import com.gic.haoban.manage.service.dao.mapper.content.holo.HoloDataSource;
import com.gic.haoban.manage.service.entity.content.TabHaobanTriggerCustomerDetail;
import com.gic.haoban.manage.service.pojo.bo.content.TriggerCustomerDetailBO;
import com.gic.haoban.manage.service.pojo.qo.content.TriggerCustomerDetailQO;
import com.gic.haoban.manage.service.service.content.TriggerCustomerDetailService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
......@@ -29,6 +43,9 @@ public class TriggerCustomerDetailServiceImpl implements TriggerCustomerDetailSe
@Autowired
private TabHaobanTriggerCustomerDetailMapper triggerCustomerDetailMapper;
@Autowired
private HoloDataSource holoDataSource;
@Override
public void save(TriggerCustomerDetailBO triggerCustomerDetailBO) {
......@@ -42,17 +59,53 @@ public class TriggerCustomerDetailServiceImpl implements TriggerCustomerDetailSe
@Override
public void batchSave(List<TriggerCustomerDetailBO> triggerCustomerDetailBos) {
if (CollectionUtils.isEmpty(triggerCustomerDetailBos)) {
log.info("无客户触达日志 >>>> ");
return;
}
List<TabHaobanTriggerCustomerDetail> customerDetails = triggerCustomerDetailBos
.stream()
.map(item -> {
TabHaobanTriggerCustomerDetail temp = new TabHaobanTriggerCustomerDetail();
BeanUtils.copyProperties(item, temp);
return temp;
})
.collect(Collectors.toList());
triggerCustomerDetailMapper.insertBatch(customerDetails);
Config appConfig = ConfigService.getAppConfig();
// 当前阶段 0双写 1 只写数据表
String triggerProcess = appConfig.getProperty("trigger.write", "1");
if (StringUtils.equals(triggerProcess, "0")) {
List<TabHaobanTriggerCustomerDetail> customerDetails = triggerCustomerDetailBos
.stream()
.map(item -> {
TabHaobanTriggerCustomerDetail temp = new TabHaobanTriggerCustomerDetail();
BeanUtils.copyProperties(item, temp);
return temp;
})
.collect(Collectors.toList());
triggerCustomerDetailMapper.insertBatch(customerDetails);
}
try {
HoloClient client = holoDataSource.getClient();
if (client == null) {
log.error("数仓数据源链接异常, 请检查!!!!{}", JSON.toJSONString(triggerCustomerDetailBos));
return;
}
TableSchema tableSchema = client.getTableSchema("tab_haoban_trigger_customer_detail");
SerializeConfig serializeConfig = new SerializeConfig();
serializeConfig.setPropertyNamingStrategy(PropertyNamingStrategy.SnakeCase);
// id, enterprise_id, wx_enterprise_id, biz_id, biz_type, clerk_id, staff_id,store_id, channel_type, customer_id, create_time, update_time, delete_flag
List<Put> puts = triggerCustomerDetailBos
.stream()
.map(item -> {
Put temp = new Put(tableSchema);
JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(item, serializeConfig));
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
temp.setObject(entry.getKey(), entry.getValue());
}
// 时间戳格式不行
temp.setObject("create_time", DateUtil.dateToStr(item.getCreateTime(), "yyyy-MM-dd HH:mm:ss"));
temp.setObject("update_time", DateUtil.dateToStr(item.getUpdateTime(), "yyyy-MM-dd HH:mm:ss"));
return temp;
})
.collect(Collectors.toList());
client.put(puts);
log.info("客户触达写数据组完成 >>>>>");
}catch (Exception ex) {
log.info("数据组写入异常 {}", JSON.toJSONString(triggerCustomerDetailBos), ex);
}
}
/**
......
package com.gic.haoban.manage.service.service.content.message;
import com.alibaba.fastjson.JSON;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.gic.dubbo.entity.ProviderLocalTag;
import com.gic.haoban.manage.api.enums.content.MaterialInteractRecordEventType;
import com.gic.haoban.manage.service.pojo.bo.content.message.InteractRecordMessageBO;
import com.gic.haoban.manage.service.service.content.adaptor.MessageConvertHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -25,6 +28,8 @@ public class InteractRecordMessageListener implements MessageListener<String, St
@Autowired
InteractRecordMessageService interactRecordMessageService;
@Autowired
MessageConvertHelper messageConvertHelper;
private static final List<Integer> dealEventType = new ArrayList<>();
......@@ -40,11 +45,21 @@ public class InteractRecordMessageListener implements MessageListener<String, St
providerLocalTag.traceId = UUID.randomUUID().toString();
String value = consumerRecord.value();
log.info("【InteractRecordMessageListener】处理埋点事件 {}", value);
InteractRecordMessageBO recordMessageBO = JSON.parseObject(value, InteractRecordMessageBO.class);
InteractRecordMessageBO recordMessageBO = messageConvertHelper.convertMessageBo(value);
if (recordMessageBO == null) {
log.info("互动记录解析异常 {}", value);
return;
}
// 过滤环境
Config config = ConfigService.getAppConfig();
String envKey = config.getProperty("bury.point.env.app.key", "");
if (!StringUtils.equalsIgnoreCase(envKey, recordMessageBO.getAppKey())) {
log.info("埋点数据环境不正确 config.env: {}, appKey: {}", envKey, recordMessageBO.getAppKey());
return;
}
if (recordMessageBO.getEventType() == null || !dealEventType.contains(recordMessageBO.getEventType())) {
log.info("埋点事件类型非相关类型 {}", recordMessageBO.getEventType());
return;
......
......@@ -12,7 +12,7 @@
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.data.service}"/>
<entry key="group.id" value="haoban-3-data"/>
<entry key="group.id" value="${kafka.data.group}"/>
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer"
......
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.gic.api.base.commons.Page;
import com.gic.commons.util.UniqueIdUtils;
import com.gic.haoban.manage.service.pojo.bo.content.PotentialCustomerBO;
import com.gic.haoban.manage.service.pojo.bo.content.TriggerCustomerDetailBO;
import com.gic.haoban.manage.service.pojo.qo.content.PotentialCustomerQO;
import com.gic.haoban.manage.service.service.MemberQueryService;
import com.gic.haoban.manage.service.service.content.PotentialCustomerService;
import com.gic.haoban.manage.service.service.content.TriggerCustomerDetailService;
import com.gic.member.api.dto.MemberDTO;
import com.gic.member.api.service.MemberService;
import org.junit.Test;
......@@ -13,6 +18,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.Collections;
import java.util.Date;
/**
* @Author MUSI
* @Date 2023/3/30 10:15 PM
......@@ -29,6 +37,8 @@ public class PotentialCustomerServiceTest {
private MemberService memberService;
@Autowired
MemberQueryService memberQueryService;
@Autowired
private TriggerCustomerDetailService triggerCustomerDetailService;
String eid = "ff8080815dacd3a2015dacd3ef5c0000";
String wxEid = "ca66a01b79474c40b3e7c7f93daf1a3b";
......@@ -56,4 +66,32 @@ public class PotentialCustomerServiceTest {
JSONObject jsonObject = memberQueryService.queryDataSingle(eid, "ff8080818215878101821949a845003c", null);
System.out.println(jsonObject.toJSONString());
}
@Test
public void triggerTest(){
TriggerCustomerDetailBO triggerCustomerDetailBO = new TriggerCustomerDetailBO();
triggerCustomerDetailBO.setId(UniqueIdUtils.uniqueLong());
triggerCustomerDetailBO.setCustomerId("dadadada");
triggerCustomerDetailBO.setEnterpriseId(eid);
triggerCustomerDetailBO.setWxEnterpriseId(wxEid);
triggerCustomerDetailBO.setBizId("musidanxieru091210");
triggerCustomerDetailBO.setBizType(1);
triggerCustomerDetailBO.setClerkId(clerkId);
triggerCustomerDetailBO.setStaffId(staffId);
triggerCustomerDetailBO.setStoreId(staffId);
triggerCustomerDetailBO.setChannelType(1);
triggerCustomerDetailBO.setCustomerId(staffId);
triggerCustomerDetailBO.setCreateTime(new Date());
triggerCustomerDetailBO.setUpdateTime(new Date());
triggerCustomerDetailBO.setDeleteFlag(1);
triggerCustomerDetailBO.setId(UniqueIdUtils.uniqueLong());
triggerCustomerDetailService.batchSave(Collections.singletonList(triggerCustomerDetailBO));
triggerCustomerDetailBO.setId(UniqueIdUtils.uniqueLong());
triggerCustomerDetailBO.setBizId("musidanxieru091211");
triggerCustomerDetailService.batchSave(Collections.singletonList(triggerCustomerDetailBO));
long time = new Date().getTime();
System.out.println((int)time);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment