Pulsar 2.5.0 之Java client
官网原文标题《Pulsar Java client》
翻译时间:2020-02-14
官网原文地址:http://pulsar.apache.org/docs/en/client-libraries-java/
译者:本文介绍如何使用javaClient创建生产者、消费者以及通过管理后台接口读取消息。
Pulsar Java client
通过Java client 可以创建生产者、消费者以及读取消息,当前API版本为2.5.0,包括两大块内容
包 | 描述 | Maven Artifact |
---|---|---|
org.apache.pulsar.client.api |
创建生产和创建消息者 API | org.apache.pulsar:pulsar-client:2.5.0 |
org.apache.pulsar.client.admin |
admin API | org.apache.pulsar:pulsar-client-admin:2.5.0 |
本章重点是创建生产和创建消息者 API如何使用,关于 admin client API 仔细阅读文档 Pulsar admin interface
Java Client 导入包方式
Maven
在pom.xml 文件添加如下
<!-- in your <properties> block -->
<pulsar.version>2.5.0</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
build.gradle 文件添加如下信息
def pulsarVersion = '2.5.0'
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}
Java Client URLS
客户端client 通过pulsar协议来进行通讯,类型列表如下
- 集群模式:pulsar://localhost:6650
- Brokers :pulsar://localhost:6550,localhost:6651,localhost:6652
- 生产集群:pulsar://pulsar.us-west.example.com:6650
- TLS 认证:pulsar+ssl://pulsar.us-west.example.com:6651
如何创建 PulsarClient 对象
默认示例:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
多brokers示例
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
.build();
本地集群模式 默认 broker URL pulsar://localhost:6650
通过loadConf
可以进行自定义 PulsarClient 参数
参数配置
参数类型 | 参数名称 | 描述 | 默认值 |
---|---|---|---|
String | serviceUrl | 无 | |
String | authPluginClassName | 无 | |
String | authParams | 无 | |
long | operationTimeoutMs | 30000 | |
long | statsIntervalSeconds | 60 | |
int | numIoThreads | 1 | |
int | numListenerThreads | 1 | |
boolean | useTcpNoDelay | true | |
boolean | useTls | false | |
string | tlsTrustCertsFilePath | 无 | |
boolean | tlsAllowInsecureConnection | false | |
boolean | tlsHostnameVerificationEnable | false | |
int | concurrentLookupRequest | 5000 | |
int | maxLookupRequest | 50000 | |
int | maxNumberOfRejectedRequestPerConnection | 50 | |
int | keepAliveIntervalSeconds | 30 | |
int | connectionTimeoutMs | 10000 | |
int | requestTimeoutMs | 60000 | |
int | defaultBackoffIntervalNanos | TimeUnit.MILLISECONDS.toNanos(100); | |
long | maxBackoffIntervalNanos | TimeUnit.SECONDS.toNanos(30) |
Producer 对象创建与使用
发送消息,需要创建Producer对象对指定的topic发送消息
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
*// You can then send messages to the broker and topic you specified:*
producer.send("My message".getBytes());
默认情况下 messages schema. 是字节数组,schema是可以根据自己的业务场景进行选择的。例如我们可以使用string 类型schema。
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
对象在使用完之后需要进行释放
producer.close();
consumer.close();
client.close();
关闭释放操作也有异步方法
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"))
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return null;
});
Producer
对象可以使用默认配置,也可以自定义配置参数,自定义配置是通过loadConf 来进行配置。
参数类型 | 参数名称 | 描述 | 默认值 |
---|---|---|---|
String | topicName |
null | |
String | producerName |
null | |
long | sendTimeoutMs |
30000 | |
boolean | blockIfQueueFull |
false | |
int | maxPendingMessages |
1000 | |
int | maxPendingMessagesAcrossPartitions |
50000 | |
MessageRoutingMode | messageRoutingMode |
pulsar.RoundRobinDistribution |
|
HashingScheme | hashingScheme |
HashingScheme.JavaStringHash |
|
ProducerCryptoFailureAction | cryptoFailureAction |
ProducerCryptoFailureAction.FAIL |
|
long | batchingMaxPublishDelayMicros |
TimeUnit.MILLISECONDS.toMicros(1) | |
int | batchingMaxMessages | 1000 | |
boolean | batchingEnabled |
true | |
CompressionType | compressionType |
No compression |
Producer自定义配置示例:
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
如果Producer 创建了分片topic,发送消息时需要消息路由模式发送,消息路由了解更多,请阅读 Partitioned Topics cookbook.
异步发送消息,返回MessageId包装对象
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
Message 对象自定义配置示例:
producer.newMessage()
.key("my-message-key")
.value("my-async-message".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
还可以使用sendAsync()方法,这个方法有返回值
Consumer
通过PulsarClient 对象来创建Consumer对象,Consumer对象在创建的时候需要指定主题与订阅名称。对象创建完成,就可以来对订阅的主题进行消费
示例:
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
通过while 循环来监听与接收内容,如果接收失败,可以通过negativeAcknowledge方法,重新投递消息
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
可以使用默认配置,也可以通过loadConf 进行自定义配置
参数类型 | 参数名称 | 描述 | 默认值 |
---|---|---|---|
Set<String> | topicNames | Sets.newTreeSet() | |
Pattern | topicsPattern | None | |
String | subscriptionName | None | |
SubscriptionType | subscriptionType | SubscriptionType.Exclusive | |
int | receiverQueueSize | 1000 | |
long | acknowledgementsGroupTimeMicros | TimeUnit.MILLISECONDS.toMicros(100) | |
long | negativeAckRedeliveryDelayMicros | TimeUnit.MINUTES.toMicros(1) | |
int | maxTotalReceiverQueueSizeAcrossPartitions | 50000 | |
String | consumerName | null | |
long | ackTimeoutMillis | 0 | |
long | tickDurationMillis | 1000 | |
int | priorityLevel | 0 | |
ConsumerCryptoFailureAction | cryptoFailureAction | ConsumerCryptoFailureAction.FAIL | |
SortedMap<String, String> | properties | new TreeMap<>() | |
boolean | readCompacted | false | |
SubscriptionInitialPosition | subscriptionInitialPosition | SubscriptionInitialPosition.Latest | |
int | patternAutoDiscoveryPeriod | 1 | |
RegexSubscriptionMode | regexSubscriptionMode | RegexSubscriptionMode.PersistentOnly | |
DeadLetterPolicy | deadLetterPolicy | None | |
boolean | autoUpdatePartitions | true | |
boolean | replicateSubscriptionState | false |
示例
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
Consumer接收方式
-
异步接收
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
-
批量接收
Messages messages = consumer.batchReceive(); for (message in messages) { // do something } consumer.acknowledge(messages)
自定义批量接收策略
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024 * 1024)
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();
默认批量接收策略
BatchReceivePolicy.builder()
.maxNumMessage(-1)
.maxNumBytes(10 * 1024 * 1024)
.timeout(100, TimeUnit.MILLISECONDS)
.build();
多主题订阅
当consumer订阅pulsar的主题,默认情况下,它订阅了一个指定的主题,例如:persistent://public/default/my-topic。从Pulsar的1.23.0-incubating的版本,Pulsar消费者可以同时订阅多个topic。你可以用以下两种方式定义topic的列表:
- 通过基础的正则表达式(regex),例如 persistent://public/default/finance-.*
- 通过明确定义的topic列表
通过正则订阅多主题时,所有的主题必须在同一个命名空间(namespace)
当订阅多主题时,pulsar客户端会自动调用Pulsar的API来发现匹配表达式的所有topic,然后全部订阅。如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。
不能保证顺序性
当消费者订阅多主题时,pulsar所提供对单一主题订阅的顺序保证,就hold不住了。如果你在使用pulsar的时候,遇到必须保证顺序的需求,我们强烈建议不要使用此特性
正则表达式订阅
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
列表订阅
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
*// Alternatively:*
Consumer multiTopicConsumer = consumerBuilder
.topics(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
异步订阅
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Consumer consumer) {
consumer.receiveAsync().thenAccept(message -> {
// Do something with the received message
receiveMessageFromConsumer(consumer);
});
}
订阅模式
Pulsar 提供了多样化的订阅模式来满足实际中的业务场景。
为了能更好的理解订阅多样化的模式,我们通过创建一个生产者,往指定主题名称为my-topic的主题发送10条消息,来进行分析对比。
发送示例:
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
Exclusive订阅模式
独占模式,只能有一个消费者绑定到订阅(subscription)上。如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
Failover订阅模式
灾备模式中,多个consumer可以绑定到同一个subscription。consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。这个consumer被称为master consumer。当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
运行结果
//conumser1 is the active consumer, consumer2 is the standby consumer.
//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.
Shared 订阅模式
shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
//Both consumer1 and consumer 2 is active consumers.
运行结果
消费者1
("key-1", "message-1-1")
("key-1", "message-1-3")
("key-2", "message-2-2")
("key-3", "message-3-1")
("key-4", "message-4-1")
消费者2
("key-1", "message-1-2")
("key-2", "message-2-1")
("key-2", "message-2-3")
("key-3", "message-3-2")
("key-4", "message-4-2")
Key_Shared订阅模式
2.4.0 版本之后新扩展的订阅模式
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.
运行结果
消费者1
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
消费者2
("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
Reader
使用Pulsar的 读取器接口, 应用程序可以手动管理游标。 当使用读取器连接到一个主题而非消费者时,在读取器连接到主题的时候就需要指定读取器从哪个位置开始读消息。当连接到某个主题时, 读取器从以下位置开始读消息:
- 主题中最早的可用消息
- 主题中最新可用消息
- 指定的消息ID
示例:
ReaderConfiguration conf = new ReaderConfiguration();
byte[] msgIdBytes = *// Some message ID byte array*
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
while (true) {
Message message = reader.readNext();
*// Process message*
}
Reader 配置
Reader 配置
参数类型 | 参数名称 | 描述 | 默认值 |
---|---|---|---|
String | topicName | None | |
int | receiverQueueSize | 1000 | |
ReaderListener<T> | readerListener | None | |
String | readerName | null | |
String | subscriptionRolePrefix | null | |
CryptoKeyReader | cryptoKeyReader | null | |
ConsumerCryptoFailureAction | cryptoFailureAction | ConsumerCryptoFailureAction.FAIL | |
boolean | readCompacted | false | |
boolean | resetIncludeHead | false |
reader范围读取策略
范围值区间0~65535,最大值不能超过65535
示例
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
.create();
Schema
Pulsar应用中,如果开发者没有为topic指定schema,producer和consumer将会处理原始字节。但实际情况我们的期望不是这样的,我们期望能按自己的数据格式进行发送,我们需要支持不同的数据类型。 Message schemas 就能很好的支持这种业务场景。
Schema示例
public class SensorReading {
public float temperature;
public SensorReading(float temperature) {
this.temperature = temperature;
}
// A no-arg constructor is required
public SensorReading() {
}
public float getTemperature() {
return temperature;
}
public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
目前java client支持shcema 如下
- Schema.BYTES
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
.topic("some-raw-bytes-topic")
.create();
//or
Producer<byte[]> bytesProducer = client.newProducer()
.topic("some-raw-bytes-topic")
.create();
- Schema.STRING
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("some-string-topic")
.create();
- Schema.JSON
Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
.topic("some-pojo-topic")
.create();
- Schema.PROTOBUF
Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
.topic("some-protobuf-topic")
.create();
- Schema.AVRO
Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
.topic("some-avro-topic")
.create();
身份验证与认证
Pulsar支持两种方式
- TLS
- Athenz
TLS 身份认证使用需要将setUseTls设置为true ,同时需要设置TLS cert 路径
示例
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(tlsAuth)
.build();
Athenz 需要设置TLS,同时需要初始化如下参数
- tenantDomain
- tenantService
- providerDomain
- privateKey
privateKey支持三种格式
- file:///path/to/file
- file:/path/to/file
- data:application/x-pem-file;base64,<base64-encoded value>
示例
Map<String, String> authParams = new HashMap<>();
authParams.put("tenantDomain", "shopping"); // Tenant domain name
authParams.put("tenantService", "some_app"); // Tenant service name
authParams.put("providerDomain", "pulsar"); // Provider domain name
authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz.class.getName(), authParams);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(athenzAuth)
.build();