1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import java.net.InetAddress._ import com.datastax.spark.connector._ import org.apache.spark.sql.{SparkSession, SaveMode} import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf} import org.apache.spark.sql.functions._
object cassSpark { def main(args: Array[String]) { val cassIps = Set("192.168.0.121", "192.168.0.122", "192.168.0.123")
val spark = SparkSession.builder().appName("test") .config("spark.cassandra.connection.host", cassIps.mkString(",")) .config("spark.sql.warehouse.dir", "file:///home/tester").getOrCreate()
import spark.implicits._
val cassConf = new CassandraConnectorConf(cassIps.map(getByName(_)))
val exeRes = new CassandraConnector(cassConf).withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS test " + "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }") session.execute("DROP TABLE IF EXISTS test.testtbl") session.execute("CREATE TABLE test.testtbl (var0 text, var1 text, var2 double, PRIMARY KEY(var0, var1))") session.execute("INSERT INTO test.testtbl(var0, var1, var2) VALUES ('T', 'A', 23.1)") session.execute("INSERT INTO test.testtbl(var0, var1, var2) VALUES ('T', 'B', 17.5)") session.execute("INSERT INTO test.testtbl(var0, var1, var2) VALUES ('U', 'B', 11.9)") session.execute("INSERT INTO test.testtbl(var0, var1, var2) VALUES ('U', 'A', 25.3)") }
val collection = spark.sparkContext.parallelize(Seq(("T", "C", 17.0), ("U", "C", 5.0))) collection.saveToCassandra("test", "testtbl", SomeColumns("var0", "var1", "var2"))
val cass_tbl = spark.read.format("org.apache.spark.sql.cassandra") .option("keyspace", "test").option("table", "testtbl").load()
cass_tbl.write.format("com.databricks.spark.csv").option("header", "true").save("file:///home/tester/test.csv")
val concatMap = udf((maps: Seq[Map[String, Double]]) => maps.flatten.toMap) val cass_tbl_agg = cass_tbl.withColumn("var2_map", map($"var1", $"var2")).groupBy($"var0").agg(concatMap(collect_list($"var2_map")).alias("var2"))
try { cass_tbl_agg.createCassandraTable("test", "testtbl_trans") } catch { case e1: com.datastax.driver.core.exceptions.AlreadyExistsException => None case e2: Exception => throw e2 }
cass_tbl_agg.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "testtbl_trans", "keyspace" -> "test")).mode(SaveMode.Append).save() } }
|