4

This might be a trivial question but I'm unable to find a solution to this seemingly easy task. As I'm new to ReactiveSwift and reactive programming I might simply miss something obvious.

Basically what I want to do is something like this:

signal.collect(timeInterval: .seconds(5))

I want to collect all values over a specific period of time from a signal. The resulting signal would produce an event every x seconds which would contain an array of the collected events from the first signal.

What is the best approach to do this in ReactiveSwift?

Sascha Wolf
  • 18,810
  • 4
  • 51
  • 73

2 Answers2

4

There's no built-in operator in ReactiveSwift for this task. Instead, you can use following approach, writing an extension:

import Foundation
import ReactiveSwift
import Result
public extension Signal {
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
        let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
        onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
            pipe.1.sendCompleted()
        }
        return Signal { observer in
            return self.observe { event in
                switch event {
                case let .value(value):
                    observer.send(value: value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }
        }.take(until: pipe.0)
    }

    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
        return self.completeAfter(after: until).collect()
    }
}

And then use signal.collectUntil(5) method.

Another way is to use timer function from ReactiveSwift. Example (add to same extension, as above):

public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
    var signal: Signal<(), NoError>? = nil
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
        signal = innerSignal.map { _ in () }.take(first: 1)
    }
    return self.take(until: signal!).collect()
}

I, however, don't like this approach, because it's faking nature of SignalProducer type extracting inner signal.

Signal type itself has also timeout function, however it will be difficult to use it since it's error-producing. Example of how to use it (still, add to the same extension):

public func completeOnError() -> Signal<Value, Error> {
    return Signal { observer in
        return self.observe { event in
            switch(event) {
            case .value(let v): observer.send(value: v)
            case .failed(_): observer.sendCompleted()
            case .interrupted: observer.sendInterrupted()
            case .completed: observer.sendCompleted()
            }
        }
    }
}

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
    return self
        .timeout(after: until,
                 raising: NSError() as! Error,
                 on: QueueScheduler())
        .completeOnError()
        .collect()
}

P.S. By choosing any of 3 options, mind passing correct scheduler or paramatrizing your solution with correct scheduler.

Petro Korienev
  • 4,007
  • 6
  • 34
  • 43
  • I've marked your answer as correct although it wasn't quite what I was looking for, but you've pointed me in the right direction. I've added an answer of my own which includes the extension I've written which solves my problem. – Sascha Wolf Mar 14 '17 at 10:38
1

Based on the answer by Petro Korienev (which sadly wasn't quite what I was looking for), I've created an extension which solves my problem. The extension follows the structure of the ReactiveSwift collect functions, to stay as close as possible to the intents of ReactiveSwift.

It will collect all sent values over a given timeInterval and then send them as array. On a terminating event it will also send the remaining values, if there are any.

extension Signal {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
        return Signal<[Value], Error> { observer in
            var values: [Value] = []
            let sendAction: () -> Void = {
                observer.send(value: values)

                values.removeAll(keepingCapacity: true)
            }
            let disposable = CompositeDisposable()
            let scheduleDisposable = scheduler.schedule(
                    after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                    interval: timeInterval,
                    action: sendAction
            )

            disposable += scheduleDisposable
            disposable += self.observe { (event: Event<Value, Error>) in
                if event.isTerminating {
                    if !values.isEmpty {
                        sendAction()
                    }

                    scheduleDisposable?.dispose()
                }

                switch event {
                case let .value(value):
                    values.append(value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }

            return disposable
        }
    }
}

extension SignalProducer {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
        return lift { (signal: ProducedSignal) in
            signal.collect(timeInterval: timeInterval, on: scheduler)
        }
    }
}

extension DispatchTimeInterval {
    var timeInterval: TimeInterval {
        switch self {
        case let .seconds(s):
            return TimeInterval(s)
        case let .milliseconds(ms):
            return TimeInterval(TimeInterval(ms) / 1000.0)
        case let .microseconds(us):
            return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
        case let .nanoseconds(ns):
            return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
        }
    }
}
Community
  • 1
  • 1
Sascha Wolf
  • 18,810
  • 4
  • 51
  • 73
  • 1
    Now I've got your point and it seems for me that what you're trying to achieve is closer to "sampling" approach. I've created a short gist, probalby might help you as well https://gist.github.com/soxjke/f9271d452292af67cfc4bb83593588d3 – Petro Korienev Mar 14 '17 at 12:30
  • @PetroKorienev, I tried both approaches yours from your Gist and Zeekers. But Both let events Drop if they come in short intervals around the Time a collection or Sample is send. Any ideas to mitigate that Problem? – Peter Schumacher Jun 12 '17 at 12:42
  • I got ahead of myself. The Problem was somewhere totally different which let it seem that Data was lost during Collection – Peter Schumacher Jun 12 '17 at 13:49