Rxjs 响应式编程-第二章:序列的深入研究

Rxjs 响应式编程-第一章:响应式
Rxjs 响应式编程-第二章:序列的深入研究
Rxjs 响应式编程-第三章: 构建并发程序
Rxjs 响应式编程-第四章 构建完整的Web应用程序
Rxjs 响应式编程-第五章 使用Schedulers管理时间
Rxjs 响应式编程-第六章 使用Cycle.js的响应式Web应用程序

序列的深入研究

童年的回忆中的益智视频游戏,你必须使用各种技巧在屏幕上引导下降的水流。您可以拆分流,稍后将它们合并,或者使用倾斜的木板来改变它们的方向。你必须要有创造力才能使水达到最终目标。

我发现该游戏与使用Observable序列有很多相似之处。 Observable只是我们可以转换,组合和查询的事件流。 无论我们是在处理简单的Ajax回调还是在Node.js中处理字节数据都没关系。 我们发现流的方式是一样的。 一旦我们在流中思考,我们程序的复杂性就会降低。

在本章中,我们将重点介绍如何在程序中有效地使用序列。 到目前为止,我们已经介绍了如何创建Observable并使用它们进行简单的操作。为了释放它们的力量,我们必须知道将我们的程序输入和输出转换为带有我们程序流程的序列。

在我们弄清楚之前,我们将会遇到一些可以帮助我们开始操作序列的基本operator。接下来,我们将实现一个真实的应用程序,显示(几乎)实时发生的地震。 开始吧!

可视化的Observables

您将要学习我们在RxJS程序中最常使用的一些运算符。 谈论对序列的操作可能感觉很抽象。 为了帮助开发人员以简单的方式理解Operator,我们将使用标准的可视化表示序列,称为大理石图。 它们直观地表示异步数据流,您可以在RxJS的每个资源中找到它们。

让我们使用范围运算符,它返回一个Observable,它得到指定范围内的整数:Rx.Observable.range(1,3);

它的大理石图看起来像这样:

Rxjs 响应式编程-第二章:序列的深入研究

长箭头表示Observable,x轴表示时间。每个圆圈表示Observable通过内部调用onNext()传出的值。生成第三个值后,range调用了onCompleted,在图中用垂直线表示。

让我们看一个涉及几个Observable的例子。合并运算符采用两个不同的Observable并返回一个具有合并值的新Observable。 interval运算符返回一个Observable,它在给定的时间间隔内产生增量数,以毫秒为单位。

在下面的代码中,我们将合并两个不同的Observable,它们使用interval来以不同的间隔生成值:

var a = Rx.Observable.interval(200).map(function(i) { 
    return 'A' + i;
});
var b = Rx.Observable.interval(100).map(function(i) {
    return 'B' + i; 
    
});
Rx.Observable.merge(a, b).subscribe(function(x) {
    console.log(x);
});
B0, A0, B1, B2, A1, B3, B4...

合并运算符的大理石图如下所示:

Rxjs 响应式编程-第二章:序列的深入研究

这里,沿y轴的虚线箭头指向应用于序列A和B中每个元素的变换的最终结果。得到的Observable由C表示,其中包含A和B的合并元素。如果不同Observables同时传出元素,合并序列中这些元素的顺序是随机的。

基本序列运算符

在RxJS中转换Observables的数十个运算符中,最常用的是具有良好收集处理能力的其他语言也具有:map,filter和reduce。在JavaScript中,您可以在Array中找到这些operator。

RxJS遵循JavaScript约定,因此您会发现以下运算符的语法与数组运算符的语法几乎相同。实际上,我们将使用数组和Observables同时实现,以显示两个API的相似程度。

Map

map是最常用的序列转换运算符。它接受一个Observable和一个函数,并将该函数应用于源Observable中的每个值。 它返回一个带有转换值的新Observable。

Rxjs 响应式编程-第二章:序列的深入研究

JS Arrays

var src = [1, 2, 3, 4, 5];
var upper = src.map(function(name) {
    return name * 2; 
});
upper.forEach(logValue);

Observables

var src = Rx.Observable.range(1, 5); 
var upper = src.map(function(name) {
    return name * 2; 
});
upper.subscribe(logValue);

在这两种情况下,src都不会发生改变。

这段代码和后面的代码使用的logValue函数:

var logValue = function(val) { 
    console.log(val) 
};

有些情况下,我们传递给map的函数会进行一些异步计算来转换值。在这种情况下,map将无法按预期工作。 对于这些情况,最好使用flatMap,后续会介绍到。

Filter

filter接受一个Observable和一个函数,并使用该函数检测Observable中的每个元素。它返回一个Observable序列,其中包含函数返回true的所有元素。

Rxjs 响应式编程-第二章:序列的深入研究

JS Arrays

var isEven = (function(val) { return val % 2 !== 0; });
var src = [1, 2, 3, 4, 5];
var even = src.filter(isEven);
even.forEach(logValue);

Observables

var isEven = (function(val) { return val % 2 !== 0; });
var src = Rx.Observable.range(1, 5); 
var even = src.filter(isEven);
even.subscribe(logValue);

Reduce

reduce(也称为fold)接受一个Observable并返回一个始终包含单个项的新项,这是在每个元素上应用函数的结果。 该函数接收当前元素和函数先前调用的结果。

Rxjs 响应式编程-第二章:序列的深入研究

JS Arrays

var src = [1, 2, 3, 4, 5];
var sum = src.reduce(function(a, b) {
    return a + b;
});
console.log(sum);

Observables

var src = Rx.Observable.range(1, 5);
var sum = src.reduce(function(acc, x) {
    return acc + x;
});
sum.subscribe(logValue);

reduce是操作序列的强大操作符。事实上,它是称为聚合运算符的基本实现。

聚合运算符

聚合运算符处理序列并返回单个值。例如, Rx.Observable.first接受一个Observable和一个可选函数,并返回满足函数条件布尔值的第一个元素。

计算序列的平均值也是一个聚合操作.RxJS提供了实例运算符的平均值,但是为了本节的目的,我们想看看如何使用reduce实现它。每个聚合运算符都可以通过仅使用reduce来实现:

sequences/marble.js

var avg = Rx.Observable.range(0, 5)
    .reduce(function(prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe(function(x) {
    console.log('Average is: ', x);
});
Average is: 2

在此代码中,我们使用reduce将每个新值添加到前一个值。因为reduce不能为我们提供序列中元素的总数,所以我们需要对它们进行计数。我们使用包含两个字段sum和count的对象组成的初始值调用reduce,其中我们将存储到目前为止的元素总数和总数。每个新元素都将返回具有更新值的同一对象。

当序列结束时,reduce可以通过调用onNex返回t包含最终总和和最终计数的对象。但在这里我们使用map来返回将总和除以计数的结果。

我们可以聚合无限Observables吗?

想象一下,我们正在编写一个程序,让用户在行走时获得平均速度。即使用户尚未完成行走,我们也需要能够使用我们目前所知的速度值进行计算。我们想要实时记录无限序列的平均值。 问题是如果序列永远不会结束,像reduce这样的聚合运算符将永远不会调用其Observers的onNext运算符。

对我们来说幸运的是,RxJS团队已经考虑过这种情况,并为我们提供了scan操作符,其作用类似于reduce但是会发出每个中间结果:

var avg = Rx.Observable.interval(1000)
    .scan(function (prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe( function (x) {
    console.log(x);
});

这样,我们可以聚合需要很长时间才能完成或无限的序列。在前面的示例中,我们每秒生成一个增量整数,并调用scan替换先前的reduce。我们现在每秒得到生成值的平均值。

flatMap

如果你的Observable的结果是还是Observables,你要怎么处理?大多数情况下,您希望在单个序列中统一这些嵌套Observable中的项目。 这正是flatMap的作用。

flatMap运算符接收参数Observable A,其元素也是Observables,并返回一个子元素也是Observable的Observable。让我们用图表可视化它:

Rxjs 响应式编程-第二章:序列的深入研究

我们可以看到A(A1,A2,A3)中的每个元素也是可观察序列。 一旦我们使用变换函数将flatMap应用于A,我们得到一个Observable,其中包含A的不同子元素中的所有元素。

flatMap是一个功能强大的运算符,但它比我们迄今为止看到的运算符更难理解。可以把它想象成Observables的concatAll()

concatAll是一个函数,它接受一个数组数组并返回一个“flattened”单个数组,其中包含所有子数组的值,而不是子数组本身。 我们可以使用reduce来实现这样的功能:

function concatAll(source) {
    return source.reduce(function(a, b) {
        return a.concat(b); 
    });
}

我们会像这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap做同样的事情,但它使Observables而不是数组变扁平。它需要一个源Observable和一个返回一个新的Observable的函数,并将该函数应用于源Observable中的每个元素,就像map一样。如果程序在这里停止,我们最终会得到一个会发出Observables的Observable。 但是flatMap向主序列发出每个新Observable发出的值,将所有Observable“扁平化”为一个主序列。 最后,我们获得了一个Observable。

取消序列

在RxJS中,我们可以取消正在运行的Observable。 这是一种优于其他异步通信形式的优势,例如回调和Promise,一旦被调用就无法直接取消(尽管某些Promise实现支持取消)。

我们可以通过两种主要方式取消Observable:隐式和显式。

显式取消:Disposable

Observables本身没有取消的方法。相反,当我们订阅Observable时,我们会得到一个代表该特定订阅的Disposable对象。然后我们可以在该对象中调用方法dispose,并且该订阅将停止从Observable接收通知。

在下面的示例中,我们将两个Observers订阅到计数器Observable,它每秒发出一个递增的整数。 两秒后,我们取消第二个订阅,我们可以看到它的输出停止但第一个订阅者的输出继续:

sequences/disposable.js

var counter = Rx.Observable.interval(1000);

var subscription1 = counter.subscribe(function(i) {
    console.log('Subscription 1:', i);
});

var subscription2 = counter.subscribe(function(i) {
    console.log('Subscription 2:', i);
});

setTimeout(function() { 
    console.log('Canceling subscription2!');
    subscription2.dispose();
}, 2000);
Subscription 1: 0 
Subscription 2: 0 
Subscription 1: 1 
Subscription 2: 1 
Canceling subscription2! 
Subscription 1: 2 
Subscription 1: 3 
Subscription 1: 4
...

隐式取消:通过Operater

大多数时候,Operater会自动取消订阅。当序列结束或满足操作条件时,rangetake等操作符将取消订阅。更高级的操作符,如withLatestFromflatMapLatest,将根据需要在内部创建和销毁订阅,因为它们处理的是运行中的几个可观察的内容。简而言之,大部分订阅的取消都不应该是你该担心的。

被封装之后的Observables

当您使用包含不提供取消的外部API的Observable时,Observable仍会在取消时停止发出通知,但基础API不一定会被取消。例如,如果您正在使用封装Promise的Observable,则Observable将在取消时停止发出,但不会取消基础Promise。

在下面的代码中,我们尝试取消对包含promise p的Observable的订阅,同时我们以传统的方式设置一个动作来解决promise。 promise应在五秒内resolve,但我们在创建后立即取消订阅:

var p = new Promise(function(resolve, reject) {
    window.setTimeout(resolve, 5000);
});

p.then(function() {
    console.log('Potential side effect!');
});

var subscription = Rx.Observable.fromPromise(p).subscribe(function(msg) {
    console.log('Observable resolved!');
});

subscription.dispose();

5秒后,我们看到:

Potential side effect!

如果我们取消对Observable的订阅,它会有效地阻止它接收通知。 但是promise的then方法仍在运行,表明取消Observable并不会取消关联的Promsie。

了解我们在Observable中使用的外部API的详细信息非常重要。您可能认为已取消序列,但底层API会继续运行并在程序中引起一些副作用。 这些错误真的很难捕捉到。

错误处理

我们不能在回调中使用传统的try / catch机制,因为它是同步的。 它将在任何异步代码之前运行,并且无法捕获任何错误。

在回调函数中,可以通过将错误(如果有)作为参数传递到回调函数。这是有用的,但它使代码非常脆弱。

让我们看看如何捕获Observables中的错误。

onError处理程序

还记得我们在上面上讨论了第一次与观察者联系的观察者可以调用的三种方法吗? 我们熟悉onNextonCompleted,但是我们还没有使用onError; 它是有效处理Observable序列中错误的关键。

为了了解它是如何工作的,我们将编写一个简单的函数来获取JSON字符串数组,并使用JSON.parse返回一个Observable,它发出从这些字符串解析的对象:

为了了解它是如何工作的,我们将编写一个简单的函数来获取JSON字符串组成的数组,并使用JSON.parse返回一个Observable,它发出从这些字符串解析的对象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

我们将带有三个JSON字符串的数组传递给getJSON,其中数组中的第二个字符串包含语法错误,因此JSON.parse将无法解析它。 然后我们将订阅结果,为onNext和onError提供处理程序:

getJSON([
    '{"1": 1, "2": 2}',
    '{"success: true}', // Invalid JSON string
    '{"enabled": true}'
]).subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    function(err) {
        console.log(err.message);
    }
)
Parsed JSON: { 1: 1, 2: 2 }
JSON.parse: unterminated string at line 1 column 8 of the JSON data

Observable为第一个结果发出解析的JSON,但在尝试解析第二个结果时抛出异常。 onError处理程序捕获并打印出来。默认行为是,每当发生错误时,Observable都会停止发出项目,并且不会调用onCompleted。

错误捕获

到目前为止,我们已经看到如何检测错误已经发生并对该信息做了些什么,但是我们无法对它做出响应并继续我们正在做的事情。Observable察实例具有catch运算符,它允许我们对Observable中的错误做出反应并继续使用另一个Observable。

catch接受一个Observable或一个接收错误的函数作为参数并返回另一个Observable。 在我们的场景中,如果原始Observable中存在错误,我们希望Observable发出包含error属性的JSON对象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

var caught = getJSON(['{"1": 1, "2": 2}', '{"1: 1}']).catch(
    Rx.Observable.return({
        error: 'There was an error parsing JSON'
    })
);

caught.subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    // Because we catch errors now, `onError` will not be executed
    function(e) {
        console.log('ERROR', e.message);
    }
);

在前面的代码中,我们创建了一个新的Observable,它使用catch运算符来捕获原始Observable中的错误。 如果出现错误,它将使用仅发出一个项目的Observable继续序列,并使用描述错误的error属性。 这是输出:

Parsed JSON: Object { 1: 1, 2: 2 }
Parsed JSON: Object { error: "There was an error parsing JSON" }

这是catch操作符的大理石图:

Rxjs 响应式编程-第二章:序列的深入研究

注意X表示序列出错。 在这种情况下,Observable值 - 三角形的不同形状意味着它们是来自另一个Observable的值。在这里,这是我们在发生错误时返回的Observable。

catch对于对序列中的错误作出反应非常有用,它的行为与传统的try / catch块非常相似。 但是,在某些情况下,忽略Observable中的项目发生的错误并让序列继续,这将是非常方便的。 在这些情况下,我们可以使用重试运算符。

序列重试

有时错误就会发生,我们无能为力。例如,可能存在请求远程数据的超时,因为用户具有不稳定的Internet连接,或者我们查询的远程服务器可能崩溃。在这些情况下,如果我们能够继续请求我们需要的数据直到成功,那将是很好的。 重试操作符的确如此:

sequences/error_handling.js

// This will try to retrieve the remote URL up to 5 times.
Rx.DOM.get('/products').retry(5)
.subscribe(
    function(xhr) { console.log(xhr); },
    function(err) { console.error('ERROR: ', err); }
);

在前面的代码中,我们创建了一个函数,该函数返回一个Observable,它使用XMLHttpRequest从URL检索内容。 因为我们的连接可能有点不稳定,所以我们在订阅它之前添加retry(5),确保在出现错误的情况下,它会在放弃并显示错误之前尝试最多五次。

使用重试时需要了解两件重要事项。首先,如果我们不传递任何参数,它将无限期地重试,直到序列完成没有错误。 如果Observable产生错误,这对性能是危险的。 如果我们使用同步Observable,它将具有与无限循环相同的效果。

其次,重试将始终重新尝试整个Observable序列,即使某些项目没有错误。如果您在处理项目时造成任何副作用,这一点很重要,因为每次重试都会重新应用它们。

制作实时地震可视化器

使用我们在本章中到目前为止所涵盖的概念,我们将构建一个使用RxJS的Web应用程序,以向我们展示实时发生地震的位置。我们首先要建立一个功能性的反应性实施方案,我们将随着时间的推移对其进行改进。 最终结果如下:

Rxjs 响应式编程-第二章:序列的深入研究

准备环境

我们将使用USGS(美国地质调查局)地震数据库,该数据库提供多种格式的实时地震数据集。 我们将以JSONP格式从每周数据集中获取数据。

我们还将使用Leaflet(一个JavaScript库)来渲染交互式地。让我们看看我们的index.html看起来如何,并重点介绍:

examples_earthquake/index.html

<!DOCTYPE html>
<html lang="en-us">
<head>
    <meta charset="utf-8">
    <link rel="stylesheet"
    href="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.css" />
    <script src="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.js"></script>
    <script src="../rx.all-4.0.0.js"></script>
    <title>Earthquake map</title>
    <style type="text/css">
        html, body {
        margin: 0;
        padding: 0;
        height: 100%;
        }
        #map { height: 100%; }
    </style>
</head>
<body>
<div id="map"></div>
    <script>
        var QUAKE_URL = 'http://earthquake.usgs.gov/earthquakes/feed/v1.0/' +
        'summary/all_day.geojsonp';
        function loadJSONP(url) {
            var script = document.createElement('script');
            script.src = url;
            var head = document.getElementsByTagName('head')[0];
            head.appendChild(script);
        }
        var map = L.map('map').setView([33.858631, -118.279602], 7);
        L.tileLayer('http://{s}.tile.osm.org/{z}/{x}/{y}.png').addTo(map);
    </script>
    <script src="code.js"></script>
</body>
</html>

检索地震位置

现在我们的HTML已准备就绪,我们可以为我们的应用程序编写逻辑。首先,我们需要知道我们获得了什么样的数据以及在地图上代表地震所需什么样的数据。

USGS网站给我们的JSONP数据看起来像这样:

examples_earthquake/jsonp_example.txt

eqfeed_callback({
    "type": "FeatureCollection",
    "metadata": {
        "generated": 1408030886000,
        "url": "http://earthquake.usgs.gov/earthquakes/...",
        "title": "USGS All Earthquakes, Past Day",
        "status": 200, "api": "1.0.13", "count": 134
    },
    "features": [
        {
            "type": "Feature",
            "properties": {
                "mag": 0.82,
                "title": "M 0.8 - 3km WSW of Idyllwild-Pine Cove, California",
                "place": "3km WSW of Idyllwild-Pine Cove, California",
                "time": 1408030368460,
                ...
            },
            "geometry": {
                "type": "Point",
                "coordinates": [ -116.7636667, 33.7303333, 17.33 ]
            },
            "id": "ci15538377"
        },
        ...
    ]
})

features数组包含一个对象,其中包含今天发生的每次地震的数据。 那是一大堆数据! 一天之内发生了多少次地震是令人惊讶的(并且可怕)。对于我们的程序,我们只需要每次地震的坐标,标题和大小。

我们首先要创建一个Observable来检索数据集并发出单个地震。 这是第一个版本:

examples_earthquake/code.js

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        var quakes = response.features;
        quakes.forEach(function(quake) {
            observer.onNext(quake);
        });
    };
    loadJSONP(QUAKE_URL);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

等等,那个明显的全局函数window.eqfeed_callback在我们的代码中做了什么? 好吧,事实证明,JSONP URL通常在URL中添加查询字符串,以指定处理响应的函数名称,但USGS站点不允许这样做,因此我们需要创建一个全局函数 他们决定我们必须使用的名称,即eqfeed_callback

我们的Observable按顺序发出所有地震。我们现在有地震数据生成器!我们不必关心异步流程或者必须将所有逻辑放在同一个函数中。只要我们订阅Observable,就会得到地震数据。

通过在地震观测中将地震检索“黑箱”,我们现在可以订阅并处理每次地震。 然后我们将为每个地震绘制一个圆,其大小与其大小成比例。

深入一些

我们可以做得更好吗?你打赌!在前面的代码中,我们仍然通过遍历数组并调用onNext来管理每个地震,即使我们在Observable中将其隔离。

这是可以使用flatMap的完美情况。我们将使用Rx.Observable.from检索数据并从features数组中生成一个Observable。 然后我们将Observable合并回主Observable中:

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        observer.onNext(response);
        observer.onCompleted();
    };
    loadJSONP(QUAKE_URL);
}).flatMap(function transform(dataset) {
    return Rx.Observable.from(dataset.response.features);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

我们不再手动管理流程了。 没有循环或条件来提取单个地震对象并将其传递出去。 这是就是发生了什么:

  1. onNext只发生一次,它产生整个JSON字符串。
  2. 由于我们只会产生一次,因此我们在onNext之后发出完成信号。
  3. 我们将flatMap调用链接到create的结果,因此flatMap将从Observable中获取每个结果(在这种情况下只有一个),将它用作transform函数的参数,并将该函数产生的Observable合并到源Observable。
  4. 这里我们采用包含所有地震的features数组,并从中创建一个Observable。由于flatMap,这将成为quakes变量将包含的实际Observable。

5.订阅不会改变; 它像以前一样继续处理地震的数据流。

始终有一种方法

到目前为止,我们已经使用了rx.all.js中包含的RxJS运算符,但通常还是需要借鉴其他基于RxJS的库附带的运算符。在我们的例子中,我们将看看RxJS-DOM。RxJS-DOM是一个外部库,其中包含一个处理JSONP请求的运算符:jsonpRequest。这为我们节省了一些代码,因为我们不需要使用讨厌的全局函数:

examples_earthquake/code1_2.js

var quakes = Rx.DOM.jsonpRequest({
    url: QUAKE_URL,
    jsonpCallback: 'eqfeed_callback'
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.map(function(quake) {
    return {
        lat: quake.geometry.coordinates[1],
        lng: quake.geometry.coordinates[0],
        size: quake.properties.mag * 10000
    };
});

quakes.subscribe(function(quake) {
    L.circle([quake.lat, quake.lng], quake.size).addTo(map);
});

请记住,要运行此代码,您需要在HTML中包含RxJS-DOM中的文件rx.dom.js。请注意我们如何添加一个map运算符,将地震对象转换为仅包含我们可视化所需信息的简单对象:纬度,经度和地震震级。 我们在subscribeoperator中写的功能越少越好。

实时标记

我们地震应用的版本不会实时更新地震图。为了实现这一点,我们将使用我们在本章前面看到的interval运算符 - 以及有用的distinct运算符。下面的代码,然后我们将完成更改:

examples_earthquake/code1_3.js

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    }).retry(3);
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; });

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

