温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark与hbase怎么用

发布时间:2021-12-09 10:39:12 来源:亿速云 阅读:256 作者:小新 栏目:大数据

小编给大家分享一下spark与hbase怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

package hgs.spark.hbase import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.mapreduce.TableInputFormat object HbaseTest {   def main(args: Array[String]): Unit = {     val conf = new SparkConf          conf.setMaster("local").setAppName("local")          val context = new SparkContext(conf)          val hadoopconf = new HBaseConfiguration     hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")     hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")     val tableName = "test1"     hadoopconf.set(TableInputFormat.INPUT_TABLE, tableName)     hadoopconf.set(TableInputFormat.SCAN_ROW_START, "h")     hadoopconf.set(TableInputFormat.SCAN_ROW_STOP, "x")     hadoopconf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "cf1")     hadoopconf.set(TableInputFormat.SCAN_COLUMNS, "cf1:col1,cf1:col2")          /*val startrow = "h"     val stoprow = "w"          val scan = new Scan     scan.setStartRow(startrow.getBytes)     scan.setStartRow(stoprow.getBytes)          val proto = ProtobufUtil.toScan(scan)     val scanToString = Base64.encodeBytes(proto.toByteArray())     println(scanToString)     hadoopconf.set(TableInputFormat.SCAN, scanToString)     */     val hbaseRdd = context.newAPIHadoopRDD(hadoopconf,          classOf[TableInputFormat],          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],         classOf[org.apache.hadoop.hbase.client.Result])                  hbaseRdd.foreach(x=>{          val vale =  x._2.getValue("cf1".getBytes, "col1".getBytes)          val val2 = x._2.getValue("cf1".getBytes, "col2".getBytes)           println(new String(vale),new String(val2))         })     context.stop()       } }
package hgs.spark.hbase import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable object SparkToHbase {   def main(args: Array[String]): Unit = {     val conf = new SparkConf          conf.setMaster("local").setAppName("local")          val context = new SparkContext(conf)          val rdd = context.parallelize(List(("aaaaaaa","aaaaaaa"),("bbbbb","bbbbb")), 2)     val hadoopconf = new HBaseConfiguration     hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")     hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")     hadoopconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")     //hadoopconf.set(TableOutputFormat., "test1")          val jobconf  = new JobConf(hadoopconf,this.getClass)     jobconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")     jobconf.setOutputFormat(classOf[TableOutputFormat])          val exterrdd = rdd.map(x=>{              val put = new Put(x._1.getBytes)       put.add("cf1".getBytes, "col1".getBytes, x._2.getBytes)       (new ImmutableBytesWritable,put)     })          exterrdd.saveAsHadoopDataset(jobconf)          context.stop()                  } }

看完了这篇文章,相信你对“spark与hbase怎么用”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI