spring cloud stream 提供了消息队列的封装。最近公司对新同事进行了相关的培训。
这里顺便记录一下主要部分.
学习路径
首先,需要对spring boot 有一定的了解,并能够独立搭建spring boot 项目。
其次需要对maven 有所了解。这些为基础工作。
准备工作
首先需要了解kafka 基本原理。
参考网站:
http://kafka.apache.org/intro
https://www.jianshu.com/p/97011dab6c56
然后了解 spring cloud stream 的基本原理:
https://blog.csdn.net/yejingtao703/article/details/78059480
开发准备
kafka 准备
当然首先下载安装kafka.
下载地址: Kafka Download
这里默认在linux 环境下。
1. 解压
> tar -xzf kafka_2.12-2.2.0.tgz –版本号随着时间可能变化哦
> cd kafka_2.12-2.2.0
2. 启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
3. 启动kafka
> bin/kafka-server-start.sh config/server.properties
windows 用户自行换成 windows/*.bat 进行操作。
spring boot 准备
需要参考spring boot 的快速搭建。
地址如下:https://start.spring.io
开始编写
在spring boot 的基础之上,需要参考spring cloud stream 项目的教程。
地址: https://cloud.spring.io/spring-cloud-stream-binder-kafka/
demo 地址:https://github.com/spring-cloud/spring-cloud-stream-samples
根据上面的例子,我这里整理了一下。以最简单的例子进行演示,当然如果你有更哈
maven 依赖添加
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency><dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Fishtown.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
编写java类:
package com.cqmaple.mq.example.kafka.demo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
public class SampleRunner {
//Following code is only used as a test harness.
//Following source is used as test producer.
@EnableBinding(TestSource.class)
static class TestProducer {
private AtomicBoolean semaphore = new AtomicBoolean(true);
private String[] randomWords = new String[]{"foo", "bar", "foobar", "baz", "fox"};
private Random random = new Random();
@Bean
@InboundChannelAdapter(channel = TestSource.OUTPUT, poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () -> {
int idx = random.nextInt(5);
return new GenericMessage<>(randomWords[idx]);
};
}
}
//Following sink is used as test consumer for the above processor. It logs the data received through the processor.
@EnableBinding(TestSink.class)
static class TestConsumer {
private final Log logger = LogFactory.getLog(getClass());
@StreamListener(TestSink.INPUT)
public void receive(String data) {
logger.info("Data received..." + data);
}
}
interface TestSink {
String INPUT = "input1";
@Input(INPUT)
SubscribableChannel input1();
}
interface TestSource {
String OUTPUT = "output1";
@Output(TestSource.OUTPUT)
MessageChannel output();
}
}
配置文件:
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input1.destination: words
spring.cloud.stream.bindings.output1.destination: words
启动类:
package com.cqmaple.mq.example.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
运行结果:
2019-06-26 15:19:38.866 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Successfully joined group with generation 1
2019-06-26 15:19:38.868 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Setting newly assigned partitions [words-0]
2019-06-26 15:19:38.878 INFO 8408 — [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Resetting offset for partition words-0 to offset 888.
2019-06-26 15:19:38.882 INFO 8408 — [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [words-0]
2019-06-26 15:19:39.733 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz
2019-06-26 15:19:40.683 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz
[…] 可以参照 spring cloud stream Kafka 示例 […]