线程切换的操作在 Rx 里面非常常用,主要有 subscribeOn observeOn 他们都需要一个 Scheduler 参数,很明显这个是接口 可以实现各种 调度器。根据这个我们可以这样写...
//Scheduler.kt
abstract class Scheduler {
abstract fun createWorker(): Worker
abstract class Worker {
abstract fun schedule(action: () -> Unit)
}
}
Worker 是实际工作的,有个抽象方法 schedule 需要我们去实现
Executor 是 Java 里面线程池的执行器, Executors 里面有各种已经实现的线程池,至此 我们可以来实现一个ExecutorScheduler
//ExecutorScheduler.kt
class ExecutorScheduler(private var executor: Executor) : Scheduler() {
override fun createWorker(): Worker {
return WorkerImpl(executor)
}
private class WorkerImpl(private var executor: Executor) : Worker() {
override fun schedule(action: () -> Unit) {
executor.execute {
action()
}
}
}
}
参考 rxjava 建立一个 Schedulers 存放 Scheduler,这样就直接可以调用 Schedulers.io()
class Schedulers {
companion object {
private val io = ExecutorScheduler(Executors.newSingleThreadExecutor())
fun io(): Scheduler {
return io
}
}
}
实现 Android 特有 的 AndroidScheduler,主要利用 Looper
//LooperScheduler.kt
class LooperScheduler(looper: Looper) : Scheduler() {
private var handler : Handler = Handler(looper)
override fun createWorker(): Worker = LooperSchedulerWorker(handler)
private class LooperSchedulerWorker(private var handler : Handler) : Worker() {
override fun schedule(action: () -> Unit) {
handler.post {
action()
}
}
}
}
//AndroidSchedulers.kt
class AndroidSchedulers {
companion object {
private val mainThread = LooperScheduler(Looper.getMainLooper())
fun mainThread(): Scheduler {
return mainThread
}
}
}
实现 subscribeOn observeOn
//Observable.kt
fun <R> lift(operator: Operator<R, T>): Observable<R> {
return create(OnSubscribeLift(onSubscribe!!, operator))
}
fun subscribeOn(scheduler: Scheduler): Observable<T> {
return create(OperatorSubscribeOn(this, scheduler))
}
fun observeOn(scheduler: Scheduler): Observable<T> {
return lift(OperatorObserveOn(scheduler))
}
interface Operator<T, R> {
fun call(subscriber: Subscriber<T>) : Subscriber<R>
}
//OnSubscribeLift.kt
class OnSubscribeLift<T, R>(private var parent: Observable.OnSubscribe<T>, private var operator: Observable.Operator<R, T>) : Observable.OnSubscribe<R>{
override fun call(subscriber: Subscriber<R>) {
try {
val st = operator.call(subscriber)
st.onStart()
parent.call(st)
}catch (e: Exception) {
subscriber.onError(e)
}
}
}
在 rxjava 里面 subscribeOn 只有调用的第一次起作用,而 observeOn 则是看最近的那次调用。
原因在于 subscribeOn 调度的是执行 OnSubscribe 因此多次调用最上游的还是 在 第一次的线程那里执行的,除非更改下游 观察者所在的线程 也就是 observeOn。如果不明白的,可以多看这部分代码或者 rxjava 的源码加深理解。
//OperatorSubscribeOn.kt
class OperatorSubscribeOn<T>(private var source: Observable<T> , private var scheduler: Scheduler) : Observable.OnSubscribe<T>{
override fun call(subscriber: Subscriber<T>) {
val worker = scheduler.createWorker()
worker.schedule {
source.subscribe(SubscribeOnSubscriber(subscriber))
}
}
class SubscribeOnSubscriber<T>(private var actual: Subscriber<T>): Subscriber<T>() {
override fun onCompleted() {
actual.onCompleted()
}
override fun onError(t: Throwable) {
actual.onError(t)
}
override fun onNext(t: T) {
actual.onNext(t)
}
}
}
//OperatorObserveOn.kt
class OperatorObserveOn<T>(private var scheduler: Scheduler): Observable.Operator<T, T> {
override fun call(subscriber: Subscriber<T>): Subscriber<T> {
val worker = scheduler.createWorker()
return object : Subscriber<T>(){
override fun onCompleted() {
worker.schedule {
subscriber.onCompleted()
}
}
override fun onError(t: Throwable) {
worker.schedule {
subscriber.onError(t)
}
}
override fun onNext(t: T) {
worker.schedule {
subscriber.onNext(t)
}
}
}
}
}
//Test.kt
Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.map {
it.toInt() + 1
}
.filter {
it != 1
}
.map {
it
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<Int>() {
override fun onCompleted() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: Int) {
System.out.println(t)
}
})