原文地址:https://www.douyacun.com/article/de43463fc4bd3caa67ce22a9fbbbd993
zookeeper 下载地址: http://zookeeper.apache.org/releases.html#download
解压:tar -zxvf kafka_2.12-2.6.0.tgz
运行:./bin/zkServer.sh --config conf start
--config 指的是配置目录
kafka 下载地址: https://kafka.apache.org/downloads
解压:tar -zxvf kafka_2.12-2.6.0.tgz
运行:./bin/kafka-server-start.sh config/server.properties
kafka配置:
kakfa-configs.sh
可以对kafka配置查看或修改
别名: 每次敲这么多命令很烦
alias kconfigs="~/Documents/kafka/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092"
查看: --describe
kconfigs --describe --entity-type [topics/clients/users/brokers/broker-loggers] --all
修改:--alter
kconfigs --alter --entity-type topics --entity-name test --add-config max.message.bytes=104858800
JVM配置:
export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
系统配置:
文件描述符限制:ulimit -n
/etc/security/limits.conf
soft nofile 102400
hard nofile 104800
文件系统类型: xfs 性能较好
swap内存:建议配置成1,方便报警
脏页刷新时间:默认5秒
磁盘大小
本地运行多个实例,需要指定不同的端口,和broker.id, 创建多个server.propertie,指定端口
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir="~/Documents/kafka/data1"
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir="~/Documents/kafka/data2"
启动:
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties
./bin/kafka-server-start.sh -daemon config/server1.properties
./bin/kafka-server-start.sh -daemon config/server2.properties
创建:3副本/1分区 topic
alias ktopics="~/Documents/kafka/bin/kafka-topics --bootstrap-server localhost:9092,localhost:9093,localhost:9094"
ktopics --create --replication-factor 3 --partitions 1 --topic test
查看:topic状态
ktopics --descibe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1,2
分区策略是决定生产者将消息发送到哪个分区的算法
常见的分区策略:
go语言包:github.com/Shopify/sarama 提供了这3种分区策略的实现
轮询: func NewRoundRobinPartitioner(topic string) Partitioner
随机: func NewRandomPartitioner(topic string) Partitioner
hash: func NewHashPartitioner(topic string) Partitioner
指定分区 func NewManualPartitioner(topic string) Partitioner
压缩:
保证Producer 和 Consumer 压缩算法一致
const (
//CompressionNone no compression
CompressionNone CompressionCodec = iota
//CompressionGZIP compression using GZIP
CompressionGZIP
//CompressionSnappy compression using snappy
CompressionSnappy
//CompressionLZ4 compression using LZ4
CompressionLZ4
//CompressionZSTD compression using ZSTD
CompressionZSTD
)
吞吐量: LZ4 > Snappy > zstd > GZIP
压缩比:zstd > LZ4 > GZIP > Snappy
CPU: 各算法差不多压缩时 Snappy 算法使用的 CPU 较多, 解压缩时 GZIP 算法可能使用更多的 CPU
如何保证消息不丢失?
producer:
Errors() <-chan *ProducerError
接口来处理错误config.Producer.Retry
, 默认是 3次config.Producer.RequiredAcks=WaitForAll
所有副本都接受到消息,该消息才算做提交
NoResponse RequiredAcks = 0
WaitForLocal RequiredAcks = 1
WaitForAll RequiredAcks = -1
broker:
unclean.leader.election.enable=false
控制落后台太多的broker不参加leader选举replication.factor >= 3
副本数量不要少于3份min.insync.replicas >= 1
消息最少写入多少个副本才算做已提交,正式环境中建议>=3replication.factor > min.insync.replicas
如果2着相等,只要有1个副本挂掉,整个分区就无法工作了consumer:
cfg.Consumer.Offsets.AutoCommit.Enable = false
默认是自动提交的