This is a short intro to start using Apache SPARK with Cassandra, running SQL on the Cassandra tables.
Note that I am not running a SPARK cluster, I am running “local”, to me this is really convenient, not having to run a SPARK server and workers for something so small. So for playing around with SPARK and Cassandra this is really good.
I am using Scala and SBT.
Something I was struggling hard with, to get the dependency versions right. It is crucial that you do not do like I did first, use version 1.5.2 of Spark and 1.5.0 for SparkCassandraConnector, this will NOT work. I constantly got exception with java.lang.NoSuchMethodException, so incredibly frustrating to try out version after version.
|
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(Lorg/apache/spark/sql/sources/BaseRelation;)V |
build.sbt
|
val sparkV = "1.5.0" val sparkCassandraConnectorV = "1.5.0-RC1" val cassandraDriverV = "3.0.0-rc1" libraryDependencies += "joda-time" % "joda-time" % "2.9.1" libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.10" libraryDependencies += "ch.qos.logback" % "logback-core" % "1.1.3" libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" libraryDependencies += "jline" % "jline" % "2.12.1" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % sparkV exclude("org.scala-lang","scala-compiler") exclude("jline","jline") libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % sparkV libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % sparkCassandraConnectorV exclude("org.apache.spark","spark-sql_2.11") libraryDependencies += "com.datastax.cassandra" % "cassandra-driver-core" % cassandraDriverV mainClass in (Compile, run) := Some("com.tsoft.dreamtel.spark.poc.SparkTest") |
A small Scala program to show how it works
SparkTest.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
package com.tsoft.dreamtel.spark.poc import org.apache.spark.sql.cassandra.CassandraSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} object SparkTest { def main(args: Array[String]): Unit = { testCassandraSQL() } def testCassandraSQL(): Unit = { val conf = new SparkConf(true).set("spark.cassandra.connection.host","127.0.0.1") val sc = new SparkContext("local", "Tobias-Local-SPARK-Context", conf) val cc = new CassandraSQLContext(sc) val sqlrdd: DataFrame = cc.sql("SELECT firstname from playground.individual WHERE firstname like 'L%'") // WHERE firstname like 'S%'") sqlrdd.foreach( e => println("ROW: "+e.getString(0) ) ) } } } |
The output…
|
ROW: Leona ROW: Lillian ROW: Liana ROW: Lilian ROW: Lovisa ROW: Layla ROW: Lennon ROW: Leonidas ROW: Lova ROW: Léon ROW: Léon ROW: Laila ROW: Leonardo |