RACSignal 原理:高阶信号、同步、副作用、多线程操作

原创:知识点总结性文章
创作不易,请珍惜,之后会持续更新,不断完善
个人比较喜欢做笔记和写总结,毕竟好记性不如烂笔头哈哈,这些文章记录了我的IOS成长历程,希望能与大家一起进步
温馨提示:由于简书不支持目录跳转,大家可通过command + F 输入目录标题后迅速寻找到你所需要的内容

目录

  • 一、高阶信号操作
    • 1、flattenMap: (在父类RACStream中定义的)
    • 2、flatten (在父类RACStream中定义的)
    • 3、flatten:
    • 4、concat
    • 5、switchToLatest
      1. switch: cases: default:
    • 7、if: then: else:
    • 8、catch:
    • 9、catchTo:
    • 10、try:
    • 11、tryMap:
    • 12、timeout: onScheduler:
    • 总结
  • 二、同步操作
    • 1、firstOrDefault: success: error:
    • 2、firstOrDefault:
    • 3、first
    • 4、waitUntilCompleted:
    • 5、toArray
  • 三、副作用操作
      1. doNext:
    • 2、doError:
    • 3、doCompleted:
    • 4、initially:
      1. finally:
  • 四. 多线程操作
    • 1、deliverOn:
    • 2、subscribeOn:
    • 3、deliverOnMainThread
  • 五、其他操作
    • 1、setKeyPath: onObject: nilValue:
    • 2、setKeyPath: onObject:

一、高阶信号操作

高阶操作大部分的操作是针对高阶信号的,也就是说信号里面发送的值还是一个信号。可以类比数组,这里就是多维数组,数组里面还是套的数组。

1、flattenMap: (在父类RACStream中定义的)

flattenMap:在整个RAC中具有很重要的地位,很多信号变换都是可以用flattenMap:来实现的。map:flattenfiltersequenceMany:这4个操作都是用flattenMap:来实现的。回顾一下map:的实现:

- (instancetype)map:(id (^)(id value))block {
    NSCParameterAssert(block != nil);
    
    Class class = self.class;
    return [[self flattenMap:^(id value) {
        return [class return:block(value)];
    }] setNameWithFormat:@"[%@] -map:", self.name];
}

map:的操作其实就是直接对原信号进行flattenMap:的操作,变换出来的新的信号的值是block(value)flatten的实现接下去会具体分析,这里先略过。

filter的实现和map:有点类似,也是对原信号进行flattenMap:的操作,只不过block(value)不是作为返回值,而是作为判断条件,满足这个闭包的条件,变换出来的新的信号返回值就是value,不满足的就返回empty信号。

- (instancetype)filter:(BOOL (^)(id value))block {
    NSCParameterAssert(block != nil);
    
    Class class = self.class;
    return [[self flattenMap:^ id (id value) {
        block(value) ? return [class return:value] :  return class.empty;
    }] setNameWithFormat:@"[%@] -filter:", self.name];
}

接下去要分析的高阶操作里面,switchToLatesttry:tryMap:的实现中也将会使用到flattenMap:flattenMap:的实现是调用了bind函数,对原信号进行变换,并返回block(value)的新信号。

- (instancetype)flattenMap:(RACStream * (^)(id value))block {
    Class class = self.class;
    
    return [[self bind:^{
        return ^(id value, BOOL *stop) {
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
            
            return stream;
        };
    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

2、flatten (在父类RACStream中定义的)

flatten操作必须是对高阶信号进行操作,如果信号里面不是信号,即不是高阶信号,那么就会崩溃。崩溃信息如下:

*** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: 'Value returned from -flattenMap: is not a stream

所以flatten是对高阶信号进行的降阶操作。高阶信号每发送一次信号,经过flatten变换,由于flattenMap:操作之后,返回的新的信号的每个值就是原信号中每个信号的值。

如果对信号A,信号B,信号C进行merge:操作,可以达到和flatten一样的效果。

[RACSignal merge:@[signalA,signalB,signalC]];

merge:操作在上篇文章分析过,再来复习一下。现在在回来看这段代码,copiedSignals虽然是一个NSMutableArray,但是它近似合成了一个上图中的高阶信号。然后这些信号们每发送出来一个信号就发给订阅者。整个操作如flatten的字面意思一样,压平。

+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
    NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
    for (RACSignal *signal in signals) {
        [copiedSignals addObject:signal];
    }
    
    return [[[RACSignal
              createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
                  for (RACSignal *signal in copiedSignals) {
                      [subscriber sendNext:signal];
                  }
                  
                  [subscriber sendCompleted];
                  return nil;
              }]
             flatten]
            setNameWithFormat:@"+merge: %@", copiedSignals];
}

flatten的操作是可以选择3种操作选择的:mergeconcatswitchToLatest

public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
    switch strategy {
    case .merge:
        return self.merge()
    case .concat:
        return self.concat()
    case .latest:
        return self.switchToLatest()
    }
}

3、flatten:

flatten:操作也必须是对高阶信号进行操作,如果信号里面不是信号,即不是高阶信号,那么就会崩溃。flatten:的实现比较复杂,一步步的来分析。先来解释一些变量,数组的作用。

  • activeDisposables里面装的是当前正在订阅的订阅者们的disposables信号。
  • queuedSignals里面装的是被暂时缓存起来的信号,它们等待被订阅。
  • selfCompleted表示高阶信号是否Completed
  • subscribeToSignal闭包的作用是订阅所给的信号,这个闭包的入参参数就是一个信号,在闭包内部订阅这个信号,并进行一些操作。
  • recur是对subscribeToSignal闭包的一个弱引用,防止strong-weak循环引用,在下面会分析subscribeToSignal闭包,就会明白为什么recur要用weak修饰了。
  • completeIfAllowed的作用是在所有信号都发送完毕的时候,通知订阅者,给订阅者发送completed
  • 入参maxConcurrent的意思是最大可容纳同时被订阅的信号个数。

再来详细分析一下具体订阅的过程。flatten:的内部,订阅高阶信号发出来的信号,这部分的代码比较简单:

  1. 如果当前最大可容纳信号的个数 > 0 ,且activeDisposables数组里面已经装满到最大可容纳信号的个数,不能再装新的信号了。那么就把当前的信号缓存到queuedSignals数组中。

  2. 直到activeDisposables数组里面有空的位子可以加入新的信号,那么就调用subscribeToSignal( )闭包,开始订阅这个新的信号。

  3. 最后完成的时候标记变量selfCompletedYES,并且调用completeIfAllowed( )闭包。

- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
        NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
        NSMutableArray *queuedSignals = [NSMutableArray array];

        __block BOOL selfCompleted = NO;
        __block void (^subscribeToSignal)(RACSignal *);
        __weak __block void (^recur)(RACSignal *);
        recur = subscribeToSignal = ^(RACSignal *signal) { // 暂时省略};

        void (^completeIfAllowed)(void) = ^{ // 暂时省略};
        
        [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
            if (signal == nil) return;
            
            NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
            
            @synchronized (subscriber) {
                if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
                    [queuedSignals addObject:signal];
                    return;
                }
            }
            
            subscribeToSignal(signal);
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (subscriber) {
                selfCompleted = YES;
                completeIfAllowed();
            }
        }]];
        
        return compoundDisposable;
    }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}

selfCompleted = YES 并且activeDisposables数组里面的信号都发送完毕,没有可以发送的信号了,即activeDisposables.count = 0,那么就给订阅者sendCompleted。这里值得一提的是,还需要把subscribeToSignal手动置为nil。因为在subscribeToSignal闭包中强引用了completeIfAllowed闭包,防止completeIfAllowed闭包被提早的销毁掉了。所以在completeIfAllowed闭包执行完毕的时候,需要再把subscribeToSignal闭包置为nil

void (^completeIfAllowed)(void) = ^{
    if (selfCompleted && activeDisposables.count == 0) {
        [subscriber sendCompleted];
        subscribeToSignal = nil;
    }
};

那么接下来需要看的重点就是subscribeToSignal( )闭包。

  1. activeDisposables先添加当前高阶信号发出来的信号的Disposable( 也就是入参信号的Disposable)
  2. 这里会对recur进行__strong,因为下面第6步会用到subscribeToSignal( )闭包,同样也是为了防止出现循环引用。
  3. 订阅入参信号,给订阅者发送信号。当发送完毕后,activeDisposables中移除它对应的Disposable
  4. 如果当前缓存的queuedSignals数组里面没有缓存的信号,那么就调用completeIfAllowed( )闭包。
  5. 如果当前缓存的queuedSignals数组里面有缓存的信号,那么就取出第0个信号,并在queuedSignals数组移除它。
  6. 把第4步取出的信号继续订阅,继续调用subscribeToSignal( )闭包。
recur = subscribeToSignal = ^(RACSignal *signal) {
    RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
    // 1
    @synchronized (subscriber) {
        [compoundDisposable addDisposable:serialDisposable];
        [activeDisposables addObject:serialDisposable];
    }

    serialDisposable.disposable = [signal subscribeNext:^(id x) {
        [subscriber sendNext:x];
    } error:^(NSError *error) {
        [subscriber sendError:error];
    } completed:^{
        // 2
        __strong void (^subscribeToSignal)(RACSignal *) = recur;
        RACSignal *nextSignal;
        // 3
        @synchronized (subscriber) {
            [compoundDisposable removeDisposable:serialDisposable];
            [activeDisposables removeObjectIdenticalTo:serialDisposable];
            // 4
            if (queuedSignals.count == 0) {
                completeIfAllowed();
                return;
            }
            // 5
            nextSignal = queuedSignals[0];
            [queuedSignals removeObjectAtIndex:0];
        }
        // 6
        subscribeToSignal(nextSignal);
    }];
};

总结一下:高阶信号每发送一个信号值,判断activeDisposables数组装的个数是否已经超过了maxConcurrent。如果装不下了就缓存进queuedSignals数组中。如果还可以装的下就开始调用subscribeToSignal( )闭包,订阅当前信号。

每发送完一个信号就判断缓存数组queuedSignals的个数,如果缓存数组里面已经没有信号了,那么就结束原来高阶信号的发送。如果缓存数组里面还有信号就继续订阅。如此循环,直到原高阶信号所有的信号都发送完毕。

整个flatten:的执行流程都分析清楚了,最后,关于入参maxConcurrent进行更进一步的解读。回看上面flatten:的实现中有这样一句话:

if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) 

那么maxConcurrent的值域就是最终决定flatten:表现行为。如果maxConcurrent < 0,会发生什么?程序会崩溃。因为在源码中有这样一行的初始化的代码。activeDisposables在初始化的时候会初始化一个大小为maxConcurrent的NSMutableArray。如果maxConcurrent < 0,那么这里初始化就会崩溃。

NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];

如果maxConcurrent = 0,会发生什么?那么flatten:就退化成flatten了。

如果maxConcurrent = 1,会发生什么?那么flatten:就退化成concat了。

如果maxConcurrent > 1,会发生什么?由于至今还没有遇到能用到maxConcurrent > 1的需求情况,所以这里暂时不展示图解了。maxConcurrent > 1之后,flatten的行为还依照高阶信号的个数和maxConcurrent的关系。如果高阶信号的个数<=maxConcurrent的值,那么flatten:又退化成flatten了。如果高阶信号的个数>maxConcurrent的值,那么多的信号就会进入queuedSignals缓存数组。


4、concat

这里的concat实现是在RACSignal里面定义的。一看源码就知道了,concat其实就是flatten:1

- (RACSignal *)concat {
    return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}

当然在RACSignal中定义了concat:方法,这个方法在之前的文章已经分析过了,这里回顾对比一下。经过对比可以发现,虽然最终变换出来的结果类似,但是针对的信号的对象是不同的,concat是针对高阶信号进行降阶操作。concat:是把两个信号连接起来的操作。如果把高阶信号按照时间轴,从左往右,依次把每个信号都concat:连接起来,那么结果就是concat

- (RACSignal *)concat:(RACSignal *)signal {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

        RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            RACDisposable *concattedDisposable = [signal subscribe:subscriber];
            serialDisposable.disposable = concattedDisposable;
        }];

        serialDisposable.disposable = sourceDisposable;
        return serialDisposable;
    }] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

5、switchToLatest

一个高阶信号经过switchToLatest降阶操作之后,能得到下图中的信号。

switchToLatest这个操作只能用在高阶信号上,如果原信号里面有不是信号的值,那么就会崩溃,崩溃信息如下:

***** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: '-switchToLatest requires that the source signal (<RACDynamicSignal: 0x608000038ec0> name: ) send signals.

switchToLatest操作中,先把原信号转换成热信号,connection.signal就是RACSubject类型的。对RACSubject进行flattenMap:变换。在flattenMap:变换中,connection.signal会先concat:一个never信号。这里concat:一个never信号的原因是为了内部的信号过早的结束而导致订阅者收到complete信号。flattenMap:变换中x也是一个信号,对x进行takeUntil:变换,效果就是下一个信号到来之前,x会一直发送信号,一旦下一个信号到来,x就会被取消订阅,开始订阅新的信号。

- (RACSignal *)switchToLatest {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACMulticastConnection *connection = [self publish];
        
        RACDisposable *subscriptionDisposable = [[connection.signal
                                                  flattenMap:^(RACSignal *x) {
                                                      NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
                                                      return [x takeUntil:[connection.signal concat:[RACSignal never]]];
                                                  }]
                                                 subscribe:subscriber];
        
        RACDisposable *connectionDisposable = [connection connect];
        return [RACDisposable disposableWithBlock:^{
            [subscriptionDisposable dispose];
            [connectionDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}

6. switch: cases: default:

实现中有3个断言,全部都是针对入参的要求。入参signal信号和cases字典都不能是nil。其次,cases字典里面所有key对应的value必须是RACSignal类型的。注意,defaultSignal是可以为nil的。

接下来的实现比较简单,对入参传进来的signal信号进行map变换,这里的变换是升阶的变换。

signal每次发送出来的一个值,就把这个值当做key值去cases字典里面去查找对应的value。当然value对应的是一个信号。如果value对应的信号不为空,就把signal发送出来的这个值map成字典里面对应的信号。如果value对应为空,那么就把原signal发出来的值mapdefaultSignal信号。

如果经过转换之后,得到的信号为nil,就会返回一个error信号。如果得到的信号不为nil,那么原信号完全转换完成就会变成一个高阶信号,这个高阶信号里面装的都是信号。最后再对这个高阶信号执行switchToLatest转换。

+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
    NSCParameterAssert(signal != nil);
    NSCParameterAssert(cases != nil);
    
    for (id key in cases) {
        id value __attribute__((unused)) = cases[key];
        NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
    }
    
    NSDictionary *copy = [cases copy];
    
    return [[[signal
              map:^(id key) {
                  if (key == nil) key = RACTupleNil.tupleNil;
                  
                  RACSignal *signal = copy[key] ?: defaultSignal;
                  if (signal == nil) {
                      NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
                      return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
                  }
                  
                  return signal;
              }]
             switchToLatest]
            setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}

7、if: then: else:

入参boolSignaltrueSignalfalseSignal三个信号都不能为nilboolSignal里面都必须装的是NSNumber类型的值。

针对boolSignal进行map升阶操作,boolSignal信号里面的值如果是YES,那么就转换成trueSignal信号,如果为NO,就转换成falseSignal。升阶转换完成之后,boolSignal就是一个高阶信号,然后再进行switchToLatest操作。

+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
    NSCParameterAssert(boolSignal != nil);
    NSCParameterAssert(trueSignal != nil);
    NSCParameterAssert(falseSignal != nil);
    
    return [[[boolSignal
              map:^(NSNumber *value) {
                  NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
                  
                  return (value.boolValue ? trueSignal : falseSignal);
              }]
             switchToLatest]
            setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}

8、catch:

当对原信号进行订阅的时候,如果出现了错误,会去执行catchBlock( )闭包,入参为刚刚产生的errorcatchBlock( )闭包产生的是一个新的RACSignal,并再次用订阅者订阅该信号。这里之所以说是高阶操作,是因为这里原信号发生错误之后,错误会升阶成一个信号。

- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
    NSCParameterAssert(catchBlock != NULL);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
        
        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            RACSignal *signal = catchBlock(error);
            NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
            catchDisposable.disposable = [signal subscribe:subscriber];
        } completed:^{
            [subscriber sendCompleted];
        }];
        
        return [RACDisposable disposableWithBlock:^{
            [catchDisposable dispose];
            [subscriptionDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -catch:", self.name];
}

9、catchTo:

catchTo:的实现就是调用catch:方法,只不过原来catch:方法里面的catchBlock( )闭包,永远都只返回catchTo:的入参,signal信号。

- (RACSignal *)catchTo:(RACSignal *)signal {
    return [[self catch:^(NSError *error) {
        return signal;
    }] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
}

10、try:

try:可以用来进来信号的升阶操作。对原信号进行flattenMap变换,对信号发出来的每个值都调用一遍tryBlock( )闭包,如果这个闭包的返回值是YES,那么就返回[RACSignal return:value],如果闭包的返回值是NO,那么就返回error。原信号中如果都是值,那么经过try:操作之后,每个值都会变成RACSignal,于是原信号也就变成了高阶信号了。当然,如果在block`的实现中返回一个信号,这时就不会升阶了。返回的信号里面可以不返回信号,而是直接返回值。

- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
    NSCParameterAssert(tryBlock != NULL);
    
    return [[self flattenMap:^(id value) {
        NSError *error = nil;
        BOOL passed = tryBlock(value, &error);
        return (passed ? [RACSignal return:value] : [RACSignal error:error]);
    }] setNameWithFormat:@"[%@] -try:", self.name];
}

11、tryMap:

tryMap:的实现和try:的实现基本一致,唯一不同的就是入参闭包的返回值不同。在tryMap:中调用mapBlock( )闭包,返回是一个对象,如果这个对象不为nil,就返回[RACSignal return:mappedValue]。如果返回的对象是nil,那么就变换成error信号。

- (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
    NSCParameterAssert(mapBlock != NULL);
    
    return [[self flattenMap:^(id value) {
        NSError *error = nil;
        id mappedValue = mapBlock(value, &error);
        return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
    }] setNameWithFormat:@"[%@] -tryMap:", self.name];
}

12、timeout: onScheduler:

timeout: onScheduler:的实现很简单,它比正常的信号订阅多了一个timeoutDisposable操作。它在信号订阅的内部开启了一个scheduler,经过interval的时间之后,就会停止订阅原信号,并对订阅者sendError。这个操作的表意和方法名完全一致,经过interval的时间之后,就算timeout,那么就停止订阅原信号,并sendError

- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
    NSCParameterAssert(scheduler != nil);
    NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
        
        RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
            [disposable dispose];
            [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
        }];
        
        [disposable addDisposable:timeoutDisposable];
        
        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [disposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [disposable dispose];
            [subscriber sendCompleted];
        }];
        
        [disposable addDisposable:subscriptionDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

总结

总结一下ReactiveCocoa中高阶信号的升阶 / 降阶操作:

升阶操作
  • map( 把值map成一个信号)
  • [RACSignal return:signal]
降阶操作
  • flatten(等效于flatten:0+merge:)
  • concat(等效于flatten:1)
  • flatten:1
  • switchToLatest
  • flattenMap:

这5种操作能将高阶信号变为低阶信号,但是最终降阶之后的效果就只有3种:switchToLatestflattenconcat。具体的图示见上面的分析。

ReactiveCocoa中还包含一些同步的操作,这些操作一般我们很少使用,除非真的很确定这样做了之后不会有什么问题,否则胡乱使用会导致线程死锁等一些严重的问题。


二、同步操作

1、firstOrDefault: success: error:

从源码上看,firstOrDefault: success: error:这种同步的方法很容易导致线程死锁。它在subscribeNexterrorcompleted的闭包里面都调用condition锁先lockunlock。如果一个信号发送值过来,都没有执行subscribeNexterrorcompleted这3个操作里面的任意一个,那么就会执行[condition wait],等待。

由于对原信号进行了take:1操作,所以只会对第一个值进行操作。执行完subscribeNexterrorcompleted这3个操作里面的任意一个,又会加一次锁,对外部传进来的入参successerror进行赋值,已便外部可以拿到里面的状态。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是defaultValue

done为YES表示已经成功执行了subscribeNexterrorcompleted这3个操作里面的任意一个。反之为NO

localSuccess为YES表示成功发送值或者成功发送完了原信号的所有值,期间没有发生错误。

conditionbroadcast操作是唤醒其他线程的操作,相当于操作系统里面互斥信号量的signal操作。

入参defaultValue是给内部变量value的一个初始值。当原信号发送出一个值之后,value的值时刻都会与原信号的值保持一致。

successerror是外部变量的地址,从外面可以监听到里面的状态。在函数内部赋值,在函数外面拿到它们的值。

- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
    NSCondition *condition = [[NSCondition alloc] init];
    condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
    
    __block id value = defaultValue;
    __block BOOL done = NO;
    
    // Ensures that we don't pass values across thread boundaries by reference.
    __block NSError *localError;
    __block BOOL localSuccess;
    
    [[self take:1] subscribeNext:^(id x) {
        // 加锁
        [condition lock];
        
        value = x;
        localSuccess = YES;
        
        done = YES;
        [condition broadcast];
        // 解锁
        [condition unlock];
    } error:^(NSError *e) {
        // 加锁
        [condition lock];
        
        if (!done) {
            localSuccess = NO;
            localError = e;
            
            done = YES;
            [condition broadcast];
        }
        // 解锁
        [condition unlock];
    } completed:^{
        // 加锁
        [condition lock];
        
        localSuccess = YES;
        
        done = YES;
        [condition broadcast];
        // 解锁
        [condition unlock];
    }];
    // 加锁
    [condition lock];
    while (!done) {
        [condition wait];
    }
    
    if (success != NULL) *success = localSuccess;
    if (error != NULL) *error = localError;
    // 解锁
    [condition unlock];
    return value;
}

2、firstOrDefault:

firstOrDefault:的实现就是调用了firstOrDefault: success: error:方法。只不过不需要传successerror,不关心内部的状态。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是defaultValue

- (id)firstOrDefault:(id)defaultValue {
    return [self firstOrDefault:defaultValue success:NULL error:NULL];
}

3、first

first方法就更加省略,连defaultValue也不传。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是nil

- (id)first {
    return [self firstOrDefault:nil];
}

4、waitUntilCompleted:

waitUntilCompleted:里面还是调用firstOrDefault: success: error:方法。返回值是success。只要原信号正常的发送完信号,success应该为YES,但是如果发送过程中出现了errorsuccess就为NO。success作为返回值,外部就可以监听到是否发送成功。

虽然这个方法可以监听到发送结束的状态,但是也尽量不要使用,因为它的实现调用了firstOrDefault: success: error:方法,这个方法里面有大量的锁的操作,一不留神就会导致死锁。

- (BOOL)waitUntilCompleted:(NSError **)error {
    BOOL success = NO;
    
    [[[self
       ignoreValues]
      setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
     firstOrDefault:nil success:&success error:error];
    
    return success;
}

5、toArray

经过collect之后,原信号所有的值都会被加到一个数组里面,取出信号的第一个值就是一个数组。所以执行完first之后第一个值就是原信号所有值的数组。

- (NSArray *)toArray {
    return [[[self collect] first] copy];
}

三、副作用操作

ReactiveCocoa 中还为我们提供了一些可以进行副作用操作的函数。

1. doNext:

doNext:能让我们在原信号sendNext之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。

- (RACSignal *)doNext:(void (^)(id x))block {
    NSCParameterAssert(block != NULL);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            block(x);
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -doNext:", self.name];
}

2、doError:

doError:能让我们在原信号sendError之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。

- (RACSignal *)doError:(void (^)(NSError *error))block {
    NSCParameterAssert(block != NULL);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            block(error);
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -doError:", self.name];
}

3、doCompleted:

doCompleted:能让我们在原信号sendCompleted之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。

- (RACSignal *)doCompleted:(void (^)(void))block {
    NSCParameterAssert(block != NULL);
    
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            block();
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -doCompleted:", self.name];
}

4、initially:

initially:能让我们在原信号发送之前,先调用了defer:操作,在return self之前先执行了一个闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。

- (RACSignal *)initially:(void (^)(void))block {
    NSCParameterAssert(block != NULL);
    
    return [[RACSignal defer:^{
        block();
        return self;
    }] setNameWithFormat:@"[%@] -initially:", self.name];
}

5. finally:

finally:操作调用了doError:doCompleted:操作,依次在sendError之前,sendCompleted之前,插入一个block( )闭包。这样当信号因为错误而要终止取消订阅,或者,发送结束之前,都能执行一段我们想要执行的副作用操作。

- (RACSignal *)finally:(void (^)(void))block {
    NSCParameterAssert(block != NULL);
    
    return [[[self
              doError:^(NSError *error) {
                  block();
              }]
             doCompleted:^{
                 block();
             }]
            setNameWithFormat:@"[%@] -finally:", self.name];
}

四. 多线程操作

RACSignal里面有3个关于多线程的操作。

1、deliverOn:

deliverOn:的入参是一个scheduler,当原信号subscribeNextsendErrorsendCompleted的时候,都去调用schedulerschedule方法。

- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [scheduler schedule:^{
                [subscriber sendNext:x];
            }];
        } error:^(NSError *error) {
            [scheduler schedule:^{
                [subscriber sendError:error];
            }];
        } completed:^{
            [scheduler schedule:^{
                [subscriber sendCompleted];
            }];
        }];
    }] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}

schedule的方法里面会判断当前currentScheduler是否为nil,如果是nil就调用backgroundScheduler去执行block( )闭包,如果不为nil,当前currentScheduler直接执行block( )闭包。

判断currentScheduler是否存在,看两点,一是当前线程的字典里面,是否存在RACSchedulerCurrentSchedulerKey( @"RACSchedulerCurrentSchedulerKey" ),如果存在对应的value,返回scheduler,二是看当前的类是不是在主线程,如果在主线程,返回mainThreadScheduler。如果两个条件都不存在,那么当前currentScheduler就不存在,返回nil

+ (instancetype)currentScheduler {
    RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
    if (scheduler != nil) return scheduler;
    if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

    return nil;
}

deliverOn:操作的特点是原信号发送sendNextsendErrorsendCompleted所在线程是确定的。


2、subscribeOn:

subscribeOn:操作就是在传入的scheduler的闭包内部订阅原信号的。它与deliverOn:操作就不同:subscribeOn:操作能够保证didSubscribe block( )闭包在入参scheduler中执行,但是不能保证原信号subscribeNextsendErrorsendCompleted在哪个scheduler中执行。

deliverOn:subscribeOn:正好反过来,能保证原信号subscribeNextsendErrorsendCompleted在哪个scheduler中执行,但是不能保证didSubscribe block( )闭包在哪个scheduler中执行。

- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
        
        RACDisposable *schedulingDisposable = [scheduler schedule:^{
            RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
            
            [disposable addDisposable:subscriptionDisposable];
        }];
        
        [disposable addDisposable:schedulingDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}

3、deliverOnMainThread

对比deliverOn:的源码实现,发现两者比较相似,只不过这里deliverOnMainThreadsendNextsendErrorsendCompleted都包在了performOnMainThread闭包中执行。

- (RACSignal *)deliverOnMainThread {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block volatile int32_t queueLength = 0;
        
        void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) { // 暂时省略};
        
        return [self subscribeNext:^(id x) {
            performOnMainThread(^{
                [subscriber sendNext:x];
            });
        } error:^(NSError *error) {
            performOnMainThread(^{
                [subscriber sendError:error];
            });
        } completed:^{
            performOnMainThread(^{
                [subscriber sendCompleted];
            });
        }];
    }] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}

performOnMainThread闭包内部保证了入参block( )闭包一定是在主线程中执行。OSAtomicIncrement32OSAtomicDecrement32是原子操作,分别代表+1和-1。下面的if-else判断里面,不管是满足哪一条,最终都还是在主线程中执行block( )闭包。deliverOnMainThread能保证原信号subscribeNextsendErrorsendCompleted都在主线程MainThread中执行。

__block volatile int32_t queueLength = 0;

void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
    int32_t queued = OSAtomicIncrement32(&queueLength);
    if (NSThread.isMainThread && queued == 1) {
        block();
        OSAtomicDecrement32(&queueLength);
    } else {
        dispatch_async(dispatch_get_main_queue(), ^{
            block();
            OSAtomicDecrement32(&queueLength);
        });
    }
};

五、其他操作

1、setKeyPath: onObject: nilValue:

- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
    NSCParameterAssert(keyPath != nil);
    NSCParameterAssert(object != nil);
    
    keyPath = [keyPath copy];
    
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    
    __block void * volatile objectPtr = (__bridge void *)object;
    
    RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
        // 1
        __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
        [object setValue:x ?: nilValue forKeyPath:keyPath];
    } error:^(NSError *error) {
        __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
        
        NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
        NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
        
        [disposable dispose];
    } completed:^{
        [disposable dispose];
    }];
    
    [disposable addDisposable:subscriptionDisposable];
    
#if DEBUG
    static void *bindingsKey = &bindingsKey;
    NSMutableDictionary *bindings;
    
    @synchronized (object) {
        // 2
        bindings = objc_getAssociatedObject(object, bindingsKey);
        if (bindings == nil) {
            bindings = [NSMutableDictionary dictionary];
            objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
        }
    }
    
    @synchronized (bindings) {
        NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
        
        bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
    }
#endif
    
    RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
#if DEBUG
        @synchronized (bindings) {
            // 3
            [bindings removeObjectForKey:keyPath];
        }
#endif
        
        while (YES) {
            void *ptr = objectPtr;
            // 4
            if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
                break;
            }
        }
    }];
    
    [disposable addDisposable:clearPointerDisposable];
    
    [object.rac_deallocDisposable addDisposable:disposable];
    
    RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
    return [RACDisposable disposableWithBlock:^{
        [objectDisposable removeDisposable:disposable];
        [disposable dispose];
    }];
}

代码虽然有点长,但是逐行读下来不是很难,需要注意的有4点地方,已经在上述代码里面标明了。接下来一一分析。

AssociatedObject关联对象

如果bindings字典不存在,那么就调用objc_setAssociatedObjectobject进行关联对象。参数是OBJC_ASSOCIATION_RETAIN_NONATOMIC。如果bindings字典存在,就用objc_getAssociatedObject取出字典。在字典里面重新更新绑定key-value值,key就是入参keyPathvalue是原信号。

取消订阅原信号的时候

当信号取消订阅的时候,移除所有的关联值。

[bindings removeObjectForKey:keyPath];
OSAtomicCompareAndSwapPtrBarrier

这个函数用于比较__oldValue是否与__theValue指针指向的内存位置的值匹配,如果匹配,则将__newValue的值存储到__theValue指向的内存位置。整个函数的返回值就是交换是否成功的BOOL值。

在这个while的死循环里面只有当OSAtomicCompareAndSwapPtrBarrier返回值为YES,才能退出整个死循环。返回值为YES就代表&objectPtr被置为了NULL,这样就确保了在线程安全的情况下,不存在野指针的问题了。

while (YES) {
    void *ptr = objectPtr;
    if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr))   {
        break;
    }
}

2、setKeyPath: onObject:

setKeyPath: onObject:就是调用setKeyPath: onObject: nilValue:方法,只不过nilValue传递的是nil

- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
    return [self setKeyPath:keyPath onObject:object nilValue:nil];
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容