Home Java RxJava.Taking the magic out of it.

RxJava.Taking the magic out of it.

by admin

For a long time I was afraid to use RxJava in production.Its purpose and principle of work remained a mystery to me.Reading the source code did not add clarity and the articles were only confusing. Here is an attempt to answer the questions:"What problems does this technology solve better than analogues?" and "How does it work?" using analogies to classical Java and simple metaphors.
RxJava.Taking the magic out of it.


RxJava is an excellent substitute for Streams API from Java 8 on earlier versions of Java. Since Android Java 8 is not supported since 4.0, Rx would be the optimal solution. This article discusses RxJava from this perspective, as I think it is the most comprehensible and a truly reactive Android application with pure Rx is difficult to implement.


We all know the Iterator pattern.

interface Iterator<T> {T next();boolean hasNext();}

An interface hides a data source of some kind, and it doesn’t matter which one. Iterator completely hides all implementation details by providing only two methods :
next – get the next element
hasNext – Find out if there is more data in the source
This pattern has one peculiarity: the consumer asks for data and waits ("hangs") until the source gives it out. That is why the source is usually a final, often pre-formed collection.
Let’s do some refactoring.

interface Iterator<T> {T getNext();boolean isComplete();}

I think you know what I mean by now. The Emitter interface from RxJava (For consumers, it is duplicated in Observer (Subscriber in RxJava 1) :

interface Emitter<T> {void onNext(T value);void onComplete();void onError(Throwable error);}

It is similar to Iterator, but works in the opposite direction : source tells the consumer that there is new data.
This solves all the problems with multi-threading on the source side and, for example, if you are designing a UI, you can calculate on the fact that all the code responsible for the GUI is serial. Incredibly convenient. Goodbye, callbacks! I won’t be bored.
The analogy with Iterator is taken from [1]


Now a little bit about the sources themselves. They come in many types: Observable, Single, Maybe… And they all look like cabbage (and monads, but that’s not important)
RxJava.Taking the magic out of it.
Because once you create one source, you can wrap it in another source, which you can wrap again in another source, and so on to OutOfMemory. (But since a normal source weighs less than 100 bytes, more until the charge runs out.)
Let’s wrap up the answer to that very question in the source.


As we know, getting an answer is quite a long operation. So let’s wrap it up in a source that will do the calculations in a special thread :


And we also want the application not to crash when we answer. We wrap in a source that will return a response in the main thread :


And finally, let’s run :

Observable.just(42).subscribeOn(computation()).observeOn(mainThread()).subscribe(new DisposableObserver<Integer> () {@Overridepublic void onNext(Integer answer) {System.out.print(answer);}@Override public void onComplete() {}@Override public void onError(Throwable e) {}});

The console displays the answer, but what happened?
Method subscribe is defined in Observable It makes checks and preparations and then calls the method subscribeActual which is already defined differently for different sources.
In our case the method subscribe called the method subscribeActual from ObservableObserveOn, which calls the subscribemethod of the source wrapped in it, specifying which thread to return the result to.
In ObservableObserveOn lies ObservableSubscribeOn. Its subscribeActual runs subscribe wrapped in a given stream.
Finally, ObservableSubscribeOn is wrapped with ObservableJust, which just gives its value onNext.
Naturally, just with a number isn’t interesting. So here’s a source that gets a list of products and finds out prices for them. You can only get prices by 20 pieces (InAppBilling API has the same limitation).
This example is designed to demonstrate how it works, not for use in real projects.
RxJava has a huge number of different source implementations. They all work on the same principle, and the details are perfectly described in the documentation. So I won’t dwell on them.


All source operations are divided into 2 types :
Nonterminal return a new source that wrapped the original
Terminal execute the chain and retrieve the data (subscribe, map…)
And yes, nothing will execute until a terminal operation is performed. The chain can lie in memory as long as it wants without doing anything at all. And that’s a good thing, because if we’re not getting data, why produce it? (Lazy computing without Haskell included!)
Similar to the Streams API of [2]

Dispose (Unsubscribe in RxJava 1)

You can interrupt the chain. This is done by calling dispose() on the DisposableObserver (unsubscribe() to Subscriber in RxJava 1)
After that RxJava will stop executing chains, unsubscribe all Observer’s and call iterrupt() on threads that are no longer needed.
You can also find out if execution from sources is interrupted. To do this, Emitter has the isDispose() method (isUnsubscribe() for RxJava 1)
This has a logical but unpleasant feature : since Observer is responsible for error handling, now all errors crash the application. I haven’t found a solution yet which I’m willing to write about.


– Allows you to easily link requests to the network, database, etc.; organizing their asynchronous execution. This means that your users will get a faster and more responsive application.
– Contains no magic in itself. Only the composition and execution of source chains.
– (To me) Solves more problems than it creates!
Thank you all.
[1] Video
[2] Video

You may also like