博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ReactiveCocoa 中 RACSignal 所有变换操作底层实现分析(中)
阅读量:6847 次
发布时间:2019-06-26

本文共 32380 字,大约阅读时间需要 107 分钟。

前言

紧接着的源码实现分析,继续分析RACSignal的变换操作的底层实现。

目录

  • 1.过滤操作
  • 2.组合操作

一. 过滤操作

过滤操作也属于一种变换,根据过滤条件,过滤出符合条件的值。变换出来的新的信号是原信号的一个子集。

1. filter: (在父类RACStream中定义的)

这个filter:操作在any:的实现中用到过了。

- (instancetype)filter:(BOOL (^)(id value))block {    NSCParameterAssert(block != nil);    Class class = self.class;    return [[self flattenMap:^ id (id value) {          if (block(value)) {            return [class return:value];        } else {            return class.empty;        }    }] setNameWithFormat:@"[%@] -filter:", self.name];}复制代码

filter:中传入一个闭包,是用筛选的条件。如果满足筛选条件的即返回原信号的值,否则原信号的值被“吞”掉,返回空的信号。这个变换主要是用flattenMap的。

2. ignoreValues

- (RACSignal *)ignoreValues {    return [[self filter:^(id _) {        return NO;    }] setNameWithFormat:@"[%@] -ignoreValues", self.name];}复制代码

由上面filter的实现,这里把筛选判断条件永远的传入NO,那么原信号的值都会被变换成empty信号,故变换之后的信号为空信号。

3. ignore: (在父类RACStream中定义的)

- (instancetype)ignore:(id)value {    return [[self filter:^ BOOL (id innerValue) {        return innerValue != value && ![innerValue isEqual:value];    }] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];}复制代码

ignore:的实现还是由filter:实现的。传入的筛选判断条件是一个值,当原信号发送的值中是这个值的时候,就替换成空信号。

4. distinctUntilChanged (在父类RACStream中定义的)

- (instancetype)distinctUntilChanged {    Class class = self.class;    return [[self bind:^{        __block id lastValue = nil;        __block BOOL initial = YES;        return ^(id x, BOOL *stop) {            if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];            initial = NO;            lastValue = x;            return [class return:x];        };    }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];}复制代码

distinctUntilChanged的实现是用bind来完成的。每次变换中都记录一下原信号上一次发送过来的值,并与这一次进行比较,如果是相同的值,就“吞”掉,返回empty信号。只有和原信号上一次发送的值不同,变换后的新信号才把这个值发送出来。

关于distinctUntilChanged,这里关注的是两两信号之间的值是否不同,有时候我们可能需要一个类似于NSSet的信号集,distinctUntilChanged就无法满足了。在ReactiveCocoa 2.5的这个版本也并没有向我们提供distinct的变换函数。

我们可以自己实现类似的变换。实现思路也不难,可以把之前每次发送过来的信号都用数组存起来,新来的信号都去数组里面查找一遍,如果找不到,就把这个值发送出去,如果找到了,就返回empty信号。效果如上图。

5. take: (在父类RACStream中定义的)

