Skip to content

spring-cloud-starter-stream-kafka 消息格式问题

最近在对接kafka的过程中,因为是与三方服务对接,而对方为非spring 项目。

由于之前项目都是使用的spring-cloud-starter-stream-kafka, 数据格式一直都是默认的、或者json。

而这次传输的数据又是文本数据,所以直接使用的字符。

但是在对方解码的时候,所有数据的第一列数据老是出异常。

经过排查为默认spring-cloud-starter-stream-kafka 的header需要被解析。

开始对接的时候我使用的kafka的命令行进行测试的,没问题。原来命令行默认header默认不解析。

查询了相关的配置文档之后:

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.2.RELEASE/single/spring-cloud-stream-binder-kafka.html

相关解释:

headerMode:

When set to raw, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.

Default: embeddedHeaders.

在生产者中添加:

spring:
  cloud:
    stream:
      bindings:
        xxx:
          destination: xxx-topic
          #contentType: application/json
          producer:
            headerMode: raw
        xxx-input:
          destination: xxx-topic
          #contentType: application/json
          consumer:
            headerMode: raw

生产者与消费者分别添加红色部分的配置,即可。表示消息头部信息为原始格式。

 

顺带还有一个参数,就是消息内容格式(contentType),蓝色部分(被注释掉的哦):

contentType:

The content type of the channel. See “Chapter 8, Content Type Negotiation”.

Default: null (no type coercion is performed).

 

可以使用的值:text/plain,application/json,application/x-java-object,application/octet-stream等等。

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注