前言
- 背压,响应式编程中的概念。意思是当服务端资源不足的时候能够通知客户端请求慢一点。
- 在RSocket中使用Lease机制实现,Lease的构造方法里有两个参数,ttl和nums-of-requests,两个参数通知客户端在接下来的ttl时间里服务端最多可以处理nums-of-requests个请求。
机制详解
以下机制都是需要我们自己处理的业务逻辑,RSocket底层机制只定义了lease帧和发送lease的方法。
1、针对服务端
- a、服务端首先要评估自己的剩余资源,比如自己的异步队列还剩多少空闲长度,然后根据这个剩余资源去创建一个lease对象,表明在接下来的ttl时间里我还有这么多资源供你请求
- b、服务端应该每隔一个时间给客户端发送lease以告知自己还有多少剩余资源
2、针对客户端
- a、客户端要有租约处理器,这个处理器应该位于客户端订阅者的上游,即先处理租约再执行业务逻辑
- b、客户端可以缓存到当前时间为止的最新租约,并且能够通知下游新租约的到来
- c、客户端应该具有延迟机制,在初始没有有效租约lease消息到来之前不能发送消息
- d、客户端应该具有延迟重试机制,在没有新的有效的租约lease消息到来之前不要发送消息
代码解析
此代码源于官方git,不过官方git没有给注释,对初学者并不友好
服务端
- 服务端代码
服务端代码包括了一个阻塞队列,和一个工作线程。工作线程消费阻塞队列里的消息。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketServer;
import io.rsocket.lease.Leases;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class LeaseServer {
private static final String SERVER_TAG = "server";
public static void main(String[] args) throws InterruptedException {
// Queue for incoming messages represented as Flux
// Imagine that every fireAndForget that is pushed is processed by a worker
int queueCapacity = 50;
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
// emulating a worker that process data from the queue
Thread workerThread =
new Thread(
() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
String message = messagesQueue.take();
System.out.println("消费者线程处理消息:" + message);
Thread.sleep(100000); // emulating processing
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
workerThread.start();
CloseableChannel server = getFireAndForgetServer(messagesQueue, workerThread);
TimeUnit.MINUTES.sleep(10);
server.dispose();
}
/**
* 收到fireAndForget消息之后让消息入队。
* 启动租约机制,5秒有效期和队列剩余容量可供请求
*
* @param messagesQueue
* @param workerThread
*
* @return
*/
private static CloseableChannel getFireAndForgetServer(BlockingQueue<String> messagesQueue, Thread workerThread) {
CloseableChannel server =
RSocketServer.create((setup, sendingSocket) ->
Mono.just(new RSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
// add element. if overflows errors and terminates execution
// specifically to show that lease can limit rate of fnf requests in
// that example
try {
if (!messagesQueue.offer(payload.getDataUtf8())) {
System.out.println("Queue has been overflowed. Terminating execution");
sendingSocket.dispose();
workerThread.interrupt();
}
} finally {
payload.release();
}
return Mono.empty();
}
}))
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
.bindNow(TcpServerTransport.create("localhost", 7000));
return server;
}
}
- 服务端租约生成器
服务端租约生成器包括了租约生成。主要根据剩余空闲队列的长度来生成租约,租约有效时间可以自己设置。租约以Flux即流的方式向客户端传输。
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;
import io.rsocket.lease.Lease;
import io.rsocket.lease.LeaseStats;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* This is a class responsible for making decision on whether Responder is ready to receive new
* FireAndForget or not base in the number of messages enqueued. <br>
* In the nutshell this is responder-side rate-limiter logic which is created for every new
* connection.<br>
* In real-world projects this class has to issue leases based on real metrics
*/
@Slf4j
public class LeaseCalculator implements Function<Optional<LeaseStats>, Flux<Lease>> {
final String tag;
final BlockingQueue<?> queue;
public LeaseCalculator(String tag, BlockingQueue<?> queue) {
this.tag = tag;
this.queue = queue;
}
@Override
public Flux<Lease> apply(Optional<LeaseStats> leaseStats) {
log.info("{} stats are {}", tag, leaseStats.isPresent() ? "present" : "absent");
Duration ttlDuration = Duration.ofSeconds(10);
// The interval function is used only for the demo purpose and should not be
// considered as the way to issue leases.
// For advanced RateLimiting with Leasing
// consider adopting https://github.com/Netflix/concurrency-limits#server-limiter
// 每2秒发送租约,租约内容为队列容量和10秒有效期
return Flux.interval(Duration.ofSeconds(0), ttlDuration.dividedBy(2))
.handle((__, sink) -> {
// put queue.remainingCapacity() + 1 here if you want to observe that app is
// terminated because of the queue overflowing
int requests = queue.remainingCapacity();
// reissue new lease only if queue has remaining capacity to
// accept more requests
if (requests > 0) {
long ttl = ttlDuration.toMillis();
sink.next(Lease.create((int) ttl, requests));
}
});
}
}
客户端
- 客户端代码
客户端代码包括了持有租约后的逻辑处理和未持有租约之前的阻塞等待。
import java.util.Objects;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.lease.Leases;
import io.rsocket.lease.MissingLeaseException;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class LeaseClient {
private static final String CLIENT_TAG = "client";
public static void main(String[] args) {
LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
RSocket clientRSocket =
RSocketConnector.create()
.lease(() -> Leases.create().receiver(receiver))
.connect(TcpClientTransport.create("localhost", 7000))
.block();
Objects.requireNonNull(clientRSocket);
// generate stream of fnfs
Flux.generate(() -> 0L, (state, sink) -> {
// 给下游订阅者发送单个消息
sink.next(state);
return state + 1;
})
// 等待新的租约到来再继续执行下边的,不然就在这阻塞
.delaySubscription(receiver.notifyWhenNewLease().then())
// 新租约到来之后,flatten和order这些流的帧
.concatMap(tick -> {
System.out.println("客户端发射消息" + tick);
// 有订阅者之后再创建mono
return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
// retry.indefinitely表示非立即重试,也就是说下一次重试没有确定时间
.retryWhen(Retry.indefinitely()
// 只有在租约到期的错误的时候才开始等待新租约
.filter(t -> t instanceof MissingLeaseException)
// 执行重试之前的信号,也就是新的租约到来的时候才会重试
.doBeforeRetryAsync(
rs -> {
// 在重试之前会阻塞,直到新的租约到来
System.out.println("租约到期:" + rs);
return receiver.notifyWhenNewLease().then();
}));
})
.blockLast();
clientRSocket.onClose().block();
}
}
- 客户端租约处理器
客户端租约处理器包括了客户端业务逻辑之前的租约拦截和处理,包括打租约日志,通知下游订阅者新的有效租约的到来。
import java.util.function.Consumer;
import io.rsocket.lease.Lease;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
/**
* Requester-side Lease listener.<br>
* In the nutshell this class implements mechanism to listen (and do appropriate actions as
* needed) to incoming leases issued by the Responder
*/
@Slf4j
public class LeaseReceiver implements Consumer<Flux<Lease>> {
final String tag;
// 缓存最后一个租约,每当新的订阅者订阅的时候就回放这个租约
final ReplayProcessor<Lease> lastLeaseReplay = ReplayProcessor.cacheLast();
public LeaseReceiver(String tag) {
this.tag = tag;
}
@Override
public void accept(Flux<Lease> receivedLeases) {
receivedLeases.subscribe(
l -> {
log.info("{} received leases - ttl: {}, requests: {}", tag, l.getTimeToLiveMillis(),
l.getAllowedRequests());
lastLeaseReplay.onNext(l);
});
}
/**
* 通知下游新的有效租约的到来
*/
public Mono<Lease> notifyWhenNewLease() {
return lastLeaseReplay.filter(l -> l.isValid()).next();
}
}
总结
这种应用层背压机制或者限流机制对我们基于其他协议实现背压也是有帮助的,我们可以以这种思路来设计限流或者背压。
- 服务端评估自己剩余空闲资源
- 服务端定时给客户端发送自己的空闲资源,即lease,这个lease可以是我们自己定义的一种数据结构,必须包含ttl和剩余资源
- 客户端以lease为凭证执行业务逻辑,没有lease不执行(阻塞)
- 客户端有有效lease的时候持续执行,或者按照时间执行发送逻辑