温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

IBatchSpout API怎么使用

发布时间:2021-12-23 13:55:56 来源:亿速云 阅读:194 作者:iii 栏目:云计算

这篇文章主要介绍“IBatchSpout API怎么使用”,在日常操作中,相信很多人在IBatchSpout API怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”IBatchSpout API怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

IBatchSpout是storm trident推出的一种可以批量发射的Spout。非事务性,基本的spout

1:Map getComponentConfiguration();定义配置,可以用backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,参数conf即是getComponentConfiguration定义的配置

3:Fields getOutputFields(); 声明输出的fields

4:void emitBatch(long batchId, TridentCollector collector); 批量发射tuple,本次的批次号为batchId

5:void ack(long batchId);批次号为batchId的数据处理成功

6:  void close();

一个例子

package storm.projectA; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MySpout implements IBatchSpout{  /**   *    */  private static final long serialVersionUID = 1L;  private long maxBatchSize;//每批次最大的数量  private BufferedReader br;//源文件流  HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();//保存发送过的所有数据,以便于重复发送  /**   * @param conf 配置   * @param context    */  @Override  public void open(Map conf, TopologyContext context) {   String filePath = (String)conf.get("filePath");   maxBatchSize = (Long)conf.get("maxBatchSize");   try {    br = new BufferedReader(new FileReader(filePath));   } catch (FileNotFoundException e) {    e.printStackTrace();   }  }  /*** spout的发送方法   * @param batchId 批次id   * @param collector 批量发射器   */  @Override  public void emitBatch(long batchId, TridentCollector collector) {   List<List<Object>> batch = batches.get(batchId);   if (batch == null) {    batch = new ArrayList<List<Object>>();    for (int i = 0; i < maxBatchSize; i++) {     try {      String line = br.readLine();      if(line == null){       break;      }      batch.add(new Values(line));     } catch (IOException e) {      e.printStackTrace();     }    }   }   for(List<Object> list : batch){             collector.emit(list);         }  }  @Override  public void ack(long batchId) {   batches.remove(batchId);  }  /**   * close 方法   */  @Override  public void close() {   if(br!=null){    try {     br.close();    } catch (IOException e) {     e.printStackTrace();    }   }     }  @Override  public Map getComponentConfiguration() {   Config conf = new Config();   //最大并行度 本地模式设置为1   conf.setMaxTaskParallelism(1);   conf.put("filePath", "D:\\aaa.txt");   conf.put("maxBatchSize", 2);   return conf;  }  /**   * 输出的fileds   */  @Override  public Fields getOutputFields() {   return new Fields("sentence");  } }

到此,关于“IBatchSpout API怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

api
AI