一、单表关联
给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——祖父母)表
二、maven设置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>
三、编程代码
public class SingleTableJoinApp implements SparkConfInfo {public static void main(String[] args) {String filePath = "E:\\spark\\childParent.txt";SparkSession sparkSession = new SingleTableJoinApp().getSparkConf("childParent");JavaPairRDD<String, String> childParent = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("\n")).iterator()).mapToPair(v -> {if(v.matches("\\s+child\\s+parent\\s+")){return null;}String[] data = v.split("\\s+");if (data.length != 2) {return null;}return new Tuple2<>(data[0],data[1]);}).filter(v -> v != null).cache();JavaPairRDD<String, String> parentChild = childParent.mapToPair(v->new Tuple2(v._2, v._1));JavaPairRDD<String, Tuple2<String, String> > joinRdd = parentChild.join(childParent);List<Tuple2<String, String>> childGrand = joinRdd.mapToPair(v->new Tuple2<>(v._2._1, v._2._2)).sortByKey(true).collect();System.out.println("child\t\tgrand");childGrand.forEach(v -> System.out.println(v._1 + "\t\t" + v._2));sparkSession.stop();}
}public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}
}
childParent.txt文件内容
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
输出
child grand
Jone Mary
Jone Ben
Jone Alice
Jone Jesse
Mark Alice
Mark Jesse
Philip Alice
Philip Jesse
Tom Mary
Tom Ben
Tom Alice
Tom Jesse
四、join方法
<W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
关联表返回相同可以的键值对