Commit 12e6e589 by fudahua

binlog消息

parent d1e3b1da
package com.gic.haoban.manage.service.pojo;
import com.gic.binlog.base.entity.enums.GicRecordType;
import java.io.Serializable;
/**
* Created 2020/5/27.
*
* @author hua
*/
public class BinlogBasePojo implements Serializable {
protected int recordType;
protected String prefix;
public void setRecordType(int recordType) {
this.recordType = recordType;
}
public int getRecordType() {
return recordType;
}
public String getPrefix() {
return prefix;
}
}
package com.gic.haoban.manage.service.pojo;
import com.alibaba.fastjson.annotation.JSONField;
import java.io.Serializable;
/**
* Created 2020/5/27.
*
* @author hua
*/
public class ClerkSyncPojo extends BinlogBasePojo {
@JSONField(name = "clerk_id")
private String clerkId;
@JSONField(name = "storeId")
private String storeId;
@JSONField(name = "clerk_code")
private String clerkCode;
@JSONField(name = "enterprise_id")
private String enterpriseId;
private Integer status;
public ClerkSyncPojo() {
prefix="clerk";
}
public String getClerkId() {
return clerkId;
}
public void setClerkId(String clerkId) {
this.clerkId = clerkId;
}
public String getStoreId() {
return storeId;
}
public void setStoreId(String storeId) {
this.storeId = storeId;
}
public String getClerkCode() {
return clerkCode;
}
public void setClerkCode(String clerkCode) {
this.clerkCode = clerkCode;
}
public String getEnterpriseId() {
return enterpriseId;
}
public void setEnterpriseId(String enterpriseId) {
this.enterpriseId = enterpriseId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
}
package com.gic.haoban.manage.service.task; package com.gic.haoban.manage.service.task;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.gic.binlog.base.entity.GicField;
import com.gic.binlog.base.entity.GicRecord; import com.gic.binlog.base.entity.GicRecord;
import com.gic.binlog.base.entity.enums.GicRecordType;
import com.gic.commons.util.BeanUtil;
import com.gic.commons.util.EntityUtil;
import com.gic.haoban.manage.service.pojo.BinlogBasePojo;
import com.gic.haoban.manage.service.pojo.ClerkSyncPojo;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* Created 2019/4/10. * Created 2019/4/10.
* *
...@@ -22,6 +33,7 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -22,6 +33,7 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
private final String CLERK = "tab_gic_clerk"; private final String CLERK = "tab_gic_clerk";
@Override @Override
public void onMessage(ConsumerRecord<String, GicRecord> record) { public void onMessage(ConsumerRecord<String, GicRecord> record) {
logger.info("message:{}", JSONObject.toJSONString(record.value())); logger.info("message:{}", JSONObject.toJSONString(record.value()));
...@@ -29,6 +41,33 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor ...@@ -29,6 +41,33 @@ public class KafkaMessageServiceImpl implements MessageListener<String, GicRecor
if(StringUtils.isBlank(message)){ if(StringUtils.isBlank(message)){
return; return;
} }
BinlogBasePojo pojo = binlogChange(record.value());
logger.info("pojo:{}", JSONObject.toJSONString(pojo));
}
private BinlogBasePojo binlogChange(GicRecord record) {
ClerkSyncPojo pojo=new ClerkSyncPojo();
pojo.setRecordType(record.getRecordType().value());
if(record.getDbName().equals(CLERK)) {
return binlogMap(record.getFieldList(),record.getRecordType(),ClerkSyncPojo.class);
}else if (record.getDbName().equals(STORE)) {
return binlogMap(record.getFieldList(),record.getRecordType(),ClerkSyncPojo.class);
}else{
return null;
}
}
private BinlogBasePojo binlogMap(List<GicField> list,GicRecordType recordType,Class<? extends BinlogBasePojo> aclass) {
Map<String,String> mid=new HashMap<>();
for (GicField gicField : list) {
mid.put(gicField.getName(),gicField.getValue());
}
BinlogBasePojo pojo = EntityUtil.changeEntityByJSON(aclass, mid);
pojo.setRecordType(recordType.value());
return pojo;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment