【Spring Boot + Datahub】阿里云流数据处理平台 基于2.15版本的数据读写【上】

    技术2022-07-12  79

    Datahub 2.15版本的数据读写

    产品概述产品优势使用场景 数据总线服务器datahub配置类的封装yml 配置 (保密原因自我补全)config配置类核心线程池+主线程+工作线程消费类生产者工具类

    产品概述

    DataHub基本介绍 阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。

    DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。

    DataHub服务也提供分发流式数据到各种云产品的功能,目前支持分发到MaxCompute(原ODPS),OSS等。

    系统整体功能图

    产品优势

    高吞吐 最高支持单shard每日8000万Record级别的写入量。

    实时性 通过 DataHub ,您可以实时的收集各种方式生成的数据并进行实时的处理,对您的业务产生快速的响应。

    易用性 DataHub 提供丰富的SDK包,包括C++, JAVA, Pyhon, Ruby, Go等语言。 DataHub服务也提供Restful API规范,您可以用自己的方式实现访问接口。 除了SDK以外,DataHub 还提供一些常用的客户端插件,包括:Fluentd,LogStash,Flume等。您可以使用这些客户端工具往 DataHub 里面写入流式数据。 DataHub 同时支持强Schema的结构化数据(创建Tuple类型的Topic)和无类型的非结构化数据(创建Blob类型的Topic),您可以自由选择。 高可用 服务可用性不低于99.9%。 规模自动扩展,不影响对外服务;数据持久性不低于99.999%。 数据自动多重冗余备份。 动态伸缩 每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。

    高安全性 提供企业级多层次安全防护,多用户资源隔离机制; 提供多种鉴权和授权机制及白名单、主子账号功能。

    使用场景

    DataHub作为一个流式数据处理服务,结合阿里云众多云产品,可以构建一站式的数据处理服务。

    流计算StreamCompute StreamCompute是阿里云提供的流计算引擎,提供使用类SQL的语言来进行流式计算。DataHub 和StreamCompute无缝结合,可以作为StreamCompute的数据源和输出源,具体可参考实时计算文档

    流处理应用 用户可以编写应用订阅DataHub中的数据,并进行实时的加工,把加工后的结果输出。用户可以把应用计算产生的结果输出到DataHub中,并使用另外一个应用来处理上一个应用生成的流式数据,来构建数据处理流程的DAG。

    流式数据归档 用户的流式数据可以归档到 MaxCompute(原ODPS)中。用户通过创建DataHub Connector,指定相关配置,即可创建将Datahub中流式数据定期归档的同步任务。

    以上来自于阿里云平台 Datahub在线文档

    好了,先进入正题,Datahub 2.15版本是支持协同消费的,单是要保证阿里云数据总线服务器是最新版本的,否则是不支持协同消费类的,但是低版本数据总线服务器是支持高版本sdk向下兼容的

    数据总线服务器

    一开始我们新建一个项目 项目新建完成以后,再新建这个项目下的topic

    datahub配置类的封装

    pom依赖

    <dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.15.0-public</version> </dependency> <dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>datahub-client-library</artifactId> <version>1.1.8-public</version> </dependency>

    yml 配置 (保密原因自我补全)

    # datahub配置文件 public: datahub: # 配置信息 config: #控制器 startup: false #应用服务器地址 endpoint: XXXXXXXXXXXXXXXXXXXXXXXXXXXX #用户权限accessId accessId: XXXXXXXXXXXXXXXXX #用户权限accessKey accessKey: XXXXXXXXXXXXXXXx serivce: #应用名称 - projectName: XXXXXXXX #应用消费——topic名称 topicGet: XXXXXXXXXXXXXXXXXX #应用生产——topic名称 topicSet: XXXXXXXXXXXXXXXXXX #topicid的订阅ID subId: XXXXXXXXXXXXXXXXXX

    config配置类

    package com.encdata.oss.datahubClient; import com.encdata.oss.system.domain.dto.DataHubConfigDTO; import com.encdata.oss.system.handler.CustomerThreadPool; import com.encdata.oss.system.handler.task.DatahubConsumerTask; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; import static com.encdata.oss.system.controller.BaseController.executorService; /** * Created by IntelliJ IDEA. * datahub 配置文件类 * @author liyiq * @date 2020/06/04 */ @Data @Component @ConfigurationProperties(prefix = "public.datahub.config") public class DatahubConfig { /** * datahub 控制器 */ private Boolean startup; /** * datahub应用服务器地址 */ private String endpoint; public static String endpoints; /** * datahub 用户权限accessId */ private String accessId; public static String accessIds; /** * datahub 用户权限accessKey */ private String accessKey; public static String accessKeys; /** * 应用配置信息集合 */ private List<Map> serivce; public static List<Map> serivces = new ArrayList<>(); /** * datahub 应用名称 */ private String projectName; /** * datahub 应用消费——topic名称 */ private String topicGet; /** * datahub topicid的订阅ID */ private String subId; /** * 初始化datahub协同消费类 * 支持订阅多个topic */ @PostConstruct public void datahubRecycling(){ //todo: 静态变量赋值 endpoints = this.endpoint; accessIds = this.accessId; accessKeys = this.accessKey; serivces = this.serivce; //todo:开启关闭datahub消费控制器 if(startup){ if(executorService == null){ executorService = CustomerThreadPool.createDefaultThreadPool(); } //todo: 多应用分割 for(int s=0;s<serivce.size();s++){ Map map = serivce.get(s); projectName = map.get("projectName").toString(); topicGet = map.get("topicGet").toString(); subId = map.get("subId").toString(); String[] topics = topicGet.split(","); //todo: 多topic循环监听 for(int i=0;i<topics.length;i++){ //todo:封装datahub配置DTO DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO(); dataHubConfigDTO .setEndpoint(endpoint) .setAccessId(accessId) .setAccessKey(accessKey) .setProjectName(projectName) .setTopicName(topics[i]) .setSubId(subId); //todo:初始化监听线程 executorService.execute(new DatahubConsumerTask(dataHubConfigDTO)); } } } } }

    核心线程池+主线程+工作线程

    package com.encdata.oss.system.handler; import java.util.concurrent.*; /** * 自定义线程池 * * Created by IntelliJ IDEA. * * @author yangyi * @date 2020/06/04 */ public class CustomerThreadPool { /** 线程池核心线程数,即线程池中常驻的线程数量 **/ private static final int DEFAULT_CORE_POLL_SIZE = 8; /** 线程池允许的最大线程数,非核心线程在超时之后会被清除,受限于 CAPACITY,需要根据实际的物理机配置去计算 **/ private static final int DEFAULT_MAXIMUM_POOL_SIZE = 1024; /** 线程没有任务执行时可以保持的时间【非核心线程】 **/ private static final long DEFAULT_KEEP_ALIVE_TIME = 0; /** keepAliveTime 的时间单位 **/ private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MICROSECONDS; /** 阻塞队列的最大容量 **/ private static final Integer MAX_WORK_QUEUE_CAPACITY = 1024*10; /** 任务阻塞队列,用于存储等待执行的任务,默认采用有界队列 **/ private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<Runnable>(MAX_WORK_QUEUE_CAPACITY); /** 线程工厂,用来创建线程,可自定义对线程的控制**/ private static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory(); /** * rejectHandler:当任务队列已满时,拒绝任务提交时的策略: AbortPolicy【默认】:丢掉任务,并抛RejectedExecutionException异常。 DiscardPolicy:直接丢掉任务,不抛异常。 DiscardOldestPolicy:丢掉最老的任务,然后调用execute立刻执行该任务(新进来的任务)。 CallerRunsPolicy【推荐】:在调用者的当前线程去执行这个任务。 */ private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy(); /** * 创建线程池 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间 * @param unit keepAliveTime 的时间单位 * @param workQueue 任务阻塞队列,用于存储等待执行的任务 * @param threadFactory 线程工厂 * @param handler 当任务队列已满时,拒绝任务提交时的策略 * @return ThreadPoolExecutor */ public static ThreadPoolExecutor createThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /*** * 创建线程池 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间 * @param unit keepAliveTime 的时间单位 * @param workQueue 任务阻塞队列,用于存储等待执行的任务 * @param threadFactory 线程工厂 * @return ThreadPoolExecutor */ public static ThreadPoolExecutor createThreadPool (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER); } /*** * 创建线程池 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间 * @param unit keepAliveTime 的时间单位 * @param workQueue 任务阻塞队列,用于存储等待执行的任务 * @return ThreadPoolExecutor */ public static ThreadPoolExecutor createThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, DEFAULT_THREAD_FACTORY); } /*** * 创建线程池 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间 * @param unit keepAliveTime 的时间单位 * @return ThreadPoolExecutor */ public static ThreadPoolExecutor createThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, DEFAULT_WORK_QUEUE); } /** * 创建默认的线程池,配置如下: * corePoolSize = 8; * maximumPoolSize = 1024; * keepAliveTime = 0; * unit = TimeUnit.MICROSECONDS * workQueue = new ArrayBlockingQueue<Runnable>(10240); * threadFactory = Executors.defaultThreadFactory(); * handler = new ThreadPoolExecutor.CallerRunsPolicy(); */ public static ThreadPoolExecutor createDefaultThreadPool() { return createThreadPool(DEFAULT_CORE_POLL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT); } } package com.encdata.oss.system.handler.task; import com.encdata.oss.system.exception.BaseException; /** * 抽象工作线程,若想进行多线程工作,可以继承此抽象类,再重写相关方法实现具体的业务逻辑 * <p> * Created by IntelliJ IDEA. * * @author yangyi * @date 2020/06/04 */ public abstract class BaseWorkTask implements Runnable { //工作任务线程 /*@Override public void run() { // 工作示例 System.out.println(Thread.currentThread().getId() + " is start"); process(); System.out.println(Thread.currentThread().getId() + " is over"); }*/ /** * 业务处理方法 * * @throws BaseException 业务处理异常 */ protected abstract void process() throws BaseException; } package com.encdata.oss.system.handler.task; import com.encdata.oss.datahubClient.singleSubscription.SubscriptionExample; import com.encdata.oss.system.domain.dto.DataHubConfigDTO; import com.encdata.oss.system.exception.BaseException; import lombok.extern.slf4j.Slf4j; import static com.encdata.oss.system.controller.BaseController.executorService; /** * Created by IntelliJ IDEA. * 多应用,多topic 数据订阅线程 * @author liyiq * @date 2020/06/08 */ @Slf4j public class DatahubConsumerTask extends BaseWorkTask { /** * datahub配置类dto */ private DataHubConfigDTO dataHubConfigDTO; /** * datahub消费类 */ private SubscriptionExample subscriptionExample; public DatahubConsumerTask(DataHubConfigDTO dataHubConfigDTO){ this.dataHubConfigDTO = dataHubConfigDTO; } @Override public void run() { if (log.isDebugEnabled()) { log.debug("DataHub数据消费信息处理线程【开始】"); } process(); if (log.isDebugEnabled()) { log.debug("DataHub数据消费信息处理线程【结束】=====重新请求"); executorService.execute(new DatahubConsumerTask(dataHubConfigDTO)); } } /** * 创建数据消费线程 * @throws BaseException */ @Override protected void process() throws BaseException { new SubscriptionExample(dataHubConfigDTO).Start(); } }

    消费类

    package com.encdata.oss.datahubClient.singleSubscription; import com.aliyun.datahub.DatahubClient; import com.aliyun.datahub.DatahubConfiguration; import com.aliyun.datahub.auth.AliyunAccount; import com.aliyun.datahub.common.data.RecordSchema; import com.aliyun.datahub.exception.DatahubClientException; import com.aliyun.datahub.exception.OffsetResetedException; import com.aliyun.datahub.exception.OffsetSessionChangedException; import com.aliyun.datahub.exception.SubscriptionOfflineException; import com.aliyun.datahub.model.*; import com.encdata.oss.datahubClient.TaskReleaseConsumer; import com.encdata.oss.system.domain.dto.DataHubConfigDTO; import java.util.List; class Consumer{ private String projectName = null; private String topicName = null; private String subId = null; private String shardId = null; private RecordSchema schema = null; private DatahubClient client = null; public Consumer(String projectName, String topicName, String subId, String shardId, RecordSchema schema, DatahubConfiguration conf) { this.projectName = projectName; this.topicName = topicName; this.subId = subId; this.shardId = shardId; this.schema = schema; this.client = new DatahubClient(conf); } private void commit(OffsetContext offsetCtx) { client.commitOffset(offsetCtx); //System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString()); } public void run() { try { boolean bExit = false; // 首先初始化offset上下文 OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId); String cursor = null; // 开始消费的cursor if (!offsetCtx.hasOffset()) { // 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据 GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST); cursor = cursorResult.getCursor(); } else { // 否则,获取当前已消费点位的下一个cursor GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.SEQUENCE, (offsetCtx.getOffset().getSequence() + 1)); cursor = cursorResult.getCursor(); } /*System.out.println("Start consume shard:" + shardId + ", start offset:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);*/ long recordNum = 0L; while (!bExit) { try { GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10, schema); List<RecordEntry> records = recordResult.getRecords(); if (records.size() == 0) { // 将最后一次消费点位上报 commit(offsetCtx); // 可以先休眠一会,再继续消费新记录 Thread.sleep(1000); //System.out.println("sleep 1s and continue consume records! shard id:" + shardId); } else { for (RecordEntry record : records) { // 处理记录逻辑 /*System.out.println("Consume shard:" + shardId + " thread process record:" + record.toJsonNode().toString());*/ new TaskReleaseConsumer(topicName,record).DataParsing(); // 上报点位,该示例是每处理100条记录上报一次点位 offsetCtx.setOffset(record.getOffset()); recordNum++; if (recordNum % 100 == 0) { commit(offsetCtx); } } cursor = recordResult.getNextCursor(); } } catch (SubscriptionOfflineException e) { // 订阅下线,退出 bExit = true; e.printStackTrace(); } catch (OffsetResetedException e) { // 点位被重置,更新offset上下文 client.updateOffsetContext(offsetCtx); cursor = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.SEQUENCE, (offsetCtx.getOffset().getSequence() + 1)).getCursor(); System.err.println("Restart consume shard:" + shardId + ", reset offset:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor); } catch (OffsetSessionChangedException e) { // 其他consumer同时消费了该订阅下的相同shard,退出 bExit = true; e.printStackTrace(); } catch (Exception e) { bExit = true; e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } } public class SubscriptionExample { private String endpoint; private String accessId; private String accessKey; private String projectName; private String topicName; private String subId; private DatahubConfiguration conf; private DatahubClient client; public SubscriptionExample(DataHubConfigDTO dataHubConfigDTO) { this.endpoint=dataHubConfigDTO.getEndpoint(); this.accessId=dataHubConfigDTO.getAccessId(); this.accessKey=dataHubConfigDTO.getAccessKey(); this.projectName=dataHubConfigDTO.getProjectName(); this.topicName=dataHubConfigDTO.getTopicName(); this.subId=dataHubConfigDTO.getSubId(); this.conf = new DatahubConfiguration(new AliyunAccount(accessId, accessKey), endpoint); this.client = new DatahubClient(conf); } public void Start() { GetTopicResult topicResult = client.getTopic(projectName, topicName); ListShardResult shardResult = client.listShard(projectName, topicName); for (int i = 0; i < shardResult.getShards().size(); ++i) { new Consumer(projectName, topicName, subId, shardResult.getShards().get(i).getShardId(), topicResult.getRecordSchema(), conf).run(); } } public static void main(String[] args) { String endpoint = ""; String accessId = ""; String accessKey = ""; String projectName = ""; String topicName = ""; String subId = ""; DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO(); dataHubConfigDTO .setEndpoint(endpoint) .setAccessId(accessId) .setAccessKey(accessKey) .setProjectName(projectName) .setTopicName(topicName) .setSubId(subId); SubscriptionExample example = new SubscriptionExample(dataHubConfigDTO); try { example.Start(); } catch (DatahubClientException e) { e.printStackTrace(); } } }

    生产者工具类

    package com.encdata.oss.datahubClient; import com.aliyun.datahub.common.data.Field; import com.aliyun.datahub.common.data.FieldType; import com.aliyun.datahub.common.data.RecordSchema; import com.aliyun.datahub.exception.DatahubClientException; import com.encdata.oss.datahubClient.singleSubscription.DatahubExample; import com.encdata.oss.system.domain.dto.DataHubConfigDTO; import com.encdata.oss.system.domain.dto.TaskCenterPRQDTO; import com.encdata.oss.util.ObjectParseUtils; import lombok.extern.slf4j.Slf4j; import java.util.Map; import static com.encdata.oss.datahubClient.DatahubConfig.*; /** * Created by IntelliJ IDEA. * 任务发布状态回调类 * @author liyiq * @date 2020/06/08 */ @Slf4j public class TaskReleaseProduction<T>{ private static String projectName; private static String topicSet; private static DataHubConfigDTO dataHubConfigDTO; /** * 状态回调成dto */ private T dto; public TaskReleaseProduction(T dto){ this.dto=dto; } /** * 向Datahub发送数据 */ public void DataProduction(String topic){ //todo: 通过不同的应用获取不同的topic for (int s = 0; s < serivces.size(); s++) { Map map = serivces.get(s); projectName = map.get("projectName").toString(); topicSet = map.get("topicSet").toString(); dataHubConfigDTO = new DataHubConfigDTO(); this.dataHubConfigDTO .setEndpoint(endpoints) .setAccessId(accessIds) .setAccessKey(accessKeys) .setProjectName(projectName) .setTopicName(topicSet); if (topic.equals(topicSet)) { datahubWriter(topic); } } } public void datahubWriter(String topic){ // Endpoint以Region: 华东1为例,其他Region请按实际情况填写 RecordSchema schema = new RecordSchema(); Map map = ObjectParseUtils.objectToMap(dto); for(Object key : map.keySet()){ schema.addField(new Field(key.toString(), FieldType.STRING)); } //todo: 根据泛型dto定义schema // switch (topic){ // case "task_center_platform_response": // schema.addField(new Field("ref_id", FieldType.STRING)); // schema.addField(new Field("biz_sys_code", FieldType.STRING)); // schema.addField(new Field("biz_sys_name", FieldType.STRING)); // schema.addField(new Field("title", FieldType.STRING)); // schema.addField(new Field("result_code", FieldType.STRING)); // schema.addField(new Field("result_msg", FieldType.STRING)); // break; // } //todo:单独写入数据方法 DatahubExample example = new DatahubExample(dataHubConfigDTO); try { example.init(schema); example.putRecords(dto); //example.getRecords(); } catch (DatahubClientException e) { e.printStackTrace(); } //todo:协同消费写入数据方法 // ProducerConfig config = new ProducerConfig(endpoints, accessIds, accessKeys); // Producer producer = new Producer(projectName, topicSet, config); // try { // List<RecordEntry> recordEntries = genRecords(schema); // DatahubWriter.sendRecords(producer, recordEntries); // } finally { // // 确保资源正确释放 // producer.close(); // } } /*private List<RecordEntry> genRecords(RecordSchema schema) { List<RecordEntry> recordEntries = new ArrayList<>(); RecordEntry entry = new RecordEntry(); //entry.addAttribute("key1", "value1"); //entry.addAttribute("key2", "value2"); TupleRecordData data = new TupleRecordData(schema); //todo: 通过key值定义同步dto数据到data Map map = ObjectParseUtils.objectToMap(dto); for (Object key : map.keySet()) { data.setField(key.toString(),map.get(key)); } entry.setRecordData(data); recordEntries.add(entry); return recordEntries; }*/ /*public static void main(String[] args) { // Endpoint以Region: 华东1为例,其他Region请按实际情况填写 String endpoint = "https://datahub.cn-shanghai-shga-d01.dh.alicloud.ga.sh"; String accessId = "0iWV0NCs805VuAAu"; String accessKey = "iEwlgpCnXDwT93YMVDb2G60my9ne81"; String projectName = "sjc_rwzx"; String topicName = "task_center_platform_request"; RecordSchema schema = new RecordSchema(); *//*TaskCenterPRPDTO dto = new TaskCenterPRPDTO(); dto.setBiz_sys_code("") .setBiz_sys_name("") .setRef_id("") .setResult_code("") .setResult_msg("") .setTitle("");*//* TaskCenterPRQDTO dto = new TaskCenterPRQDTO(); dto.setBiz_sys_code("test") .setBiz_sys_name("测试系统") .setBrief("测试任务") .setClose_time("") .setFlow_start_param("") .setRef_id("123") .setRemark("备注") .setSubmit_by("张三") .setTc_flow_id("") .setTitle("测试标题"); Map map = ObjectParseUtils.objectToMap(dto); // for(Object key : map.keySet()){ // schema.addField(new Field(key.toString(), FieldType.STRING)); // } schema.addField(new Field("ref_id", FieldType.STRING)); schema.addField(new Field("biz_sys_code", FieldType.STRING)); schema.addField(new Field("biz_sys_name", FieldType.STRING)); schema.addField(new Field("title", FieldType.STRING)); schema.addField(new Field("brief", FieldType.STRING)); schema.addField(new Field("submit_by", FieldType.STRING)); schema.addField(new Field("close_time", FieldType.STRING)); schema.addField(new Field("remark", FieldType.STRING)); schema.addField(new Field("tc_flow_id", FieldType.STRING)); schema.addField(new Field("flow_start_param", FieldType.STRING)); ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey); Producer producer = new Producer(projectName, topicName, config); // 根据场景控制循环 boolean stop = false; try { while (!stop) { List<RecordEntry> recordEntries = new ArrayList<>(); RecordEntry entry = new RecordEntry(); //entry.addAttribute("key1", "value1"); //entry.addAttribute("key2", "value2"); TupleRecordData data = new TupleRecordData(schema); //todo: 通过key值定义同步dto数据到data Map<String, Object> map1 = ObjectParseUtils.objectToMap(dto); for (Map.Entry<String, Object> entry1: map1.entrySet()) { data.setField(entry1.getKey(),entry1.getValue()); } entry.setRecordData(data); recordEntries.add(entry); DatahubWriter.sendRecords(producer, recordEntries); } } finally { // 确保资源正确释放 producer.close(); } }*/ public static void datahubSetmessage(TaskCenterPRQDTO dto) { String endpoint = ""; String accessId = ""; String accessKey = ""; String projectName = ""; String topicName = ""; DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO(); dataHubConfigDTO .setEndpoint(endpoint) .setAccessId(accessId) .setAccessKey(accessKey) .setProjectName(projectName) .setTopicName(topicName); DatahubExample example = new DatahubExample(dataHubConfigDTO); RecordSchema schema = new RecordSchema(); // schema.addField(new Field("ref_id", FieldType.STRING)); // schema.addField(new Field("biz_sys_code", FieldType.STRING)); // schema.addField(new Field("biz_sys_name", FieldType.STRING)); // schema.addField(new Field("title", FieldType.STRING)); // schema.addField(new Field("brief", FieldType.STRING)); // schema.addField(new Field("submit_by", FieldType.STRING)); // schema.addField(new Field("close_time", FieldType.STRING)); // schema.addField(new Field("remark", FieldType.STRING)); // schema.addField(new Field("tc_flow_id", FieldType.STRING)); // schema.addField(new Field("flow_start_param", FieldType.STRING)); Map map = ObjectParseUtils.objectToMap(dto); for(Object key : map.keySet()){ schema.addField(new Field(key.toString(), FieldType.STRING)); } try { example.init(schema); example.putRecords(dto); //example.getRecords(); //example.createADSDataConnector(); } catch (DatahubClientException e) { e.printStackTrace(); } } public static void main(String[] args) { String endpoint = ""; String accessId = ""; String accessKey = ""; String projectName = ""; String topicName = ""; DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO(); dataHubConfigDTO .setEndpoint(endpoint) .setAccessId(accessId) .setAccessKey(accessKey) .setProjectName(projectName) .setTopicName(topicName); DatahubExample example = new DatahubExample(dataHubConfigDTO); RecordSchema schema = new RecordSchema(); // schema.addField(new Field("ref_id", FieldType.STRING)); // schema.addField(new Field("biz_sys_code", FieldType.STRING)); // schema.addField(new Field("biz_sys_name", FieldType.STRING)); // schema.addField(new Field("title", FieldType.STRING)); // schema.addField(new Field("brief", FieldType.STRING)); // schema.addField(new Field("submit_by", FieldType.STRING)); // schema.addField(new Field("close_time", FieldType.STRING)); // schema.addField(new Field("remark", FieldType.STRING)); // schema.addField(new Field("tc_flow_id", FieldType.STRING)); // schema.addField(new Field("flow_start_param", FieldType.STRING)); TaskCenterPRQDTO dto = new TaskCenterPRQDTO(); dto.setBiz_sys_code("sys_task_center") .setBiz_sys_name("任务中心") .setBrief("任务发起测试") .setClose_time("2020-06-21 00:00:00") .setFlow_start_param("") .setRef_id("1") .setRemark("备注") .setSubmit_by("test") .setTc_flow_id("1") .setTitle("测试标题"); Map map = ObjectParseUtils.objectToMap(dto); for(Object key : map.keySet()){ schema.addField(new Field(key.toString(), FieldType.STRING)); } try { example.init(schema); example.putRecords(dto); //example.getRecords(); //example.createADSDataConnector(); } catch (DatahubClientException e) { e.printStackTrace(); } } }
    Processed: 0.009, SQL: 9