Pulsar安装和简单使用
安装Pulsar
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz # 下载pulsar安装包
tar xvfz apache-pulsar-2.8.0-bin.tar.gz # 解压安装包
cd apache-pulsar-2.8.0 # 打开pulsar目录
bin/pulsar standalone # 启动单机pulsar
Springboot集成Pulsar
配置yml参数
#pulsar配置
pulsar:
# pulsar服务端地址
url: pulsar://localhost:6650
# 多个topic以逗号分隔
topic: topic1
# 消费者组
subscription: pulsar-topicGroup
读取yml参数
package com.demo.Config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author zqf
* @date 2021/6/22 16:33
* @description
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "pulsar")
public class PulsarConfig {
/**
* pulsar服务端地址
*/
private String url;
/**
* topic
*/
private String topic;
/**
* 消费者组
*/
private String subscription;
}
创建生产者
package com.demo.Pulsar;
import com.demo.Config.PulsarConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @author zqf
* @date 2021/6/22 16:36
* @description
*/
@Log4j2
@Component
public class PulsarProducer {
@Autowired
private PulsarConfig config;
PulsarClient client = null;
Producer<byte[]> producer = null;
@PostConstruct
public void initPulsar() throws PulsarClientException {
//构造Pulsar client
client = PulsarClient.builder()
.serviceUrl(config.getUrl())
.build();
//创建producer
producer = client.newProducer()
.topic(config.getTopic())
//是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送
.enableBatching(true)
//消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)
.compressionType(CompressionType.LZ4)
//设置将对发送的消息进行批处理的时间段,
// 10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
//设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0
.sendTimeout(0, TimeUnit.SECONDS)
//批处理中允许的最大消息数。默认1000
.batchingMaxMessages(1000)
//设置等待接受来自broker确认消息的队列的最大大小,默认1000
.maxPendingMessages(1000)
//设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync
// 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true
.blockIfQueueFull(true)
//向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
//key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();
}
/**
* 发送消息
* @param key key
* @param data data
* @return MessageId
* @author zqf
* @since 2021/6/21 16:46
*/
public MessageId sendMsg(String key, String data) {
CompletableFuture<MessageId> future = producer.newMessage()
.key(key)
//异步发送
.value(data.getBytes()).sendAsync();
future.handle((v, ex) -> {
if (ex == null) {
log.info("Message persisted, MessageId:{}, data:{}", v, data);
} else {
log.error("发送消息失败msg:{} ", data, ex);
}
return null;
});
return future.join();
}
}
创建消费者
package com.demo.Pulsar;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.demo.Config.PulsarConfig;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* @author zqf
* @date 2021/6/23 13:18
* @description
*/
@Slf4j
@Component
public class PulsarConsumer implements ApplicationRunner {
@Autowired
private PulsarConfig config;
private PulsarClient client = null;
private Consumer consumer = null;
@Override
public void run(ApplicationArguments args) {
try {
//构造Pulsar client
client = PulsarClient.builder()
.serviceUrl(config.getUrl())
.build();
//创建consumer
consumer = client.newConsumer()
.topic(config.getTopic().split(Constants.COMMA))
.subscriptionName(config.getSubscription())
//指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
.subscriptionType(SubscriptionType.Shared)
//指定从哪里开始消费还有Latest,valueof可选,默认Latest
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
//指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
.autoUpdatePartitions(true)
.subscribe();
// 开始消费
receive();
} catch (Exception e) {
log.error("Pulsar Consumer初始化异常:", e);
}
}
@Async("taskExecutor")
public void receive() {
while (!Thread.currentThread().isInterrupted()) {
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
try {
log.info("收到消息: key: {},Data: {}", asyncMessage.get().getKey(),new String(asyncMessage.get().getData()));
} catch (InterruptedException | ExecutionException e) {
log.error("Pulsar消费异常:", e);
}
}
}
}
测试
package com.demo.Controller;
import com.demo.Common.lang.ResultBody;
import com.demo.Pulsar.PulsarProducer;
import com.demo.Utils.SnowFlake;
import org.apache.pulsar.client.api.MessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author zqf
* @date 2021/6/23 19:43
* @description
*/
@RestController
public class PulsarProducerTestController {
@Autowired
private PulsarProducer producer;
@RequestMapping("/sendMsg")
public ResultBody sendMessage(@RequestParam("word")String word){
MessageId messageId = producer.sendMsg(String.valueOf(SnowFlake.nextId()), word);
return ResultBody.ok().data("messageId",messageId);
}
}
POST http://localhost:8088/sendMsg?word=测试测试测试
测试结果
当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »