温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark 读取Hbase表数据并实现类似groupByKe

发布时间:2020-02-25 07:16:37 来源:网络 阅读:3000 作者:CaramelLatte 栏目:关系型数据库


一、概述 
程序运行环境很重要,本次测试基于: 
hadoop-2.6.5 
spark-1.6.2 
hbase-1.2.4 
zookeeper-3.4.6 
jdk-1.8 
废话不多说了,直接上需求


Andy column=baseINFO:age,  value=21

Andy column=baseINFO:gender,  value=0

Andy column=baseINFO:telphone_number, value=110110110

Tom  column=baseINFO:age, value=18

Tom  column=baseINFO:gender, value=1

Tom  column=baseINFO:telphone_number, value=120120120

如上表所示,将之用spark进行分组,达到这样的效果:

[Andy,(21,0,110110110)] 
[Tom,(18,1,120120120)] 
需求比较简单,主要是熟悉一下程序运行过程

二、具体代码


package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase {     private static String appName = "ReadTable";     public static void main(String[] args) {         SparkConf sparkConf = new SparkConf();     //we can also run it at local:"local[3]"  the number 3 means 3 threads         sparkConf.setMaster("spark://master:7077").setAppName(appName);         JavaSparkContext jsc = new JavaSparkContext(sparkConf);         Configuration conf = HBaseConfiguration.create();         conf.set("hbase.zookeeper.quorum", "master");          conf.set("hbase.zookeeper.property.clientPort", "2181");          Scan scan = new Scan();         scan.addFamily(Bytes.toBytes("baseINFO"));         scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));         scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));         scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));         String scanToString = "";         try {             ClientProtos.Scan proto = ProtobufUtil.toScan(scan);             scanToString = Base64.encodeBytes(proto.toByteArray());         } catch (IOException io) {             System.out.println(io);         }         for (int i = 0; i < 2; i++) {             try {                 String tableName = "VIPUSER";                 conf.set(TableInputFormat.INPUT_TABLE, tableName);                 conf.set(TableInputFormat.SCAN, scanToString);                 //get the Result of query from the Table of Hbase                 JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,                         TableInputFormat.class, ImmutableBytesWritable.class,                         Result.class);                 //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]                 JavaPairRDD<String, List<Integer>> art_scores = hBaseRDD.mapToPair(                         new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<Integer>>() {                             @Override                             public Tuple2<String, List<Integer>> call(Tuple2<ImmutableBytesWritable, Result> results) {                                 List<Integer> list = new ArrayList<Integer>();                                 byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));                                 byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));                                 byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));                 //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on                                  list.add(Integer.parseInt(Bytes.toString(telphone_number)));                                 list.add(Integer.parseInt(Bytes.toString(age)));                                 list.add(Integer.parseInt(Bytes.toString(gender)));                                 return new Tuple2<String, List<Integer>>(Bytes.toString(results._1().get()), list);                             }                         }                 );                 //switch to Cartesian product                  JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart = art_scores.cartesian(art_scores);                 //use Row Key to delete the repetition from the last step "Cartesian product"                   JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart2 = cart.filter(                         new Function<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>, Boolean>() {                             public Boolean call(Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> tuple2Tuple2Tuple2) throws Exception {                                 return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;                             }                         }                 );                 System.out.println("Create the List 'collect'...");         //get the result we need                  List<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>> collect = cart2.collect();                  System.out.println("Done..");                  System.out.println(collect.size() > i ? collect.get(i):"STOP");                  if (collect.size() > i ) break;             } catch (Exception e) {                 System.out.println(e);             }         }     } }


三、程序运行过程分析 
1、spark自检以及Driver和excutor的启动过程 
实例化一个SparkContext(若在spark2.x下,这里初始化的是一个SparkSession对象),这时候启动SecurityManager线程去检查用户权限,OK之后创建sparkDriver线程,spark底层远程通信模块(akka框架实现)启动并监听sparkDriver,之后由sparkEnv对象来注册BlockManagerMaster线程,由它的实现类对象去监测运行资源 
2、zookeeper与Hbase的自检和启动 
第一步顺利完成之后由sparkContext对象去实例去启动程序访问Hbase的入口,触发之后zookeeper完成自己的一系列自检活动,包括用户权限、操作系统、数据目录等,一切OK之后初始化客户端连接对象,之后由Hbase的ClientCnxn对象来建立与master的完整连接 
3、spark job 的运行 
程序开始调用spark的action类方法,比如这里调用了collect,会触发job的执行,这个流程网上资料很详细,无非就是DAGScheduler搞的一大堆事情,连带着出现一大堆线程,比如TaskSetManager、TaskScheduler等等,最后完成job,返回结果集 
4、结束程序 
正确返回结果集之后,sparkContext利用反射调用stop()方法,这之后也会触发一系列的stop操作,主要线程有这些:BlockManager,ShutdownHookManager,后面还有释放actor的操作等等,最后一切结束,临时数据和目录会被删除,资源会被释放

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI