代码如下:
import org.apache.spark.sql.Row; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext;public class QueryHDFSData {static SparkConf sparkConf = new SparkConf().setAppName("HDFSQuery").setMaster("local[2]");static JavaSparkContext sc = new JavaSparkContext(sparkConf);static SQLContext sqlContext = new SQLContext(sc);public static void main(String[] args){ // JavaRDD<String> poi = sc.textFile("hdfs://node2:9000/user/flume/events/2015-11-27-21/events-.1448629506841");DataFrame df = sqlContext.read().json("hdfs://node2:9000/user/flume/events/2015-11-26-21/events-.1448543965316");// 打印模式 df.printSchema();// 将数据框架注册成一个表df.registerTempTable("poi");// 使用sql语句从表中读取数据DataFrame poi = sqlContext.sql("SELECT * FROM poi WHERE cid=57425749418");JavaRDD<Row> row = poi.javaRDD();row.foreach(new VoidFunction<Row>(){@Overridepublic void call(Row r) throws Exception {System.out.println(r.mkString()); }});} }