Ferramentas necessárias:
Crie uma aplicação com o nome de sua escolha no spring starter com as seguintes dependências necessárias
Dentro do arquivo pom.xml adicione a dependencia abaixo, pois vamos utilizar o Gson para serializar nossa mensagem recebida em String para um objeto de uma classe Java.
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.6</version> </dependency>
Na nossa segunda aplicação, que eu chamei de API Orquestradora, vamos configurar o elasticsearch e a fila Jms que utilizaremos para realizar o proposito da aplicação.
Criamos a classe ElasticsearchClientConfig onde configuramos o acesso a nosso elasticsearch rodando no docker.
ElasticsearchClientConfig
import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories; @Configuration @EnableElasticsearchRepositories(basePackages = "br.com.orquestrador.infrastructure.repository.elasticsearch") @ComponentScan(basePackages = { "br.com.orquestrador" }) public class ElasticsearchClientConfig extends AbstractElasticsearchConfiguration { @Override @Bean public RestHighLevelClient elasticsearchClient() { final ClientConfiguration clientConfiguration = ClientConfiguration .builder() .connectedTo("localhost:9200") .build(); return RestClients.create(clientConfiguration).rest(); } }
E também uma classe Repository com o nome UserRepository para utilizarmos o método de save para salvar usuário no elasticsearch e o metodo de buscar por nome.
UserRepository
import br.com.orquestrador.user.User; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; import java.util.Optional; @Repository public interface UserRepository extends ElasticsearchRepository<User, String> { Optional<User> findByName(String name); }
Agora vamos configurar a parte de fila para acessar o ActiveMq rodando em nosso docker, para isto criamos a classe JmsConfig e adicionamos algumas configs da filas utilizadas no application.properties.
JmsConfig
import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration @EnableJms public class JmsConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Value("${spring.activemq.user}") private String user; @Value("${spring.activemq.password}") private String password; @Bean public ActiveMQConnectionFactory connectionFactory() { if ( "".equals(user) ) { return new ActiveMQConnectionFactory(brokerUrl); } return new ActiveMQConnectionFactory(user, password, brokerUrl); } @Bean public JmsListenerContainerFactory jmsFactoryTopic(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setPubSubDomain(true); return factory; } @Bean public JmsTemplate jmsTemplate() { return new JmsTemplate(connectionFactory()); } @Bean public JmsTemplate jmsTemplateTopic() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); jmsTemplate.setPubSubDomain( true ); return jmsTemplate; } }
Vamos configurar um Listener Kafka para ouvir nosso tópico que esta produzindo as mensagens na aplicação anterior.
Para isso criamos uma classe KafkaConsumer que ira ficar ouvindo nosso tópico e a cada mensagem produzida realizar a logica que definimos.
- Passo 1 - Serializar a mensagem recebida em um objeto Java
- Passo 2 - Enviar este objeto para uma fila do Activemq
- Passo 3 - Salvar os dados deste usuario no elasticsearch
KafkaConsumer
import br.com.orquestrador.application.ListenerKafka.dto.UserDto; import br.com.orquestrador.user.User; import br.com.orquestrador.user.UserService; import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @Autowired private Gson serializer; @Autowired private UserService userService; @Autowired private JmsTemplate jmsTemplate; @Autowired public KafkaConsumer(Gson serializer) { this.serializer = serializer; } @KafkaListener(topics = "${user.topic}", groupId = "${spring.kafka.consumer.group-id}") public void receive( @Payload String message) { logger.info("message received: {}", message); UserDto usuarioDto = serializer.fromJson(message, UserDto.class); jmsTemplate.convertAndSend("queue.sample", message); User usuario = usuarioDto.converte(); logger.info(usuario.toString()); String messageFinal = userService.save(usuario); logger.info(messageFinal); } }
Abaixo segue o link do github onde a aplicação esta armazenada para que possam conferir como a mesma ficou.
Top comments (0)