Ignite集成Spark之IgniteDataFrames
本系列共两篇文章,主要探讨如何将Ignite和Spark进行集成。
下面简要地回顾一下在第一篇文章中所谈到的内容。
Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。
Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。
因此,这两种技术是可以互补的。
将Ignite与Spark整合
整合这两种技术会为Spark用户带来若干明显的好处:
- 通过避免大量的数据移动,获得真正可扩展的内存级性能;
- 提高RDD、DataFrame和SQL的性能;
- 在Spark作业之间更方便地共享状态和数据。
下图中显示了如何整合这两种技术,并且标注了显著的优势:
在第一篇文章中,主要聚焦于IgniteRDD,而本文会聚焦于IgniteDataFrames。
IgniteDataframes
Spark的DataFrame API为描述数据引入了模式的概念,Spark通过表格的形式进行模式的管理和数据的组织。
DataFrame是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame等同于关系数据库中的表,并允许Spark使用Catalyst查询优化器来生成高效的查询执行计划。而RDD只是跨集群节点分区化的元素集合。
Ignite扩展了DataFrames,简化了开发,改进了将Ignite作为Spark的内存存储时的数据访问时间,好处包括:
- 通过Ignite读写DataFrames时,可以在Spark作业之间共享数据和状态;
- 通过优化Spark的查询执行计划加快SparkSQL查询,这些主要是通过IgniteSQL引擎的高级索引以及避免了Ignite和Spark之间的网络数据移动实现的。
IgniteDataframes示例
下面通过一些代码以及搭建几个小程序的方式,了解Ignite DataFrames如何使用,如果想实际运行这些代码,可以从GitHub上下载。
一共会写两个Java的小应用,然后在IDE中运行,还会在这些Java应用中执行一些SQL。
一个Java应用会从JSON文件中读取一些数据,然后创建一个存储于Ignite的DataFrame,这个JSON文件Ignite的发行版中已经提供,另一个Java应用会从Ignite的DataFrame中读取数据然后使用SQL进行查询。
下面是写应用的代码:
public class DFWriter { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession .builder() .appName("DFWriter") .master("local") .config("spark.executor.instances", "2") .getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); Dataset<Row> peopleDF = spark.read().json( resolveIgnitePath("resources/people.json").getAbsolutePath()); System.out.println("JSON file contents:"); peopleDF.show(); System.out.println("Writing DataFrame to Ignite."); peopleDF.write() .format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) .option(IgniteDataFrameSettings.OPTION_TABLE(), "people") .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id") .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated") .save(); System.out.println("Done!"); Ignition.stop(false); } }
在DFWriter中,首先创建了SparkSession,它包含了应用名,之后会使用spark.read().json()读取JSON文件并且输出文件内容,下一步是将数据写入Ignite存储。下面是DFReader的代码:
public class DFReader { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession .builder() .appName("DFReader") .master("local") .config("spark.executor.instances", "2") .getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); System.out.println("Reading data from Ignite table."); Dataset<Row> peopleDF = spark.read() .format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) .option(IgniteDataFrameSettings.OPTION_TABLE(), "people") .load(); peopleDF.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6"); sqlDF.show(); System.out.println("Done!"); Ignition.stop(false); } }
在DFReader中,初始化和配置与DFWriter相同,这个应用会执行一些过滤,需求是查找所有的id > 0 以及 < 6的人,然后输出结果。
在IDE中,通过下面的代码可以启动一个Ignite节点:
public class ExampleNodeStartup { public static void main(String[] args) throws IgniteException { Ignition.start("config/example-ignite.xml"); } }
到此,就可以对代码进行测试了。
运行应用
首先在IDE中启动一个Ignite节点,然后运行DFWriter应用,输出如下:
JSON file contents: +-------------------+---+------------------+ | department| id| name| +-------------------+---+------------------+ |Executive Committee| 1| Ivan Ivanov| |Executive Committee| 2| Petr Petrov| | Production| 3| John Doe| | Production| 4| Ann Smith| | Accounting| 5| Sergey Smirnov| | Accounting| 6|Alexandra Sergeeva| | IT| 7| Adam West| | Head Office| 8| Beverley Chase| | Head Office| 9| Igor Rozhkov| | IT| 10|Anastasia Borisova| +-------------------+---+------------------+ Writing DataFrame to Ignite. Done!
如果将上面的结果与JSON文件的内容进行对比,会显示两者是一致的,这也是期望的结果。
下一步会运行DFReader,输出如下:
Reading data from Ignite table. +-------------------+--------------+---+ | DEPARTMENT| NAME| ID| +-------------------+--------------+---+ |Executive Committee| Ivan Ivanov| 1| |Executive Committee| Petr Petrov| 2| | Production| John Doe| 3| | Production| Ann Smith| 4| | Accounting|Sergey Smirnov| 5| +-------------------+--------------+---+ Done!
这也是期望的输出。
总结
通过本文,会发现使用Ignite DataFrames是如何简单,这样就可以通过Ignite DataFrame进行数据的读写了。
未来,这些代码示例也会作为Ignite发行版的一部分进行发布。
关于Ignite和Spark的集成,内容就是这些了。