基于nodejs将mongodb的数据实时同步到elasticsearch
一、前言
因公司需要选用elasticsearch做全文检索,持久化存储选用的是mongodb,但是希望mongodb里面的数据发生改变可以实时同步到elasticsearch上,一开始主要使用elasticsearch v1.7.2的版本,mongo-river可以搞定这个问题。随着elasticsearch的升级,发现elasticsearch已经放弃了mongo-river,咋整......Google之后发现一神器mongo-connector,国外大神用python写的工具而且MongoDB官网也极力推荐使用。But,我们需要把文档中的附件信息也同步到elasticsearch上,mongo-connector对附件同步支持的不是很好。能不能有个nodejs版本的数据同步?GitHub上发现一个大神写的node-elasticsearch-sync很好用,但是功能太简单了,不支持复杂的数据筛选,也不支持附件同步。活人不能被尿憋死,参考node-elasticsearch-sync自己写了一个同步工具node-mongodb-es-connector。
二、准备工作
2.1 安装mongodb
安装mongodb可以去官网下载:
PS:关于如何搭建mongodb replica集群:
https://www.cnblogs.com/ljhdo...
2.2 安装elasticsearch
安装elasticsearch可以去官网下载:
https://www.elastic.co/cn/dow...
PS:关于elasticsearch-head、kibana、logstash等相关安装请自己google吧
2.3 安装nodejs
安装nodejs可以去官网下载:
PS:别忘记安装npm,如何安装请自己google吧
以上是使用node-mongodb-es-connector的前提
2.4 node-mongodb-es-connector下载地址
github: https://github.com/zhr8521007...
npm: https://www.npmjs.com/package...
三、文件结构
├── crawlerDataConfig 项目构建配置(这里添加你要同步数据的配置) │ ├── mycarts.json 一个index一个配置文件(唯一需要自己增加或者修改的配置文件,这个文件只是提供了一个例子,不用可以删除) │ └── …… ├── lib │ ├── pool │ │ ├── elasticsearchPool.js elasticsearch连接池 │ │ ├── mongoDBPool.js mongodb连接池 │ ├── promise │ │ ├── elasticsearchPromise.js elasticsearch方法类(增删改查) │ │ ├── mongoPromise.js mongodb方法类(增删改查) │ ├── util │ │ ├── fsWatcher.js 配置文件监控类(主要监控crawlerDataConfig目录里面的配置文件) │ │ ├── logger.js 日志类 │ │ ├── oplogFactory.js mongo-oplog触发事件之后的执行方法(增删改) │ │ ├── tail.js 监听mongodb数据是否发生变化 │ │ ├── util.js 工具类 │ ├── main.js 主方法(主要是第一次启动立刻同步mongodb里面的数据到elasticsearch) ├── logs │ ├── logger-2018-03-23.log 同步数据打印日志 │ └── …… ├── test │ ├── img │ │ ├── elasticsearch.jpg 图片不解释 │ │ ├── mongoDB.jpg 图片不解释 │ │ └── structure.jpg 图片不解释 │ └── test.js 测试类(啥也没写) ├── app.js 启动文件 ├── index.js 接口文件(只提供配置文件的增删改) ├── package-lock.json ├── package.json ├── ReadMe.md 英文文档(markdown) ├── README.zh-CN.md 中文文档(markdown) └── LICENSE
mycarts.json文件(这个文件只是提供了一个例子)
{ "mongodb": { "m_database": "myTest", "m_collectionname": "carts", "m_filterfilds": { "version" : "2.0" }, "m_returnfilds": { "cName": 1, "cPrice": 1, "cImgSrc": 1 }, "m_connection": { "m_servers": [ "localhost:29031", "localhost:29032", "localhost:29033" ], "m_authentication": { "username": "UserAdmin", "password": "pass1234", "authsource":"admin", "replicaset":"my_replica", "ssl":false } }, "m_documentsinbatch": 5000, "m_delaytime": 1000 }, "elasticsearch": { "e_index": "mycarts", "e_type": "carts", "e_connection": { "e_server": "http://localhost:9200", "e_httpauth": { "username": "EsAdmin", "password": "pass1234" } }, "e_pipeline": "mypipeline", "e_iscontainattachment": true } }
- m_database - MongoDB里需要监听的数据库.
- m_collectionname - MongoDB里需要监听的collection.
- m_filterfilds - MongoDB里的查询条件,目前支持一些简单的查询条件.(默认值为null)
- m_returnfilds - MongoDB需要返回的字段.(默认值为null)
m_connection
- m_servers - MongoDB服务器的地址.(replica结构,数组格式)
m_authentication - 如果需要MongoDB的登录验证使用下面配置(默认值为null).
- username - MongoDB连接的用户名. - password - MongoDB连接的密码. - authsource - MongoDB用户认证,默认为admin. - replicaset - MongoDB的replica结构的名字. - ssl- MongoDB的ssl.(默认值为false).
- m_documentsinbatch - 一次性从mongodb往Elasticsearch里传入数据的条数.
(你可以设置比较大的值,默认为1000.).
m_delaytime - 每次进elasticsearch数据的间隔时间(默认值为1000ms).
- e_index - ElasticSearch里的index.
- e_type - ElasticSearch里的type,这里的type主要为了使用bulk.
e_connection
- e_server - ElasticSearch的连接字符串.
e_httpauth - 如果ElasticSearch需要登录验证使用下面配置(默认值为null).
- username - ElasticSearch连接的用户名.
- password - ElasticSearch连接的密码.
- e_pipeline - ElasticSearch 中pipeline的名称.(没有pipeline就填null)
- e_iscontainattachment - pipeline是否包含附件规则(默认值为false).
四、如何使用
用户可以事先在/crawlerDataConfig目录下编辑好自己的配置文件,文件必须以json格式存放.
在文件根目录下,打开cmd命令窗口,输入以下信息:
node app.js
项目启动后,修改配置文件 (如:mycarts.json),数据也会实时同步。或者修改mongodb中的某一条数据,也会实时同步到elasticsearch中。
PS:如何把mongodb的主文档和附件信息都同步到elasticsearch中?
利用elasticsearch的pipeline来实现。
首先需要创建一个pipeline到elasticsearch中:
PUT _ingest/pipeline/mypipeline { "description" : "Extract attachment information from arrays", "processors" : [ { "foreach": { "field": "attachments", "processor": { "attachment": { "target_field": "_ingest._value.attachment", "field": "_ingest._value.data" } } } } ] }
然后在配置文件中(如:mycarts.json)修改节点数据即可
"e_pipeline": "mypipeline"
五、结果展示
mongodb里面的数据
elasticsearch里面的数据
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。