Schedulers in Combine: Part 4: OperationQueue Scheduler
iOS Combine Estimated reading time: 8 minutesIn this article we review last, but not least (in the list of available schedulers for Combine
framework), scheduler - OperationQueue
.
As u remember from previous articles, Scheduler
it’s just a protocol that requires from type define WHEN and HOW execute a selected task.
OperationQueue
functionality and purpose - it’s another quite interesting mechanism. I will not cover the full possibilities of it in this article, instead, u can jump here to refresh the knowledge about it.
In 2 words, OperationQueue
it’s a queue that controls how operations can be executed.
An operation queue executes its queued Operation objects based on their priority and readiness. After being added to an operation queue, the operation remains in its queue until it reports that it is finished with its task. You can’t directly remove an operation from a queue after it has been added.
It’s also good to know, that under the hood OperationQueue
use GCD
, but provide an additional level of control for each task that can be executed.
Related articles:
- Schedulers in Combine. Part 1: ImmediateScheduler
- Schedulers in Combine. Part 2: RunLoop Scheduler
- Schedulers in Combine. Part 3: DispatchQueue Scheduler
- Schedulers in Combine. Part 4: OperationQueue Scheduler
OperationQueue Scheduler
As we did in previous articles from this series, we start to review OperationQueue
as a Scheduler from the checking HOW approach. And this can be done within simple code:
var subscription = Set<AnyCancellable>()
let operation = OperationQueue()
let publisher = [1,2,3,4,5].publisher
publisher
.receive(on: operation)
.sink { (value) in
print("Recevied value \(value) on \(Thread.current)")
}
.store(in: &subscription)
If u expect to receive output like:
Recevied value 1 on <NSThread: 0x600000b01000>{number = 1, name = (null)}
Recevied value 2 on <NSThread: 0x600000b00b80>{number = 1, name = (null)}
Recevied value 3 on <NSThread: 0x600000b1a140>{number = 1, name = (null)}
Recevied value 4 on <NSThread: 0x600000b16c40>{number = 1, name = (null)}
Recevied value 5 on <NSThread: 0x600000b04b40>{number = 1, name = (null)}
u will be surprised. The real output is like
Recevied value 2 on <NSThread: 0x600000b01000>{number = 5, name = (null)}
Recevied value 5 on <NSThread: 0x600000b00b80>{number = 3, name = (null)}
Recevied value 4 on <NSThread: 0x600000b1a140>{number = 8, name = (null)}
Recevied value 3 on <NSThread: 0x600000b16c40>{number = 9, name = (null)}
Recevied value 1 on <NSThread: 0x600000b04b40>{number = 6, name = (null)}
First of all, u may notice, that values come in different order. Also - on different Threads
.
Why? As was mention above, OperationQueue
works under GCD
, and so, to deliver these values it may use different Threads
, so the order is not guaranteed at all.
If u want to check how it’s work under the hood, then we may refer to open-source code, and we may found:
public func schedule(options: OperationQueue.SchedulerOptions?,
_ action: @escaping () -> Void) {
let op = BlockOperation(block: action)
addOperation(op)
}
As u can see, BlockOperation
is used. This means that any of the available global()
Thread
is used. And this explains the output.
definition:
@available(iOS 4.0, *)
class BlockOperation : Operation {
public convenience init(block: @escaping () -> Void)
open func addExecutionBlock(_ block: @escaping () -> Void)
open var executionBlocks: [@convention(block) () -> Void] { get }
}
But remember, OperationQueue
provide few additional points of control in comparison to GCD
. One of them - maxConcurrentOperationCount
(allow to determine max task executed in concurency).
The default maximum number of operations to be executed concurrently in a queue equal to maxPossibleCount. If u print this value (default) u will see
-1
, that’s indicated as much as possible.
print(operation.maxConcurrentOperationCount) // -1
To fix random
execution of tasks we may modify operation by adding
operation.maxConcurrentOperationCount = 1
Output now - is ordered, as we want, but note the Thread
:
Recevied value 1 on <NSThread: 0x6000026f0d40>{number = 5, name = (null)}
Recevied value 2 on <NSThread: 0x6000026f0d40>{number = 5, name = (null)}
Recevied value 3 on <NSThread: 0x6000026f0d40>{number = 5, name = (null)}
Recevied value 4 on <NSThread: 0x6000026f9c00>{number = 3, name = (null)}
Recevied value 5 on <NSThread: 0x6000026f9c00>{number = 3, name = (null)}
So, even we got an order, but the Thread
is still - any available as before. To get the main
Thread
using OperationQueue
we may do next:
.receive(on: OperationQueue.main)
or even
operation.underlyingQueue = .main
The result will be ordered and on the main Thread
.
We also can modify other properties of OperationQueue
such as qualityOfService
or underlyingQueue
.
Another moment that we should think about when using OperationQueue
as a scheduler is the priority
of operations.
Priority defined as:
extension Operation {
public enum QueuePriority : Int {
case veryLow = -8
case low = -4
case normal = 0
case high = 4
case veryHigh = 8
}
}
The default value is normal = 0
. This means, that if u have OperationQueue
with the operation of highest priority and try to publish some other values - the result may surprise u.
To test this approach, let’s create a custom AsyncOperation that may take some time to process. To do so we may create something like this:
public class AsyncOperation: Operation {
// MARK: - AsyncOperation
public enum State: String {
case ready
case executing
case finished
fileprivate var keyPath: String {
return "is" + rawValue.capitalized
}
}
public var state = State.ready {
willSet {
willChangeValue(forKey: newValue.keyPath)
willChangeValue(forKey: state.keyPath)
}
didSet {
didChangeValue(forKey: oldValue.keyPath)
didChangeValue(forKey: state.keyPath)
}
}
}
public extension AsyncOperation {
// MARK: - AsyncOperation+Addition
override var isReady: Bool {
return super.isReady && state == .ready
}
override var isExecuting: Bool {
return state == .executing
}
override var isFinished: Bool {
return state == .finished
}
override var isAsynchronous: Bool {
return true
}
override func start() {
if isFinished {
return
}
if isCancelled {
state = .finished
return
}
main()
state = .executing
}
override func cancel() {
super.cancel()
state = .finished
}
override func main() {
preconditionFailure("Subclasses must implement `main`."
}
}
// subclass
final class AsyncLongAndHightPriorityOperation: AsyncOperation {
override func main() {
print("started heavy operation")
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
self.state = .finished
print("finished heavy operation")
}
}
}
better AsyncOperation can be found here
let heavyOperation = AsyncLongAndHightPriorityOperation()
heavyOperation.queuePriority = .high
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
print("Started at date \(Date())")
queue.addOperation(heavyOperation)
publisher
.receive(on: queue)
.sink(receiveCompletion: { (completion) in
print("Recevied completion \(completion) on \(Thread.current), date \(Date())")
}, receiveValue: { (value) in
print("Recevied value \(value) on \(Thread.current), date \(Date())")
})
.store(in: &subscription)
The result - as u may expect has a delay between output:
Started at date 2020-12-13 10:39:58 +0000
started heavy operation
Recevied value 1 on <NSThread: 0x600003f38dc0>{number = 3, name = (null)}, date 2020-12-13 10:40:02 +0000
finished the heavy operation
Recevied value 2 on <NSThread: 0x600003f3cb40>{number = 5, name = (null)}, date 2020-12-13 10:40:02 +0000
Recevied value 3 on <NSThread: 0x600003f38dc0>{number = 3, name = (null)}, date 2020-12-13 10:40:02 +0000
Recevied value 4 on <NSThread: 0x600003f0d100>{number = 7, name = (null)}, date 2020-12-13 10:40:02 +0000
Recevied value 5 on <NSThread: 0x600003f0d100>{number = 7, name = (null)}, date 2020-12-13 10:40:02 +0000
Recevied completion finished on <NSThread: 0x600003f0d100>{number = 7, name = (null)}, date 2020-12-13 10:40:02 +0000

Think about it in cases when u haven’t specified queue.maxConcurrentOperationCount = 1
or if u set non-concurrent target queue or even if u use OperationQueue.main
. The result may surprise u.
Such additional operation may even affect sink
output. Try to do next:
let heavyOperation = AsyncLongAndHightPriorityOperation()
heavyOperation.queuePriority = .high
let queue = OperationQueue()
//queue.maxConcurrentOperationCount = 1 // <- comment this
print("Started at date \(Date())")
queue.addOperation(heavyOperation)
publisher
.receive(on: queue)
.sink(receiveCompletion: { (completion) in
print("Recevied completion \(completion) on \(Thread.current), date \(Date())")
}, receiveValue: { (value) in
print("Recevied value \(value) on \(Thread.current), date \(Date())")
})
.store(in: &subscription)
Output shows us, that few values are not received at all.

To resolve this, we may do next:
.subscribe(on: queue)
.receive(on: OperationQueue.main)

As u can see, OperationQueue
provides for us additional options of controlling task executing, but be careful within it and make sure u correctly configure OperationQueue
.
by default
OperationQueue
execute task concurently
SchedulerOptions
If we check API, we may found that these options contain nothing, so nothing here to do.
/// Options that affect the operation of the operation queue scheduler.
public struct SchedulerOptions { }
SchedulerTimeType
SchedulerTimeType
is Date
:
public struct SchedulerTimeType: Strideable, Codable, Hashable {
/// The date represented by this type.
public var date: Date
/// Initializes an operation queue scheduler time with the given date.
///
/// - Parameter date: The date to represent.
public init(_ date: Date) {
self.date = date
}
...
}
To setup future work we may do as we alredy done within other Schedulers:
let source = Timer
.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { counter, _ in
let value = counter + 1
print("tick ", value)
return value
})
source
.receive(on: DispatchQueue.main)
.sink { (value) in
print("The value is \(value) in \(Thread.current) at \(Date())")
}
.store(in: &subscription)
operation
.schedule(
after: .init(Date(timeIntervalSinceNow: 4.5)),
tolerance: .seconds(1),
options: nil
) {
print("cancelation")
subscription.removeAll()
}

Under the hood, schedule use serial queue and execute every task using asyncAfter(deadline:)
:
init(_ action: @escaping() -> Void, after: OperationQueue.SchedulerTimeType) {
self.action = action
readyFromAfter = false
super.init()
let deadline = DispatchTime.now() + after.date.timeIntervalSinceNow
DelayReadyOperation.readySchedulingQueue.asyncAfter(deadline: deadline) { [weak self] in
self?.becomeReady()
}
}
note the difference
asynchAfter(deadline:)
andasynchAfter(wallDeadline:)
Pitfalls
- Be careful when use OperationQueue - make sure it’s available for dedicated tasks
- Remember that every task by default will be executed async concurrently on available
Threads
, so the order is not guaranteed
Related articles:
- Schedulers in Combine. Part 1: ImmediateScheduler
- Schedulers in Combine. Part 2: RunLoop Scheduler
- Schedulers in Combine. Part 3: DispatchQueue Scheduler
- Schedulers in Combine. Part 4: OperationQueue Scheduler
Share on: