【Android】RxJava之初始篇

【Android】RxJava之初始篇

关于RxJava

RxJava是ReactiveX推出在Java VM环境下使用的异步操作库。除了在Java环境ReactiveX也为其他编程语言推出Rx库,例如Py、Js、Go等。网上有很多关于对RxJava的介绍和使用,在Android开发中也有很多项目使用RxJava。那为什么还要使用RxJava呢,Android开发也有提供异步操作的方法供开发者使用,我想应该是RxJava比起Handle、AsyncTask简洁优雅。

  • 1 RxJava采用链式调用,在程序逻辑上清晰简洁
  • 2 采用扩展式观察者设计模式

关于观察者模式以及其他RxJava的介绍这个就不做重复,下面内容主要围绕RxJava和RxAndroid使用。对于RxJava官方文档已经有详细介绍,本节是以学习讨论为主,如存在错误的地方希望大家可以指出。

被观察者Observable

使用RxJava需要创建Observable,Observable用于发射数据。如下Observable的create方法需要传入一个OnSubscribe,其继承于Action1<Subscriber<? super T>>,Action中的Subscriber就是订阅者。

public static <T> Observable<T> create(OnSubscribe<T> f) { 


    return new Observable<T>(RxJavaHooks.onCreate(f)); 



}  

另外create方法中需要实现接口call,返回subscriber对象。call方法实现在observable订阅后要执行的事件流。subscriber.onNext发射data,subscriber.onCompleted可以表示发射事件结束。接着调用observable的subscribe方法实现被订阅后执行的事件流。

Observable<String> observable = Observable 


                .create(new Observable.OnSubscribe<String>() { 


 


            @Override 


            public void call(Subscriber<? super String> subscriber) { 


                subscriber.onNext("1"); 


                subscriber.onNext("2"); 


                subscriber.onNext("3"); 


                subscriber.onNext("4"); 


                subscriber.onNext("5"); 


            } 


}); 


Subscriber<String> subscriber = new  Subscriber<String>() { 


            @Override 


            public void onCompleted() { 


 


            } 


 


            @Override 


            public void onError(Throwable e) { 


 


            } 


 


            @Override 


            public void onNext(String s) { 


                System.out.print(s + '\n'); 


            } 


}; 


observable.subscribe(subscriber); 


//输出结果 print: 


//1 


//2 


//3 


//4 



//5   

Observable除了使用create方法创建外还可以使用from或者just快速设置发射的事件流,简化了create的步骤。

【Android】RxJava之初始篇 

Observable<String> o = Observable.from("a", "b", "c");  

【Android】RxJava之初始篇 

Observable<String> o = Observable.just("one object"); 

说好的异步操作

RxJava的线程由Schedulers调度者控制,通过它来控制具体操作在什么线程中进行。

  • Schedulers.immediate() 在当前线程中执行
  • Schedulers.newThread() 为每一个任务开辟线程执行
  • Schedulers.computation() 计算任务运行的线程
  • Schedulers.io() IO任务运行的线程....
  • AndroidSchedulers.mainThread() Android 主线程运行

对于线程的控制主要由subscribeOn()和observeOn()两个方法控制:

  • subscribeOn 控制Observable.OnSubscribe所处的线程,等同于Observable create、just、from时所处的线程。
  • observeOn 控制Subscriber的线程,也可以说是控制事件被执行时所在的线程。
Observable 


        .just(1,2,3) 


        .subscribeOn(Schedulers.io()) 


        .observeOn(AndroidSchedulers.mainThread()) 


        .subscribe(new Subscriber<Integer>() { 


            @Override 


            public void onCompleted() { 


 


            } 


 


            @Override 


            public void onError(Throwable e) { 


 


            } 


 


            @Override 


            public void onNext(Integer integer) { 


                 System.out.print(integer + '\n');                        


            } 


}); 


//输出结果 print: 


//1 


//2 



//3  

写下上面的RxJava链式调用的代码,有没有觉得比以前使用的异步调用清爽许多,对处女座还说这很治愈!

操作符Operators

