Tranquility

本页目录
  • 与Kafka集群交互
  • Druid使用Tranquility Kafka

本文以Kafka为例,介绍在E-MapReduce中如何使用Tranquility从Kafka集群采集数据,并实时推送至Druid集群。

Tranquility是一个以push方式向Druid实时发送数据的应用。它替用户解决了分区、多副本、服务发现、防止数据丢失等多个问题,简化了用户使用Druid的难度。它支持多种数据来源,包括Samza、Spark、Storm、Kafka、Flink等等。

与Kafka集群交互

首先是Druid集群与Kafka集群的交互。两个集群交互的配置方式大体和Hadoop集群类似,均需要设置连通性、hosts等。对于非安全Kafka集群,请按照以下步骤操作:
  1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
  2. 将 Kafka 集群的 hosts 写入到 Druid 集群每一个节点的 hosts 列表中,注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。
对于安全的 Kafka 集群,您需要执行下列操作(前两步与非安全 Kafka 集群相同):
  1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
  2. 将 Kafka 集群的 hosts 写入到 Druid 集群每一个节点的 hosts 列表中,注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。
  3. 设置两个集群间的 Kerberos 跨域互信(详情参考跨域互信),且最好做双向互信。
  4. 准备一个客户端安全配置文件: 
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="/etc/ecm/druid-conf/druid.keytab"principal="druid@EMR.1234.COM";};

            之后将该配置文件同步到 Druid 集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。

  5、在 Druid 配置页面的 overlord.jvm 里新增如下选项:

Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf

  6、在 Druid 配置页面的 middleManager.runtime 里配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他JVM启动参数。

7、重启Druid服务。

Druid使用Tranquility Kafka

由于Tranquility是一个服务,它对于Kafka来说是消费者,对于Druid来说是客户端。您可以使用中立的机器来运行Tranquility,只要这台机器能够同时连通 Kafka 集群和 Druid 集群即可。

  1、Kafka端创建一个名为 pageViews 的 topic。

-- 如果开启了kafka 高安全:export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"--./bin/kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181/kafka-1.0.1 --partitions 1 --replication-factor 1 --topic pageViews

  2、下载 Tranquility 安装包,并解压至某一路径下。

  3、配置 datasource。

  这里假设您的 topic name 为 pageViews,并且每条 topic 都是如下形式的 json 文件:

  

{"time": "2018-05-23T11:59:43Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-23T11:59:44Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-23T11:59:45Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

对应的 dataSrouce 的配置如下:

{"dataSources" : {"pageViews-kafka" : {"spec" : {"dataSchema" : {"dataSource" : "pageViews-kafka","parser" : {"type" : "string","parseSpec" : {"timestampSpec" : {"column" : "time","format" : "auto"},"dimensionsSpec" : {"dimensions" : ["url", "user"],"dimensionExclusions" : ["timestamp","value"]},"format" : "json"}},"granularitySpec" : {"type" : "uniform","segmentGranularity" : "hour","queryGranularity" : "none"},"metricsSpec" : [{"name": "views", "type": "count"},{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}]},"ioConfig" : {"type" : "realtime"},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : "100000","intermediatePersistPeriod" : "PT10M","windowPeriod" : "PT10M"}},"properties" : {"task.partitions" : "1","task.replicants" : "1","topicPattern" : "pageViews"}}},"properties" : {"zookeeper.connect" : "localhost","druid.discovery.curator.path" : "/druid/discovery","druid.selectors.indexing.serviceName" : "druid/overlord","commit.periodMillis" : "15000","consumer.numThreads" : "2","kafka.zookeeper.connect" : "emr-header-1.cluster-500148518:2181,emr-header-2.cluster-500148518:2181,   emr-header-3.cluster-500148518:2181/kafka-1.0.1","kafka.group.id" : "tranquility-kafka",}}

  4、运行如下命令启动 Tranquility。

./bin/tranquility kafka -configFile 

  5、在 Kafka 端启动 producer 并发送一些数据。

./bin/kafka-console-producer.sh --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic pageViews

输入:

{"time": "2018-05-24T09:26:12Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-24T09:26:13Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-24T09:26:14Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

在Tranquility日志中查看相应的消息,在Druid端则可以看到启动了相应的实时索引 task。

——————————————————————————————
原文:https://help.aliyun.com/document_detail/72704.html

 

 

 

 

 

转载于:https://www.cnblogs.com/wynjauu/articles/10369007.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/449951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Iot相关杂烩

人工智能就像人的大脑,而 IoT 就像人的神经网络 1)在天空中巨大的鸟群里,每一只鸟儿都实时判断自己和四周同伴的距离。这时,它们各自都是一个物联网节点。2)这些“节点”并不是简单地收集数据,而是在实时计…

水滴石穿C语言之指针、数组和函数

基本解释   1、指针的本质是一个与地址相关的复合类型,它的值是数据存放的位置(地址);数组的本质则是一系列的变量。   2、数组名对应着(而不是指向)一块内存,其地址与容量在生命期内保持…

告诉你银行在年底为存储做的小动作

25年前,银行的存款利率是10.98%,可谓巅峰时刻。15年前,银行的存款利率开始下降,降到了8%的利率。 到了5年前,银行的存款利率毫无回转之势,直线下降到了5%的利率。 而如今,我们无可奈何地接受了2…

爬虫学习(五)——百度贴吧的爬取

import osimport timeimport urllib.requestimport urllib.parse# 输入目标页码和吧名def header(): url "https://tieba.baidu.com/f?" baming input("请输入要爬取的吧名") start_page int(input("请输入起始页")) end_page …

什么是嵌入式设备?/ 嵌入式设备的定义

什么是嵌入式设备?/ 嵌入式设备的定义 区别于通用计算机的其他设备都可以称之为嵌入式设备 (个人电脑,服务器) 一段时期内,必备的硬件配置。 嵌入式开发包括哪些部分: 底层驱动开发: 关键字…

Linux mv命令、Linux cp命令、Linux scp命令

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到教程。 Linux mv命令用来为文件或目录改名、或将文件或目录移入其它位置。 语法 mv [options] source dest mv [options] source... director…

创业者谈:畏惧失败,但也要拥抱失败

摘要:本文作者为Paydirt创始人Tristan Gamilis,他在文中分享了如何面对创业过程中的失败。作为一个创业者,开始的时候并非全才,很多知识都是经历了创业中的失败,摸爬滚打之后才学会的。所以,我们在创业过程…

基于STM32F4移植W5500官方驱动库ioLibrary_Driver(转)

源: 基于STM32F4移植W5500官方驱动库ioLibrary_Driver 参考: 基于STM32W5500 的Ethernet和Internet移植 Upgrade W5500 Throughput on Nucleo STM32F401RE Using SPI DMA

redis 资料

redis是什么: Redis is an open source, BSD licensed, advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets. redis是开源,BSD许可,高级的key-value存储系统. 可以用来存储字…

Android应用开发——onStop的调用时机

onStop的调用时机,网上搜索到的说法大概是:“ onStop的调用是“The activity is no longer visible”,也就是完全不可见的时候调用的,这个完全不可见真的就是指视觉上的完全看不到而已,无论是按home键返回桌面&#xf…

UnaryOperator函数式接口

2019独角兽企业重金招聘Python工程师标准>>> 这是一个函数式接口&#xff0c;因此可以用作lambda表达式或方法引用的赋值目标。 可以看到UnaryOperator<T>继承了Function<T,T>接口&#xff0c;这里可是两个T,T,还增加了static修饰的identity()方法。 然…

从程序员到项目经理

推荐研发工程师必看的内容 从程序员到项目经理 从程序员到项目经理”&#xff0c;这个标题让我想起了很久以前一本书的名字《从Javascript到Java》。然而&#xff0c;从Javascript到Java充其量只是工具的更新&#xff0c;而从程序员到项目经理&#xff0c;却是一个脱胎换骨的过…

linux--命令rcp和scp

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 rcp代表“remote file copy”&#xff08;远程文件拷贝&#xff09;。该命令用于在计算机之间拷贝文件。rcp命令有两种格式。第一种格式…

Android Camera 2.0 Api

二次图像处理 Camera2的API扩展了对YUV的支持&#xff0c;及图像再处理支持。要知道是否据有这个能力&#xff0c;可以调getCameraCharacteristics()方法&#xff0c;检查REPROCESS_MAX_CAPTURE_STALL这个键值 。如果设备支持再处理&#xff0c;则可以调用createReprocessableC…

scala-数组操作

package com.bigdataimport scala.collection.mutable.ArrayBufferobject ArrayO {def main(args: Array[String]): Unit {val arrayBuffer ArrayBuffer[Int]()//默认情况下都是在ArrayBuffer末尾增加元素arrayBuffer 1arrayBuffer (4,5,6,7,8,9,10)arrayBuffer Array(1,2…

spring cloud微服务分布式云架构 - Spring Cloud集成项目简介

Spring Cloud集成项目有很多&#xff0c;下面我们列举一下和Spring Cloud相关的优秀项目&#xff0c;我们的企业架构中用到了很多的优秀项目&#xff0c;说白了&#xff0c;也是站在巨人的肩膀上去整合的。在学习Spring Cloud之前大家必须了解一下相关项目&#xff0c;希望可以…

Nokia落寞身影下 三星成为全球最大手机厂商

摘要&#xff1a;在诺基亚统治全球最大手机厂商宝座长达14年后&#xff0c;三星今年首次取代诺基亚&#xff0c;成为全球最大手机厂商。据IHS iSuppli的数据显示&#xff0c;三星预计今年手机出货量将占全球29&#xff05;&#xff0c;而落寞的诺基亚市场份额将下降到24&#x…

Linux中cp和scp命令的使用方法

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 Linux为我们提供了两个用于文件copy的命令&#xff0c;一个是cp&#xff0c;一个是scp&#xff0c;但是他们略有不同。 cp --- 主要是用…

Django:学习笔记(2)——创建第一个应用

Django&#xff1a;学习笔记(2)——创建第一个应用 创建应用 在 Django 中&#xff0c;每一个应用都是一个 Python 包&#xff0c;并且遵循着相同的约定。Django 自带一个工具&#xff0c;可以帮你生成应用的基础目录结构&#xff0c;这样你就能专心写代码&#xff0c;而不是创…

dubbo源码解析(十)远程通信——Exchange层

远程通讯——Exchange层 目标&#xff1a;介绍Exchange层的相关设计和逻辑、介绍dubbo-remoting-api中的exchange包内的源码解析。前言 上一篇文章我讲的是dubbo框架设计中Transport层&#xff0c;这篇文章我要讲的是它的上一层Exchange层&#xff0c;也就是信息交换层。官方文…