在前面的代码中,我们使用interval来发出新请求并以5秒的固定间隔处理它们。 interval创建一个Observable,每隔五秒发出一个递增的数字。我们对这些数字没有做任何事情; 相反,我们使用flatMap来检索jsonpRequest的数据。另请注意我们如何在首先检索列表时出现问题时再次尝试重试。

我们应用的最后一个运算符是distinct,它只发出之前未发出的元素。 它需要一个函数来返回属性以检查是否相等。 这样我们就不会重绘已经绘制过的地震。

在不到20行中,我们编写了一个应用程序,定期轮询外部JSONP URL,从其内容中提取具体数据,然后过滤掉已导入的地震。在那之后,我们在地图上表示地震,其大小与其大小成比例-所有这些都以独立,清晰和简洁的方式编写,而不依赖于外部状态。这表明了Observables的表现力。

改进的想法

这里有一些想法可以使用你新获得的RxJS技能,并使这个小应用程序更有趣:

  • 当用户将鼠标悬停在地震上时,提供一个弹出窗口,显示有关该特定地震的更多信息。 一种方法是从只有你想要显示的属性的地震中创建一个新的Observable,并在悬停时动态过滤它。
  • 在页面顶部放置一个计数器,显示当前到目前为止的地震次数,并每天重置

