Kafka基础

                                        Kafka基础

1 消息队列

1.1 什么是消息队列

消息队列(MQ):消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。

1.2 为什么要有消息队列

当网站面对教大的流量冲击,在网站系统中一般会有一个消息存储/缓存系统(即消息队列,也叫消息中间件),网站就可以按照自己服务负载的能力来消费这些消息。

1.3 消息队列分类

主要分为两类:点对点(p2p)发布订阅(Pub/Sub)

(1)Peer-to-Peer

一般基于Pull或者Polling接收数据 ,发送到队列中的消息被一个而且仅仅一个接收者所接受,即使有多个接收者在同一个队列中侦听同一消息。即支持异步“即发即收”的消息传递方式,也支持同步请求/应答传送方式 。

(2)发布订阅

发布到同一个主题的消息,可被多个订阅者所接收 发布/订阅即可基于Push消费数据,也可基于Pull或者Polling消费数据。解耦能力比P2P模型更强

点对点与发布订阅的比较:

共同点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。

不同点: 点对点(p2p)模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中) 发布订阅(pub/Sub)模式包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

1.4 消息系统使用场景

(1)峰值处理能力:消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求

(2)解耦:各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在

(3)冗余:部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险

(4)可恢复性:系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据

(5)异步通信:在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理

(6)扩展 消息系统是统一的数据接口,各系统可独立扩展

1.5 常见的消息系统

Kafka/Jafka ,Redis ,RabbitMQ ,ZeroMQ ,ActiveMQ 等等

2 Kafka相关概念

2.1 简介

Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。

三大特点:

           ①高吞吐量:可以满足每秒百万级别消息的生产和消费——生产消费。

           ②持久性:有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。

           ③分布式:基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体

2.2 Kafka核心的概念

Topic(主题):Kafka处理的消息的不同分类

Broker(消息服务器代理):Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。

Partition(Topic物理上的分组 ):一个topic在broker中被分为1个或者多个

Message(消息):通信的基本单位,每个消息都属于一个partition

Producer(生产者):消息和数据的生产者,向Kafka的一个topic发布消息。

Consumer(消费者):消息和数据的消费者,定于topic并处理其发布的消息。

Zookeeper:协调kafka的正常运行。

3 配置

3.1 安装配置

安装包下载:http://archive.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz

解压安装好后,修改Kafka安装目录下的config/server.properties

## 当前kafka实例的id,必须为整数,一个集群中不可重复
broker.id=1 
## 生产到kafka中的数据存储的目录,目录需要手动创建
log.dirs=/home/refuel/opt/module/kafka_2.11-2.1.1/data/logs
## kafka数据在zk中的存储目录
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka 

创建目录mkdir -p /home/refuel/opt/module/kafka_2.11-2.1.1/data/logs

启动命令home/refuel/opt/module/kafka_2.11-2.1.1/bin/kafka-server-start.sh -daemon home/refuel/opt/module/kafka_2.11-2.1.1/config/server.properties 

3.2 kafka在zookeeper中的目录

/kafka/cluster		/id  {"version":"1","id":"b9Imb3cBTQmOz6aSq-m_9Q"}  ==》代表的是一个kafka集群包含集群的版本,和集群的id/controller  {"version":1,"brokerid":2,"timestamp":"1568893911970"} ==》controller是kafka中非常重要的一个角色,控制器,控制partition的leader选举,topic的crud操作。brokerid意为由其id对应的broker承担controller的角色。/controller_epoch 2 代表的是controller的纪元,换句话说是代表controller的更迭,每当controller的brokerid更换一次,controller_epoch就+1./brokers/ids	 [1, 2, 3] ==》存放当前kafka的broker实例列表/topics	[spark, __consumer_offsets] ==》当前kafka中的topic列表/seqid	系统的序列id/consumers ==》老版本用于存储kafka消费者的信息,主要保存对应的offset,新版本中基本不用,此时用户的消费信息,保存在一个系统的topic中:__consumer_offsets/config	--->存放配置信息

4 Kafka的基本操作

4.1 topic的操作

topic是kafka非常重要的核心概念,是用来存储各种类型的数据的。关于topic的操作脚本:kafka-topics.sh 。

(1)创建topic

[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --create \
--topic spark\	## 指定要创建的topic的名称
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka \ ##指定kafka关联的zk地址
--partitions 3 \		##指定该topic的分区个数
--replication-factor 3	##指定副本因子

注意:指定副本因子的时候,不能大于broker实例个数

(2)查看topic的列表

[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --list  \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

(3)查看每一个topic的信息

[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --describe --topic spark \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

Partition:    当前topic对应的分区编号     

Replicas :  副本因子,当前kafka对应的partition所在的broker实例的broker.id的列表

Leader     :  该partition的所有副本中的leader领导者,处理所有kafka该partition读写请求

ISR         :  该partition的存活的副本对应的broker实例的broker.id的列表

(4)修改一个topic

[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --alter --topic spark --partitions 4 --zookeeper bigdata01:2181/kafka

注意:partition个数,只能增加,不能减少

(5)删除一个topic

[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --delete --topic spark --partitions 4 --zookeeper bigdata01:2181/kafka

(6)生产数据

[refuel@bigdata01 kafka]$ bin/kafka-console-producer.sh \
--topic spark \	-->指定数据被生产的topic
--broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --->指定kafka的broker列表

(7)消费数据

[refuel@bigdata03 kafka]$ bin/kafka-console-consumer.sh \
--topic spark \
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092

           没数据,原因在于消费者后于生产者启动,在消费者启动之前生产者消费的数据变不能直接获取。如果想要获取消费者启动之前生产者生产的数据,可以添加一个参数--from-beginning。

4.2 API编程

(1)kafka生产者的api操作

public static void main(String[] args) throws IOException {Properties properties = new Properties();properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));Producer<Integer, String> producer = new KafkaProducer<>(properties);//发送数据ProducerRecord<Integer, String> record = new ProducerRecord("spark", "product");producer.send(record);producer.close();}

producer.properties配置如下

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

K: --->代表的是向topic中发送的每一条消息的key的类型,key可以为null

V: --->代表的是向topic中发送的每一条消息的value的类型

创建producer时指定的配置信息如下

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092  ==》kafka的服务器
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer  ==》Key的序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer  ==》value的序列化器
acks=[0|-1|1|all]  ==》消息确认机制0:	不做确认,直管发送消息即可-1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
batch.size=1024  ==》每个分区内的用户缓存未发送record记录的空间大小。如果缓存区中的数据,没有沾满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0,
linger.ms=10  ==》不管缓冲区是否被占满,延迟10ms发送request
buffer.memory=10240  ==》控制的是一个producer中的所有的缓存空间
retries=0  ==》发送消息失败之后的重试次数

(2)kafka消费者的api操作

public static void main(String[] args) throws Exception {//消费者Properties properties = new Properties();properties.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);//订阅topicconsumer.subscribe(Arrays.asList("spark"));//从kafka对应的topic中拉取数据while (true) {ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<Integer, String> record : consumerRecords) {Integer key = record.key();String value = record.value();int partition = record.partition();long offset = record.offset();String topic = record.topic();System.out.println(String.format("topic:%s\tpartition:%d\toffset:%d\tkey:%d\tvalue:%s",topic,partition,offset,key,value));}}}

consumer.properties

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
group.id=group01
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

5  Kafka的数据消费

kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据。

 bin/kafka-console-consumer.sh --topic spark \--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 \--group haha \		#消费者对应的消费者组--offset earliest \	#从什么位置(消息的偏移量)开始消费--partition 2		#消费哪一个分区中的数据	

offset:是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。如果想要kafka中的数据全局有序,就只能让partition个数为1。

在组内,kafka的topic的partition个数,代表了kafka的topic的并行度,同一时间最多可以有多个线程来消费topic的数据,所以如果要想提高kafka的topic的消费能力,应该增大partition的个数。

record进入分区的策略

每一条producerRecord有,topic名称、可选的partition分区编号,以及一对可选的key和value组成。

有三种策略进入分区:①如果指定的partition,那么直接进入该partition;②如果没有指定partition,但是指定了key,使用key的hash选择partition③如果既没有指定partition,也没有指定key,使用轮询的方式进入partition。

6.1 自定义分区

自定义分区的核心类

public interface Configurable {/*** Configure this class with the given key-value pairs 指定当前producer的配置信息*/void configure(Map<String, ?> configs);}public interface Partitioner extends Configurable, Closeable {/*** Compute the partition for the given record.计算给定记录的分区* @param topic The topic name  主题名字* @param key The key to partition on (or null if no key)* @param keyBytes key序列之后的字节数组的形式* @param value The value to partition on or null* @param valueBytes value序列之后的字节数组的形式* @param cluster The current cluster metadata 当前cluster的元数据信息*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.分区结束之后被调用*/public void close(); 
}

(1)随机分区

public class RandomPartitioner implements Partitioner {private Random random = new Random();public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//返回当前topic的partition个数Integer partitionCount = cluster.partitionCountForTopic(topic);int partition = random.nextInt(partitionCount);System.out.println("partition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}}

注册使用 :partitioner.class=com.refuel.bigdata.kafka.partitioner.RandomPartitioner

(2)hash分区

public class HashPartitioner implements Partitioner {public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Integer partCount = cluster.partitionCountForTopic(topic);int partition = Math.abs(key.hashCode()) % partCount;System.out.println("key: " + key + "partition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}}

注册使用:partitioner.class=com.refuel.bigdata.kafka.partitioner.HashPartitioner

(3)轮询分区

public class RoundRobinPartitioner implements Partitioner {  //定义一个原子计数器private AtomicInteger count = new AtomicInteger();  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int parCount = cluster.partitionCountForTopic(topic);int partition = count.getAndIncrement() % parCount;System.out.println("key: " + key + "\tpartition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}
}

注册使用:partitioner.class=com.refuel.bigdata.kafka.partitioner.RoundRobinPartitioner

7 Kafka架构

(1)Topic

Kafka处理的消息的不同分类的逻辑概念,同一个Topic的消息可分布在一个或多个节点(Broker)上 一个Topic包含一个或者多个Partition 每条信息都属于且仅属于一个Topic Producer发布数据是,必须制定该消息发布到哪一个Topic,Consumer订阅消息时,也必须制定订阅哪个Topic的消息。

(2)Partition

物理概念,一个Partition只分布在一个Broker上(不考虑备份) 一个partition物理上对应一个文件夹 一个Partition包含多个Segment(Segment对用户透明) 一个Segment对应一个文件,Segment由一个个不可变记录组成 记录只会被append到Segment中,不会被单独删除或者修改 清除过期日志时,直接删除一个或多个Segment segment文件(log文件)文件名规范: 这个文件里面第一条消息的offset - 1

(3)broker

Kafka中的broker对于调用者而言都是透明的,也就是说各个broker的地位都是一样的,但是在kafka内部有区分,主要就是controller和非controller之分,controller的角色我们可以在zookeeper的对应目录/kafka/controller中获取对应的brokerid。

(4)offset

在kafka1.0以下的版本中使用zk来保存kafka消费者的offset(目录为/kafka/consumers/**),但是在kafka1.0以上,不再使用zookeeper来保存,主要原因在于,避免zookeeper负载过高,造成相关联的框架无法使用,此时在kafka提供了一个系统级别的topic:__consumer_offsets来报错偏移量信息。

7.1 消息flush和Retention策略

(1)flush策略:为了提供kafka的读写数据能力,先将接收数据到kafka内存,不可能无限制的保存在内存,所以必然会将数据flush到磁盘(partition的segement)文件,在flush的时候做了Durability(持久)和Latency(延迟)和Throughput(吞吐量)的权衡与取舍。

配置文件:

# The number of messages to accept before forcing a flush of data to disk
## 每当每一个topic接收到10000条message的时候,就会将数据flush到磁盘
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#每隔1s flush一次数据
log.flush.interval.ms=1000

(2)retention策略 :partition对应的文件,就保存在一个个的segment文件中,每一个文件默认大小是1G,但是log.retention.check.interval.ms监测频率是5分钟一次,所以segment文件可能会超过1G,此时就会启动retion策略,将文件裁剪到log.retention.bytes配置,如果超过了log.segment.bytes=1G配置,将会创建一个新的segment文件;默认情况,segment文件会保留7天。

配置文件:

# The minimum age of a log file to be eligible for deletion due to age
# 日志最小的保留时间:7天,超过这个时间,数据可能会被清理掉
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned(裁剪) from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
## segement文件如果超过log.retention.bytes的配置,将会可能被裁剪,直到小于log.retention.bytes配置
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 一个segment文件最大的大小,超过log.segment.bytes一个G,将会创建一个新的segment文件
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
## 每隔5分钟,检测一次retention策略是否达到
log.retention.check.interval.ms=300000

7.2 Kafka消息检索原理

(1)message的物理结构

结构 
8 bytes offset表示该消息在partition的第几条消息
4 bytes message size消息的大小
1 bytes magic sizekafka程序服务协议号
4 bytes src crc32校验
其他压缩、key,编码等
payload实际消息的数据

(2)partition分区目录说明:

.index文件和.log文件:

partition分区目录下的文件列表,主要包含两种类型的文件 x.index索引文件和x.log segment文件,其中x.log保存的是message信息,x.index保存的是索引数据。

①x.index中保存的内容:

a. index文件的序号就是message在日志文件中的相对偏移量 ​

b. OffsetIndex是稀疏索引,也就是说不会存储所有的消息的相对offset和position。也就是说index文件的序号对应的是log文件中的消息偏移量;index文件中的地址栏对应的是log文件中文件中的偏移字节。

因为一个partition下面有多个segment文件,segment文件当达到retention策略之后将会被裁剪或删除,同时partition中的offset是单调递增的,从0开始增加,但是segment文件中的消息在该文件中的偏移量指的是文件开头到该文件走过的字节长度,所以消息offset和文件中的偏移量不一样。

直接根据message的offset是无法直接读取到消息的,需要x.index中保存的相对偏移量来帮忙了。

(3)通过命令查看segment文件内容

kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \		--->打印读取到的segment日志文件内容
--files 00000000000000000000.log	--->指定读取的segment日志文件

其中的offset是该条message在该partition中的偏移量,position为该条消息在该文件中的字节偏移量。

(4)消息检索

定位到具体的segment日志文件 由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件

(5)消息检索过程

定位到具体的segment日志文件。由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件

②计算查找的offset在日志文件的相对偏移量

segment文件中第一条消息的offset是已知的 。计算message相对偏移量:需要定位的offset - segment文件中第一条消息

③查找index索引文件, 可以定位到该消息在日志文件中的偏移字节为a. 综上, 直接读取文件夹.log中偏移a字节的数据即可。

如果查找的offset在日志文件的相对偏移量在index索引文件不存在, 可根据其在index索引文件最接近的上限偏移量,往下顺序查找。

 

8 flume和kafka的整合

flume主要是做日志数据(离线或实时)地采集。

(1)创建整合的topic

bin/kafka-topics.sh --create --topic flume-kafka --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka --partitions 3 --replication-factor 3
Created topic "flume-kafka"

(2)编写flume-agent配置文件

flume-kafka-sink.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473748.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统总结学习 Python 的 14 张思维导图

本文主要涵盖了 Python 编程的核心知识&#xff08;暂不包括标准库及第三方库&#xff09;。 首先&#xff0c;按顺序依次展示了以下内容的一系列思维导图&#xff1a;基础知识&#xff0c;数据类型&#xff08;数字&#xff0c;字符串&#xff0c;列表&#xff0c;元组&#x…

LeetCode 1617. 统计子树中城市之间最大距离(枚举所有可能+图的最大直径)

文章目录1. 题目2. 解题1. 题目 给你 n 个城市&#xff0c;编号为从 1 到 n 。同时给你一个大小为 n-1 的数组 edges &#xff0c;其中 edges[i] [ui, vi] 表示城市 ui 和 vi 之间有一条双向边。 题目保证任意城市之间只有唯一的一条路径。换句话说&#xff0c;所有城市形成了…

MYSQL电脑客户端免安装教程以及出现问题解决方案

准备工作&#xff1a;window 7 64位旗舰版 MySQL 5.6.35免安装。 1. 下载MySQL 1.1 进入MySQL官网下载&#xff08;https://www.mysql.com/&#xff09;MySQL的安装包。 1.2. 根据自己电脑的位数(32位/64位)来下载响应的MySQL 、 2. 部署MySQL 2.1 解压压缩包到自己的某个盘…

[Kaggle] Digit Recognizer 手写数字识别(卷积神经网络)

文章目录1. 使用 LeNet 预测1.1 导入包1.2 建立 LeNet 模型1.3 读入数据1.4 定义模型1.5 训练1.6 绘制训练曲线1.7 预测提交2. 使用 VGG16 迁移学习2.1 导入包2.2 定义模型2.3 数据处理2.4 配置模型、训练2.5 预测提交Digit Recognizer 练习地址 相关博文&#xff1a; [Hands …

SparkCore基础

目录 Spark简介 1 什么是Spark 2 Spark特点 3 Spark分布式环境安装 3.1 Spark HA的环境安装 3.2 动态增删一个worker节点到集群 4 Spark核心概念 5 Spark案例 5.2 Master URL 5.3 spark日志的管理 5.4 WordCount案例程序的执行过程 6 Spark作业运行架构图&#xff…

LeetCode 1320. 二指输入的的最小距离(动态规划)

文章目录1. 题目2. 解题1. 题目 二指输入法定制键盘在 XY 平面上的布局如上图所示&#xff0c;其中每个大写英文字母都位于某个坐标处&#xff0c; 例如字母 A 位于坐标 (0,0)&#xff0c;字母 B 位于坐标 (0,1)&#xff0c;字母 P 位于坐标 (2,3) 且字母 Z 位于坐标 (4,1)。 …

SparkStreaming基础

目录 SparkStreaming基础 1 流式计算 1.1 常见的离线和流式计算框架 2 SparkStreaming简介 2.1 核心概念DStream 2.2 工作原理 2.3 Storm&#xff0c;SparkStreaming和Flink的对比 2.4 如何选择流式处理框架 3 SparkStreaming实时案例 3.1 StreamingContext和Receiver…

【Kaggle微课程】Natural Language Processing - 1. Intro to NLP

文章目录1. 使用 spacy 库进行 NLP2. Tokenizing3. 文本处理4. 模式匹配练习&#xff1a;食谱满意度调查1 在评论中找到菜单项2 对所有的评论匹配3 最不受欢迎的菜4 菜谱出现的次数learn from https://www.kaggle.com/learn/natural-language-processing 1. 使用 spacy 库进行…

【Kaggle微课程】Natural Language Processing - 2.Text Classification

文章目录1. bag of words2. 建立词袋模型3. 训练文本分类模型4. 预测练习&#xff1a;1. 评估方法2. 数据预处理、建模3. 训练4. 预测5. 评估模型6. 改进learn from https://www.kaggle.com/learn/natural-language-processing NLP中的一个常见任务是文本分类。这是传统机器学…

Django框架—富文本编辑器

借助富文本编辑器&#xff0c;网站的编辑人员能够像使用offfice一样编写出漂亮的、所见即所得的页面此处以tinymce为例&#xff0c;其它富文本编辑器的使用也是类似的在虚拟环境中安装包 pip install django-tinymce2.6.0安装完成后&#xff0c;可以使用在Admin管理中&#xf…

Python基础(二)--数据类型,运算符与流程控制

目录 Python基础&#xff08;二&#xff09;--数据类型&#xff0c;运算符与流程控制 1 数据类型 1.1 Python中的数据类型 1.2 整数类型&#xff08;int&#xff09; 1.3 布尔类型 1.4 浮点类型 1.5 复数类型 1.6 类型转换 2 运算符 2.1 算术运算符 2.2 布尔运算符 …

【Kaggle微课程】Natural Language Processing - 3. Word Vectors

文章目录1. 词嵌入 Word Embeddings2. 分类模型3. 文档相似度练习&#xff1a;1. 使用文档向量训练模型2. 文本相似度learn from https://www.kaggle.com/learn/natural-language-processing 1. 词嵌入 Word Embeddings 参考博文&#xff1a;05.序列模型 W2.自然语言处理与词…

Django搜索工具——全文检索

全文检索不同于特定字段的模糊查询&#xff0c;使用全文检索的效率更高&#xff0c;并且能够对于中文进行分词处理haystack&#xff1a;全文检索的框架&#xff0c;支持whoosh、solr、Xapian、Elasticsearc四种全文检索引擎&#xff0c;点击查看官方网站whoosh&#xff1a;纯Py…

LeetCode 787. K 站中转内最便宜的航班(Dijkstra最短路径 + 优先队列)

文章目录1. 题目2. 解题1. 题目 有 n 个城市通过 m 个航班连接。每个航班都从城市 u 开始&#xff0c;以价格 w 抵达 v。 现在给定所有的城市和航班&#xff0c;以及出发城市 src 和目的地 dst&#xff0c;你的任务是找到从 src 到 dst 最多经过 k 站中转的最便宜的价格。 如…

Windows Phone 资源管理与换肤思考

Windows Phone 资源管理与换肤思考 原文 Windows Phone 资源管理与换肤思考 新入手一台Windows 8的笔记本&#xff0c;安装了VS2013后&#xff0c;终于又可以开发WP了。公司暂时不愿意开发WP&#xff0c;那么咱就自行研究吧&#xff01; 在没有WP开发环境的时候&#xff0c;曾经…

Django完成异步工具——celery

情景&#xff1a;用户发起request&#xff0c;并等待response返回。在本些views中&#xff0c;可能需要执行一段耗时的程序&#xff0c;那么用户就会等待很长时间&#xff0c;造成不好的用户体验&#xff0c;比如发送邮件、手机验证码等使用celery后&#xff0c;情况就不一样了…

Python基础(三)--序列

Python基础&#xff08;三&#xff09;--序列 1 序列相关的概念 1.1 什么是序列 序列是一种可迭代对象&#xff0c;可以存储多个数据&#xff0c;并提供数据的访问。 序列中的数据称为元素&#xff0c;Python内置的序列类型有&#xff1a;列表&#xff08;list&#xff09;…

项目上线最后工作——布署环境

当项目开发完成后&#xff0c;需要将项目代码放到服务器上&#xff0c;这个服务器拥有固定的IP&#xff0c;再通过域名绑定&#xff0c;就可以供其它人浏览&#xff0c;对于python web开发&#xff0c;可以使用wsgi、apache服务器&#xff0c;此处以wsgi为例进行布署服务器首先…

Python基础(四)--字典与集合

Python基础&#xff08;四&#xff09;--字典与集合 1 字典 1.1 什么是字典 字典提供的是一种映射存储的方式。字典分为两个部分&#xff0c;一个是键&#xff08;key&#xff09;&#xff0c;一个是key所关联的值&#xff08;value&#xff09;。&#xff0c;一个键关联&am…

[Kaggle] Spam/Ham Email Classification 垃圾邮件分类(spacy)

文章目录1. 导入包2. 数据预览2. 特征组合3. 建模4. 训练5. 预测练习地址&#xff1a;https://www.kaggle.com/c/ds100fa19 相关博文&#xff1a; [Kaggle] Spam/Ham Email Classification 垃圾邮件分类&#xff08;RNN/GRU/LSTM&#xff09; [Kaggle] Spam/Ham Email Classifi…