使用php连接操作kafka

php基础

浏览数:171

2019-9-9

AD:资源代下载服务

使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。

一、安装

注:需安装JDK

1.官网下载kafka

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

2.解压文件

tar -xzf kafka_2.11-2.1.0.tgz

cd kafka_2.11-2.1.0

3.启动ZooKeeper(注意:kafka包自带了zookeeper)

bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动kafka

bin/kafka-server-start.sh config/server.properties

5.创建一个topic(名为test)

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

可通过这个命令查看现有的topic列表

bin/kafka-topics.sh –list –zookeeper localhost:2181

6.发送一些消息:生产者producer(利用kafka自带客户端)

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

7.接收消息:消费者consumer

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

到此,kafka单机版就安装完成了。

二、使用php连接kafka

(一)让kafka支持外网访问

1.修改config/下的server.properties

listeners=PLAINTEXT://192.168.230.130:9092

host.name=192.168.230.130 #这里的ip填成你的kafka的ip

继续修改

advertised.listeners=PLAINTEXT://192.168.230.130:9092

2.至此,配置完成,记得重启kafka服务,可以下载kafka tools来验证是否可以外网访问

注:配置完以后,操作脚本时记得改为–bootstrap-server 192.168.230.130:9092,否则连不上。

(二)集成kafka-php,支持连接kafka

1.使用的是https://github.com/weiboad/kafka-php/来连接kafka,优点是不需要安装

php拓展。使用composer安装

2.按照github给的示例代码连接即可,需要注意的是

$config->setMetadataBrokerList(‘192.168.230.130:9092’);

这里填写的是kafka地址,不是zookeeper的。

3.offset可以这样设置

$config->setOffsetReset(‘earliest’);

(三)遇到的坑

github给的consumer建议通过脚本运行,当脚本被kill掉以后,再次重启脚本时,发现消费的数据漏掉了一条。后来通过这样解决了问题:

首先,先关闭consumer脚本

记得脚本里这样设置

$config->setOffsetReset(‘earliest’);

通过这个命令可以查看这个groupid当前的偏移量

bin/kafka-consumer-groups.sh –bootstrap-server 192.168.230.130:9092 –describe –group test3

通过这个命令,将这个groupid的偏移量设置成最早的

bin/kafka-consumer-groups.sh –bootstrap-server 192.168.230.130:9092 –reset-offsets –group test3 –topic test –to-earliest

再重新启动脚本,再消费数据时,就是正常的了,不会出现丢数据的情况;但是,在生产者持续写入时,还是有bug

后来发现是协议版本问题,换成这个就解决了:

$config->setBrokerVersion(‘1.0.0’);

作者:__sk