温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据

发布时间:2021-08-02 17:32:58 来源:亿速云 阅读:440 作者:Leah 栏目:开发技术

本篇文章为大家展示了Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

简介

通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

Maven

 <dependency>    <groupId>io.streamnative.connectors</groupId>    <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>    <version>2.7.3</version>  </dependency>    <!-- JAR repositories -->    <repositories>         <repository>             <id>central</id>             <layout>default</layout>             <url>https://repo1.maven.org/maven2</url>         </repository>         <repository>             <id>bintray-streamnative-maven</id>             <name>bintray</name>             <url>https://dl.bintray.com/streamnative/maven</url>         </repository>     </repositories>

CODE

使用PulsarMetadataReader获取元数据

package com.levi.demo; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /**  * Test.  *  * @author levi  * @version 1.0  **/ public class Test {     public static void main(String[] args)  {         final ClientConfigurationData configurationData = new ClientConfigurationData();         configurationData.setServiceUrl("pulsar://127.0.0.1:6650");         //Your Pulsar Token         final AuthenticationToken token =                 new AuthenticationToken(                         "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");          configurationData.setAuthentication(token);           try (final PulsarMetadataReader reader =                      new PulsarMetadataReader("http://127.0.0.1:8443",                              configurationData,                              "",                              new HashMap(),                              -1,                              -1)) {             //获取namespaces             final List<String> namespaces = reader.listNamespaces();             System.out.println("namespaces: " + namespaces.toString());                          for (final String namespace : namespaces) {                 //获取Topics                 final List<String> topics = reader.getTopics(namespace);                 System.out.println("topic: " + topics.toString());                                  for (String topic : topics) {                     //获取字段SchemaInfo                     final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);                     final String name = schemaInfo.getName();                     System.out.println("SchemaName:" + name); //topicName                     final SchemaType type = schemaInfo.getType();                      System.out.println("SchemaType:" + type.toString());// "JSON"...                     final Map<String, String> properties = schemaInfo.getProperties();                     System.out.println(properties);                      final String schemaDefinition = schemaInfo.getSchemaDefinition();                     System.out.println(schemaDefinition); // Field info.                 }             }         } catch (IOException | PulsarAdminException e) {             e.printStackTrace();         }     } }

上述内容就是Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI