sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时
import org.apache.hadoop.hbase.client.{Admin, Connection}
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedisobject StreamingKafka extends Logging{def main(args: Array[String]): Unit = {val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC),ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")val group:String = "gps_consum_group"val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> group,"auto.offset.reset" -> "latest",// earliest,latest,和none"enable.auto.commit" -> (false: java.lang.Boolean))val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val context: SparkContext = sparkSession.sparkContextcontext.setLogLevel("WARN")// val streamingContext = new StreamingContext(conf,Seconds(5))//获取streamingContextval streamingContext: StreamingContext = new StreamingContext(context,Seconds(1))val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")result.foreachRDD(eachRdd =>{if(!eachRdd.isEmpty()){eachRdd.foreachPartition(eachPartition =>{val connection: Connection = HBaseUtil.getConnectionval jedis: Jedis = JedisUtil.getJedis//判断表是否存在,如果不存在就进行创建val admin: Admin = connection.getAdminif(!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}if(!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))){val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))admin.createTable(htabgps)}eachPartition.foreach(record =>{//保存到HBase和redisval consumerRecords: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection,jedis, record)})JedisUtil.returnJedis(jedis)connection.close()})//更新offsetval offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges//result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //将offset提交到默认的kafka的topic里面去保存for(eachrange <- offsetRanges){val startOffset: Long = eachrange.fromOffset //起始offsetval endOffset: Long = eachrange.untilOffset //结束offsetval topic: String = eachrange.topicval partition: Int = eachrange.partitionHbaseTools.saveBatchOffset(group,topic,partition+"",endOffset)}}})streamingContext.start()streamingContext.awaitTermination()}
}