Storm Spout n’obtient pas Ack

J’ai commencé à utiliser Storm, je crée donc une topologie simple à l’aide de ce tutoriel.

Lorsque je gère ma topologie avec LocalCluster et que tout semble LocalCluster pour le LocalCluster , mon problème est que je ne reçois pas d’accusé de réception sur le tuple, ce qui signifie que mon bec ack n’est jamais appelé.

mon code est ci-dessous – savez-vous pourquoi ack n’est pas appelé?

donc ma topologie ressemble à ceci

 public StormTopology build() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(HelloWorldSpout.class.getSimpleName(), helloWorldSpout, spoutParallelism); HelloWorldBolt bolt = new HelloWorldBolt(); builder.setBolt(HelloWorldBolt.class.getSimpleName(), bolt, boltParallelism) .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); } 

Mon bec ressemble à ceci

 public class HelloWorldSpout extends BaseRichSpout implements ISpout { private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("int")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } private static Boolean flag = false; public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ this.collector.emit(new Values(6)); flag = true; } } @Override public void ack(Object msgId) { System.out.println("[HelloWorldSpout] ack on msgId" + msgId); } public void fail(Object msgId){ System.out.println("[HelloWorldSpout] fail on msgId" + msgId); } } 

et mon boulon ressemble à ceci

 @SuppressWarnings("serial") public class HelloWorldBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; logger.info("preparing HelloWorldBolt"); } public void execute(Tuple tuple) { System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); this.collector.ack(tuple); } public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } } 

Votre méthode emit () dans le bec n’a qu’un seul argument, de sorte que le tuple n’est pas ancré. C’est pourquoi vous ne recevez pas de rappel de la méthode ack () dans le bec, même si vous encaissez le tuple dans le verrou.

Pour que cela fonctionne, vous devez modifier votre bec pour émettre un deuxième argument qui est l’identifiant du message. C’est cet identifiant qui est renvoyé à la méthode ack () dans le bec:

 public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ Object msgId = "ID 6"; // this can be any object this.collector.emit(new Values(6), msgId); flag = true; } } @Override public void ack(Object msgId) { // msgId should be "ID 6" System.out.println("[HelloWorldSpout] ack on msgId" + msgId); }