ReactiveX提供超级多的操作符,每个操作符都具有不同的功能,但目的都是在Observable和Subscribe之间变换和修改发射出去的事件流。这节介绍几个比较常见简单的操作符,之后有机会再写一节操作符篇详细说说每个操作符的作用。附上官方操作符文档看看就知道有多少多了。

  • Map() 
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { 


        return create(new OnSubscribeMap<T, R>(this, func)); 



    }   

【Android】RxJava之初始篇

首先先介绍一个操作符map,map实现Func1接口将T类型数据变换为R类型数据,返回R类型数据。例如传入Integer类型的事件队列,经过map加工之后以String类型返回。

Observable 


                .just(1,2,3) 


                .subscribeOn(Schedulers.io()) 


                .observeOn(AndroidSchedulers.mainThread()) 


                .map(new Func1<Integer, String>() { 


                    @Override 


                    public String call(Integer integer) { 


                        return integer + ""; 


                    } 


                }) 


                .subscribe(new Subscriber<String>() { 


                    ...... 


                    @Override 


                    public void onNext(String str) { 


                        System.out.print(str + '\n'); 


                    } 


                }); 


//输出结果 print: 


//1 


//2 



//3  
  • Filter() 
public final Observable<T> filter(Func1<? super T, Boolean> predicate) { 


       return create(new OnSubscribeFilter<T>(this, predicate)); 



   }   

【Android】RxJava之初始篇

filter和map一样实现Func1接口不过它变换之后的类型为boolean,对发射的事件流进行筛选,当变换后的boolean值为true,订阅者才能收到通过筛选的事件,反之该事件不被消费。例如事件流筛选要求当int值可被2整除才能继续传递,所以最后订阅者可消费的事件为2,4,6,8,10。

Observable 


                .just(1,2,3,4,5,6,7,8,9,10) 


                .subscribeOn(Schedulers.io()) 


                .observeOn(AndroidSchedulers.mainThread()) 


                .filter(new Func1<Integer, Boolean>() { 


                    @Override 


                    public Boolean call(Integer integer) { 


                        return integer % 2 == 0; 


                    } 


                }) 


                .map(new Func1<Integer, String>() { 


                    @Override 


                    public String call(Integer integer) { 


                        return integer + ""; 


                    } 


                }) 


                .subscribe(new Subscriber<String>() { 


                   ...... 


                    @Override 


                    public void onNext(String str) { 


                        System.out.print(str + '\n'); 


                        Log.i("subscribe", str); 


                    } 


                }); 


//输出结果 print: 


//2 


//3 


//4 


//6 


//8 



//10  
  • Skip()
public final Observable<T> skip(int count) { 


        return lift(new OperatorSkip<T>(count)); 


    } 

【Android】RxJava之初始篇

skip操作符表示跳过前几个事件从某一个事件开始发射事件,下标从0开始。

 Observable 


                .just(1,2,3,4,5,6,7,8,9,10) 


                .subscribeOn(Schedulers.io()) 


                .observeOn(AndroidSchedulers.mainThread()) 


                .skip(3) 


                .map(new Func1<Integer, String>() { 


                    @Override 


                    public String call(Integer integer) { 


                        return integer + ""; 


                    } 


                }) 


                .subscribe(new Subscriber<String>() { 


                    ...... 


                    @Override 


                    public void onNext(String s) { 


                        System.out.print(s + '\n'); 


                        Log.i("subscribe", s); 


                    } 


                }); 


//输出结果 print: 


//4 


//5 


//6 


//7 


//8 


//9 



//10  
  • Range()
public static Observable<Integer> range(int start, int count) { 


        if (count < 0) { 


            throw new IllegalArgumentException("Count can not be negative"); 


        } 


        if (count == 0) { 


            return Observable.empty(); 


        } 


        if (start > Integer.MAX_VALUE - count + 1) { 


            throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE"); 


        } 


        if(count == 1) { 


            return Observable.just(start); 


        } 


        return Observable.create(new OnSubscribeRange(start, start + (count - 1))); 



    }   
【Android】RxJava之初始篇 

range操作符可以理解为just,from传递一个连续的int类型待发射数组,n为起始int值,m为Count。例如n = 1,m = 5 int数组就是{1,2,3,4,5}

相关推荐