Android Introduction To Reactive Programming – RxJava, RxAndroid

237 views.

RxJava is out there for quite some time and people are hearing about its greater capabilities, but the lot of them haven’t started yet. If you are one of them, you are late to the party, but that’s ok; better late than never. Few developers I spoke with say there is no proper guide available (we can find a lot of good articles spread across multiple websites than in a single place) or they fear to start something new.

In this series, I am aiming to write a series of tutorials covering basics to advanced topics in RxJava and RxAndroid.

I am hoping these article series make any novice developer, a RxAndroid developer in the weeks time.

We’ll start with a basic theoretical explanation of few terms you come across in reactive programming and later get on too few code examples.

What is Reactive Programming
Reactive Programming is basically event-based asynchronous programming. Everything you see is an asynchronous data stream, which can be observed and an action will be taken place when it emits values. You can create data stream out of anything; variable changes, click events, http calls, data storage, errors and what not. When it says asynchronous, that means every code module runs on its own thread thus executing multiple code blocks simultaneously.

An advantage of asynchronous approach is, as every task runs on its own thread, all the task can start simultaneously and amount of time takes complete all the tasks is equivalent to the longer task in the list. When it comes to mobile apps, as the tasks runs on background thread, you can achieve seamless user experience without blocking main thread.

A simple example (from Wikipedia) can be x = y + z; where the sum of y and z is assigned to x. In reactive programming, when the y value changes, the value of x will be updated automatically without re-executing the x = y + z statement. This can be achieved by observing the value of y or z.
An array list can be a data steam and an action can be taken when each item it emitted. May be you want to filter out the even numbers and ignoring the odd numbers. This can be done using usual loops and conditional statements, but in reactive programming you can achieve this in a completely different approach.
When you start your app in Reactive Programming, the way you design your architecture and the way you write code changes completely. It’s even becomes more powerful when met with Clean Architecture, MVP, MVVM and other design patters.
2. Reactive Extensions
Reactive Extensions (ReactiveX or RX) is a library that follows Reactive Programming principles i.e compose asynchronous and event based programs by using observable sequence. These libraries provides set of interfaces and methods which helps developers write clean and simpler code.

Reactive Extensions are available in multiple languages C++ (RxCpp), C# (Rx.NET), Java (RxJava), Kotlin (RxKotlin), Swift (RxSwift) and lot more. We specifically interested in RxJava and RxAndroid as android is our focused area.
3. What is RxJava
RxJava is Java implementation of Reactive Extension (from Netflix). Basically it’s a library that composes asynchronous events by following Observer Pattern. You can create asynchronous data stream on any thread, transform the data and consumed it by an Observer on any thread. The library offers wide range of amazing operators like map, combine, merge, filter and lot more that can be applied onto data stream.

You will understand more about operators and transformations when you start working on actual code examples.
4. What is RxAndroid
RxAndroid is specific to Android Platform with few added classes on top of RxJava. More specifically, Schedulers are introduced in RxAndroid (AndroidSchedulers.mainThread()) which plays major role in supporting multithreading concept in android applications. Schedulers basically decides the thread on which a particular code runs whether on background thread or main thread. Apart from it everything we use is from RxJava library only.

Even through there are lot of Schedulers available, Schedulers.io() and AndroidSchedulers.mainThread() are extensively used in android programming. Below are the list of schedulers available and their brief introduction.

Schedulers.io() – This is used to perform non CPU-intensive operations like making network calls, reading disc / files, database operations etc., This maintains pool of threads.
AndroidSchedulers.mainThread() – This provides access to android Main Thread / UI Thread. Usually operations like updating UI, user interactions happens on this thread. We shouldn’t perform any intensive operations on this thread as it makes the app glitchy or ANR dialog can be thrown.
Schedulers.newThread() – Using this, a new thread will be created each time a task is scheduled. It’s usually suggested not to use schedular unless there is a very long running operation. The threads created via newThread() won’t be reused.
Schedulers.computation() – This schedular can be used to perform CPU-intensive operations like processing huge data, bitmap processing etc., The number of threads created using this scheduler completely depends on number CPU cores available.
Schedulers.single() – This scheduler will execute all the tasks in sequential order they are added. This can be used when there is necessity of sequential execution is required.
Schedulers.immediate() – This scheduler executes the the task immediately in synchronous way by blocking the main thread.
Schedulers.trampoline() – It executes the tasks in First In – First Out manner. All the scheduled tasks will be executed one by one by limiting the number of background threads to one.
Schedulers.from() – This allows us to create a scheduler from an executor by limiting number of threads to be created. When thread pool is occupied, tasks will be queued.
Now we have the basic concepts needed. Let’s start with some key concepts of RxJava that everyone should aware of.

4. RxJava Basics: Observable, Observer
RxJava is all about two key components: Observable and Observer. In addition to these, there are other things like Schedulers, Operators and Subscription.

Observable: Observable is a data stream that do some work and emits data.

Observer: Observer is the counter part of Observable. It receives the data emitted by Observable.

Subscription: The bonding between Observable and Observer is called as Subscription. There can be multiple Observers subscribed to a single Observable.

Operator / Transformation: Operators modifies the data emitted by Observable before an observer receives them.

Schedulers: Schedulers decides the thread on which Observable should emit the data and on which Observer should receives the data i.e background thread, main thread etc.,
5. RxJava Basic Examples
Now we have good theoretical knowledge about Reactive Programming, RxJava and RxAndroid. Let’s jump on to some code examples to understand the concepts better.

Adding Dependencies
To get started, you need to add the RxJava and RxAndroid dependencies to your projects build.gradle and sync the project.

The Basic Steps
1. Create an Observable that emits data. Below we have created an Observable that emits list of animal names. Here just() operator is used to emit few animal names.

Observable<String> animalsObservable = Observable.just(“Ant”, “Bee”, “Cat”, “Dog”, “Fox”);
2. Create an Observer that listen to Observable. Observer provides the below interface methods to know the the state of Observable.

onSubscribe(): Method will be called when an Observer subscribes to Observable.
onNext(): This method will be called when Observable starts emitting the data.
onError(): In case of any error, onError() method will be called.
onComplete(): When an Observable completes the emission of all the items, onComplete() will be called.
Observer<String> animalsObserver = getAnimalsObserver();

private Observer<String> getAnimalsObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe”);
}

@Override
public void onNext(String s) {
Log.d(TAG, “Name: ” + s);
}

@Override
public void onError(Throwable e) {
Log.e(TAG, “onError: ” + e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG, “All items are emitted!”);
}
};
}
3. Make Observer subscribe to Observable so that it can start receiving the data. Here, you can notice two more methods, observeOn() and subscribeOn().

subscribeOn(Schedulers.io()): This tell the Observable to run the task on a background thread.
observeOn(AndroidSchedulers.mainThread()): This tells the Observer to receive the data on android UI thread so that you can take any UI related actions.
animalsObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(animalsObserver);
If you run the program, you can see the below output in your LogCat.

onSubscribe
Name: Ant
Name: Bee
Name: Cat
Name: Dog
Name: Fox
All items are emitted!
That’s all, you just wrote your first RxJava program. We are going to learn more about Schedulers and Observers in subsequent articles. But for now this information is sufficient to get started.

You may also like...

Leave a Reply