RACSubject, созданный в dispatch_queue sendComplete, не попадает в объединенный сигнал

В блоке dispatch_async, работающем в очереди gcd DISPATCH_QUEUE_PRIORITY_DEFAULT: я создаю два объекта RACSubject, использую слияние RACSignal: и завершаю подписку. Затем, для целей этого теста (и для воспроизведения сценария в моем реальном коде), я отправляю sendComplete на оба из них. Подписка на завершение объединенного сигнала никогда не срабатывает. Я прикрепил к темам две подписки на завершение самостоятельно, они действительно срабатывают. Если я сделаю этот же тест в основном потоке вместо очереди gcd, он будет работать, как и ожидалось.

Есть ли способ заставить это работать, или мне придется провести рефакторинг, чтобы все мои темы попали в основной поток?

#import <ReactiveCocoa/ReactiveCocoa.h>

@interface rac_signal_testTests: SenTestCase
@end

@implementation rac_signal_testTests

- (void)setUp
{
    [super setUp];

    // Set-up code here.
}

- (void)tearDown
{
    // Tear-down code here.

    [super tearDown];
}

-(void)test_merged_subjects_will_complete_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject1 sendNext:@"1"];
    [subject2 sendNext:@"2"];

    [subject1 sendCompleted];
    [subject2 sendCompleted];

    STAssertTrue(completed_fired, nil);
}

//test proving that throttling isn't breaking the merged signal (initial hypothesis).
-(void)test_merged_subjects_will_complete_if_one_of_them_has_a_throttled_subscriber_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    __block NSString * hit_subject2_next = nil;
    [[subject2 throttle:.5] subscribeNext:^(NSString *value){
        hit_subject2_next = value;
    }];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject2 sendNext:@"2"];
    [subject2 sendCompleted];
    [subject1 sendCompleted];
    STAssertEqualObjects(@"2", hit_subject2_next, nil);
    STAssertTrue(completed_fired, nil);
}

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    dispatch_async(global_default_queue, ^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        __block NSString * hit_subject2_next = nil;

        RACScheduler *global_default_scheduler = [RACScheduler schedulerWithQueue:global_default_queue name:@"com.test.global_default"];

        RACSignal *sig1 = [subject1 deliverOn:RACScheduler.mainThreadScheduler];
        RACSignal *sig2 = [subject2 deliverOn:RACScheduler.mainThreadScheduler];

        [sig2    subscribeNext:^(NSString *value){
            hit_subject2_next = value;
        }];

        [sig2 subscribeCompleted:^{
            NSLog(@"hit sig2 complete");
        }];

        [sig1 subscribeCompleted:^{
            NSLog(@"hit sig1 complete");
        }];

        RACSignal *merged = [[RACSignal merge:@[sig1, sig2]] deliverOn:RACScheduler.mainThreadScheduler];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject2 sendNext:@"2"];
//        if we dispatch the send complete calls to the main queue then this code works but that seems like it shoul be unnecessary.
//        dispatch_async(dispatch_get_main_queue(), ^{
            [subject1 sendCompleted];
            [subject2 sendCompleted];
//        });
    });

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

@end

person Jon    schedule 01.08.2013    source источник


Ответы (1)


Так что это довольно хреновый случай, вызванный взаимодействием GCD и RAC. Строго говоря, бага нет. Но это удивительно удивительно и странно. Мы говорим об этом требовании в рекомендациях по дизайну по адресу https://github.com/ReactiveCocoa/ReactiveCocoa/blob/1bd47736f306befab64859602dbdea18f7f9a3f6/Documentation/DesignGuidelines.md#subscription-will-always-occur-on-a-scheduler..

Суть в том, что подписка всегда должна выполняться по известному планировщику. Это внутреннее требование RAC. Если вы просто используете старый добрый GCD, известного планировщика нет, поэтому RAC должен асинхронно отправлять подписку планировщику.

Итак, чтобы перейти к тесту:

[merged subscribeCompleted:^{
    complete = YES;
}];

Фактическая подписка происходит асинхронно, потому что нет известного планировщика. Подписка заканчивается после вызовов -sendCompleted и полностью их пропускает. Это действительно состояние гонки, но на самом деле вы, вероятно, никогда не увидите его успеха.

Исправление состоит в том, чтобы использовать RACSchedulers вместо GCD, если это возможно. Если вам нужно использовать определенную очередь GCD, вы можете использовать RACTargetQueueScheduler. Например, рабочая упрощенная версия вашего теста:

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    RACScheduler *scheduler = [[RACTargetQueueScheduler alloc] initWithName:@"testScheduler" targetQueue:global_default_queue];
    [scheduler schedule:^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject1 sendCompleted];
        [subject2 sendCompleted];
    }];

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

Поскольку подписка происходит из планировщика, subscribeCompleted: выполняется синхронно, получает завершенные события, и все ведет себя так, как вы ожидаете.

Если вам не нужно использовать определенную очередь GCD и вы просто хотите, чтобы это было сделано в неосновной очереди, сделайте что-то вроде:

[[RACScheduler scheduler] schedule:^{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    [merged subscribeCompleted:^{
        complete = YES;
    }];

    [subject1 sendCompleted];
    [subject2 sendCompleted];
}];

Надеюсь, это проясняет то, что вы видите. Дайте мне знать, если мне нужно что-то перефразировать.

person joshaber    schedule 01.08.2013
comment
Почему вы используете RACReplaySubject, где OP использует RACSubject? - person ipmcc; 02.08.2013
comment
Извините, это просто остатки от какого-то другого тестирования. Исправлена! - person joshaber; 03.08.2013
comment
Без проблем; Просто убедиться, что это не важно для решения. - person ipmcc; 03.08.2013