温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Apache Atlas是如何构建自己的API

发布时间:2021-12-21 11:00:08 来源:亿速云 阅读:203 作者:柒染 栏目:大数据

本篇文章为大家展示了Apache Atlas是如何构建自己的API,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。

一、Atlas客户端Maven依赖关系

以下依赖项可用于pom.xml文件

  <dependency>       <groupId>org.apache.atlas</groupId>       <artifactId>atlas-client</artifactId>       <version>0.7-incubating</version>   </dependency>   <dependency>       <groupId>org.apache.atlas</groupId>       <artifactId>atlas-typesystem</artifactId>       <version>0.7-incubating</version>   </dependency>   <dependency>       <groupId>org.apache.atlas</groupId>       <artifactId>atlas-notification</artifactId>       <version>0.7-incubating</version>   </dependency>   <dependency>       <groupId>org.apache.atlas</groupId>       <artifactId>atlas-repository</artifactId>       <version>0.7-incubating</version>   </dependency>

二、设置atlas-application.properties

Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中

#########  Security Properties  ######### # SSL config atlas.enableTLS=false #########  Server Properties  ######### atlas.rest.address=http://192.168.5.95:21000 atlas.hook.demo.kafka.retries=1 atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181 atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092 atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.connection.timeout.ms=2000 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 atlas.kafka.hook.group.id=atlas

三、创建与Atlas服务器的连接

要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码

final AtlasClient atlasClient = new AtlasClient             (new String[]{"http://192.168.5.95:21000"},                     new String[]{"admin",                             "admin"});

四、关于Type相关的测试类

public class AtlasTypesTest {     final AtlasClient atlasClient = new AtlasClient             (new String[]{"http://192.168.5.95:21000"},                     new String[]{"admin",                             "admin"});     static final String DATABASE_TYPE = "DB_Sync";     static final String COLUMN_TYPE = "Column_Sync";     static final String TABLE_TYPE = "Table_Sync";     static final String VIEW_TYPE = "View_Sync";     public static final String DB_ATTRIBUTE = "db";     static final String STORAGE_DESC_TYPE = "StorageDesc";     public static final String COLUMNS_ATTRIBUTE = "columns";     public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";     private static final String[] TYPES =             {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",                     "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};     /**      * 组织定义types      * @return      */     TypesDef createTypeDefinitions() {         HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil                 .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,                         TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),                         attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),                         attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));         HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil                 .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),                         attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));         HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil                 .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),                         new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),                         new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),                         attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),                         attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),                         attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),                         attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),                         attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),                         new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),                                 Multiplicity.COLLECTION, true, null));         HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil                 .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),                         new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),                         new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),                                 Multiplicity.COLLECTION, false, null));         return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),                 ImmutableList.of(),                 ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));     }     private void createTypes() throws Exception {         TypesDef typesDef = createTypeDefinitions();         String typesAsJSON =  TypesSerialization.toJson(typesDef);         System.out.println("typesAsJSON = " + typesAsJSON);         atlasClient.createType(typesAsJSON);         verifyTypesCreated();     }     private void verifyTypesCreated() throws Exception {         List<String> types = atlasClient.listTypes();         for (String type : TYPES) {             assert types.contains(type);         }     }     AttributeDefinition attrDef(String name, String dT) {         return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);     }     AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,                                 String reverseAttributeName) {         return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);     }     @Test     public void createNewTypes() throws Exception {         createTypes();     } }

五、关于Entities相关的测试类

public class AtlasEntitiesTest {     final AtlasClient atlasClient = new AtlasClient             (new String[]{"http://192.168.5.95:21000"},                     new String[]{"admin",                             "admin"});     /**      * 创建实例并返创建的Id对象      * @param referenceable      * @return      * @throws Exception      */     private Id createInstance(Referenceable referenceable) throws Exception {         String typeName = referenceable.getTypeName();         String entityJSON = InstanceSerialization.toJson(referenceable, true);         System.out.println("Submitting new entity= " + entityJSON);         List<String> guids = atlasClient.createEntity(entityJSON);         System.out.println("created instance for type " + typeName + ", guid: " + guids);         return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),                 referenceable.getTypeName());     }     /**      * 创建数据库实例并返创建的数据库Id对象      * @param name      * @param description      * @param owner      * @param locationUri      * @param traitNames      * @return      * @throws Exception      */     Id database(String name, String description, String owner, String locationUri, String... traitNames)             throws Exception {         Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);         referenceable.set("name", name);         referenceable.set("description", description);         referenceable.set("owner", owner);         referenceable.set("locationUri", locationUri);         referenceable.set("createTime", System.currentTimeMillis());         return createInstance(referenceable);     }     /**      * 创建列的实例并返创建的列的实例对象      * @param name      * @param dataType      * @param comment      * @param traitNames      * @return      * @throws Exception      */     Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {         Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);         referenceable.set("name", name);         referenceable.set("dataType", dataType);         referenceable.set("comment", comment);         return referenceable;     }     /**      * 创建表的实例并返创建的表的Id对象      * @param name      * @param description      * @param dbId      * @param sd      * @param owner      * @param tableType      * @param columns      * @param traitNames      * @return      * @throws Exception      */     Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,              List<Referenceable> columns, String... traitNames) throws Exception {         Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);         referenceable.set("name", name);         referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);         referenceable.set("description", description);         referenceable.set("owner", owner);         referenceable.set("tableType", tableType);         referenceable.set("createTime", System.currentTimeMillis());         referenceable.set("lastAccessTime", System.currentTimeMillis());         referenceable.set("retention", System.currentTimeMillis());         referenceable.set("db", dbId);         referenceable.set("sd", sd);         referenceable.set("columns", columns);         return createInstance(referenceable);     }     /**      * 创建视图的实例并返创建的视图的Id对象      * @param name      * @param dbId      * @param inputTables      * @param traitNames      * @return      * @throws Exception      */     Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {         Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);         referenceable.set("name", name);         referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);         referenceable.set("db", dbId);         referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);         return createInstance(referenceable);     }     /**      * 原始存储描述符      * @param location      * @param inputFormat      * @param outputFormat      * @param compressed      * @return      * @throws Exception      */     Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)             throws Exception {         Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);         referenceable.set("location", location);         referenceable.set("inputFormat", inputFormat);         referenceable.set("outputFormat", outputFormat);         referenceable.set("compressed", compressed);         return referenceable;     }     @Test     public void createEntities() throws Exception {         //创建数据库实例         Id syncDB = database("sy_sync", "Sync Database", "root", "");         //存储描述符         Referenceable sd =                 storageDescriptor("", "TextInputFormat", "TextOutputFormat",                         true);         //创建列实例         //1、数据源         List<Referenceable> databaseColumns = ImmutableList                 .of(column("id", "long", "id"),                         column("name", "string", "name"),                         column("type", "string", "type"),                         column("url", "string", "url"),                         column("database_name", "string", "database name"),                         column("username", "string", "username"),                         column("password","string","password"),                         column("description", "string", "description"),                         column("create_time", "string", "create time"),                         column("update_time", "string", "update time"),                         column("create_id", "long", "user id"),                         column("update_id", "long", "user id"));         //2、同步文件夹         List<Referenceable> syncFolderColumns = ImmutableList                 .of(column("id", "long", "id"),                         column("name", "string", "name"),                         column("description", "string", "description"),                         column("create_time", "string", "create time"),                         column("update_time", "string", "update time"),                         column("create_id", "long", "user id"),                         column("update_id", "long", "user id"));         //创建表实例         Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);         Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);         //创建视图实例     }     @Test     public void getEntity() throws AtlasServiceException {         Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");         System.out.println(InstanceSerialization.toJson(referenceable, true));     } }

上述内容就是Apache Atlas是如何构建自己的API,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI