rakuishi.com

RxJava を使って、非同期通信処理とテストを書いてみた

例えば、ある通信処理で得られるトークンをもとに、次の通信処理を行うケースを考えてみる。Android アプリでは、同期的な通信は許可されていないから、非同期処理を続けて書く必要になりそうだ。これを愚直に実装すると面倒なことになりそうだが、RxJava で解消しそうだった。

RxJava とは、 リアクティブプログラミングを可能にするライブラリである(知らない言葉を説明するために、知らない言葉が登場してくる)。リアクティブプログラミングについては詳しくないのですが、このライブラリを使えば前述のようなプログラムが可読性高く書けるようになるようだった。

この RxJava を使って、非同期通信処理とテストを書いてみた。

非同期通信処理

今書いている Android アプリ rakuishi/OK の通信処理(このブログの RSS を読み込む)とデータ加工(レスポンスから Feed モデルを作成する)とリストに反映する処理を貼り付けた。

createmapsubscribeOnobserveOnsubscribe のように、処理をチェーンして書いているのが分かる。

Observable
    .create(new Observable.OnSubscribe<Response>() {
        @Override
        public void call(Subscriber<? super Response> subscriber) {
            Request request = new Request.Builder()
                .url("http://rakuishi.com/index.xml")
                .get()
                .build();
            OkHttpClient okHttpClient = new OkHttpClient();
            try {
                Response response = okHttpClient.newCall(request).execute();
                subscriber.onNext(response);
                subscriber.onCompleted();
            } catch (IOException e) {
                subscriber.onError(e);
            }
        }
    })
    .map(new Func1<Response, Feed>() {
        @Override
        public Feed call(Response response) {
            Serializer serializer = new Persister();
            try {
                return serializer.read(Feed.class, response.body().string());
            } catch (Exception e) {
                throw new OnErrorFailedException(e);
            }
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.newThread())
    .subscribe(new Subscriber<Feed>() {
        @Override
        public void onCompleted() {
            // 完了
            mEmptyView.setVisibility(View.GONE);
        }

        @Override
        public void onError(Throwable e) {
            // 失敗
            ToastUtils.showLongMessage(getActivity(), e.getMessage());
            mEmptyView.setVisibility(View.GONE);
        }

        @Override
        public void onNext(Feed feed) {
            // 成功
            mListView.setAdapter(new FeedAdapter(getActivity(), feed.getList()));
        }
    });

Observable を作成する

まずは、create で元となる Observable を作成する。Observable は、目に見える / 観測できる / 見分けのつくという意味の単語。ここでは、OkHttpClient による同期通信処理の Response を返している。この Observable.create() は、Observable<Response> を返している。

ちなみに Observable を作成するメソッドは、create 以外にもたくさんある。

データを加工する

次に、map でデータ加工を行います。これは Observable<Response> から Observable<Feed> に変換する処理を書いている。[Simple](Transforming Observables) という XML パーサーを使用している。

非同期処理にする

observeOnsubscribeOn で、Observable と後述するコールバックを扱う Subscribe をどのスレッドで扱うかを指定しています。今回は、非同期処理を行うように指定。

結果を受け取る

subscribe メソッドを呼ぶと、その結果を処理できる。subscribe は、申し込むという意味の単語。

onNextonCompleted は、成功時と完了時に呼ばれるが、意味が被りそうなこのふたつが存在するのは、値を順番に返すような処理も作れるためだった。

onError は、一連の流れで例外が発生した場合に呼ばれる。つまり、失敗した時の処理はここに書いとけば良いから、可読性が上がる。

テストを書く

非同期通信処理のテストを書くときは、CountDownLatch を使うと思うのだけれど、たんに、observeOn(AndroidSchedulers.mainThread()), subscribeOn(Schedulers.newThread()) を呼ばずに同期処理でテストすれば良い。

@Test
public void testRequestFeed() throws Exception {
    OkAPIClient.getInstance().requestFeed()
        .subscribe(new Action1<Feed>() {
            @Override
            public void call(Feed feed) {
                assertNotNull(feed);
                assertEquals(feed.getTitle(), "rakuishi.com");
                assertEquals(feed.getLink(), "http://rakuishi.com/");
            }
        });
}

ここでは、subscribe に、Action1 を使用している。何かしらの処理が失敗している場合は、feed == null が得られる。

テストは、以下のブログを参考にした。

Android アプリで使うには、RxAndroid

実際には、RxJava を Android 用に書かれた RxAndroid を使っている。

例えば、非同期通信処理が終わった後、呼び出し元の画面が存在していなくて、ぬるぽで落ちる問題がある。それを回避するには、画面が破棄される前に subscrible を解除してあげる必要がある。

RxAndroid では、CompositeSubscription に、add し、それを画面が破棄される onDestroy 時に、unsubscribe すれば、OK です。

public class SampleFragment extends Fragment {

    private CompositeSubscription mSubscription = new CompositeSubscription();

    @Override
    public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
        View view = inflater.inflate(R.layout.fragment, container, false);
        request();
        return view;
    }

    @Override
    public void onDestroy() {
        mSubscription.unsubscribe();
        super.onDestroy();
    }

    private void request() {
        mSubscription.add(Observable
            .create(/* 省略 */)
            .map(/* 省略 */)
            .observeOn(/* 省略 */)
            .subscribeOn(/* 省略 */)
            .subscribe(/* 省略 */)
        );
    }
}

参考