Datahub 2.15版本的数据读写
产品概述产品优势使用场景
数据总线服务器datahub配置类的封装yml 配置 (保密原因自我补全)config配置类核心线程池+主线程+工作线程消费类生产者工具类
产品概述
DataHub基本介绍 阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。
DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。
DataHub服务也提供分发流式数据到各种云产品的功能,目前支持分发到MaxCompute(原ODPS),OSS等。
系统整体功能图
产品优势
高吞吐 最高支持单shard每日8000万Record级别的写入量。
实时性 通过 DataHub ,您可以实时的收集各种方式生成的数据并进行实时的处理,对您的业务产生快速的响应。
易用性 DataHub 提供丰富的SDK包,包括C++, JAVA, Pyhon, Ruby, Go等语言。 DataHub服务也提供Restful API规范,您可以用自己的方式实现访问接口。 除了SDK以外,DataHub 还提供一些常用的客户端插件,包括:Fluentd,LogStash,Flume等。您可以使用这些客户端工具往 DataHub 里面写入流式数据。 DataHub 同时支持强Schema的结构化数据(创建Tuple类型的Topic)和无类型的非结构化数据(创建Blob类型的Topic),您可以自由选择。 高可用 服务可用性不低于99.9%。 规模自动扩展,不影响对外服务;数据持久性不低于99.999%。 数据自动多重冗余备份。 动态伸缩 每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。
高安全性 提供企业级多层次安全防护,多用户资源隔离机制; 提供多种鉴权和授权机制及白名单、主子账号功能。
使用场景
DataHub作为一个流式数据处理服务,结合阿里云众多云产品,可以构建一站式的数据处理服务。
流计算StreamCompute StreamCompute是阿里云提供的流计算引擎,提供使用类SQL的语言来进行流式计算。DataHub 和StreamCompute无缝结合,可以作为StreamCompute的数据源和输出源,具体可参考实时计算文档
流处理应用 用户可以编写应用订阅DataHub中的数据,并进行实时的加工,把加工后的结果输出。用户可以把应用计算产生的结果输出到DataHub中,并使用另外一个应用来处理上一个应用生成的流式数据,来构建数据处理流程的DAG。
流式数据归档 用户的流式数据可以归档到 MaxCompute(原ODPS)中。用户通过创建DataHub Connector,指定相关配置,即可创建将Datahub中流式数据定期归档的同步任务。
以上来自于阿里云平台 Datahub在线文档
好了,先进入正题,Datahub 2.15版本是支持协同消费的,单是要保证阿里云数据总线服务器是最新版本的,否则是不支持协同消费类的,但是低版本数据总线服务器是支持高版本sdk向下兼容的
数据总线服务器
一开始我们新建一个项目 项目新建完成以后,再新建这个项目下的topic
datahub配置类的封装
pom依赖
<dependency>
<groupId>com.aliyun.datahub
</groupId>
<artifactId>aliyun-sdk-datahub
</artifactId>
<version>2.15.0-public
</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub
</groupId>
<artifactId>datahub-client-library
</artifactId>
<version>1.1.8-public
</version>
</dependency>
yml 配置 (保密原因自我补全)
public:
datahub:
config:
startup: false
endpoint: XXXXXXXXXXXXXXXXXXXXXXXXXXXX
accessId: XXXXXXXXXXXXXXXXX
accessKey: XXXXXXXXXXXXXXXx
serivce:
- projectName: XXXXXXXX
topicGet: XXXXXXXXXXXXXXXXXX
topicSet: XXXXXXXXXXXXXXXXXX
subId: XXXXXXXXXXXXXXXXXX
config配置类
package com
.encdata
.oss
.datahubClient
;
import com
.encdata
.oss
.system
.domain
.dto
.DataHubConfigDTO
;
import com
.encdata
.oss
.system
.handler
.CustomerThreadPool
;
import com
.encdata
.oss
.system
.handler
.task
.DatahubConsumerTask
;
import lombok
.Data
;
import org
.springframework
.boot
.context
.properties
.ConfigurationProperties
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.PostConstruct
;
import java
.util
.ArrayList
;
import java
.util
.List
;
import java
.util
.Map
;
import static com
.encdata
.oss
.system
.controller
.BaseController
.executorService
;
@Data
@Component
@ConfigurationProperties(prefix
= "public.datahub.config")
public class DatahubConfig {
private Boolean startup
;
private String endpoint
;
public static String endpoints
;
private String accessId
;
public static String accessIds
;
private String accessKey
;
public static String accessKeys
;
private List
<Map> serivce
;
public static List
<Map> serivces
= new ArrayList<>();
private String projectName
;
private String topicGet
;
private String subId
;
@PostConstruct
public void datahubRecycling(){
endpoints
= this.endpoint
;
accessIds
= this.accessId
;
accessKeys
= this.accessKey
;
serivces
= this.serivce
;
if(startup
){
if(executorService
== null
){
executorService
= CustomerThreadPool
.createDefaultThreadPool();
}
for(int s
=0;s
<serivce
.size();s
++){
Map map
= serivce
.get(s
);
projectName
= map
.get("projectName").toString();
topicGet
= map
.get("topicGet").toString();
subId
= map
.get("subId").toString();
String
[] topics
= topicGet
.split(",");
for(int i
=0;i
<topics
.length
;i
++){
DataHubConfigDTO dataHubConfigDTO
= new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint
)
.setAccessId(accessId
)
.setAccessKey(accessKey
)
.setProjectName(projectName
)
.setTopicName(topics
[i
])
.setSubId(subId
);
executorService
.execute(new DatahubConsumerTask(dataHubConfigDTO
));
}
}
}
}
}
核心线程池+主线程+工作线程
package com
.encdata
.oss
.system
.handler
;
import java
.util
.concurrent
.*
;
public class CustomerThreadPool {
private static final int DEFAULT_CORE_POLL_SIZE
= 8;
private static final int DEFAULT_MAXIMUM_POOL_SIZE
= 1024;
private static final long DEFAULT_KEEP_ALIVE_TIME
= 0;
private static final TimeUnit DEFAULT_TIME_UNIT
= TimeUnit
.MICROSECONDS
;
private static final Integer MAX_WORK_QUEUE_CAPACITY
= 1024*10;
private static final BlockingQueue
<Runnable> DEFAULT_WORK_QUEUE
= new ArrayBlockingQueue<Runnable>(MAX_WORK_QUEUE_CAPACITY
);
private static final ThreadFactory DEFAULT_THREAD_FACTORY
= Executors
.defaultThreadFactory();
private static final RejectedExecutionHandler DEFAULT_HANDLER
= new ThreadPoolExecutor.CallerRunsPolicy();
public static ThreadPoolExecutor
createThreadPool(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
,
ThreadFactory threadFactory
,
RejectedExecutionHandler handler
) {
return new ThreadPoolExecutor(corePoolSize
, maximumPoolSize
, keepAliveTime
, unit
, workQueue
,
threadFactory
, handler
);
}
public static ThreadPoolExecutor createThreadPool
(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
,
ThreadFactory threadFactory
) {
return createThreadPool(corePoolSize
, maximumPoolSize
, keepAliveTime
, unit
, workQueue
, threadFactory
, DEFAULT_HANDLER
);
}
public static ThreadPoolExecutor
createThreadPool(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
) {
return createThreadPool(corePoolSize
, maximumPoolSize
, keepAliveTime
, unit
, workQueue
, DEFAULT_THREAD_FACTORY
);
}
public static ThreadPoolExecutor
createThreadPool(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
) {
return createThreadPool(corePoolSize
, maximumPoolSize
, keepAliveTime
, unit
, DEFAULT_WORK_QUEUE
);
}
public static ThreadPoolExecutor
createDefaultThreadPool() {
return createThreadPool(DEFAULT_CORE_POLL_SIZE
, DEFAULT_MAXIMUM_POOL_SIZE
, DEFAULT_KEEP_ALIVE_TIME
, DEFAULT_TIME_UNIT
);
}
}
package com
.encdata
.oss
.system
.handler
.task
;
import com
.encdata
.oss
.system
.exception
.BaseException
;
public abstract class BaseWorkTask implements Runnable {
protected abstract void process() throws BaseException
;
}
package com
.encdata
.oss
.system
.handler
.task
;
import com
.encdata
.oss
.datahubClient
.singleSubscription
.SubscriptionExample
;
import com
.encdata
.oss
.system
.domain
.dto
.DataHubConfigDTO
;
import com
.encdata
.oss
.system
.exception
.BaseException
;
import lombok
.extern
.slf4j
.Slf4j
;
import static com
.encdata
.oss
.system
.controller
.BaseController
.executorService
;
@Slf4j
public class DatahubConsumerTask extends BaseWorkTask {
private DataHubConfigDTO dataHubConfigDTO
;
private SubscriptionExample subscriptionExample
;
public DatahubConsumerTask(DataHubConfigDTO dataHubConfigDTO
){
this.dataHubConfigDTO
= dataHubConfigDTO
;
}
@Override
public void run() {
if (log
.isDebugEnabled()) {
log
.debug("DataHub数据消费信息处理线程【开始】");
}
process();
if (log
.isDebugEnabled()) {
log
.debug("DataHub数据消费信息处理线程【结束】=====重新请求");
executorService
.execute(new DatahubConsumerTask(dataHubConfigDTO
));
}
}
@Override
protected void process() throws BaseException
{
new SubscriptionExample(dataHubConfigDTO
).Start();
}
}
消费类
package com
.encdata
.oss
.datahubClient
.singleSubscription
;
import com
.aliyun
.datahub
.DatahubClient
;
import com
.aliyun
.datahub
.DatahubConfiguration
;
import com
.aliyun
.datahub
.auth
.AliyunAccount
;
import com
.aliyun
.datahub
.common
.data
.RecordSchema
;
import com
.aliyun
.datahub
.exception
.DatahubClientException
;
import com
.aliyun
.datahub
.exception
.OffsetResetedException
;
import com
.aliyun
.datahub
.exception
.OffsetSessionChangedException
;
import com
.aliyun
.datahub
.exception
.SubscriptionOfflineException
;
import com
.aliyun
.datahub
.model
.*
;
import com
.encdata
.oss
.datahubClient
.TaskReleaseConsumer
;
import com
.encdata
.oss
.system
.domain
.dto
.DataHubConfigDTO
;
import java
.util
.List
;
class Consumer{
private String projectName
= null
;
private String topicName
= null
;
private String subId
= null
;
private String shardId
= null
;
private RecordSchema schema
= null
;
private DatahubClient client
= null
;
public Consumer(String projectName
, String topicName
, String subId
, String shardId
, RecordSchema schema
,
DatahubConfiguration conf
) {
this.projectName
= projectName
;
this.topicName
= topicName
;
this.subId
= subId
;
this.shardId
= shardId
;
this.schema
= schema
;
this.client
= new DatahubClient(conf
);
}
private void commit(OffsetContext offsetCtx
) {
client
.commitOffset(offsetCtx
);
}
public void run() {
try {
boolean bExit
= false;
OffsetContext offsetCtx
= client
.initOffsetContext(projectName
, topicName
, subId
, shardId
);
String cursor
= null
;
if (!offsetCtx
.hasOffset()) {
GetCursorResult cursorResult
= client
.getCursor(projectName
, topicName
, shardId
, GetCursorRequest
.CursorType
.OLDEST
);
cursor
= cursorResult
.getCursor();
} else {
GetCursorResult cursorResult
= client
.getCursor(projectName
, topicName
, shardId
, GetCursorRequest
.CursorType
.SEQUENCE
,
(offsetCtx
.getOffset().getSequence() + 1));
cursor
= cursorResult
.getCursor();
}
long recordNum
= 0L
;
while (!bExit
) {
try {
GetRecordsResult recordResult
= client
.getRecords(projectName
, topicName
, shardId
, cursor
, 10,
schema
);
List
<RecordEntry> records
= recordResult
.getRecords();
if (records
.size() == 0) {
commit(offsetCtx
);
Thread
.sleep(1000);
} else {
for (RecordEntry record
: records
) {
new TaskReleaseConsumer(topicName
,record
).DataParsing();
offsetCtx
.setOffset(record
.getOffset());
recordNum
++;
if (recordNum
% 100 == 0) {
commit(offsetCtx
);
}
}
cursor
= recordResult
.getNextCursor();
}
} catch (SubscriptionOfflineException e
) {
bExit
= true;
e
.printStackTrace();
} catch (OffsetResetedException e
) {
client
.updateOffsetContext(offsetCtx
);
cursor
= client
.getCursor(projectName
, topicName
, shardId
,
GetCursorRequest
.CursorType
.SEQUENCE
, (offsetCtx
.getOffset().getSequence() + 1)).getCursor();
System
.err
.println("Restart consume shard:" + shardId
+ ", reset offset:"
+ offsetCtx
.toObjectNode().toString() + ", cursor:" + cursor
);
} catch (OffsetSessionChangedException e
) {
bExit
= true;
e
.printStackTrace();
} catch (Exception e
) {
bExit
= true;
e
.printStackTrace();
}
}
} catch (Exception e
) {
e
.printStackTrace();
}
}
}
public class SubscriptionExample {
private String endpoint
;
private String accessId
;
private String accessKey
;
private String projectName
;
private String topicName
;
private String subId
;
private DatahubConfiguration conf
;
private DatahubClient client
;
public SubscriptionExample(DataHubConfigDTO dataHubConfigDTO
) {
this.endpoint
=dataHubConfigDTO
.getEndpoint();
this.accessId
=dataHubConfigDTO
.getAccessId();
this.accessKey
=dataHubConfigDTO
.getAccessKey();
this.projectName
=dataHubConfigDTO
.getProjectName();
this.topicName
=dataHubConfigDTO
.getTopicName();
this.subId
=dataHubConfigDTO
.getSubId();
this.conf
= new DatahubConfiguration(new AliyunAccount(accessId
, accessKey
), endpoint
);
this.client
= new DatahubClient(conf
);
}
public void Start() {
GetTopicResult topicResult
= client
.getTopic(projectName
, topicName
);
ListShardResult shardResult
= client
.listShard(projectName
, topicName
);
for (int i
= 0; i
< shardResult
.getShards().size(); ++i
) {
new Consumer(projectName
, topicName
, subId
, shardResult
.getShards().get(i
).getShardId(),
topicResult
.getRecordSchema(), conf
).run();
}
}
public static void main(String
[] args
) {
String endpoint
= "";
String accessId
= "";
String accessKey
= "";
String projectName
= "";
String topicName
= "";
String subId
= "";
DataHubConfigDTO dataHubConfigDTO
= new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint
)
.setAccessId(accessId
)
.setAccessKey(accessKey
)
.setProjectName(projectName
)
.setTopicName(topicName
)
.setSubId(subId
);
SubscriptionExample example
= new SubscriptionExample(dataHubConfigDTO
);
try {
example
.Start();
} catch (DatahubClientException e
) {
e
.printStackTrace();
}
}
}
生产者工具类
package com
.encdata
.oss
.datahubClient
;
import com
.aliyun
.datahub
.common
.data
.Field
;
import com
.aliyun
.datahub
.common
.data
.FieldType
;
import com
.aliyun
.datahub
.common
.data
.RecordSchema
;
import com
.aliyun
.datahub
.exception
.DatahubClientException
;
import com
.encdata
.oss
.datahubClient
.singleSubscription
.DatahubExample
;
import com
.encdata
.oss
.system
.domain
.dto
.DataHubConfigDTO
;
import com
.encdata
.oss
.system
.domain
.dto
.TaskCenterPRQDTO
;
import com
.encdata
.oss
.util
.ObjectParseUtils
;
import lombok
.extern
.slf4j
.Slf4j
;
import java
.util
.Map
;
import static com
.encdata
.oss
.datahubClient
.DatahubConfig
.*
;
@Slf4j
public class TaskReleaseProduction<T>{
private static String projectName
;
private static String topicSet
;
private static DataHubConfigDTO dataHubConfigDTO
;
private T dto
;
public TaskReleaseProduction(T dto
){
this.dto
=dto
;
}
public void DataProduction(String topic
){
for (int s
= 0; s
< serivces
.size(); s
++) {
Map map
= serivces
.get(s
);
projectName
= map
.get("projectName").toString();
topicSet
= map
.get("topicSet").toString();
dataHubConfigDTO
= new DataHubConfigDTO();
this.dataHubConfigDTO
.setEndpoint(endpoints
)
.setAccessId(accessIds
)
.setAccessKey(accessKeys
)
.setProjectName(projectName
)
.setTopicName(topicSet
);
if (topic
.equals(topicSet
)) {
datahubWriter(topic
);
}
}
}
public void datahubWriter(String topic
){
RecordSchema schema
= new RecordSchema();
Map map
= ObjectParseUtils
.objectToMap(dto
);
for(Object key
: map
.keySet()){
schema
.addField(new Field(key
.toString(), FieldType
.STRING
));
}
DatahubExample example
= new DatahubExample(dataHubConfigDTO
);
try {
example
.init(schema
);
example
.putRecords(dto
);
} catch (DatahubClientException e
) {
e
.printStackTrace();
}
}
public static void datahubSetmessage(TaskCenterPRQDTO dto
) {
String endpoint
= "";
String accessId
= "";
String accessKey
= "";
String projectName
= "";
String topicName
= "";
DataHubConfigDTO dataHubConfigDTO
= new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint
)
.setAccessId(accessId
)
.setAccessKey(accessKey
)
.setProjectName(projectName
)
.setTopicName(topicName
);
DatahubExample example
= new DatahubExample(dataHubConfigDTO
);
RecordSchema schema
= new RecordSchema();
Map map
= ObjectParseUtils
.objectToMap(dto
);
for(Object key
: map
.keySet()){
schema
.addField(new Field(key
.toString(), FieldType
.STRING
));
}
try {
example
.init(schema
);
example
.putRecords(dto
);
} catch (DatahubClientException e
) {
e
.printStackTrace();
}
}
public static void main(String
[] args
) {
String endpoint
= "";
String accessId
= "";
String accessKey
= "";
String projectName
= "";
String topicName
= "";
DataHubConfigDTO dataHubConfigDTO
= new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint
)
.setAccessId(accessId
)
.setAccessKey(accessKey
)
.setProjectName(projectName
)
.setTopicName(topicName
);
DatahubExample example
= new DatahubExample(dataHubConfigDTO
);
RecordSchema schema
= new RecordSchema();
TaskCenterPRQDTO dto
= new TaskCenterPRQDTO();
dto
.setBiz_sys_code("sys_task_center")
.setBiz_sys_name("任务中心")
.setBrief("任务发起测试")
.setClose_time("2020-06-21 00:00:00")
.setFlow_start_param("")
.setRef_id("1")
.setRemark("备注")
.setSubmit_by("test")
.setTc_flow_id("1")
.setTitle("测试标题");
Map map
= ObjectParseUtils
.objectToMap(dto
);
for(Object key
: map
.keySet()){
schema
.addField(new Field(key
.toString(), FieldType
.STRING
));
}
try {
example
.init(schema
);
example
.putRecords(dto
);
} catch (DatahubClientException e
) {
e
.printStackTrace();
}
}
}