python2.7版本使用以下代码向Kafka发送数据时正常,但是在python3.7版本使用Kafka报错:return '' % self.async;原因是async是python3.7版本的关键字引起的,通过命令执行pip install kafka-python就可以解决这个问题。

#该代码在2.7版本运行正常,但是3.7版本运行报错:return '<SimpleProducer batch=%s>' % self.async
# -- coding: UTF-8

import datetime
import json
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['IP:9092'])
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S')

运行之后解决了上述问题,但还是会报错,return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. 第一反应是Kafka版本不对引起的,就在实例化KafkaProducer类添加了版本将python2.7的代码改为(将上面的代码改为如下,添加版本):

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

还是return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

发现不行,不是版本的问题,让我怀疑是否连接成功,将上诉代码修改为:

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))


打印信息如上,配置生效了。debug模式也显示Connected。证明连接上了。目前是send函数有问题,3.x版本的send函数有问题。通过排查发现是send函数体里面的 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)这个调用类函数有问题,接收的参数是str类型,但是我们传的类型是byte类型,将上述代码修改后又会报错

修改send函数的参数topic名称以str类型传入

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send('account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

运行后继续报错

Debug打开,发现是在进入send函数体中, assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))报错,value_bytes是str类型

assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

正确的send数据到Kafka代码如下:

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092')
future = producer.send('account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

Kafka使用相关链接


↙↙↙阅读原文可查看相关链接,并与作者交流