安装Pulsar

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz # 下载pulsar安装包
tar xvfz apache-pulsar-2.8.0-bin.tar.gz # 解压安装包
cd apache-pulsar-2.8.0 # 打开pulsar目录
bin/pulsar standalone  # 启动单机pulsar

Springboot集成Pulsar

配置yml参数

#pulsar配置
pulsar:
  # pulsar服务端地址
  url: pulsar://localhost:6650
  # 多个topic以逗号分隔
  topic: topic1
  # 消费者组
  subscription: pulsar-topicGroup

读取yml参数

package com.demo.Config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author zqf
 * @date 2021/6/22 16:33
 * @description
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "pulsar")
public class PulsarConfig {
    /**
     * pulsar服务端地址
     */
    private String url;
    /**
     * topic
     */
    private String topic;
    /**
     * 消费者组
     */
    private String subscription;
}

创建生产者

package com.demo.Pulsar;

import com.demo.Config.PulsarConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author zqf
 * @date 2021/6/22 16:36
 * @description
 */
@Log4j2
@Component
public class PulsarProducer {
    @Autowired
    private PulsarConfig config;
    PulsarClient client = null;
    Producer<byte[]> producer = null;
    @PostConstruct
    public void initPulsar() throws PulsarClientException {
        //构造Pulsar client
        client = PulsarClient.builder()
                .serviceUrl(config.getUrl())
                .build();
        //创建producer
        producer = client.newProducer()
                .topic(config.getTopic())
                //是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送
                .enableBatching(true)
                //消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)
                .compressionType(CompressionType.LZ4)
                //设置将对发送的消息进行批处理的时间段,
                // 10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                //设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0
                .sendTimeout(0, TimeUnit.SECONDS)
                //批处理中允许的最大消息数。默认1000
                .batchingMaxMessages(1000)
                //设置等待接受来自broker确认消息的队列的最大大小,默认1000
                .maxPendingMessages(1000)
                //设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync
                // 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true
                .blockIfQueueFull(true)
                //向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                //key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();
    }
    /**
     * 发送消息
     * @param key key
     * @param data data
     * @return MessageId
     * @author zqf
     * @since 2021/6/21 16:46
     */
    public MessageId sendMsg(String key, String data) {
        CompletableFuture<MessageId> future = producer.newMessage()
                .key(key)
                //异步发送
                .value(data.getBytes()).sendAsync();
        future.handle((v, ex) -> {
            if (ex == null) {
                log.info("Message persisted, MessageId:{}, data:{}", v, data);
            } else {
                log.error("发送消息失败msg:{} ", data, ex);
            }
            return null;
        });
        return future.join();
    }
}

创建消费者

package com.demo.Pulsar;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.demo.Config.PulsarConfig;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;


/**
 * @author zqf
 * @date 2021/6/23 13:18
 * @description
 */
@Slf4j
@Component
public class PulsarConsumer implements ApplicationRunner {
    @Autowired
    private PulsarConfig config;
    private PulsarClient client = null;
    private Consumer consumer = null;


    @Override
    public void run(ApplicationArguments args) {
        try {
            //构造Pulsar client
            client = PulsarClient.builder()
                    .serviceUrl(config.getUrl())
                    .build();
            //创建consumer
            consumer = client.newConsumer()
                    .topic(config.getTopic().split(Constants.COMMA))
                    .subscriptionName(config.getSubscription())
                    //指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
                    .subscriptionType(SubscriptionType.Shared)
                    //指定从哪里开始消费还有Latest,valueof可选,默认Latest
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                    //指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
                    .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
                    .autoUpdatePartitions(true)
                    .subscribe();
            // 开始消费
            receive();
        } catch (Exception e) {
            log.error("Pulsar Consumer初始化异常:", e);
        }
    }

    @Async("taskExecutor")
    public void receive() {
        while (!Thread.currentThread().isInterrupted()) {
            CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
            try {
                log.info("收到消息: key: {},Data: {}", asyncMessage.get().getKey(),new String(asyncMessage.get().getData()));
            } catch (InterruptedException | ExecutionException e) {
                log.error("Pulsar消费异常:", e);
            }
        }
    }
}

测试

package com.demo.Controller;

import com.demo.Common.lang.ResultBody;
import com.demo.Pulsar.PulsarProducer;
import com.demo.Utils.SnowFlake;
import org.apache.pulsar.client.api.MessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author zqf
 * @date 2021/6/23 19:43
 * @description
 */
@RestController
public class PulsarProducerTestController {
    @Autowired
    private PulsarProducer producer;
    @RequestMapping("/sendMsg")
    public ResultBody sendMessage(@RequestParam("word")String word){
        MessageId messageId = producer.sendMsg(String.valueOf(SnowFlake.nextId()), word);
        return ResultBody.ok().data("messageId",messageId);
    }
}
POST http://localhost:8088/sendMsg?word=测试测试测试

测试结果

最后修改:2023 年 03 月 16 日
如果觉得我的文章对你有用,请随意赞赏