kafka集群部署与安装

下载安装包(http://kafka.apache.org

kafka_2.11-1.1.0.tgz

规划安装目录

/opt/module

上传安装包到node01服务器,并解压

cd /opt/software
tar -zxf kafka_2.11-1.1.0.tgz -C /opt/module/

修改配置文件

在node01上修改

node01执行以下命令进行修改配置文件

# 进入到kafka安装目录下有一个config目录,进行修改配置文件
cd  /kkb/install/kafka_2.11-1.1.0/config

vim server.properties

# 指定kafka对应的broker id ,唯一
broker.id=0
# 指定数据存放的目录
log.dirs=/opt/module/kafka_2.11-1.1.0/logs
# 指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
# 指定是否可以删除topic ,默认是false 表示不可以删除delete.topic.enable=true
#指定broker主机名
host.name=node01

node01执行以下命令分发kafka安装目录到其他节点

cd /opt/module/
scp -r kafka_2.11-1.1.0/ node02:$PWD
scp -r kafka_2.11-1.1.0/ node03:$PWD

node02执行以下命令进行修改配置

cd /opt/module/kafka_2.11-1.1.0/config/
vi server.properties

#指定kafka对应的broker id ,唯一
broker.id=1
#指定数据存放的目录
log.dirs=/kkb/install/kafka_2.11-1.1.0/logs
#指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
#指定是否可以删除topic ,默认是false 表示不可以删除
delete.topic.enable=true
#指定broker主机名
host.name=node02

node03执行以下命令进行修改配置

cd /opt/module/kafka_2.11-1.1.0/config/
vi server.properties

#指定kafka对应的broker id ,唯一
broker.id=2
#指定数据存放的目录
log.dirs=/kkb/install/kafka_2.11-1.1.0/logs
#指定zk地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
#指定是否可以删除topic ,默认是false 表示不可以删除
delete.topic.enable=true
#指定broker主机名
host.name=node03

kafka集群启动和停止

启动

先启动zk集群

然后在所有节点执行脚本

  cd /opt/software/kafka_2.11-1.1.0/
  nohup bin/kafka-server-start.sh config/server.properties 2>&1 & 

一键启动kafka start_kafka.sh

#!/bin/sh
for host in node01 node02 node03
do
        ssh $host "source /etc/profile;nohup /opt/module/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/module/kafka_2.11-1.1.0/config/server.properties >/dev/null 2>&1 &"
        echo "$host kafka is running"

done

停止 所有节点执行关闭kafka脚本

cd /opt/module/kafka_2.11-1.1.0/
bin/kafka-server-stop.sh 

一键停止kafka stop_kafka.sh

#!/bin/sh
for host in node01 node02 node03
do
  ssh $host "source /etc/profile;nohup /opt/module/kafka_2.11-1.1.0/bin/kafka-server-stop.sh &" 
  echo "$host kafka is stopping"
done

一键启动和停止脚本 kafkaCluster.sh

  #!/bin/sh
  case $1 in 
  "start"){
  for host in node01 node02 node03 
  do
    ssh $host "source /etc/profile; nohup /opt/module/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/module/kafka_2.11-1.1.0/config/server.properties > /dev/null 2>&1 &"   
    echo "$host kafka is running..."  
  done  
  };;

  "stop"){
  for host in node01 node02 node03 
  do
    ssh $host "source /etc/profile; nohup /opt/module/kafka_2.11-1.1.0/bin/kafka-server-stop.sh >/dev/null  2>&1 &"   
    echo "$host kafka is stopping..."  
  done
  };;
  esac
启动
  sh kafkaCluster.sh start
停止

```shell
sh kafkaCluster.sh stop


##  kafka的命令行的管理使用

### 创建topic kafka-topics.sh
node01执行以下命令创建topic
```shell
    cd /opt/module/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
    ```

### 查询所有的topic kafka-topics.sh
```shell
    cd /opt/module/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 

查看topic的描述信息 kafka-topics.sh

    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  
 ```

### 删除topic kafka-topics.sh
```shell
    cd /opt/module/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 

node01模拟生产者写入数据到topic中

node01执行以下命令,模拟生产者写入数据到kafka当中去

  cd /opt/module/kafka_2.11-1.1.0/
  bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 

node01模拟消费者拉取topic中的数据

node02执行以下命令,模拟消费者消费kafka当中的数据

  cd /opt/module/kafka_2.11-1.1.0/
  bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning
  它会把消息的偏移量保存在zk上

  或者

  cd /opt/module/kafka_2.11-1.1.0/
  bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
  它会把消息的偏移量保存在kafka集群内置的topic中
  ```
### 任意kafka服务器执行以下命令可以增加topic分区数

cd /opt/module/kafka_2.11-1.1.0

bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

大数据技术

2022 高效Flink学习路线经验分享(持续更新中)

2021-7-24 23:37:03

Kafka大数据技术

Kafka浅谈、kafka简介

2021-11-4 22:57:33

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
购物车
优惠劵
搜索