元数据治理框架Atlas研究——数据写入过程源码分析

概要
Atlas通过AtlasEntityStoreV2.createOrUpdate函数进行数据写入,了解AtlasEntityStoreV2.createOrUpdate这个重要函数有助于理解Atlas工作流程、优化写入性能。本文主要梳理createOrUpdate中各子模块的功能和逻辑。

源码解析
函数格式:EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications)
数据写入过程中的上下文信息存于EntityGraphDiscoveryContext context对象中,数据格式如下:

public final class EntityGraphDiscoveryContext {
    private final AtlasTypeRegistry               typeRegistry;
    private final EntityStream                    entityStream;
    private final List<String>                    referencedGuids          = new ArrayList<>();
    private final Set<AtlasObjectId>              referencedByUniqAttribs  = new HashSet<>();
    private final Map<String, AtlasVertex>        resolvedGuids            = new HashMap<>();
    private final Map<AtlasObjectId, AtlasVertex> resolvedIdsByUniqAttribs = new HashMap<>();
    private final Set<String>                     localGuids               = new HashSet<>();

step1:通过preCreateOrUpdate函数对entityStream预处理:找出entityStream中所有entity的AtlasObjectId,判断entity需要进行创建还是更新操作,创建新vertex

a) AtlasEntityGraphDiscoveryV2.discover,遍历所有entity的AtlasObjectId,放入到context中:

情况1)AtlasObject.guid!=null时将guid放入context.referencedGuids中;
  情况2)若AtlasObject.id为null,则表示该entity已经写入图数据库,将AtlasObject放入context.referencedByUniqAttribs

b) AtlasEntityGraphDiscoveryV2.resolveReferences,判断entity是否在图数据库中存在:

情况1)若context.referencedGuids中的guid在图数据库中存在对应vertex,将guid和vertex放入context.resolvedGuids中;
  情况2)若context.referencedGuids中的guid在图数据库中不存在对应vertex,将guid放入context.localGuidReference中;
  情况3)根据context.referencedByUniqAttribs中的AtlasObjectId找到对应顶点,将顶点放入resolvedIdsByUniqAttribs中,并将AtlasObjectId放入

c) 对不存在对应vertex的entity创建vertex,并放入resolvedGuids
d) 将需要创建的entity和需要更新的entity分别放入EntityMutationContext.entitiesCreated和EntityMutationContext.entitiesUpdated中

step2:对比entityStream中的entity和图数据库中的vertex,判断是否有属性发生变化,忽略不需要更新的entity,核心代码:

for(AtlasAttribute attribute:entityType.getAllAttributes().values()){
        if(!entity.getAttributes().containsKey(attribute.getName())){  // if value is not provided, current value will not be updated
        continue;
        }

        Object newVal=entity.getAttribute(attribute.getName());
        Object currVal=entityRetriever.getEntityAttribute(vertex,attribute);

        if(!attribute.getAttributeType().areEqualValues(currVal,newVal,context.getGuidAssignments())){
        hasUpdates=true;

        if(LOG.isDebugEnabled()){
        LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}",guid,entity.getTypeName(),attribute.getName(),currVal,newVal);
        }

        break;
        }
}

注意:当attribute为referred AtlasObjectId时,客户端定义的guid是UnAssigned的,必然和数据库中存储的guid值不同,这会造成元数据的重复更新,这个目前Atlas有待改进的地方。举个例子,Hive_Table的db属性为Hive_Db的AtlasObjectId, 客户端向Atlas Server重复创建Hive_Table,db属性的值为AtlasObjectId{typeName=hive_db, guid=-1554620941},每次guid值为当前时间戳而不同,造成table元数据重复更新。

step3:操作权限验证:通过AtlasAuthorizationUtils.verifyAccess函数验证发起请求的用户是否有权限对各个entity进行写操作

step4:EntityGraphMapper.mapAttributesAndClassifications为vertex更新attributes,关于entity和vertex属性的映射,可以参考文章元数据治理框架Atlas研究——JanusGraph图数据库对象关系映射

step5:通过entityChangeNotifier.onEntitiesMutated为vertex创建全文索引,并通知audit模块记录所有的变更操作
注:在整个数据写入过程中,创建全文索引这一步骤会占用超过60%的时间,如果在实际使用中不需要用全文索引的功能,可以修改源码注释掉相应doFullTextMapping函数

step6:整个数据写入过程中,我们并未提到Atlas如何调用JanusGraph的api来向图数据库写入数据。其实,Atlas只需要通过JanusGraph api中的vertex、edge对象维护数据的图结构即可。Atlas对数据读写函数都添加了@GraphTransaction注解,这个注解确保在函数运行结束后调用graph.commit()函数将当前事务内的变更提交图数据库。具体的实现可以见GraphTransactionInterceptor类。

相关推荐