Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
- if(acknowledgment != null) { System.out.println("Acknowledgment provided");
- acknowledgment.acknowledge(); }
- }
- <int-kafka:message-driven-channel-adapter
- id="kafkaMessageListener"
- listener-container="kafkaMessageContainer" auto-startup="true"
- phase="100" send-timeout="5000" mode="record"
- message-converter="messageConverter"
- recovery-callback="recoveryCallback" error-message-strategy="ems"
- channel="inputFromKafkaChannel" error-channel="errorChannel" />
- <int:transformer id="transformerid"
- ref="transformerBean"
- input-channel="inputFromKafkaChannel" method="transform"
- output-channel="messageTransformer" />
- <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
- <constructor-arg>
- <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
- <constructor-arg>
- <map>
- <entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
- <entry key="enable.auto.commit" value="false" />
- <entry key="auto.commit.interval.ms" value="100" />
- <entry key="session.timeout.ms" value="15000" />
- <entry key="group.id" value="${spring.kafka.consumer.group-id}" />
- <entry key="key.deserializer"
- value="org.apache.kafka.common.serialization.StringDeserializer" />
- <entry key="value.deserializer"
- value="org.apache.kafka.common.serialization.StringDeserializer" />
- </map>
- </constructor-arg>
- </bean>
- </constructor-arg>
- <constructor-arg>
- <bean class="org.springframework.kafka.listener.ContainerProperties">
- <constructor-arg name="topics" value="${spring.kafka.topics}" />
- </bean>
- </constructor-arg>
- </bean>
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement