温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

hbase0.98.9中如何实现endpoints

发布时间:2021-11-24 15:53:46 来源:亿速云 阅读:194 作者:柒染 栏目:云计算

本篇文章为大家展示了hbase0.98.9中如何实现endpoints,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

定制一个endpoint的过程。

下面是实现过程:

1、定义接口描述文件(该功能有protobuf提供出来

option java_package = "coprocessor.endpoints.generated"; option java_outer_classname = "RowCounterEndpointProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; message CountRequest { } message CountResponse {   required int64 count = 1 [default = 0]; } service RowCountService {   rpc getRowCount(CountRequest)     returns (CountResponse);   rpc getKeyValueCount(CountRequest)     returns (CountResponse); }

这个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。

2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)

有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。“--java_out”这个用来指定生成后的java类放的地方。

所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。

3、实现接口

package coprocessor; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest; import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse; import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService; public class RowCounterEndpointExample extends RowCountService implements	Coprocessor, CoprocessorService {	private RegionCoprocessorEnvironment env;	public RowCounterEndpointExample() {	}	@Override	public Service getService() {	return this;	}	@Override	public void getRowCount(RpcController controller, CountRequest request,	RpcCallback<CountResponse> done) {	Scan scan = new Scan();	scan.setFilter(new FirstKeyOnlyFilter());	CountResponse response = null;	InternalScanner scanner = null;	try {	scanner = env.getRegion().getScanner(scan);	List<Cell> results = new ArrayList<Cell>();	boolean hasMore = false;	byte[] lastRow = null;	long count = 0;	do {	hasMore = scanner.next(results);	for (Cell kv : results) {	byte[] currentRow = CellUtil.cloneRow(kv);	if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {	lastRow = currentRow;	count++;	}	}	results.clear();	} while (hasMore);	response = CountResponse.newBuilder().setCount(count).build();	} catch (IOException ioe) {	ResponseConverter.setControllerException(controller, ioe);	} finally {	if (scanner != null) {	try {	scanner.close();	} catch (IOException ignored) {	}	}	}	done.run(response);	}	@Override	public void getKeyValueCount(RpcController controller,	CountRequest request, RpcCallback<CountResponse> done) {	CountResponse response = null;	InternalScanner scanner = null;	try {	scanner = env.getRegion().getScanner(new Scan());	List<Cell> results = new ArrayList<Cell>();	boolean hasMore = false;	long count = 0;	do {	hasMore = scanner.next(results);	for (Cell kv : results) {	count++;	}	results.clear();	} while (hasMore);	response = CountResponse.newBuilder().setCount(count).build();	} catch (IOException ioe) {	ResponseConverter.setControllerException(controller, ioe);	} finally {	if (scanner != null) {	try {	scanner.close();	} catch (IOException ignored) {	}	}	}	done.run(response);	}	@Override	public void start(CoprocessorEnvironment env) throws IOException {	if (env instanceof RegionCoprocessorEnvironment) {	this.env = (RegionCoprocessorEnvironment) env;	} else {	throw new CoprocessorException("Must be loaded on a table region!");	}	}	@Override	public void stop(CoprocessorEnvironment env) throws IOException {	// TODO Auto-generated method stub	} }

4、注册接口(Hbase功能,通过配置文件或者表模式方式注册

这部分,可以看hbase权威指南了,我就看这部分做的。

5、测试调用

package coprocessor; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.ServiceException; import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest; import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse; import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService; import util.HBaseHelper; public class RowCounterEndpointClientExample {	public static void main(String[] args) throws ServiceException, Throwable {	Configuration conf = HBaseConfiguration.create();	HBaseHelper helper = HBaseHelper.getHelper(conf);	//helper.dropTable("testtable");	//helper.createTable("testtable", "colfam1", "colfam2");	System.out.println("Adding rows to table...");	helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");	HTable table = new HTable(conf, "testtable");	final CountRequest request = CountRequest.getDefaultInstance();	final Batch.Call<RowCountService, Long> call =new Batch.Call<RowCountService, Long>() {	public Long call(RowCountService counter)	throws IOException {	ServerRpcController controller = new ServerRpcController();	BlockingRpcCallback<CountResponse> rpcCallback = new BlockingRpcCallback<CountResponse>();	counter.getRowCount(controller, request, rpcCallback);	CountResponse response = rpcCallback.get();	if (controller.failedOnException()) {	throw controller.getFailedOn();	}	return (response != null && response.hasCount()) ? response	.getCount() : 0;	}	};	Map<byte[], Long> results = table.coprocessorService(	RowCountService.class, null, null, call);	for(byte[] b : results.keySet()){	System.err.println(Bytes.toString(b) + ":" + results.get(b));	} 	} }

上述内容就是hbase0.98.9中如何实现endpoints,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI