前言
紧接着上篇的源码实现分析,继续分析RACSignal的变换操作的底层实现。
目录
- 1.过滤操作
- 2.组合操作
一. 过滤操作
过滤操作也属于一种变换,根据过滤条件,过滤出符合条件的值。变换出来的新的信号是原信号的一个子集。
1. filter: (在父类RACStream中定义的)
这个filter:操作在any:的实现中用到过了。
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
filter:中传入一个闭包,是用筛选的条件。如果满足筛选条件的即返回原信号的值,否则原信号的值被“吞”掉,返回空的信号。这个变换主要是用flattenMap的。
2. ignoreValues
- (RACSignal *)ignoreValues {
return [[self filter:^(id _) {
return NO;
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}
由上面filter的实现,这里把筛选判断条件永远的传入NO,那么原信号的值都会被变换成empty信号,故变换之后的信号为空信号。
3. ignore: (在父类RACStream中定义的)
- (instancetype)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
}
ignore:的实现还是由filter:实现的。传入的筛选判断条件是一个值,当原信号发送的值中是这个值的时候,就替换成空信号。
4. distinctUntilChanged (在父类RACStream中定义的)
- (instancetype)distinctUntilChanged {
Class class = self.class;
return [[self bind:^{
__block id lastValue = nil;
__block BOOL initial = YES;
return ^(id x, BOOL *stop) {
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
initial = NO;
lastValue = x;
return [class return:x];
};
}] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}
distinctUntilChanged的实现是用bind来完成的。每次变换中都记录一下原信号上一次发送过来的值,并与这一次进行比较,如果是相同的值,就“吞”掉,返回empty信号。只有和原信号上一次发送的值不同,变换后的新信号才把这个值发送出来。
关于distinctUntilChanged,这里关注的是两两信号之间的值是否不同,有时候我们可能需要一个类似于NSSet的信号集,distinctUntilChanged就无法满足了。在ReactiveCocoa 2.5的这个版本也并没有向我们提供distinct的变换函数。
我们可以自己实现类似的变换。实现思路也不难,可以把之前每次发送过来的信号都用数组存起来,新来的信号都去数组里面查找一遍,如果找不到,就把这个值发送出去,如果找到了,就返回empty信号。效果如上图。
5. take: (在父类RACStream中定义的)
- (instancetype)take:(NSUInteger)count {
Class class = self.class;
if (count == 0) return class.empty;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}
take:实现也非常简单,借助bind函数来实现的。入参的count是原信号取值的个数。在bind的闭包中,taken计数从0开始取原信号的值,当taken取到count个数的时候,就停止取值。
在take:的基础上我们还可以继续改造出新的变换方式。比如说,想取原信号中执行的第几个值。类似于elementAt的操作。这个操作在ReactiveCocoa 2.5的这个版本也并没有直接向我们提供出来。
其实实现很简单,只需要判断taken是否等于我们要取的那个位置就可以了,等于的时候把原信号的值发送出来,并*stop = YES。
// 我自己增加实现的方法
- (instancetype)elementAt:(NSUInteger)index {
Class class = self.class;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (index == 0) {
*stop = YES;
return [class return:value];
}
if (taken == index) {
*stop = YES;
return [class return:value];
} else if (taken < index){
taken ++;
return [class empty];
}else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];
}
6. takeLast:
- (RACSignal *)takeLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count) {
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
for (id value in valuesTaken) {
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
}
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
}
takeLast:的实现也是按照套路来。先创建一个新信号,return的时候订阅原信号。在函数内部用一个valuesTaken来保存原信号发送过来的值,原信号发多少,就存多少,直到个数溢出入参给定的count,就溢出数组第0位。这样能保证数组里面始终都装着最后count个原信号的值。
当原信号发送completed信号的时候,把数组里面存的值都sendNext出去。这里要注意的也是该变换发送信号的时机。如果原信号一直没有completed,那么takeLast:就一直没法发出任何信号来。
7. takeUntilBlock: (在父类RACStream中定义的)
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
return ^ id (id value, BOOL *stop) {
if (predicate(value)) return nil;
return [class return:value];
};
}] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}
takeUntilBlock:是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么新信号停止发送新信号,因为它被置为nil了。和函数名的意思是一样的,take原信号的值,Until直到闭包满足条件。
8. takeWhileBlock: (在父类RACStream中定义的)
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self takeUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
}
takeWhileBlock:的信号集是takeUntilBlock:的信号集的补集。全集是原信号。takeWhileBlock:底层还是调用takeUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
9. takeUntil:
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:selfDisposable];
}
return disposable;
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}
takeUntil:的实现也是“经典套路”——return一个新信号,在新信号中订阅原信号。入参是一个信号signalTrigger,这个信号是一个Trigger。一旦signalTrigger发出第一个信号,就会触发triggerCompletion( )闭包,在这个闭包中,会调用triggerCompletion( )闭包。
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};
一旦调用了triggerCompletion( )闭包,就会把原信号取消订阅,并给变换的新的信号订阅者sendCompleted。
如果入参signalTrigger一直没有sendNext,那么原信号就会一直sendNext:。
10. takeUntilReplacement:
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
[selfDisposable dispose];
[subscriber sendNext:x];
} error:^(NSError *error) {
[selfDisposable dispose];
[subscriber sendError:error];
} completed:^{
[selfDisposable dispose];
[subscriber sendCompleted];
}];
if (!selfDisposable.disposed) {
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
}
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[replacementDisposable dispose];
}];
}];
}
- 原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
- 控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
- 变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。
11. skip: (在父类RACStream中定义的)
- (instancetype)skip:(NSUInteger)skipCount {
Class class = self.class;
return [[self bind:^{
__block NSUInteger skipped = 0;
return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];
skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}
skip:信号集和take:信号集是补集关系,全集是原信号。take:是取原信号的前count个信号,而skip:是从原信号第count + 1位开始取信号。
skipped是一个游标,每次原信号发送一个值,就比较它和入参skipCount的大小。如果不比skipCount大,说明还需要跳过,所以就返回empty信号,否则就把原信号的值发送出来。
通过类比take系列方法,可以发现在ReactiveCocoa 2.5的这个版本也并没有向我们提供skipLast:的变换函数。这个变换函数的实现过程也不难,我们可以类比takeLast:来实现。
实现的思路也不难,原信号每次发送过来的值,都用一个数组存储起来。skipLast:是想去掉原信号最末尾的count个信号。
我们先来分析一下:假设原信号有n个信号,从0 - (n-1),去掉最后的count个,前面还剩n - count个信号。那么从 原信号的第 count + 1位的信号开始发送,到原信号结束,这样中间就正好是发送了 n - count 个信号。
分析清楚后,代码就很容易了:
// 我自己增加实现的方法
- (RACSignal *)skipLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count) {
[subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]];
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];
}
原信号每发送过来一个信号就存入数组,当数组里面的个数大于count的时候,就是需要我们发送信号的时候,这个时候每次都把数组里面第0位发送出去即可,数组维护了一个FIFO的队列。这样就实现了skipLast:的效果了。
12. skipUntilBlock: (在父类RACStream中定义的)
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
__block BOOL skipping = YES;
return ^ id (id value, BOOL *stop) {
if (skipping) {
if (predicate(value)) {
skipping = NO;
} else {
return class.empty;
}
}
return [class return:value];
};
}] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}
skipUntilBlock: 的实现可以类比takeUntilBlock: 的实现。
skipUntilBlock: 是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么skipping = NO。skipping为NO,以后原信号发送的每个值都原封不动的发送出去。predicate( )闭包不满足条件的时候,即会一直skip原信号的值。和函数名的意思是一样的,skip原信号的值,Until直到闭包满足条件,就不再skip了。
13. skipWhileBlock: (在父类RACStream中定义的)
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self skipUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}
skipWhileBlock:的信号集是skipUntilBlock:的信号集的补集。全集是原信号。skipWhileBlock:底层还是调用skipUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
到这里skip系列方法就结束了,对比take系列的方法,少了2个方法,在ReactiveCocoa 2.5的这个版本中 takeUntil: 和 takeUntilReplacement:这两个方法没有与之对应的skip方法。
// 我自己增加实现的方法
- (RACSignal *)skipUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
__block BOOL sendTrigger = NO;
void (^triggerCompletion)(void) = ^{
sendTrigger = YES;
};
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
if (sendTrigger) {
[subscriber sendNext:x];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:selfDisposable];
}
return disposable;
}] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];
}
skipUntil实现方法也很简单,当入参的signalTrigger开发发送信号的时候,就让原信号sendNext把值发送出来,否则就把原信号的值“吞”掉。
skipUntilReplacement:就没什么意义了,把原信号经过skipUntilReplacement:变换之后得到的新的信号就是Replacement信号。所以说这个操作也就没意义了。
14. groupBy:transform:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
NSCParameterAssert(keyBlock != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
NSMutableArray *orderedGroups = [NSMutableArray array];
return [self subscribeNext:^(id x) {
id<NSCopying> key = keyBlock(x);
RACGroupedSignal *groupSubject = nil;
@synchronized(groups) {
groupSubject = groups[key];
if (groupSubject == nil) {
groupSubject = [RACGroupedSignal signalWithKey:key];
groups[key] = groupSubject;
[orderedGroups addObject:groupSubject];
[subscriber sendNext:groupSubject];
}
}
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
} error:^(NSError *error) {
[subscriber sendError:error];
[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
} completed:^{
[subscriber sendCompleted];
[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
}];
}] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}
看groupBy:transform:的实现,依旧是老“套路”。return 一个新的RACSignal,在新的信号里面订阅原信号。
groupBy:transform:的重点就在subscribeNext中了。
- 首先解释一下两个入参。两个入参都是闭包,keyBlock返回值是要作为字典的key,transformBlock的返回值是对原信号发出来的值x进行变换。
- 先创建一个NSMutableDictionary字典groups,和NSMutableArray数组orderedGroups。
- 从字典里面取出key对应的value,这里的key对应着keyBlock返回值。value的值是一个RACGroupedSignal信号。如果找不到对应的key值,就新建一个RACGroupedSignal信号,并存入字典对应的key值,与之对应。
- 新变换之后的信号,订阅之后,RACGroupedSignal进行sendNext,这是一个信号,如果transformBlock不为空,就发送transformBlock变换之后的值。
- sendError和sendCompleted都要分别对数组orderedGroups里面每个RACGroupedSignal都要进行sendError或者sendCompleted。因为要对数组里面每个信号都执行一个操作,所以需要调用makeObjectsPerformSelector:withObject:方法。
经过groupBy:transform:变换之后,原信号会根据keyBlock进行分组。
写出测试代码,来看看平时应该怎么用。
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *signalGroup = [signalA groupBy:^id<NSCopying>(NSNumber *object) {
return object.integerValue > 3 ? @"good" : @"bad";
} transform:^id(NSNumber * object) {
return @(object.integerValue * 10);
}];
[[[signalGroup filter:^BOOL(RACGroupedSignal *value) {
return [(NSString *)value.key isEqualToString:@"good"];
}] flatten]subscribeNext:^(id x) {
NSLog(@"subscribeNext: %@", x);
}];
假设原信号发送的1,2,3,4,5是代表的成绩的5个等级。当成绩大于3的都算“good”,小于3的都算“bad”。
signalGroup是原信号signalA经过groupBy:transform:得到的新的信号,这个信号是一个高阶的信号,因为它里面并不是直接装的是值,signalGroup这个信号里面装的还是信号。signalGroup里面有两个分组,分别是“good”分组和“bad”分组。
想从中取出这两个分组里面的值,需要进行一次filter:筛选。筛选之后得到对应分组的高阶信号。这时还要再进行一个flatten操作,把高阶信号变成低阶信号,再次订阅才能取到其中的值。
订阅新信号的值,输出如下:
subscribeNext: 40
subscribeNext: 50
关于flatten的实现:
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
flatten操作就是调用了flattenMap:把值传进去了。
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
flatten是把高阶信号变换成低阶信号的常用操作。flattenMap:具体实现上篇文章分析过了,这里不再赘述。
15. groupBy:
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
}
groupBy:操作就是groupBy:transform:的缩减版,transform传入的为nil。
关于groupBy:可以干的事情很多,可以进行很高级的分组操作。这里可以举一个例子:
// 简单算法题,分离数组中的相同的元素,如果元素个数大于2,则组成一个新的数组,结果得到多个包含相同元素的数组,
// 例如[1,2,3,1,2,3]分离成[1,1],[2,2],[3,3]
RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;
NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object) {
return [NSString stringWithFormat:@"%@",object];
}] map:^id(RACGroupedSignal *value) {
return [value sequence];
}] sequence] map:^id(RACSignalSequence * value) {
return value.array;
}].array;
for (NSNumber * num in array) {
NSLog(@"最后的数组%@",num);
}
// 最后输出 [1,2,3,4,2,3,3,4,4,4]变成[1],[2,2],[3,3,3],[4,4,4,4]
二. 组合操作
1. startWith: (在父类RACStream中定义的)
- (instancetype)startWith:(id)value {
return [[[self.class return:value]
concat:self]
setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
}
startWith:的实现很简单,就是先构造一个只发送一个value的信号,然后这个信号发送完毕之后接上原信号。得到的新的信号就是在原信号前面新加了一个值。
2. concat: (在父类RACStream中定义的)
这里说的concat:是在父类RACStream中定义的。
- (instancetype)concat:(RACStream *)stream {
return nil;
}
父类中定义的这个方法就返回一个nil,具体的实现还要子类去重写。
3. concat: (在父类RACStream中定义的)
+ (instancetype)concat:(id<NSFastEnumeration>)streams {
RACStream *result = self.empty;
for (RACStream *stream in streams) {
result = [result concat:stream];
}
return [result setNameWithFormat:@"+concat: %@", streams];
}
这个concat:后面跟着一个数组,数组里面包含这很多信号,concat:依次把这些信号concat:连接串起来。
4. merge:
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}
[subscriber sendCompleted];
return nil;
}]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}
merge:后面跟一个数组。先会新建一个数组copiedSignals,把传入的信号都装到数组里。然后依次发送数组里面的信号。由于新信号也是一个高阶信号,因为sendNext会把各个信号都依次发送出去,所以需要flatten操作把这个信号转换成值发送出去。
从上图上看,上下两个信号就像被拍扁了一样,就成了新信号的发送顺序。
5. merge:
- (RACSignal *)merge:(RACSignal *)signal {
return [[RACSignal
merge:@[ self, signal ]]
setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
}
merge:后面参数也可以跟一个信号,那么merge:就是合并这两个信号。具体实现和merge:多个信号是一样的原理。
6. zip: (在父类RACStream中定义的)
+ (instancetype)zip:(id<NSFastEnumeration>)streams {
return [[self join:streams block:^(RACStream *left, RACStream *right) {
return [left zipWith:right];
}] setNameWithFormat:@"+zip: %@", streams];
}
zip:后面可以跟一个数组,数组里面装的是各种信号流。
它的实现是调用了join: block: 实现的。
+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;
// 第一步
for (RACStream *stream in streams) {
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];
continue;
}
current = block(current, stream);
}
// 第二步
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs) {
NSMutableArray *values = [[NSMutableArray alloc] init];
// 第三步
while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
}
// 第四步
return [RACTuple tupleWithObjectsFromArray:values];
}];
}
join: block: 的实现可以分为4步:
依次打包各个信号流,把每个信号流都打包成元组RACTuple。首先第一个信号流打包成一个元组,这个元组里面就一个信号。接着把第一个元组和第二个信号执行block( )闭包里面的操作。传入的block( )闭包执行的是zipWith:的操作。这个操作是把两个信号“压”在一起。具体实现分析请看第一篇文章里面分析过的,这里就不再赘述了。得到第二个元组,里面装着是第一个元组和第二个信号。之后每次循环都执行类似的操作,再把第二个元组和第三个信号进行zipWith:操作,以此类推下去,直到所有的信号流都循环一遍。
经过第一步的循环操作之后,还是nil,那么肯定就是空信号了,就返回empty信号。
这一步是把之前第一步打包出来的结果,还原回原信号的过程。经过第一步的循环之后,current会是类似这个样子,(((1), 2), 3),第三步就是为了把这种多重元组解出来,每个信号流都依次按照顺序放在数组里。注意观察current的特点,最外层的元组,是一个值和一个元组,所以从最外层的元组开始,一层一层往里“剥”。while循环每次都取最外层元组的last,即那个单独的值,插入到数组的第0号位置,然后取出first即是里面一层的元组。然后依次循环。由于每次都插入到数组0号的位置,类似于链表的头插法,最终数组里面的顺序肯定也保证是原信号的顺序。
第四步就是把还原成原信号的顺序的数组包装成元组,返回给map操作的闭包。
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
}
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
RACTuple *tuple = [[self alloc] init];
if (convert) {
NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
for (id object in array) {
[newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
}
tuple.backingArray = newArray;
} else {
tuple.backingArray = [array copy];
}
return tuple;
}
在转换过程中,入参convertNullsToNils的含义是,是否把数组里面的NSNull转换成RACTupleNil。
这里转换传入的是NO,所以就是把数组原封不动的copy一份。
测试代码:
RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];
[signalB subscribeNext:^(id x) {
NSLog(@"最后接收到的值 = %@",x);
}];
打印输出:
2016-11-29 13:07:57.349 最后接收到的值 = <RACTuple: 0x608000011440> (
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:07:54 +0000",
"2016-11-29 05:07:57 +0000"
)
2016-11-29 13:08:01.350 最后接收到的值 = <RACTuple: 0x608000010c60> (
"2016-11-29 05:07:59 +0000",
"2016-11-29 05:07:55 +0000",
"2016-11-29 05:08:01 +0000"
)
2016-11-29 13:08:05.352 最后接收到的值 = <RACTuple: 0x60000001a350> (
"2016-11-29 05:08:02 +0000",
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:08:05 +0000"
)
最后输出的信号以时间最长的为主,最后接到的信号是一个元组,里面依次包含zip:数组里每个信号在一次“压”缩周期里面的值。
7. zip: reduce: (在父类RACStream中定义的)
+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACStream *result = [self zip:streams];
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
}
zip: reduce:是一个组合的方法。具体实现可以拆分成两部分,第一部分是先执行zip:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。zip:完成之后,紧接着进行reduceEach:操作。
这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
- (instancetype)reduceEach:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
reduceEach:操作在上篇已经分析过了。它会动态的构造闭包,对原信号每个元组,执行reduceBlock( )闭包里面的方法。具体分析见上篇。一般用法如下:
[RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {
return [NSString stringWithFormat:@"%@: %@", string, number];
}];
8. zipWith: (在父类RACStream中定义的)
- (instancetype)zipWith:(RACStream *)stream {
return nil;
}
这个方法就是在父类的RACStream中定义了,具体实现还要看RACStream各个子类的实现。
它就可以类比concat:在父类中的实现,也是直接返回一个nil。
- (instancetype)concat:(RACStream *)stream { return nil;}
在第一篇中分析了concat:和zipWith:在RACSignal子类中具体实现。忘记了具体实现的可以回去看看。
9. combineLatestWith:
- (RACSignal *)combineLatestWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 初始化第一个信号的一些标志变量
__block id lastSelfValue = nil;
__block BOOL selfCompleted = NO;
// 初始化第二个信号的一些标志变量
__block id lastOtherValue = nil;
__block BOOL otherCompleted = NO;
// 这里是一个判断是否sendNext的闭包
void (^sendNext)(void) = ^{ };
// 订阅第一个信号
RACDisposable *selfDisposable = [self subscribeNext:^(id x) { }];
[disposable addDisposable:selfDisposable];
// 订阅第二个信号
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { }];
[disposable addDisposable:otherDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
}
大体实现思路比较简单,在新信号里面分别订阅原信号和入参signal信号。
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (disposable) {
lastSelfValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
selfCompleted = YES;
if (otherCompleted) [subscriber sendCompleted];
}
}];
先来看看原信号订阅的具体实现:
在subscribeNext闭包中,记录下原信号最新发送的x值,并保存到lastSelfValue中。从此lastSelfValue变量每次都保存原信号发送过来的最新的值。然后再调用sendNext( )闭包。
在completed闭包中,selfCompleted中记录下原信号发送完成。这是还要判断otherCompleted是否完成,即入参信号signal是否发送完成,只有两者都发送完成了,组合的新信号才能算全部发送完成。
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (disposable) {
lastOtherValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
otherCompleted = YES;
if (selfCompleted) [subscriber sendCompleted];
}
}];
这是对入参信号signal的处理实现。和原信号的处理方式完全一致。现在重点就要看看sendNext( )闭包中都做了些什么。
void (^sendNext)(void) = ^{
@synchronized (disposable) {
if (lastSelfValue == nil || lastOtherValue == nil) return;
[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
}
};
在sendNext( )闭包中,如果lastSelfValue 或者 lastOtherValue 其中之一有一个为nil,就return,因为这个时候无法结合在一起。当两个信号都有值,那么就把这两个信号的最新的值打包成元组发送出来。
可以看到,每个信号每发送出来一个新的值,都会去找另外一个信号上一个最新的值进行结合。
这里可以对比一下类似的zip:操作
zip:操作是会把新来的信号的值存起来,放在数组里,然后另外一个信号发送一个值过来就和数组第0位的值相互结合成新的元组信号发送出去,并分别移除数组里面第0位的两个值。zip:能保证每次结合的值都是唯一的,不会一个原信号的值被多次结合到新的元组信号中。但是combineLatestWith:是不能保证这一点的,在原信号或者另外一个信号新信号发送前,每次发送信号都会结合当前最新的信号,这里就会有反复结合的情况。
10. combineLatest:
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
return [left combineLatestWith:right];
}] setNameWithFormat:@"+combineLatest: %@", signals];
}
combineLatest:的实现就是把入参数组里面的每个信号都调用一次join: block:方法。传入的闭包是把两个信号combineLatestWith:一下。combineLatest:的实现就是2个操作的组合。具体实现上面也都分析过,这里不再赘述。
11. combineLatest: reduce:
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACSignal *result = [self combineLatest:signals];
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
}
combineLatest: reduce: 的实现可以类比zip: reduce:的实现。
具体实现可以拆分成两部分,第一部分是先执行combineLatest:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。combineLatest:完成之后,紧接着进行reduceEach:操作。
这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
12. combinePreviousWithStart: reduce:(在父类RACStream中定义的)
这个方法的实现也是多个变换操作组合在一起的。
- (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
NSCParameterAssert(reduceBlock != NULL);
return [[[self
scanWithStart:RACTuplePack(start)
reduce:^(RACTuple *previousTuple, id next) {
id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);
}]
map:^(RACTuple *tuple) {
return tuple[1];
}]
setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];
}
combinePreviousWithStart: reduce:的实现完全可以类比scanWithStart:reduce:的实现。举个例子来说明他们俩的不同。
RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
RACSignal *signalA = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
return @(previous.integerValue + next.integerValue);
}].signal;
RACSignal *signalB = [numbers scanWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
return @(previous.integerValue + next.integerValue);
}].signal;
signalA输出如下:
1
3
5
7
signalB输出如下:
1
3
6
10
现在应该不同点应该很明显了。combinePreviousWithStart: reduce:实现的是两两之前的加和,而scanWithStart:reduce:实现的累加。
为什么会这样呢,具体看看combinePreviousWithStart: reduce:的实现。
虽然combinePreviousWithStart: reduce:也是调用了scanWithStart:reduce:,但是初始值是RACTuplePack(start)元组,聚合reduce的过程也有所不同:
id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);
依次调用reduceBlock( )闭包,传入previousTuple[0], next,这里reduceBlock( )闭包是进行累加的操作,所以就是把前一个元组的第0位加上后面新来的信号的值。得到的值拼成新的元组,新的元组由next和value值构成。
如果打印出上述例子中combinePreviousWithStart: reduce:的加合过程中每个信号的值,如下:
<RACTuple: 0x608000200010> (
1,
1
)
<RACTuple: 0x60000001fe70> (
2,
3
)
<RACTuple: 0x60000001fe90> (
3,
5
)
<RACTuple: 0x60000001feb0> (
4,
7
)
由于这样拆成元组之后,下次再进行操作的时候,还可以拿到前一个信号的值,这样就不会形成累加的效果。
13. sample:
- (RACSignal *)sample:(RACSignal *)sampler {
NSCParameterAssert(sampler != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSLock *lock = [[NSLock alloc] init];
__block id lastValue;
__block BOOL hasValue = NO;
RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { // 暂时省略 }];
samplerDisposable.disposable = [sampler subscribeNext:^(id _) { // 暂时省略 }];
return [RACDisposable disposableWithBlock:^{
[samplerDisposable dispose];
[sourceDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
}
sample:内部实现也是对原信号和入参信号sampler分别进行订阅。具体实现就是这两个信号订阅内部都干了些什么。
RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[lock lock];
hasValue = YES;
lastValue = x;
[lock unlock];
} error:^(NSError *error) {
[samplerDisposable dispose];
[subscriber sendError:error];
} completed:^{
[samplerDisposable dispose];
[subscriber sendCompleted];
}];
这是对原信号的操作,原信号的操作在subscribeNext中就记录了两个变量的值,hasValue记录原信号有值,lastValue记录了原信号的最新的值。这里加了一层NSLock锁进行保护。
在发生error的时候,先把sampler信号取消订阅,然后再sendError:。当原信号完成的时候,同样是先把sampler信号取消订阅,然后再sendCompleted。
samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
BOOL shouldSend = NO;
id value;
[lock lock];
shouldSend = hasValue;
value = lastValue;
[lock unlock];
if (shouldSend) {
[subscriber sendNext:value];
}
} error:^(NSError *error) {
[sourceDisposable dispose];
[subscriber sendError:error];
} completed:^{
[sourceDisposable dispose];
[subscriber sendCompleted];
}];
这是对入参信号sampler的操作。shouldSend默认值是NO,这个变量控制着是否sendNext:值。只有当原信号有值的时候,hasValue = YES,所以shouldSend = YES,这个时候才能发送原信号的值。这里我们并不关心入参信号sampler的值,从subscribeNext:^(id _)这里可以看出, _代表并不需要它的值。
在发生error的时候,先把原信号取消订阅,然后再sendError:。当sampler信号完成的时候,同样是先把原信号取消订阅,然后再sendCompleted。
经过sample:变换就会变成这个样子。只是把原信号的值都移动到了sampler信号发送信号的时刻,值还是和原信号的值一样。
最后
关于RACSignal的变换操作还剩下 冷热信号转换操作,高阶信号操作,下篇接着继续分析。最后请大家多多指教。