在下面的图片展示了HBase的数据变化之后在Atlas里面的流程:
在看完上面的代码,肯定会疑惑HBaseAtlasCoprocessor实现的那四个接口有何作用。首先看第一个接口如下所示
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface MasterCoprocessor extends Coprocessor { default Optional<MasterObserver> getMasterObserver() { return Optional.empty(); } }这个接口是在HBase的coprocessor包下面的。但是很蛋疼在接口上没有任务注释。但是这个接口继承了Coprocessor接口。下面是Coprocessor接口以及其注释
/** * Building a coprocessor to observe Master operations. * <pre> * class MyMasterCoprocessor implements MasterCoprocessor { * @Override * public Optional<MasterObserver> getMasterObserver() { * return new MyMasterObserver(); * } * } */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface Coprocessor { int VERSION = 1; /** Highest installation priority */ int PRIORITY_HIGHEST = 0; /** High (system) installation priority */ int PRIORITY_SYSTEM = Integer.MAX_VALUE / 4; /** Default installation priority for user coprocessors */ int PRIORITY_USER = Integer.MAX_VALUE / 2; /** Lowest installation priority */ int PRIORITY_LOWEST = Integer.MAX_VALUE; /** * Lifecycle state of a given coprocessor instance. */ enum State { UNINSTALLED, INSTALLED, STARTING, ACTIVE, STOPPING, STOPPED } /** * Called by the {@link CoprocessorEnvironment} during it's own startup to initialize the * coprocessor. */ default void start(CoprocessorEnvironment env) throws IOException {} /** * Called by the {@link CoprocessorEnvironment} during it's own shutdown to stop the * coprocessor. */ default void stop(CoprocessorEnvironment env) throws IOException {} /** * Coprocessor endpoints providing protobuf services should override this method. * @return Iterable of {@link Service}s or empty collection. Implementations should never * return null. */ default Iterable<Service> getServices() { return Collections.EMPTY_SET; } }通过这个接口的注释可以得知,想要观察到HBase的Master的操作就需要实现这个操作,结合在Atlas源码中的代码。实现这个接口是为了获取HBase集群中HMaster对集群的操作。
Atlas实现的第二接口是HBaseAtlasCoprocessor这个接口的详细源码以及注释如下:
/** * Defines coprocessor hooks for interacting with operations on the * {@link org.apache.hadoop.hbase.master.HMaster} process. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface MasterObserver { /** * Called before a new table is created by * {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create * table RPC call. * @param ctx the environment to interact with the framework and master * @param desc the TableDescriptor for the table * @param regions the initial regions created for the table */ default void preCreateTable(final ObserverContext<MasterCoprocessorEnvironment> ctx, TableDescriptor desc, RegionInfo[] regions) throws IOException {} // 还有很多代码 }定义与HBase的Master交互的钩子函数。从上面代码中的方法中可以看到,这个方法在HBase创建表之前会被调用。在表被创建之前会被调用那么通过这个接口我们就可以成功的获取到表的属性信息。 Atlas实现的第三个接口是RegionObserver。其部分代码以及注释如下所示:
/** *Coprocessors implement this interface to observe and mediate client actions on the region. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving // TODO as method signatures need to break, update to // ObserverContext<? extends RegionCoprocessorEnvironment> // so we can use additional environment state that isn't exposed to coprocessors. public interface RegionObserver { /** Mutation type for postMutationBeforeWAL hook */ enum MutationType { APPEND, INCREMENT } /** * Called before the region is reported as open to the master. * @param c the environment provided by the region server */ default void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {} }实现这个接口是为了获取HBase的region数据变化相关的信息。 实现的第四个接口是RegionServerObserver。其部分代码以及注释如下所示:
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface RegionServerObserver { /** * Called before stopping region server. * @param ctx the environment to interact with the framework and region server. */ default void preStopRegionServer( final ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {} }实现这个接口的作用也是为了捕获到HBase的数据变化。 综合上述条件。HBaseAtlasCoprocessor实现这四个接口的作用是为了捕获到在HBase中所有的数据变化。但是在源码中会出现如下片段的源码:
@Override public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String ns) throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("==> HBaseAtlasCoprocessor.preDeleteNamespace()"); } try { activatePluginClassLoader(); implMasterObserver.postDeleteNamespace(ctx, ns); } finally { deactivatePluginClassLoader(); } if(LOG.isDebugEnabled()) { LOG.debug("<== HBaseAtlasCoprocessor.preDeleteNamespace()"); } }在上述的源码中,HBase中的数据只要出现变化就会出发一次激活类加载器的操作。出发这次操作的原因是在初始化完成之后这个类加载其被关闭。但是会出现如下代码implMasterObserver.postDeleteNamespace(ctx, ns);,目前没看明白是何含义,还希望大神不吝赐教。 2. 在上一步完成类的加载,在类被加载完成之后。就可以开始正式使用。下面是获取HBase变化的数据。并且发送出去。如下代码所示:
public class HBaseAtlasCoprocessor implements MasterCoprocessor, MasterObserver, RegionObserver, RegionServerObserver { private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasCoprocessor.class); final HBaseAtlasHook hbaseAtlasHook; public HBaseAtlasCoprocessor() { hbaseAtlasHook = HBaseAtlasHook.getInstance(); } @Override public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableDescriptor tableDescriptor, RegionInfo[] hRegionInfos) throws IOException { LOG.info("==> HBaseAtlasCoprocessor.postCreateTable()"); // 将获取到数据通过Atlas的Hook发送到kafka消息服务器 hbaseAtlasHook.sendHBaseTableOperation(tableDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_TABLE, observerContext); if (LOG.isDebugEnabled()) { LOG.debug("<== HBaseAtlasCoprocessor.postCreateTable()"); } } }这个HBaseAtlasCoprocessor虽与上面的是同名的但是是在不同包下的数据,在这个类中是为了捕获数据。在上面的类中是为了激活类加载器。 3. HBaseAtlasHook相关源码
// This will register Hbase entities into Atlas public class HBaseAtlasHook extends AtlasHook { private static volatile HBaseAtlasHook me; public static HBaseAtlasHook getInstance() { HBaseAtlasHook ret = me; if (ret == null) { try { synchronized (HBaseAtlasHook.class) { ret = me; if (ret == null) { me = ret = new HBaseAtlasHook(atlasProperties); } } } catch (Exception e) { LOG.error("Caught exception instantiating the Atlas HBase hook.", e); } } return ret; } /** * 调用Atlas的消息通知框架将消息发送到Atlas的消息服务器 * @param tableDescriptor HBase表描述器 * @param tableName 表名称 * @param operation 对表进行的操作 * @param ctx 对表操作的上下文 */ public void sendHBaseTableOperation(TableDescriptor tableDescriptor, final TableName tableName, final OPERATION operation, ObserverContext<MasterCoprocessorEnvironment> ctx) { if (LOG.isDebugEnabled()) { LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation()"); } try { final UserGroupInformation ugi = getUGI(ctx); final User user = getActiveUser(ctx); final String userName = (user != null) ? user.getShortName() : null; //封装HBase的操作为Atlas对消息上下文的封装。 HBaseOperationContext hbaseOperationContext = handleHBaseTableOperation(tableDescriptor, tableName, operation, ugi, userName); //将在前面构建好的对HBase操作的上下文发送到Atlas的kafka服务器 sendNotification(hbaseOperationContext); } catch (Throwable t) { LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): failed to send notification", t); } if (LOG.isDebugEnabled()) { LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation()"); } } }在上面HBaseAtlasHook的代码中,首先是获取HBaseAtlasHook,是通过双重检查锁创建的HBaseAtlasHook。但是在创建的是需要需要使用到atlas的配置文件,那配置文件是从哪儿读取的呢?在AtlasHook中有如下的代码:
/** * A base class for atlas hooks. */ public abstract class AtlasHook { static { try { atlasProperties = ApplicationProperties.get(); } catch (Exception e) { LOG.info("Failed to load application properties", e); } } /** * Application properties used by Atlas. */ public final class ApplicationProperties extends PropertiesConfiguration { public static Configuration get() throws AtlasException { if (instance == null) { synchronized (ApplicationProperties.class) { if (instance == null) { //public static final String APPLICATION_PROPERTIES = "atlas-application.properties"; set(get(APPLICATION_PROPERTIES)); } } } return instance; } }看到这里终于明白配置文件的来源,在向HBase组件注册Atlas的时候,需要把Atlas的配置问价一起拷贝过去,在这里读取就是拷贝过去的Atlas的配置文件。在AtlasHook中根据拷贝过去的配置文件创建HBaseAtlasHook。在创建完成之后调用sendHBaseTableOperation方法向kafka服务器发送数据。 4. 发送数据,下面是发送数据代码以及注释:
// This will register Hbase entities into Atlas public class HBaseAtlasHook extends AtlasHook { /** * 将构建好的对HBase表操作的上下文,发送到Atlas的消息服务器 * @param hbaseOperationContext 对HBase操作的上下文 */ private void sendNotification(HBaseOperationContext hbaseOperationContext) { UserGroupInformation ugi = hbaseOperationContext.getUgi(); if (ugi != null && ugi.getRealUser() != null) { ugi = ugi.getRealUser(); } //最终是List<HookNotification> messages的发誓把创建表的消息通知过去,最后消息都被封装成HookNotification notifyEntities(hbaseOperationContext.getMessages(), ugi); } } /** * A base class for atlas hooks. */ public abstract class AtlasHook { .... /** * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. * De-duping of entities is done on server side depending on the * unique attribute on the entities. * * @param messages hook notification messages */ protected void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi) { // 将消息发送到Kafka的消息服务器 notifyEntities(messages, ugi, notificationMaxRetries); } /** * 根据atlas的配置文件获取发送消息的方式,有同步发送也有异步发送,默认是异步发送消息 * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. * De-duping of entities is done on server side depending on the * unique attribute on the entities. * * @param messages hook notification messages * @param maxRetries maximum number of retries while sending message to messaging system */ public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) { if (executor == null) { // send synchronously notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); } else { executor.submit(new Runnable() { @Override public void run() { notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger); } }); } } /** * 发送消息到kafka服务器 * @param messages 需要发送的消息 * @param maxRetries 失败之后最大的重试次数 * @param ugi 用户认证的ugi * @param notificationInterface Atlas发送消息的框架 * @param shouldLogFailedMessages 是否打印失败日志 * @param logger 日志记录器 */ @VisibleForTesting static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi, NotificationInterface notificationInterface, boolean shouldLogFailedMessages, FailedMessagesLogger logger) { if (messages == null || messages.isEmpty()) { return; } final int maxAttempts = maxRetries < 1 ? 1 : maxRetries; Exception notificationFailure = null; for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) { if (numAttempt > 1) { // retry attempt try { LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval); Thread.sleep(notificationRetryInterval); } catch (InterruptedException ie) { LOG.error("Notification hook thread sleep interrupted"); break; } } try { // 带有kerberos认证 if (ugi == null) { notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); } else { PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); return messages; } }; ugi.doAs(privilegedNotify); } notificationFailure = null; // notification sent successfully, reset error break; } catch (Exception e) { notificationFailure = e; LOG.error("Failed to send notification - attempt #{}; error={}", numAttempt, e.getMessage()); } } if (notificationFailure != null) { if (shouldLogFailedMessages && notificationFailure instanceof NotificationException) { final List<String> failedMessages = ((NotificationException) notificationFailure).getFailedMessages(); for (String msg : failedMessages) { logger.log(msg); } } LOG.error("Giving up after {} failed attempts to send notification to Atlas: {}", maxAttempts, messages.toString(), notificationFailure); } } .... }在上述的代码中可以看到向kafka服务器发送消息的方式有两种,第一种是同步的发送,第二中是使用异步的方式发送消息。其配置项以及线程池初始化代码如下:
// 根据配置文件获取是同步发送消息还是异步发送消息 boolean isAsync = atlasProperties.getBoolean(ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE); if (isAsync) { int minThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1); int maxThreads = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 1); long keepAliveTimeMs = atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 10000); int queueSize = atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 10000); executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(queueSize), new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").setDaemon(true).build()); }默认是使用异步的方式发送消息到kafka。 5. 消息在进入kafka过程,如下所示源码:
/** * Atlas 消息通知框架,使用这个接口可以创建消费者以及发送指定的消息类型 * Interface to the Atlas notification framework. * <p> * Use this interface to create consumers and to send messages of a given notification type. * <ol> * <li>Atlas sends entity notifications * <li>Hooks send notifications to create/update types/entities. Atlas reads these messages * </ol> */ public interface NotificationInterface { /** * Send the given messages. * * @param type the message type * @param messages the list of messages to send * @param <T> the message type * * @throws NotificationException if an error occurs while sending */ <T> void send(NotificationType type, List<T> messages) throws NotificationException; } /** * Abstract notification interface implementation. */ public abstract class AbstractNotification implements NotificationInterface { @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { List<String> strMessages = new ArrayList<>(messages.size()); for (int index = 0; index < messages.size(); index++) { // 从传输的Json中获取到消息 createNotificationMessages(messages.get(index), strMessages); } sendInternal(type, strMessages); } /** * Send the given messages. * * @param type the message type * @param messages the array of messages to send * * @throws NotificationException if an error occurs while sending */ protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; } /** * Kafka specific access point to the Atlas notification framework. */ @Component @Order(3) public class KafkaNotification extends AbstractNotification implements Service { @Override public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { // 检测消息服务器的生产者是否创建, if (producer == null) { //创建消息服务器的生产者 createProducer(); } //发送消息 sendInternalToProducer(producer, type, messages); } // 将消息发送到发送到kafka服务器 @VisibleForTesting void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException { // 这是从hook过来的消息,所以消息是放入到ATLAS_HOOK这个topic String topic = TOPIC_MAP.get(type); List<MessageContext> messageContexts = new ArrayList<>(); // 异步把所有的消息全部发送消息服务器指定的topic. for (String message : messages) { ProducerRecord record = new ProducerRecord(topic, message); if (LOG.isDebugEnabled()) { LOG.debug("Sending message for topic {}: {}", topic, message); } Future future = p.send(record); messageContexts.add(new MessageContext(future, message)); } List<String> failedMessages = new ArrayList<>(); Exception lastFailureException = null; for (MessageContext context : messageContexts) { try { RecordMetadata response = context.getFuture().get(); if (LOG.isDebugEnabled()) { LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset()); } } catch (Exception e) { lastFailureException = e; failedMessages.add(context.getMessage()); } } if (lastFailureException != null) { throw new NotificationException(lastFailureException, failedMessages); } } }消息到此为止就已经到达kafka。后续就是消费kafka的消息持久化。 6. 消费kafka消息。消费kafka的消息代码如下:
/** * 从消息服务器获取获取推送过来的消息 * Consumer of notifications from hooks e.g., hive hook etc. */ @Component @Order(4) @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"}) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { /** * 当NotificationHookConsumer被初始化完成之后,启动一个后台的job消费kafka的消息 * @throws AtlasException */ @Override public void start() throws AtlasException { // 根据配置文件判断消息消费这是否开启 if (consumerDisabled) { LOG.info("No hook messages will be processed. {} = {}", CONSUMER_DISABLED, consumerDisabled); return; } startInternal(applicationProperties, null); } void startInternal(Configuration configuration, ExecutorService executorService) { if (consumers == null) { consumers = new ArrayList<>(); } if (executorService != null) { executors = executorService; } // 从配置文件读取配置检查HA是否开启,HA没有开启使用内联消费消息 if (!HAConfiguration.isHAEnabled(configuration)) { LOG.info("HA is disabled, starting consumers inline."); startConsumers(executorService); } } /** * 启动后台线程消费kafka消息 * @param executorService 异步提交消费任务的线程。 */ private void startConsumers(ExecutorService executorService) { // 获取配置在配置文件中,消费kafka消息的线程数量。没有配置默认值是1 int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); // 根据获取到的配置的数量创建对应的数量的消费者 List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads); if (executorService == null) { executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); } executors = executorService; for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) { //创建kafka消息的消费者 HookConsumer hookConsumer = new HookConsumer(consumer); consumers.add(hookConsumer); // 启动线程开始消费消息 executors.submit(hookConsumer); } } }上述的代码是使用实现Service的start()实现一个后台的job消费从hook发送过来的数据。在提交消费消息的时候,使用线程池异步的提交任务,消费消息写到持久化到数据库中。如下代码是HookConsumer消费消息的处理逻辑。
/** * 从消息服务器获取获取推送过来的消息 * Consumer of notifications from hooks e.g., hive hook etc. */ @Component @Order(4) @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"}) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { ... class HookConsumer extends ShutdownableThread { .... /** * 重写ShutdownableThread类中的doWork方法,当消费的线程启动之后会一直消费消息。除了线程关闭或者抛出异常 * 在ShutdownableThread中这个方法的注释 * /** * * This method is repeatedly invoked until the thread shuts down or this method throws an exception * * * def doWork():Unit */ @Override public void doWork() { LOG.info("==> HookConsumer doWork()"); // 设置为可运行状态。 shouldRun.set(true); //检测Atlas服务是否正常,当Atlas的服务没有正常启动的时候当前线程进休眠状态,休眠时间1000ms if (!serverAvailable(new NotificationHookConsumer.Timer())) { return; } try { // 只要是可运行状态会一直运行 while (shouldRun.get()) { try { // 从kafka的消息服务器中获取数据,由于构造消费者的时候。是ATLAS_HOOK的消息。所以在消费的时候也是消费ATLAS_HOOK里面的消息。 List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); // 遍历从kafka获取的每一个消息 for (AtlasKafkaMessage<HookNotification> msg : messages) { // 处理从kafka获取到的每一个消息,并且提交 handleMessage(msg); } } catch (IllegalStateException ex) { adaptiveWaiter.pause(ex); } catch (Exception e) { if (shouldRun.get()) { LOG.warn("Exception in NotificationHookConsumer", e); adaptiveWaiter.pause(e); } else { break; } } } } finally { if (consumer != null) { LOG.info("closing NotificationConsumer"); consumer.close(); } LOG.info("<== HookConsumer doWork()"); } } void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { AtlasPerfTracer perf = null; HookNotification message = kafkaMsg.getMessage(); String messageUser = message.getUser(); long startTime = System.currentTimeMillis(); boolean isFailedMsg = false; AuditLog auditLog = null; //日志消息 if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); } try { // 判断当前消息是否已经被消费过,消费下一条消息 if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { commit(kafkaMsg); return; } // 预处理从kafka获取的消息,包括处理消息的上下问 preProcessNotificationMessage(kafkaMsg); // 当前消息为空,消费下一条消息 if (isEmptyMessage(kafkaMsg)) { commit(kafkaMsg); return; } // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); } try { RequestContext requestContext = RequestContext.get(); requestContext.setAttemptCount(numRetries + 1); requestContext.setMaxAttempts(maxRetries); requestContext.setUser(messageUser, null); requestContext.setInNotificationProcessing(true); switch (message.getType()) { case ENTITY_CREATE: { final EntityCreateRequest createRequest = (EntityCreateRequest) message; final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities()); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClient.API_V1.CREATE_ENTITY.getMethod(), AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()); } // 调用持久化的接口持久化从kafka获取的hook消息 createOrUpdate(entities, false); } break; case ENTITY_PARTIAL_UPDATE: { final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; final Referenceable referenceable = partialUpdateRequest.getEntity(); final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntity(referenceable); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), partialUpdateRequest.getTypeName())); } AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue())); // There should only be one root entity entities.getEntities().get(0).setGuid(guid); createOrUpdate(entities, true); } break; case ENTITY_DELETE: { final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), deleteRequest.getTypeName())); } try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); } catch (ClassCastException cle) { LOG.error("Failed to delete entity {}", deleteRequest); } } break; case ENTITY_FULL_UPDATE: { final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } createOrUpdate(entities, false); } break; case ENTITY_CREATE_V2: { final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message; final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities(); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(), AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()); } createOrUpdate(entities, false); } break; case ENTITY_PARTIAL_UPDATE_V2: { final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message; final AtlasObjectId entityId = partialUpdateRequest.getEntityId(); final AtlasEntityWithExtInfo entity = partialUpdateRequest.getEntity(); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } atlasEntityStore.updateEntity(entityId, entity, true); } break; case ENTITY_FULL_UPDATE_V2: { final EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) message; final AtlasEntitiesWithExtInfo entities = updateRequest.getEntities(); if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } createOrUpdate(entities, false); } break; case ENTITY_DELETE_V2: { final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message; final List<AtlasObjectId> entities = deleteRequest.getEntities(); try { for (AtlasObjectId entity : entities) { if (auditLog == null) { auditLog = new AuditLog(messageUser, THREADNAME_PREFIX, AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entity.getTypeName())); } AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes()); } } catch (ClassCastException cle) { LOG.error("Failed to do delete entities {}", entities); } } break; default: throw new IllegalStateException("Unknown notification type: " + message.getType().name()); } break; } catch (Throwable e) { RequestContext.get().resetEntityGuidUpdates(); if (numRetries == (maxRetries - 1)) { String strMessage = AbstractNotification.getMessageJson(message); LOG.warn("Max retries exceeded for message {}", strMessage, e); isFailedMsg = true; failedMessages.add(strMessage); if (failedMessages.size() >= failedMsgCacheSize) { recordFailedMessages(); } return; } else { LOG.warn("Error handling message", e); try { LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); Thread.sleep(consumerRetryInterval); } catch (InterruptedException ie) { LOG.error("Notification consumer thread sleep interrupted"); } } } finally { RequestContext.clear(); } } commit(kafkaMsg); } finally { AtlasPerfTracer.log(perf); long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0; if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { String strMessage = AbstractNotification.getMessageJson(message); LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset()); LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage); } if (auditLog != null) { auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK); auditLog.setTimeTaken(System.currentTimeMillis() - startTime); AuditFilter.audit(auditLog); } } } .... } ... }到此为止就以及已经正确的从消息服务器获取消息,然后调用持久化的接口将消息持久化。 7. 提交消息之前的校验,如下的代码是NotificationHookComsumer中调用持久化的接口
/** * 创建或者更新Atlas中的实体 * @param entities 需要创建或者更新的实体信息 * @param isPartialUpdate 分区是否更新 * @throws AtlasBaseException 处理过程中出现的异常 */ private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException { List<AtlasEntity> entitiesList = entities.getEntities(); AtlasEntityStream entityStream = new AtlasEntityStream(entities); // 没有设置批量提交的参数,或者当前数据不满足一次批量提交,单次提交。 if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { // 调用AtlasEntityStore创建的接口创建消息,后续持久化调用相同的接口。 atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); } else { Map<String, String> guidAssignments = new HashMap<>(); for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { int toIndex = fromIdx + commitBatchSize; if (toIndex > entitiesList.size()) { toIndex = entitiesList.size(); } List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex)); updateProcessedEntityReferences(entitiesBatch, guidAssignments); AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch); AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream); EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); recordProcessedEntities(response, guidAssignments); RequestContext.get().resetEntityGuidUpdates(); RequestContext.get().clearCache(); } } }在上述的代码中就是对提交参数的预处理以及判断当前的提交数据是批量提交还是单条提及。下面的代码是创建接口的代码
/** * 根据所给的参数创建或者更新在图数据库中的数据 * @param entityStream AtlasEntityStream 请求的参数 * @param isPartialUpdate * @return 创建成功之后的返回结果 * @throws AtlasBaseException 创建失败的异常 */ @Override @GraphTransaction public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException { return createOrUpdate(entityStream, isPartialUpdate, false); } // 通用的创建或者更新方法 private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> createOrUpdate()"); } // 判断entityStream是否为null或者为空,在这里判断是否为空是原因是在hivehook中对于临时表entityStream为null if (entityStream == null || !entityStream.hasNext()) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); } AtlasPerfTracer perf = null; // 打印日志 if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()"); } // 创建metric MetricRecorder metric = RequestContext.get().startMetricRecord("createOrUpdate"); try { // 创建操作 // 使用给定的entity 在图中创建对应的节点。 final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); // Check if authorized to create entities 检查创建的实体是否认证通过 if (!RequestContext.get().isImportInProgress()) { for (AtlasEntity entity : context.getCreatedEntities()) { AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), "create entity: type=", entity.getTypeName()); } } // 更新操作 // for existing entities, skip update if incoming entity doesn't have any change // 对于存在的实体,如果请求的数据与已经存在的数据完全一样将忽略这次更新 if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { // 获取所有未更新的entity MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities"); List<AtlasEntity> entitiesToSkipUpdate = null; // 获取所有更新的实体信息 for (AtlasEntity entity : context.getUpdatedEntities()) { // 获取更新实体的guid String guid = entity.getGuid(); // 根据guid 获取对应的顶点 AtlasVertex vertex = context.getVertex(guid); // 根据实体名称获取实体 AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); boolean hasUpdates = false; if (!hasUpdates) { hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import } if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated continue; } Object newVal = entity.getAttribute(attribute.getName()); Object currVal = entityRetriever.getEntityAttribute(vertex, attribute); if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) { hasUpdates = true; if (LOG.isDebugEnabled()) { LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal); } break; } } } if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change for (String attributeName : entityType.getRelationshipAttributes().keySet()) { if (!entity.getRelationshipAttributes().containsKey(attributeName)) { // if value is not provided, current value will not be updated continue; } Object newVal = entity.getRelationshipAttribute(attributeName); String relationshipType = AtlasEntityUtil.getRelationshipType(newVal); AtlasAttribute attribute = entityType.getRelationshipAttribute(attributeName, relationshipType); Object currVal = entityRetriever.getEntityAttribute(vertex, attribute); if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) { hasUpdates = true; if (LOG.isDebugEnabled()) { LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal); } break; } } } // if classifications are to be replaced, then skip updates only when no change in classifications if (!hasUpdates && replaceClassifications) { List<AtlasClassification> newVal = entity.getClassifications(); List<AtlasClassification> currVal = entityRetriever.getAllClassifications(vertex); if (!Objects.equals(currVal, newVal)) { hasUpdates = true; if (LOG.isDebugEnabled()) { LOG.debug("found classifications update: entity(guid={}, typeName={}), currValue={}, newValue={}", guid, entity.getTypeName(), currVal, newVal); } } } if (!hasUpdates) { if (entitiesToSkipUpdate == null) { entitiesToSkipUpdate = new ArrayList<>(); } if (LOG.isDebugEnabled()) { LOG.debug("skipping unchanged entity: {}", entity); } entitiesToSkipUpdate.add(entity); } } if (entitiesToSkipUpdate != null) { context.getUpdatedEntities().removeAll(entitiesToSkipUpdate); } // Check if authorized to update entities if (!RequestContext.get().isImportInProgress()) { for (AtlasEntity entity : context.getUpdatedEntities()) { AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)), "update entity: type=", entity.getTypeName()); } } RequestContext.get().endMetricRecord(checkForUnchangedEntities); } EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications); ret.setGuidAssignments(context.getGuidAssignments()); // Notify the change listeners entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); if (LOG.isDebugEnabled()) { LOG.debug("<== createOrUpdate()"); } return ret; } finally { RequestContext.get().endMetricRecord(metric); AtlasPerfTracer.log(perf); } }在上述的代码中完成对数据的校验以及,以及格式的转换。剩下的就是最后一步调用JanusGraph的接口持久化数据。 8. 持久化数据,持久化数据的代码如下:
public class EntityGraphMapper { public AtlasVertex createVertex(AtlasEntity entity) { // 生成全局唯一的guid,调用util工具包直接生成。 final String guid = UUID.randomUUID().toString(); return createVertexWithGuid(entity, guid); } /** * 根据所给的guid,以及entity创建顶点 * @param entity 创建顶点的实体数据 * @param guid 当前实体的guid * @return 创建完成之后的顶点数据的封装 */ public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) { if (LOG.isDebugEnabled()) { LOG.debug("==> createVertex({})", entity.getTypeName()); } AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); // 创建图的顶点 AtlasVertex ret = createStructVertex(entity); for (String superTypeName : entityType.getAllSuperTypes()) { AtlasGraphUtilsV2.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName); } AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid); AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(entity)); GraphTransactionInterceptor.addToVertexCache(guid, ret); return ret; } private AtlasVertex createStructVertex(AtlasStruct struct) { if (LOG.isDebugEnabled()) { LOG.debug("==> createStructVertex({})", struct.getTypeName()); } //创建图的节点 final AtlasVertex ret = graph.addVertex(); AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, struct.getTypeName()); AtlasGraphUtilsV2.setEncodedProperty(ret, STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); AtlasGraphUtilsV2.setEncodedProperty(ret, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); AtlasGraphUtilsV2.setEncodedProperty(ret, CREATED_BY_KEY, RequestContext.get().getUser()); AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFIED_BY_KEY, RequestContext.get().getUser()); if (LOG.isDebugEnabled()) { LOG.debug("<== createStructVertex({})", struct.getTypeName()); } return ret; } }到这里一个完整的数据流程就完成了。后续就是调用JanusGraph的接口持久化数据。