Combine brings in developer’s life a lot of nice additions and make it’s better. Using publishers improve data flow and allow us produce and transform input into required data representations. This save for us a lot of time and effort.

I was wondering how this Publisher’s mechanism works in Combine, so i started from simplest thing - investigating key-components - Publisher, Subscription and Subscribers.

Powerful trio

To get idea what’s going on, let’s inspect each components in details.

Looking forward, here is small scheme, that demonstrate this trio workflow:

trio


Subscriber

Subscriber - is a very first item that should be checked, thus it’s, as mention in official doc, “a protocol that declares a type that can receive input from a publisher”.

So this component describe requirements for types, that can get information from source. If we check API, we can found a protocol declaration like following one:

public protocol Subscriber: CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

Here, we can see few functions that should be available for such type.

As u can already guess, start points - is creating a Subscriber. You connect a subscriber to a publisher by calling the publisher’s subscribe(_:) method. Then publisher notify Subscriber, that he receive subscription by calling receive(subscription:). Next step should be done by Subscriber - he ask in subscription some values, and subscription post them to publisher, which notify Subscriber about result by calling receive(_:).

Last, but not least - when there is nothing more to do or some error occured - publisher calls receive(completion:).

We can show this process on earlier provided diagram as next:

subscriber-life


Publisher

Next one in our list - Publisher - “a type can transmit a sequence of values over time”.

This type specially created for value transmitting to subsciber and has next requirements:

public protocol Publisher {

  associatedtype Output
  associatedtype Failure : Error

  func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

As u can see, there is only 1 method - receive(subscriber:). After publisher receive it’s subscriber, he become able to call all methods from Subscriber, defined by it’s contract.

Again, we can display this on our diagram as following:

publisher-life


Subscription

Subscription - last component, that combine previous components together.

We may think about - as a bridge between Publisher and Subscriber.

Protocol for Subscription:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
  func request(_ demand: Subscribers.Demand)
}

All that subscription can do - accept request to provide data using request(_:) method. Another option, that u can observe by looking at adopted protocols is cancel. Yes, u have an option to cancel u’r previous request.

And, as and before, here is diagram for Subscription:

subscription-life


Practice

To make things fully understandable just theory is not good enough. Let’s craft our own Publisher that simulate well-known map(_:) function.

According to Apple doc - this function “transforms all elements from the upstream publisher with a provided closure”. So, we can easelly transform our components in to alternative represenatation without changing data stream. Great.

Before we go to next steps, it’s good to understand meaning of Upstream and Downstream concepts.

Downstream - it’s an item, that add’s value to another or depends on it in any other way, and Upstream - vise versa.

There is a good article about it available here

Custom Publisher

Let’s start from Publisher, because we need to define our types and required input-output values.

To do so, we can place our publisher in an extension to Publishers (as it done with other publishers) - let’s name it Mapper.

Note Publishers - s at the end, not Publisher

We also shoud define type of input and output, and transform closure (to allow data transformation). And last step - define subscribe(:) method, where we should subscribe our custom Subscriber.

All together it looks like next:

extension Publishers {
    public struct Mapper<Upstream: Publisher, Output>: Publisher {
        public typealias Failure = Upstream.Failure
        public let upstream: Upstream
        public let transformClosure: (Upstream.Output) -> Output
        
        public init(
            upstream: Upstream,
            transform: @escaping (Upstream.Output) -> Output
        ) {
            self.upstream = upstream
            self.transformClosure = transform
        }
        
        public func receive<S>(subscriber: S)
        where S: Subscriber,
              Output == S.Input,
              S.Failure == Upstream.Failure {
            upstream.subscribe(
                MapperSubscriber<S>( // this one is not yet defined
                    subscriber: subscriber,
                    mapClosure: transformClosure
                )
            )
        }
    }
}

Custom Subscriber

The next step - Subscriber.

I named it MapperSubscriber, and we also would like to limit access to this subscriber and make it usable for our Publisher only - MapperSubscriber will be placed in extension to Publishers.Mapper.

Subscriber has Input and Output according to protocol requrements. So, to transform our values we should define tranformClosure that accept Input and return Output.

Next step - implement all method required by protocol. Here we simply dublicate functionality by calling similar functions on subscriber:

func receive(subscription: Subscription) {
    subscriber.receive(subscription: subscription)
}
    
func receive(_ input: Input) -> Subscribers.Demand {
    subscriber.receive(mapClosure(input))
}
    
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
    subscriber.receive(completion: completion)
}

Take a closer look at receive(_:) - here, in the place where we get items, the magic begins - we call mapClosure(input), and let someone else to deside, how to transform data in stream.

Combining all together:

extension Publishers.Mapper {
    private struct MapperSubscriber<S: Subscriber>: Subscriber
    where S.Input == Output,
          S.Failure == Upstream.Failure {
        
        typealias Input = Upstream.Output
        typealias Failure = Upstream.Failure
        
        private let subscriber: S
        private let mapClosure: (Input) -> Output
        let combineIdentifier: CombineIdentifier = .init()
        
        fileprivate init(
            subscriber: S,
            mapClosure: @escaping (Input) -> Output
        ) {
            self.subscriber = subscriber
            self.mapClosure = mapClosure
        }
        
        func receive(subscription: Subscription) {
            subscriber.receive(subscription: subscription)
        }
        
        func receive(_ input: Input) -> Subscribers.Demand {
            subscriber.receive(mapClosure(input))
        }
        
        func receive(completion: Subscribers.Completion<Upstream.Failure>) {
            subscriber.receive(completion: completion)
        }
    }
}

Extension

To make it more usable and pretty - let’s add an extension to Publisher type:

extension Publisher {
    public func mappper<Result>(
        _ transform: @escaping (Output) -> Result
    ) -> Publishers.Mapper<Self, Result> {
        Publishers.Mapper(upstream: self, transform: transform)
    }
}

Test

To test our new addition, we can use simple snippet like following:

let token = [1,2,3,4] // remember to store token somewhere
    .publisher
    .mappper { value in
        "\(value)"
    }
    .sink { (completionn) in

    } receiveValue: { (result) in
        print("mapped values :", result, type(of: result))
    }

and output:

mapped values : 1 String
mapped values : 2 String
mapped values : 3 String
mapped values : 4 String

download source code

Resources