- (instancetype)take:(NSUInteger)count {    Class class = self.class;    if (count == 0) return class.empty;    return [[self bind:^{        __block NSUInteger taken = 0;        return ^ id (id value, BOOL *stop) {            if (taken < count) {                ++taken;                if (taken == count) *stop = YES;                return [class return:value];            } else {                return nil;            }        };    }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];}复制代码

take:实现也非常简单,借助bind函数来实现的。入参的count是原信号取值的个数。在bind的闭包中,taken计数从0开始取原信号的值,当taken取到count个数的时候,就停止取值。

在take:的基础上我们还可以继续改造出新的变换方式。比如说,想取原信号中执行的第几个值。类似于elementAt的操作。这个操作在ReactiveCocoa 2.5的这个版本也并没有直接向我们提供出来。

其实实现很简单,只需要判断taken是否等于我们要取的那个位置就可以了,等于的时候把原信号的值发送出来,并*stop = YES。

// 我自己增加实现的方法- (instancetype)elementAt:(NSUInteger)index {    Class class = self.class;    return [[self bind:^{        __block NSUInteger taken = 0;        return ^ id (id value, BOOL *stop) {            if (index == 0) {                *stop = YES;                return [class return:value];            }            if (taken == index) {                *stop = YES;                return [class return:value];            } else if (taken < index){                taken ++;                return [class empty];            }else {                return nil;            }        };    }] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];}复制代码

6. takeLast:

- (RACSignal *)takeLast:(NSUInteger)count {    return [[RACSignal createSignal:^(id
subscriber) { NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count]; return [self subscribeNext:^(id x) { [valuesTaken addObject:x ? : RACTupleNil.tupleNil]; while (valuesTaken.count > count) { [valuesTaken removeObjectAtIndex:0]; } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ for (id value in valuesTaken) { [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value]; } [subscriber sendCompleted]; }]; }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];}复制代码

takeLast:的实现也是按照套路来。先创建一个新信号,return的时候订阅原信号。在函数内部用一个valuesTaken来保存原信号发送过来的值,原信号发多少,就存多少,直到个数溢出入参给定的count,就溢出数组第0位。这样能保证数组里面始终都装着最后count个原信号的值。

当原信号发送completed信号的时候,把数组里面存的值都sendNext出去。这里要注意的也是该变换发送信号的时机。如果原信号一直没有completed,那么takeLast:就一直没法发出任何信号来。

7. takeUntilBlock: (在父类RACStream中定义的)

- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {    NSCParameterAssert(predicate != nil);    Class class = self.class;    return [[self bind:^{        return ^ id (id value, BOOL *stop) {            if (predicate(value)) return nil;            return [class return:value];        };    }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];}复制代码

takeUntilBlock:是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么新信号停止发送新信号,因为它被置为nil了。和函数名的意思是一样的,take原信号的值,Until直到闭包满足条件。

8. takeWhileBlock: (在父类RACStream中定义的)

- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {    NSCParameterAssert(predicate != nil);    return [[self takeUntilBlock:^ BOOL (id x) {        return !predicate(x);    }] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];}复制代码

takeWhileBlock:的信号集是takeUntilBlock:的信号集的补集。全集是原信号。takeWhileBlock:底层还是调用takeUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。

9. takeUntil:

- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {    return [[RACSignal createSignal:^(id
subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; void (^triggerCompletion)(void) = ^{ [disposable dispose]; [subscriber sendCompleted]; }; RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) { triggerCompletion(); } completed:^{ triggerCompletion(); }]; [disposable addDisposable:triggerDisposable]; if (!disposable.disposed) { RACDisposable *selfDisposable = [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [disposable dispose]; [subscriber sendCompleted]; }]; [disposable addDisposable:selfDisposable]; } return disposable; }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];}复制代码

takeUntil:的实现也是“经典套路”——return一个新信号,在新信号中订阅原信号。入参是一个信号signalTrigger,这个信号是一个Trigger。一旦signalTrigger发出第一个信号,就会触发triggerCompletion( )闭包,在这个闭包中,会调用triggerCompletion( )闭包。

void (^triggerCompletion)(void) = ^{   [disposable dispose];   [subscriber sendCompleted];  };复制代码

一旦调用了triggerCompletion( )闭包,就会把原信号取消订阅,并给变换的新的信号订阅者sendCompleted。

如果入参signalTrigger一直没有sendNext,那么原信号就会一直sendNext:。

10. takeUntilReplacement:

- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {    return [RACSignal createSignal:^(id
subscriber) { RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init]; RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) { [selfDisposable dispose]; [subscriber sendNext:x]; } error:^(NSError *error) { [selfDisposable dispose]; [subscriber sendError:error]; } completed:^{ [selfDisposable dispose]; [subscriber sendCompleted]; }]; if (!selfDisposable.disposed) { selfDisposable.disposable = [[self concat:[RACSignal never]] subscribe:subscriber]; } return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [replacementDisposable dispose]; }]; }];}复制代码

  1. 原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
  2. 控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
  3. 变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。

11. skip: (在父类RACStream中定义的)

- (instancetype)skip:(NSUInteger)skipCount {    Class class = self.class;    return [[self bind:^{        __block NSUInteger skipped = 0;        return ^(id value, BOOL *stop) {            if (skipped >= skipCount) return [class return:value];            skipped++;            return class.empty;        };    }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];}复制代码

skip:信号集和take:信号集是补集关系,全集是原信号。take:是取原信号的前count个信号,而skip:是从原信号第count + 1位开始取信号。

skipped是一个游标,每次原信号发送一个值,就比较它和入参skipCount的大小。如果不比skipCount大,说明还需要跳过,所以就返回empty信号,否则就把原信号的值发送出来。

通过类比take系列方法,可以发现在ReactiveCocoa 2.5的这个版本也并没有向我们提供skipLast:的变换函数。这个变换函数的实现过程也不难,我们可以类比takeLast:来实现。

实现的思路也不难,原信号每次发送过来的值,都用一个数组存储起来。skipLast:是想去掉原信号最末尾的count个信号。

我们先来分析一下:假设原信号有n个信号,从0 - (n-1),去掉最后的count个,前面还剩n - count个信号。那么从 原信号的第 count + 1位的信号开始发送,到原信号结束,这样中间就正好是发送了 n - count 个信号。

分析清楚后,代码就很容易了:

// 我自己增加实现的方法- (RACSignal *)skipLast:(NSUInteger)count {    return [[RACSignal createSignal:^(id
subscriber) { NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count]; return [self subscribeNext:^(id x) { [valuesTaken addObject:x ? : RACTupleNil.tupleNil]; while (valuesTaken.count > count) { [subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]]; [valuesTaken removeObjectAtIndex:0]; } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; }] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];}复制代码

原信号每发送过来一个信号就存入数组,当数组里面的个数大于count的时候,就是需要我们发送信号的时候,这个时候每次都把数组里面第0位发送出去即可,数组维护了一个FIFO的队列。这样就实现了skipLast:的效果了。

12. skipUntilBlock: (在父类RACStream中定义的)

- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {    NSCParameterAssert(predicate != nil);    Class class = self.class;    return [[self bind:^{        __block BOOL skipping = YES;        return ^ id (id value, BOOL *stop) {            if (skipping) {                if (predicate(value)) {                    skipping = NO;                } else {                    return class.empty;                }            }            return [class return:value];        };    }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];}复制代码

skipUntilBlock: 的实现可以类比takeUntilBlock: 的实现。

skipUntilBlock: 是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么skipping = NO。skipping为NO,以后原信号发送的每个值都原封不动的发送出去。predicate( )闭包不满足条件的时候,即会一直skip原信号的值。和函数名的意思是一样的,skip原信号的值,Until直到闭包满足条件,就不再skip了。

13. skipWhileBlock: (在父类RACStream中定义的)

- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {    NSCParameterAssert(predicate != nil);    return [[self skipUntilBlock:^ BOOL (id x) {        return !predicate(x);    }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];}复制代码

skipWhileBlock:的信号集是skipUntilBlock:的信号集的补集。全集是原信号。skipWhileBlock:底层还是调用skipUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。

到这里skip系列方法就结束了,对比take系列的方法,少了2个方法,在ReactiveCocoa 2.5的这个版本中 takeUntil: 和 takeUntilReplacement:这两个方法没有与之对应的skip方法。

// 我自己增加实现的方法- (RACSignal *)skipUntil:(RACSignal *)signalTrigger {    return [[RACSignal createSignal:^(id
subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; __block BOOL sendTrigger = NO; void (^triggerCompletion)(void) = ^{ sendTrigger = YES; }; RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) { triggerCompletion(); } completed:^{ triggerCompletion(); }]; [disposable addDisposable:triggerDisposable]; if (!disposable.disposed) { RACDisposable *selfDisposable = [self subscribeNext:^(id x) { if (sendTrigger) { [subscriber sendNext:x]; } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [disposable dispose]; [subscriber sendCompleted]; }]; [disposable addDisposable:selfDisposable]; } return disposable; }] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];}复制代码

skipUntil实现方法也很简单,当入参的signalTrigger开发发送信号的时候,就让原信号sendNext把值发送出来,否则就把原信号的值“吞”掉。

skipUntilReplacement:就没什么意义了,把原信号经过skipUntilReplacement:变换之后得到的新的信号就是Replacement信号。所以说这个操作也就没意义了。

14. groupBy:transform:

- (RACSignal *)groupBy:(id
(^)(id object))keyBlock transform:(id (^)(id object))transformBlock { NSCParameterAssert(keyBlock != NULL); return [[RACSignal createSignal:^(id
subscriber) { NSMutableDictionary *groups = [NSMutableDictionary dictionary]; NSMutableArray *orderedGroups = [NSMutableArray array]; return [self subscribeNext:^(id x) { id
key = keyBlock(x); RACGroupedSignal *groupSubject = nil; @synchronized(groups) { groupSubject = groups[key]; if (groupSubject == nil) { groupSubject = [RACGroupedSignal signalWithKey:key]; groups[key] = groupSubject; [orderedGroups addObject:groupSubject]; [subscriber sendNext:groupSubject]; } } [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x]; } error:^(NSError *error) { [subscriber sendError:error]; [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error]; } completed:^{ [subscriber sendCompleted]; [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)]; }]; }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];}复制代码

看groupBy:transform:的实现,依旧是老“套路”。return 一个新的RACSignal,在新的信号里面订阅原信号。

groupBy:transform:的重点就在subscribeNext中了。

  1. 首先解释一下两个入参。两个入参都是闭包,keyBlock返回值是要作为字典的key,transformBlock的返回值是对原信号发出来的值x进行变换。
  2. 先创建一个NSMutableDictionary字典groups,和NSMutableArray数组orderedGroups。
  3. 从字典里面取出key对应的value,这里的key对应着keyBlock返回值。value的值是一个RACGroupedSignal信号。如果找不到对应的key值,就新建一个RACGroupedSignal信号,并存入字典对应的key值,与之对应。
  4. 新变换之后的信号,订阅之后,RACGroupedSignal进行sendNext,这是一个信号,如果transformBlock不为空,就发送transformBlock变换之后的值。
  5. sendError和sendCompleted都要分别对数组orderedGroups里面每个RACGroupedSignal都要进行sendError或者sendCompleted。因为要对数组里面每个信号都执行一个操作,所以需要调用makeObjectsPerformSelector:withObject:方法。

经过groupBy:transform:变换之后,原信号会根据keyBlock进行分组。

写出测试代码,来看看平时应该怎么用。

RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id
subscriber) { [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendNext:@4]; [subscriber sendNext:@5]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog(@"signal dispose"); }]; }]; RACSignal *signalGroup = [signalA groupBy:^id
(NSNumber *object) { return object.integerValue > 3 ? @"good" : @"bad"; } transform:^id(NSNumber * object) { return @(object.integerValue * 10); }]; [[[signalGroup filter:^BOOL(RACGroupedSignal *value) { return [(NSString *)value.key isEqualToString:@"good"]; }] flatten]subscribeNext:^(id x) { NSLog(@"subscribeNext: %@", x); }];复制代码

假设原信号发送的1,2,3,4,5是代表的成绩的5个等级。当成绩大于3的都算“good”,小于3的都算“bad”。

signalGroup是原信号signalA经过groupBy:transform:得到的新的信号,这个信号是一个高阶的信号,因为它里面并不是直接装的是值,signalGroup这个信号里面装的还是信号。signalGroup里面有两个分组,分别是“good”分组和“bad”分组。

想从中取出这两个分组里面的值,需要进行一次filter:筛选。筛选之后得到对应分组的高阶信号。这时还要再进行一个flatten操作,把高阶信号变成低阶信号,再次订阅才能取到其中的值。

订阅新信号的值,输出如下:

subscribeNext: 40subscribeNext: 50复制代码

关于flatten的实现:

- (instancetype)flatten {    __weak RACStream *stream __attribute__((unused)) = self;    return [[self flattenMap:^(id value) {        return value;    }] setNameWithFormat:@"[%@] -flatten", self.name];}复制代码

flatten操作就是调用了flattenMap:把值传进去了。

- (instancetype)flattenMap:(RACStream * (^)(id value))block {    Class class = self.class;    return [[self bind:^{        return ^(id value, BOOL *stop) {            id stream = block(value) ?: [class empty];            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);            return stream;        };    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];}复制代码

flatten是把高阶信号变换成低阶信号的常用操作。flattenMap:具体实现上篇文章分析过了,这里不再赘述。

15. groupBy:

- (RACSignal *)groupBy:(id
(^)(id object))keyBlock { return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];}复制代码

groupBy:操作就是groupBy:transform:的缩减版,transform传入的为nil。

关于groupBy:可以干的事情很多,可以进行很高级的分组操作。这里可以举一个例子:

// 简单算法题,分离数组中的相同的元素,如果元素个数大于2,则组成一个新的数组,结果得到多个包含相同元素的数组,    // 例如[1,2,3,1,2,3]分离成[1,1],[2,2],[3,3]    RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;      NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object) {          return [NSString stringWithFormat:@"%@",object];      }] map:^id(RACGroupedSignal *value) {          return [value sequence];      }] sequence] map:^id(RACSignalSequence * value) {          return value.array;      }].array;    for (NSNumber * num in array) {        NSLog(@"最后的数组%@",num);    }   // 最后输出 [1,2,3,4,2,3,3,4,4,4]变成[1],[2,2],[3,3,3],[4,4,4,4]复制代码

二. 组合操作

1. startWith: (在父类RACStream中定义的)