Operator详解

本章向您介绍了一些新的运算符,所以这里是对它们的回顾,以及我们在应用程序中使用它们的方法。 请记住,您始终可以在RxJS GitHub站点上找到Operator的完整API文档。

  • Rx.Observable.from

默认行为:同步

由于您在应用程序中使用的许多数据源都来自数组或迭代器,因此有一个运算符可以从中创建Observable。 from是您最常使用的Operator之一。

使用from,我们可以从数组,类似数组的对象(例如,arguments对象或DOM NodeLists)创建Observable,甚至可以实现可迭代协议的类型,例如StringmapSet

  • Rx.Observable.range

默认行为:同步

range运算符生成有限的Observable,它发出特定范围内的整数。它功能多样,可用于许多场景。 例如,您可以使用范围在像扫雷一样的游戏板上生成初始方块。

  • Rx.Observable.interval

默认行为:异步

每次需要生成时间间隔的值时,您可能会以interval运算符作为生成器开始。由于interval每x毫秒发出一次顺序整数(其中x是我们传递的参数),我们只需要将值转换为我们想要的任何值。 我们在第3章“构建并发程序”中的游戏很大程度上基于该技术。

  • Rx.Observable.distinct

默认行为:与filter的Observable相同

distinct是这些非常简单的Operator之一,可以节省大量的开发工作。它会过滤掉已经发出的任何值。 这使我们避免编写容易出错的样板代码,我们将对比传入的结果决定返回值。就是返回不同值。

Rxjs 响应式编程-第二章:序列的深入研究

distinct允许我们使用指定比较方法的函数。另外,我们可以不传递任何参数,它将使用严格的比较来比较数字或字符串等基本类型,并在更复杂的对象的情况下运行深度比较。

总结

在本章中,我们介绍了如何使用大理石图表直观地表示和理解Observable流程。我们已经介绍了最常见的运算符来转换Observables,更重要的是,我们只使用Observable序列构建了一个真实的世界应用程序,避免设置任何外部状态,循环或条件分支。我们以声明的方式表达了我们的整个程序,而不必编码完成手头任务的每一步。

在下一章中,我们将继续探索Observable序列,这次我们将介绍更高级的运算符,它们允许您控制程序中的流和数据,用之前无法想象的代码!

关注我的微信公众号,更多优质文章定时推送
Rxjs 响应式编程-第二章:序列的深入研究

相关推荐