The Spark DataFrame API provides table-like access to data from a variety of sources. It's
purpose is similar to Python's pandas library and R's data frames: collect and
organize data into a tabular format with named columns. DataFrames can be constructed from a
wide array of sources, including structured data files, Hive tables, and existing Spark
RDDs.
As user
spark, upload thepeople.txtfile to HDFS:cd /usr/hdp/current/spark-client su spark hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
Launch the Spark shell:
cd /usr/hdp/current/spark-client su spark ./bin/spark-shell --num-executors 1 --executor-memory 512m --master yarn-client
At the Spark shell, type the following:
scala> val df = sqlContext.read.format("json").load("people.json")Using
df.show, display the contents of the DataFrame:scala> df.show 15/11/10 11:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
Here are additional examples of Scala-based DataFrame access, using DataFrame
df defined in the previous subsection:
// Import the DataFrame functions API
scala> import org.apache.spark.sql.functions._
// Select all rows, but increment age by 1
scala> df.select(df("name"), df("age") + 1).show()
// Select people older than 21
scala> df.filter(df("age") > 21).show()
// Count people by age
df.groupBy("age").count().show()
The following example uses the DataFrame API to specify a schema for
people.txt, and retrieve names from a temporary table associated with the
schema:
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)
This will produce output similar to the following:
15/11/10 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 15/11/10 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s 15/11/10 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s Name: Michael Name: Andy Name: Justin