- (instancetype)startWith:(id)value {    return [[[self.class return:value]             concat:self]            setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];}复制代码

startWith:的实现很简单,就是先构造一个只发送一个value的信号,然后这个信号发送完毕之后接上原信号。得到的新的信号就是在原信号前面新加了一个值。

2. concat: (在父类RACStream中定义的)

这里说的concat:是在父类RACStream中定义的。

- (instancetype)concat:(RACStream *)stream {    return nil;}复制代码

父类中定义的这个方法就返回一个nil,具体的实现还要子类去重写。

3. concat: (在父类RACStream中定义的)

+ (instancetype)concat:(id
)streams { RACStream *result = self.empty; for (RACStream *stream in streams) { result = [result concat:stream]; } return [result setNameWithFormat:@"+concat: %@", streams];}复制代码

这个concat:后面跟着一个数组,数组里面包含这很多信号,concat:依次把这些信号concat:连接串起来。

4. merge:

+ (RACSignal *)merge:(id
)signals { NSMutableArray *copiedSignals = [[NSMutableArray alloc] init]; for (RACSignal *signal in signals) { [copiedSignals addObject:signal]; } return [[[RACSignal createSignal:^ RACDisposable * (id
subscriber) { for (RACSignal *signal in copiedSignals) { [subscriber sendNext:signal]; } [subscriber sendCompleted]; return nil; }] flatten] setNameWithFormat:@"+merge: %@", copiedSignals];}复制代码

merge:后面跟一个数组。先会新建一个数组copiedSignals,把传入的信号都装到数组里。然后依次发送数组里面的信号。由于新信号也是一个高阶信号,因为sendNext会把各个信号都依次发送出去,所以需要flatten操作把这个信号转换成值发送出去。

从上图上看,上下两个信号就像被拍扁了一样,就成了新信号的发送顺序。

5. merge:

- (RACSignal *)merge:(RACSignal *)signal {    return [[RACSignal             merge:@[ self, signal ]]            setNameWithFormat:@"[%@] -merge: %@", self.name, signal];}复制代码

merge:后面参数也可以跟一个信号,那么merge:就是合并这两个信号。具体实现和merge:多个信号是一样的原理。

6. zip: (在父类RACStream中定义的)

+ (instancetype)zip:(id
)streams { return [[self join:streams block:^(RACStream *left, RACStream *right) { return [left zipWith:right]; }] setNameWithFormat:@"+zip: %@", streams];}复制代码

zip:后面可以跟一个数组,数组里面装的是各种信号流。

它的实现是调用了join: block: 实现的。

+ (instancetype)join:(id
)streams block:(RACStream * (^)(id, id))block { RACStream *current = nil; // 第一步 for (RACStream *stream in streams) { if (current == nil) { current = [stream map:^(id x) { return RACTuplePack(x); }]; continue; } current = block(current, stream); } // 第二步 if (current == nil) return [self empty]; return [current map:^(RACTuple *xs) { NSMutableArray *values = [[NSMutableArray alloc] init]; // 第三步 while (xs != nil) { [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0]; xs = (xs.count > 1 ? xs.first : nil); } // 第四步 return [RACTuple tupleWithObjectsFromArray:values]; }];}复制代码

join: block: 的实现可以分为4步:

  1. 依次打包各个信号流,把每个信号流都打包成元组RACTuple。首先第一个信号流打包成一个元组,这个元组里面就一个信号。接着把第一个元组和第二个信号执行block( )闭包里面的操作。传入的block( )闭包执行的是zipWith:的操作。这个操作是把两个信号“压”在一起。具体实现分析请看里面分析过的,这里就不再赘述了。得到第二个元组,里面装着是第一个元组和第二个信号。之后每次循环都执行类似的操作,再把第二个元组和第三个信号进行zipWith:操作,以此类推下去,直到所有的信号流都循环一遍。

  2. 经过第一步的循环操作之后,还是nil,那么肯定就是空信号了,就返回empty信号。

  3. 这一步是把之前第一步打包出来的结果,还原回原信号的过程。经过第一步的循环之后,current会是类似这个样子,(((1), 2), 3),第三步就是为了把这种多重元组解出来,每个信号流都依次按照顺序放在数组里。注意观察current的特点,最外层的元组,是一个值和一个元组,所以从最外层的元组开始,一层一层往里“剥”。while循环每次都取最外层元组的last,即那个单独的值,插入到数组的第0号位置,然后取出first即是里面一层的元组。然后依次循环。由于每次都插入到数组0号的位置,类似于链表的头插法,最终数组里面的顺序肯定也保证是原信号的顺序。

  4. 第四步就是把还原成原信号的顺序的数组包装成元组,返回给map操作的闭包。

+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array {    return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];}+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {    RACTuple *tuple = [[self alloc] init];    if (convert) {        NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];        for (id object in array) {            [newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];        }        tuple.backingArray = newArray;    } else {        tuple.backingArray = [array copy];    }    return tuple;}复制代码

