Spark Connector for OneProxy

Spark Connector for OneProxy使用标准的JDBC接口访问OnePorxy集群,就将Spark计算集群和分库分表后的分布式数据库集群无缝连接起来, 专用的OneProxy Connector使Spark计算集群可感知数据库集群上的数据分布逻辑,透明地实现按分片的并行处理, 并在计算结点与存贮结点(数据库)之间实现点对点的通信,充分发挥整个数据库集群的吞吐能力(可访问备库)和整个Spark集群的计算能力, 真正实现同一份数据之上的实时数据分析和机器学习。 使用关系数据库作为存贮结点,具有很强的结点计算能力,并可创建多唯度的索引,以高效地进行数据过滤,天然解决了第二索引的问题; 在Spark中可以将更多的操作下放到数据库执行,成熟的数据库技术保证了数据存贮层的高性能,可以提供比Hadoop/HBase更高的性能; 关系数据库多用C&C++语言编译,有效管理内存并有高效的数据缓冲机制,历经数十年时间发展而打造的稳定性,是Hadoop/HBase结点目前所不能达到的。 Spark Connector支持OneProxy for MySQL和OneProxy for PostgreSQL,无需迁移数据就可以分析现有MySQL和PostgreSQL集群里的数据,无缝支持Spark 2.0.x和1.6.x版本。

构架图如下:

/

Spark下发的SQL通过OneProxy后,一样可以走读写分离,或仅从备库查询。OneProxy具有比Spark更好的数据分片机制,SQL下放到OneProxy后会做分区过滤(Partition Prune)操作,以减少数据扫描的范围。通过平民软件研发的OneProxy Connector for Spark就可以在MySQL集群上直接进行大数据分析了,只需要将jar包(下载页面)放到Spark(目前仅支持2.0.0)的jars目录下(同时将MySQL和PostgreSQL的JDBC驱动包也放到此目录)就完成安装了。

下面是一个Spark SQL的例子,其中“my_date”是一张使用OneProxy分片(Sharding)的表,如下所示:

spark-sql> CREATE TEMPORARY VIEW  my_date
         > USING com.onexsoft.oneproxy
         > OPTIONS (
         >   url    'jdbc:mysql://192.168.20.20:3307/test',
         >   user   'test',
         >   password   '123',
         >   dbtable 'my_date');
Time taken: 2.729 seconds
spark-sql> select * from my_date;
2016-08-02 00:00:00    	100
2016-09-01 01:00:00    	100
2016-10-01 01:00:00    	100
Time taken: 0.499 seconds, Fetched 3 row(s)

接下来是使用Scala在Spark Shell中的例子,如下所示:

scala> val oneproxy = com.onexsoft.oneproxy.SparkConn.mysql(
     |    "192.168.20.20:3307","test","123")
oneproxy: .....OneProxyContext = ....OneProxyContext@c212536

scala> val mydf = oneproxy.openTable(spark, "my_date")
mydf: org.apache.spark.sql.DataFrame = [id: timestamp, col2: int]

scala> mydf.show()
+--------------------+----+
|                  id|col2|
+--------------------+----+
|2016-08-02 00:00:...| 100|
|2016-09-01 01:00:...| 100|
|2016-10-01 01:00:...| 100|
+--------------------+----+

针对大表,分片的信息全部交给OneProxy来管理,OneProxy Connector会从OneProxy中获取分片信息,并在Spark中使用按OneProxy分片一致的并行策略。在前面的例子中虽然在Spark上没有定义分片,但实际上是并行执行的,并行度取决于OneProxy里定义的分片的个数。不再需要在Spark中定义JDBCRDD的“numPartitions / partitionColumn / lowerBound / upperBound”属性,也不要求SQL中要有两个绑定变量。“SparkConn”类定义了方便的连接OneProxy for MySQL或OneProxy for PostgreSQL的方法,返回的“OneProxyContext”对象主要实现了以下方法,以便你轻松地获取数据。详细用法如下所示:

  • 构造参数需要传三个参数,分别是连接OneProxy的JDBC url串、用户名、口令。
  • desc(SparkSession, sql:String)方法,根据SQL语句获取StructType类型(即字段结构信息)。
  • query(SparkSession, sql:String)方法,根据SQL语句获取JdbcRDD结果,请注意这个返回的是RDD格式。
  • openTable(SparkSession, tablename:String)方法,根据表名获取DataFrame结果 openQuery(SparkSession, sql:String, StructType)方法,根据自定议的SQL语句获取DataFrame结果,实际上是先query,再调用SparkSession.createDataFrame方法将RDD转换成DataFrame返回。
  • OneProxy的多种数据分片算法(Range/List/Hash/Composite)和关系数据库中的多索引支持,可以避免Hadoop/Hive单个RowKey带来的一些数据访问缺限。提供的接口很简单,目前并不地持在Spark中写入数据到MySQL集群,写入操作经由OneProxy直接操作就可以了。
  • OneProxy可以布署多台,再使用JDBC端的Load Balance功能以防单点连接,或每个Spark节点布署一个OneProxy实例,每个Spark只连接本地的OneProxy节点,以避免网络单点和热点。