当前位置: 代码迷 >> 综合 >> 基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现
  详细解决方案

基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现

热度:31   发布时间:2024-01-03 09:51:57.0

 功能介绍:实现将上一篇中推送到kafka上的mysql-binlog日志信息,flink流处理入库到mysql

一、kafka消费者代码简介

(1)调用者KafkaConsumerInvoke代码这里就不讲了,和上一篇类似的

(2)消费者KafkaConsumerService代码示例

@Component
@Scope("prototype")
public class KafkaConsumerService {private static final Logger PRINT = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String V_0_10 = "0.10"; // sdk版本,0.10及以上版的kafkaprivate static final String MAIN_CLASS_NAME = "com.ztesoft.zstream.Boot"; // 主类名称private static final String ZK_CONFIG = "config.properties"; // zk 配置文件private static final String KTS_LOG_PATH = "ktslog"; // 实时日志路径private static final String CLUSTER_CDH5_8_4 = "cdh5.8.4"; // 默认的hbase集群版本private static final String RUM_MODE_SPARK = "spark"; // 运行模式private KtsElement ktsElement;@Autowiredprivate DataIntegWorkitemMapper dataIntegWorkitemMapper;public void init(KtsElement ktsElement) {this.ktsElement = ktsElement;}/*** 加载jar包(具体kafka入库mysql是放在单独的一个服务处理的,那个服务打包成jar部署在服务器上)*/public void uploadJar() {String kafkaVersion = ktsElement.getKafkaVersion(); // kafka版本if (StringUtils.isEmpty(kafkaVersion)) {kafkaVersion = V_0_10; // 默认使用0.10以上的版本}if (DataSource.KAFKA.equalsIgnoreCase(ktsElement.getOriginDatasourceType())) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Use sdk kafka version is " + kafkaVersion, PRINT);}// 识别集群版本,默认**匹配第一个String clusterVersion = "**";if (ktsElement.getTargetDataSource() != null&& DataSource.HBASE.equals(ktsElement.getTargetDataSource().getDatasourceType())) {Map<String, String> clusterInfo = ClusterInfoUtil.getClusterInfo(ktsElement.getTargetDataSource());clusterVersion = clusterInfo.get("clusterVersion");}if (StringUtils.isEmpty(clusterVersion) || "**".equals(clusterVersion)) {clusterVersion = CLUSTER_CDH5_8_4;}// 上传sdk jar包 防止多个分片同时去上传jar包String taskInstId = ktsElement.getTaskInstId();DataIntegWorkitemExample example = new DataIntegWorkitemExample();example.createCriteria().andTaskInstIdEqualTo(Integer.parseInt(taskInstId));List<DataIntegWorkitem> dataIntegWorkitems = dataIntegWorkitemMapper.selectByExample(example);Boolean isSplitWorkitem = dataIntegWorkitems.size() > 1 ? true : false;SSHHelper.uploadResource(ktsElement.getSshDataSource(), "kafka/" + clusterVersion + "/" + kafkaVersion,PlugInvokeConstant.KTS_JAR, ktsElement.getPath(), isSplitWorkitem, ktsElement.getWorkitemId(), PRINT);}/*** 预创建任务路径*/public void preOperation() {SAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT, ktsElement.getTaskPath());// 创建实时日志文件路径:接口机路径 + ktslog + 任务IDSAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT,FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()) + KTS_LOG_PATH + "/"+ ktsElement.getTaskId());SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());}/*** 加载映射关系jaon,这个json是映射关系,后期功能演示时会说明*/public void uploadJson() {// 先删除后上传try {SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_JSON);}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator+ PlugInvokeConstant.KTS_JSON + "error :" + e.getMessage(), PRINT);}try {boolean flag = SSHHelper.transferFile(ktsElement.getDtoJson().getBytes("UTF-8"),PlugInvokeConstant.KTS_JSON, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);if (flag) {SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());}else {throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_JSON);}}catch (Exception e) {throw new SyDevOpsRuntimeException(e.getMessage(), e);}}/*** 加载shell脚本信息,上边将了具体入库功能是放在单独一个服务处理,打包成了jar,在这里通过shell执行jar处理*/public void uploadShell() {// config.properties如果不存在就需要拷贝进去,如果存在就不要覆盖StringBuffer configFilePath = new StringBuffer();configFilePath.append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);boolean configExist;try {SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, configFilePath.toString());configExist = true;}catch (Exception e) {configExist = false;}if (!configExist) {try {StringBuffer cpConfigCmd = new StringBuffer();cpConfigCmd.append("\\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ").append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cpConfigCmd.toString());}catch (Exception e) {throw new SyDevOpsRuntimeException("Failed to cp " + ZK_CONFIG, e);}}// 根据运行模式来处理String runMode = ktsElement.getRunMode();if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {// 集群模式需要提前将脚本放到sdk目录下StringBuffer shellPathBuf = new StringBuffer();shellPathBuf.append(ktsElement.getPath()).append(File.separator).append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);try {SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, shellPathBuf.toString());}catch (Exception e) {throw new SyDevOpsRuntimeException("Failed to find " + shellPathBuf.toString(), e);}}else {/*** <pre>* #!/bin/sh* \cp -f /home/dhpuser/sdkdir/config.properties /home/dhpuser/sdkdir/38327/config.properties* java -cp /home/dhpuser/sdkdir/KTSSubscriber.jar com.ztesoft.zstream.Boot /home/dhpuser/sdkdir/38327/ 2>&1 &* echo $! > /home/dhpuser/sdkdir/38327/KTSSubscriber.pid* </pre>*/StringBuffer shell = new StringBuffer();shell.append("#!/bin/sh \n");// // 需要将zk配置文件copy到启动目录下// shell.append("\\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ")// .append(ktsElement.getTaskPath()).append(ZK_CONFIG).append("\n");if (ktsElement.isSplit()) {shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ").append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ").append(ktsElement.getTaskPath()).append(" ").append(ktsElement.getTaskInstId());}else {shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ").append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ").append(ktsElement.getTaskPath());}shell.append(" 2>&1 & \n");shell.append("echo $! > ").append(ktsElement.getKtsPidPath()).append("\n");// 先删除后上传try {SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_START_SHELL);}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator+ PlugInvokeConstant.KTS_START_SHELL + "error :" + e.getMessage(), PRINT);}try {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Upload KTSstart.sh: " + shell.toString(),PRINT);boolean flag = SSHHelper.transferFile(shell.toString().getBytes("UTF-8"),PlugInvokeConstant.KTS_START_SHELL, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);if (flag) {SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());}else {throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_START_SHELL);}}catch (Exception e) {throw new SyDevOpsRuntimeException(e.getMessage(), e);}}}/*** 远程调用jar包*/public void executeStartShell() {// 根据运行模式来处理String runMode = ktsElement.getRunMode();if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {/*** <pre>* 参数信息:* confPath=$1 工作目录* logPath=$2  日志目录* pidPath=$3  进程号文件* sdkArgument=$4 jvm参数、环境变量等(启动参数)** cd /opt/testSDK/;nohup ./KTSSparksubmit.sh /opt/testSDK/4642 /tmp/ditagent/ktslog/4642/KTSSubscriber.log /opt/testSDK/4642/KTSSubscriber.pid  > /opt/testSDK/4642/KTSSparksubmit.log 2>&1 &* </pre>*/// 实时任务运行日志String param1 = ktsElement.getTaskPath();StringBuffer logPathBuf = new StringBuffer();logPathBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath())).append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator).append(PlugInvokeConstant.KTS_LOG);String param2 = logPathBuf.toString();String param3 = ktsElement.getKtsPidPath();String param4 = ktsElement.getSdkArgument();// spark-submit的日志StringBuffer sparkSubmitLogBuf = new StringBuffer();sparkSubmitLogBuf.append(ktsElement.getTaskPath()).append(File.separator).append(PlugInvokeConstant.KTS_SPARK_SUBMIT_LOG);String path = ktsElement.getPath();StringBuffer cmdBuf = new StringBuffer();cmdBuf.append("cd ").append(path).append(";");cmdBuf.append("nohup ./").append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);cmdBuf.append(" ").append(param1).append(" ").append(param2).append(" ").append(param3).append(" ").append(param4);cmdBuf.append(" > ").append(sparkSubmitLogBuf).append(" 2>&1 &");try {SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);}catch (Exception e) {throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL, e);}}else {/*** cd /home/dhpuser/sdkdir/38327/;nohup ./KTSstart.sh > /dev/null 2>&1 &*/String path = FileUtils.transformationPath(ktsElement.getTaskPath());StringBuffer cmdBuf = new StringBuffer();cmdBuf.append("cd ").append(path).append(";nohup ./").append(PlugInvokeConstant.KTS_START_SHELL).append(" > /dev/null 2>&1 &");try {SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);}catch (Exception e) {throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_START_SHELL, e);}finally {SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT, path + PlugInvokeConstant.KTS_START_SHELL);}}}public ComponentResult monitor() {// PID=$(cat /KTSSubscriber.pid) ps -ef|grep -v grep|grep 'xx/xxx/xxxx'StringBuffer cmd = new StringBuffer();cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") \n");cmd.append("ps -ef|grep -v grep|grep $PID");String process = null;try {process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());}catch (Exception e) {PRINT.error("Monitor error 1:" + e.getMessage(), e);}// 如果pid不存在,不一定是进程被杀了,可能是网络原因,也可能是grep的问题if (StringUtils.isEmpty(process)) {// 网络原因,先休息一会try {Thread.sleep(1000);}catch (InterruptedException e) {PRINT.error("Thread.sleep InterruptedException:" + e.getMessage(), e);}try {process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());}catch (Exception e) {PRINT.error("Monitor error 2:" + e.getMessage(), e);}if (StringUtils.isEmpty(process)) {// 还不行就可能是grep的问题,换个命令试一试// ps -ef|grep -v grep|grep 'xx/xxx/xxxx'cmd = new StringBuffer();cmd.append("ps -ef|grep -v grep|grep '").append(ktsElement.getTaskPath()).append("'");try {process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());}catch (Exception e) {PRINT.error("Monitor error 3:" + e.getMessage(), e);}}}if (StringUtils.isEmpty(process)) {throw new SyDevOpsRuntimeException("The process of KTSSubscriber is stopped! Task Id is " + ktsElement.getTaskId());}else {StringBuffer logFileBuf = new StringBuffer();logFileBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath())).append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator).append(PlugInvokeConstant.KTS_LOG);String result = null;String tailCmd = "tail -50 " + logFileBuf.toString();// 日志压缩的时候会cat不到日志,这里必须异常捕获一下try {result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, tailCmd);}catch (Exception e) {result = "";}ComponentResult readerResult = new ReaderResult();readerResult.setInvokeResult(ComponentResult.RUNNING);readerResult.setInvokeMsg(tailCmd + "\n" + result);return readerResult;}}public void stop(boolean ignore) {boolean isExists = true;try {SAScriptHelper.find(ktsElement.getSshDataSource(), null, ktsElement.getKtsPidPath());PRINT.info(ktsElement.getKtsPidPath());}catch (Exception e) {isExists = false;}/*** <pre>*   PID=$(cat /KTSSubscriber.pid)*   kill -9 $PID*   pkill -f xx/xxx/xxxx* </pre>*/StringBuffer cmd = new StringBuffer();if (isExists) {cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") \n");cmd.append("kill -9 $PID; \n");}cmd.append("pkill -f ").append(ktsElement.getTaskPath());try {String result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());PRINT.info(result);}catch (Exception e) {if (ObjectUtils.getStackTraceAsString(e).indexOf("Operation not permitted") != -1 || !ignore) {throw new SyDevOpsRuntimeException(e.getMessage(), e);}}}public ComponentResult getRunningResult() {ComponentResult readerResult = new ReaderResult();readerResult.setInvokeResult(ComponentResult.RUNNING);readerResult.setInvokeMsg("success");return readerResult;}public ComponentResult getStopResult() {ComponentResult readerResult = new ReaderResult();readerResult.setInvokeResult(ComponentResult.SUCCESS);readerResult.setInvokeMsg("success");return readerResult;}/*** 是否因为版本原因需要重新上传sdk包,默认不替换* * @param sdkPath* @return*/public boolean isNeedReplaceSDKForVersion(String sdkPath) {// 需要兼容以前的sdk包,考虑没有版本信息的情况try {String resourcesVersionCode = getVersionCodeForResources(sdkPath);if (StringUtils.isEmpty(resourcesVersionCode)) {// resources下的jar都没有版本信息,没必要继续执行CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources sdk not have versionCode.", PRINT);return false;}String serverVersionCode = getVersionCodeForServer();CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources versionCode = " + resourcesVersionCode,PRINT);CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Server versionCode = " + serverVersionCode, PRINT);// 如果resources目录下的sdk包的版本比服务器端的大,那么返回true,代表需要替换服务器端的sdk包if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isNotEmpty(serverVersionCode)) {if (Long.parseLong(resourcesVersionCode) > Long.parseLong(serverVersionCode)) {return true;}}else {// 如果resources目录下的sdk包有版本信息,而服务器端的没有,也需要替换。这表示服务器端使用的sdk包是旧的if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isEmpty(serverVersionCode)) {// 我重新评估了一下,这风险有点高……// return true;}}}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not compare the sdk version", PRINT);}return false;}/*** 读取Resources目录的sdk版本号* * @param sdkPath* @return*/private String getVersionCodeForResources(String sdkPath) {try {ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); // 获取容器资源解析器Resource[] resources = resolver.getResources(sdkPath);for (Resource resource : resources) {if (PlugInvokeConstant.KTS_JAR.equalsIgnoreCase(resource.getFilename())) {File sdkFile = resource.getFile();JarFile jarFile = null;try {jarFile = new JarFile(sdkFile);Attributes attributes = jarFile.getManifest().getMainAttributes();return attributes.getValue("versionCode"); // 版本号(yyyyMMddHHmmss)}catch (Exception e) {throw new SyDevOpsRuntimeException(e.getMessage(), e);}finally {if (jarFile != null) {jarFile.close();}}}}}catch (RuntimeException e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",PRINT);}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",PRINT);}return null;}/*** 读取服务器端的sdk的版本信息* * @return*/private String getVersionCodeForServer() {try {String versionFilePath = ktsElement.getPath() + File.separator + PlugInvokeConstant.KTS_VERSION;String version = SAScriptHelper.cat(ktsElement.getSshDataSource(), PRINT, versionFilePath);if (StringUtils.isNotEmpty(version)) {version = version.replaceAll("\n", "");}return version;}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the server sdk version info", PRINT);}return null;}/*** 写入sdk的版本信息,与sdk同级目录* * @param versionInfo*/public void writeSDKVersionFile(String versionInfo) {if (StringUtils.isNotEmpty(versionInfo)) {try {StringBuffer echoVersionCmd = new StringBuffer();echoVersionCmd.append("echo ").append("\"").append(versionInfo).append("\"").append(" > ").append(ktsElement.getPath()).append(File.separator).append(PlugInvokeConstant.KTS_VERSION);CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,"Create sdk version file: " + echoVersionCmd.toString(), PRINT);SAScriptHelper.shell(ktsElement.getSshDataSource(), null, echoVersionCmd.toString());}catch (Exception e) {CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,"Write to " + PlugInvokeConstant.KTS_VERSION + " error", PRINT);}}}
}

二、Flink对接对接实时计算平台(打成了jar)处理

Flink就不介绍了,开发是有分工的,大数据处理交给大数据工程师弄

  相关解决方案