在转换过程中,入参convertNullsToNils的含义是,是否把数组里面的NSNull转换成RACTupleNil。

这里转换传入的是NO,所以就是把数组原封不动的copy一份。

测试代码:

RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];    RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];    RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];    RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];    [signalB subscribeNext:^(id x) {        NSLog(@"最后接收到的值 = %@",x);    }];复制代码

打印输出:

2016-11-29 13:07:57.349 最后接收到的值 = 
( "2016-11-29 05:07:56 +0000", "2016-11-29 05:07:54 +0000", "2016-11-29 05:07:57 +0000")2016-11-29 13:08:01.350 最后接收到的值 =
( "2016-11-29 05:07:59 +0000", "2016-11-29 05:07:55 +0000", "2016-11-29 05:08:01 +0000")2016-11-29 13:08:05.352 最后接收到的值 =
( "2016-11-29 05:08:02 +0000", "2016-11-29 05:07:56 +0000", "2016-11-29 05:08:05 +0000")复制代码

最后输出的信号以时间最长的为主,最后接到的信号是一个元组,里面依次包含zip:数组里每个信号在一次“压”缩周期里面的值。

7. zip: reduce: (在父类RACStream中定义的)

+ (instancetype)zip:(id
)streams reduce:(id (^)())reduceBlock { NSCParameterAssert(reduceBlock != nil); RACStream *result = [self zip:streams]; if (reduceBlock != nil) result = [result reduceEach:reduceBlock]; return [result setNameWithFormat:@"+zip: %@ reduce:", streams];}复制代码

zip: reduce:是一个组合的方法。具体实现可以拆分成两部分,第一部分是先执行zip:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。zip:完成之后,紧接着进行reduceEach:操作。

这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。

- (instancetype)reduceEach:(id (^)())reduceBlock {    NSCParameterAssert(reduceBlock != nil);    __weak RACStream *stream __attribute__((unused)) = self;    return [[self map:^(RACTuple *t) {        NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);        return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];    }] setNameWithFormat:@"[%@] -reduceEach:", self.name];}复制代码

reduceEach:操作在已经分析过了。它会动态的构造闭包,对原信号每个元组,执行reduceBlock( )闭包里面的方法。具体分析见上篇。一般用法如下:

[RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {       return [NSString stringWithFormat:@"%@: %@", string, number];   }];复制代码

8. zipWith: (在父类RACStream中定义的)

- (instancetype)zipWith:(RACStream *)stream {    return nil;}复制代码

这个方法就是在父类的RACStream中定义了,具体实现还要看RACStream各个子类的实现。

它就可以类比concat:在父类中的实现,也是直接返回一个nil。

- (instancetype)concat:(RACStream *)stream { return nil;}复制代码

在中分析了concat:和zipWith:在RACSignal子类中具体实现。忘记了具体实现的可以回去看看。

9. combineLatestWith:

- (RACSignal *)combineLatestWith:(RACSignal *)signal {    NSCParameterAssert(signal != nil);    return [[RACSignal createSignal:^(id
subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; // 初始化第一个信号的一些标志变量 __block id lastSelfValue = nil; __block BOOL selfCompleted = NO; // 初始化第二个信号的一些标志变量 __block id lastOtherValue = nil; __block BOOL otherCompleted = NO; // 这里是一个判断是否sendNext的闭包 void (^sendNext)(void) = ^{ }; // 订阅第一个信号 RACDisposable *selfDisposable = [self subscribeNext:^(id x) { }]; [disposable addDisposable:selfDisposable]; // 订阅第二个信号 RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { }]; [disposable addDisposable:otherDisposable]; return disposable; }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];}复制代码

大体实现思路比较简单,在新信号里面分别订阅原信号和入参signal信号。

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {    @synchronized (disposable) {        lastSelfValue = x ?: RACTupleNil.tupleNil;        sendNext();    }} error:^(NSError *error) {    [subscriber sendError:error];} completed:^{    @synchronized (disposable) {        selfCompleted = YES;        if (otherCompleted) [subscriber sendCompleted];    }}];复制代码

先来看看原信号订阅的具体实现:

在subscribeNext闭包中,记录下原信号最新发送的x值,并保存到lastSelfValue中。从此lastSelfValue变量每次都保存原信号发送过来的最新的值。然后再调用sendNext( )闭包。

在completed闭包中,selfCompleted中记录下原信号发送完成。这是还要判断otherCompleted是否完成,即入参信号signal是否发送完成,只有两者都发送完成了,组合的新信号才能算全部发送完成。

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {    @synchronized (disposable) {        lastOtherValue = x ?: RACTupleNil.tupleNil;        sendNext();    }} error:^(NSError *error) {    [subscriber sendError:error];} completed:^{    @synchronized (disposable) {        otherCompleted = YES;        if (selfCompleted) [subscriber sendCompleted];    }}];复制代码

这是对入参信号signal的处理实现。和原信号的处理方式完全一致。现在重点就要看看sendNext( )闭包中都做了些什么。

void (^sendNext)(void) = ^{    @synchronized (disposable) {        if (lastSelfValue == nil || lastOtherValue == nil) return;        [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];    }};复制代码

在sendNext( )闭包中,如果lastSelfValue 或者 lastOtherValue 其中之一有一个为nil,就return,因为这个时候无法结合在一起。当两个信号都有值,那么就把这两个信号的最新的值打包成元组发送出来。

可以看到,每个信号每发送出来一个新的值,都会去找另外一个信号上一个最新的值进行结合。

这里可以对比一下类似的zip:操作

zip:操作是会把新来的信号的值存起来,放在数组里,然后另外一个信号发送一个值过来就和数组第0位的值相互结合成新的元组信号发送出去,并分别移除数组里面第0位的两个值。zip:能保证每次结合的值都是唯一的,不会一个原信号的值被多次结合到新的元组信号中。但是combineLatestWith:是不能保证这一点的,在原信号或者另外一个信号新信号发送前,每次发送信号都会结合当前最新的信号,这里就会有反复结合的情况。

10. combineLatest:

+ (RACSignal *)combineLatest:(id
)signals { return [[self join:signals block:^(RACSignal *left, RACSignal *right) { return [left combineLatestWith:right]; }] setNameWithFormat:@"+combineLatest: %@", signals];}复制代码

combineLatest:的实现就是把入参数组里面的每个信号都调用一次join: block:方法。传入的闭包是把两个信号combineLatestWith:一下。combineLatest:的实现就是2个操作的组合。具体实现上面也都分析过,这里不再赘述。

11. combineLatest: reduce:

+ (RACSignal *)combineLatest:(id
)signals reduce:(id (^)())reduceBlock { NSCParameterAssert(reduceBlock != nil); RACSignal *result = [self combineLatest:signals]; if (reduceBlock != nil) result = [result reduceEach:reduceBlock]; return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];}复制代码

combineLatest: reduce: 的实现可以类比zip: reduce:的实现。

具体实现可以拆分成两部分,第一部分是先执行combineLatest:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。combineLatest:完成之后,紧接着进行reduceEach:操作。

这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。

12. combinePreviousWithStart: reduce:(在父类RACStream中定义的)

这个方法的实现也是多个变换操作组合在一起的。

- (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {    NSCParameterAssert(reduceBlock != NULL);    return [[[self              scanWithStart:RACTuplePack(start)              reduce:^(RACTuple *previousTuple, id next) {                  id value = reduceBlock(previousTuple[0], next);                  return RACTuplePack(next, value);              }]             map:^(RACTuple *tuple) {                 return tuple[1];             }]            setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];}复制代码

combinePreviousWithStart: reduce:的实现完全可以类比scanWithStart:reduce:的实现。举个例子来说明他们俩的不同。

RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;      RACSignal *signalA = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {          return @(previous.integerValue + next.integerValue);      }].signal;    RACSignal *signalB = [numbers scanWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {        return @(previous.integerValue + next.integerValue);    }].signal;复制代码

signalA输出如下:

1357复制代码

signalB输出如下:

13610复制代码

现在应该不同点应该很明显了。combinePreviousWithStart: reduce:实现的是两两之前的加和,而scanWithStart:reduce:实现的累加。

为什么会这样呢,具体看看combinePreviousWithStart: reduce:的实现。

虽然combinePreviousWithStart: reduce:也是调用了scanWithStart:reduce:,但是初始值是RACTuplePack(start)元组,聚合reduce的过程也有所不同:

id value = reduceBlock(previousTuple[0], next); return RACTuplePack(next, value);复制代码

依次调用reduceBlock( )闭包,传入previousTuple[0], next,这里reduceBlock( )闭包是进行累加的操作,所以就是把前一个元组的第0位加上后面新来的信号的值。得到的值拼成新的元组,新的元组由next和value值构成。

如果打印出上述例子中combinePreviousWithStart: reduce:的加合过程中每个信号的值,如下:

( 1, 1)
( 2, 3)
( 3, 5)
( 4, 7)复制代码

由于这样拆成元组之后,下次再进行操作的时候,还可以拿到前一个信号的值,这样就不会形成累加的效果。

13. sample:

- (RACSignal *)sample:(RACSignal *)sampler {    NSCParameterAssert(sampler != nil);    return [[RACSignal createSignal:^(id
subscriber) { NSLock *lock = [[NSLock alloc] init]; __block id lastValue; __block BOOL hasValue = NO; RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init]; RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { // 暂时省略 }]; samplerDisposable.disposable = [sampler subscribeNext:^(id _) { // 暂时省略 }]; return [RACDisposable disposableWithBlock:^{ [samplerDisposable dispose]; [sourceDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];}复制代码

sample:内部实现也是对原信号和入参信号sampler分别进行订阅。具体实现就是这两个信号订阅内部都干了些什么。

RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {    [lock lock];    hasValue = YES;    lastValue = x;    [lock unlock];} error:^(NSError *error) {    [samplerDisposable dispose];    [subscriber sendError:error];} completed:^{    [samplerDisposable dispose];    [subscriber sendCompleted];}];复制代码

这是对原信号的操作,原信号的操作在subscribeNext中就记录了两个变量的值,hasValue记录原信号有值,lastValue记录了原信号的最新的值。这里加了一层NSLock锁进行保护。

在发生error的时候,先把sampler信号取消订阅,然后再sendError:。当原信号完成的时候,同样是先把sampler信号取消订阅,然后再sendCompleted。

samplerDisposable.disposable = [sampler subscribeNext:^(id _) {    BOOL shouldSend = NO;    id value;    [lock lock];    shouldSend = hasValue;    value = lastValue;    [lock unlock];    if (shouldSend) {        [subscriber sendNext:value];    }} error:^(NSError *error) {    [sourceDisposable dispose];    [subscriber sendError:error];} completed:^{    [sourceDisposable dispose];    [subscriber sendCompleted];}];复制代码

这是对入参信号sampler的操作。shouldSend默认值是NO,这个变量控制着是否sendNext:值。只有当原信号有值的时候,hasValue = YES,所以shouldSend = YES,这个时候才能发送原信号的值。这里我们并不关心入参信号sampler的值,从subscribeNext:^(id _)这里可以看出, _代表并不需要它的值。

在发生error的时候,先把原信号取消订阅,然后再sendError:。当sampler信号完成的时候,同样是先把原信号取消订阅,然后再sendCompleted。

经过sample:变换就会变成这个样子。只是把原信号的值都移动到了sampler信号发送信号的时刻,值还是和原信号的值一样。

最后

关于RACSignal的变换操作还剩下 冷热信号转换操作,高阶信号操作,下篇接着继续分析。最后请大家多多指教。

转载地址:http://cklul.baihongyu.com/

你可能感兴趣的文章
推荐几本最好的web前端开发技术图书
查看>>
ZOJ 1015 Fishing Net(判断弦图)
查看>>
D3D中剔除模式
查看>>
iOS开发系列--App扩展开发
查看>>
批量插入,更新,删除数据
查看>>
mysql 查看用户的权限
查看>>
JavaScript 函数节流和函数去抖应用场景辨析
查看>>
log4j的参数配置(转)
查看>>
[C++][基础]1_变量、常量和基本类型
查看>>
Android Service与Runnable整合并用
查看>>
Php综合手册
查看>>
[轉]javascript 的 location 各種用法
查看>>
MySQL 数据文件 说明
查看>>
测地膨胀和膨胀重建—lhMorpRDilate
查看>>
30 +最佳移动网络设计灵感的案例
查看>>
C++基础代码--20余种数据结构和算法的实现
查看>>
深入探索PowerPivot客户端和服务器端架构
查看>>
fash 3D 游戏
查看>>
Android 用户界面---广播通知(Toast Notifications)
查看>>
HDOJ 2090
查看>>