Intro to Reactive Programming

Reactive code is a paradigm of code based on the idea that you have a Stream of data that needs processing. It relies heavily in asynchronous and non-blocking operations and, although it is not the same as functional paradigm, it uses similar concepts like declarative code and avoidance to shared states and side effects.

A Bunch of Concepts (hang in there)

The root elements of Reactive are the Publisher and the Subscriber (or Observable and Observer). The Publisher is an emitter of signals while the Subscriber is a consumer of said signals.

A Publisher can have several Subscribers subscribing to it and each subscriber will receive the same data, unless a Subscriber subscribe after a data have been emitted, in which case it might or might not receive this data, depending on the nature of the Publisher.

We say signals and not data because there are three types of signals that a Publisher can emit. The first one is data and that is normally the most common when you receive an expected data from the publisher. The second kind of signal is the complete signal and it notifies the Subscriber that the publisher have no more data to send and has completed the stream.

The third signal is an error. In itself the error is also a kind of data, but it tells a special situation that the Subscriber might not be able to handle. Although there is no requirement for the Subscriber to do anything special when receive an error it must know that the error is a terminal state and after it should consider itself unsubscribed from the publisher.

Although it is not required, most implementations of Reactive code uses the concept of backpressure. This means that it is the Subscriber that requests data and, only when requested, the Publisher can send said data.

Take notice, however, that the Publisher have no requirement to have the data immediately available, in which case it will emit the data when it is ready.

Nice talk, but how do I code it?

Well perhaps we can stay a little longer in the domain of theories, yes? Just for a little longer lets implement something using just the Publisher and Subscriber interfaces.

First the Publisher have just one method to implement, that is the subscribe() method. This method receives one parameter, the Subscriber and have no return. This method is what you are going to call when you want to subscribe to the Publisher and it have a single responsibility: to call the onSubscribe() method of the Subscriber passing a Subscription object.

Obviously your Publisher also need to have the data that it is going to stream. Although this data can be in many forms and sources. Lets use a really simple one here, an array of Integers.

Publisher<Integer> publisher = new Publisher<>() {
    final Integer[] data = new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        Subscription subscription = new Subscription() {...};
        subscriber.onSubscribe(subscription);
    }
};

Since the publisher is a stateless class and don’t even have a reference to its subscribers it needs another entity to interact with the Subscribers. That entity is the Subscription. The subscription should keep track of what data have been send and what data needs to be sent next. It is also the one responsible for the backpressure mechanism.

It will implement two methods. The first one is the request() method and receive a long as parameter of how many data elements the Subscriber is ready to receive. On this method the Subscription doesn’t return the data itself, but rather calls another method of the subscriber with the data or, if the Publisher have no more data to emit, another method that sends the completed signal.

The second method is the cancel() method and it is used by the Subscriber to let the Subscription know that it is done processing and no more data should be sent (there is no guaranty that no data will be send, just that the Subscriber doesn’t want any more data to be sent)

Subscription subscription = new Subscription() {
    int next = 0;
    @Override
    public void request(long n) {
        for (int i = 0; i < n; i++) {
            if (next < data.length) {
                subscriber.onNext(data[next++]);
            } else {
                subscriber.onComplete();
                break;
            }
        }
    }
    @Override
    public void cancel() {
        subscriber.onComplete();
    }
};

Finally the Subscriber is where things might get a little more interesting. It have four methods, one for each signal it can receive and one extra to know when his subscription have been concluded and it is finally ready to ask for data.

The method onSubscribe() have the responsibility to receive the Subscription and store its reference for future use. In our simple implementation it is also responsible to request the first data. Even after subscribing to the Publisher the Subscriber will only receive data after requesting it through the Subscription.

The method onNext() is when you (finally) will receive the data and will have the opportunity to do something with it. Since you received the data, you must now request more using the Subscription method request().

The onError() is when you get an (surprise) error from the Publisher in the form of an exception. Since the error is a terminal state, after receiving this data the Subscribe should not expect more data to come.

The final method if the onComplete() and it is just that, a signal that tells the Subscriber that no more data exist from that point on.

Subscriber<Integer> subscriber = new Subscriber<>() {
    Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        System.out.println("Subscribed");
        subscription.request(1);
    }
    @Override
    public void onNext(Integer o) {
        System.out.println("Number " + o);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }
    @Override
    public void onComplete() {
        System.out.println("Completed");
    }
};

With all these classes implemented we can finally make a call to be sure kick things into action.

publisher.subscribe(subscriber);

at which point we are going to receive the expected result

Subscribed
Number 1
Number 2
Number 3
Number 4
Number 5
Number 6
Number 7
Number 8
Number 9
Number 10
Completed

Final Words

Right now this entire Reactive thing seems a lot of work for accomplishing little to nothing and that is true. That is why we rely on the implementations of Reactive Programming, like Reactor and RxJava.

This entire conceptual introduction, although not necessary to do code with reactive frameworks, will make understanding how things work when we talk about real world implementation and make sense to some things that will seem illogical.

But not now, lets wrap this up for now since there is plenty to digest.