python2.7 版本使用以下代码向 Kafka 发送数据时正常,但是在 python3.7 版本使用 Kafka 报错:return '' % self.async;原因是 async 是 python3.7 版本的关键字引起的,通过命令执行 pip install kafka-python 就可以解决这个问题。

#该代码在2.7版本运行正常,但是3.7版本运行报错:return '<SimpleProducer batch=%s>' % self.async
# -- coding: UTF-8

import datetime
import json
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['IP:9092'])
future = producer.send(b'account', json.dumps(
    {"method": "get", "step": "1", "type": "test", "testName": "kafka",
     "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
     "info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S')

运行之后解决了上述问题,但还是会报错,return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. 第一反应是 Kafka 版本不对引起的,就在实例化 KafkaProducer 类添加了版本将 python2.7 的代码改为(将上面的代码改为如下,添加版本):

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
future = producer.send(b'account', json.dumps(
    {"method": "get", "step": "1", "type": "test", "testName": "kafka",
     "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
     "info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

还是 return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

发现不行,不是版本的问题,让我怀疑是否连接成功,将上诉代码修改为:

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send(b'account', json.dumps(
    {"method": "get", "step": "1", "type": "test", "testName": "kafka",
     "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
     "info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))


打印信息如上,配置生效了。debug 模式也显示 Connected。证明连接上了。目前是 send 函数有问题,3.x 版本的 send 函数有问题。通过排查发现是 send 函数体里面的 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) 这个调用类函数有问题,接收的参数是 str 类型,但是我们传的类型是 byte 类型,将上述代码修改后又会报错

修改 send 函数的参数 topic 名称以 str 类型传入

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send('account', json.dumps(
    {"method": "get", "step": "1", "type": "test", "testName": "kafka",
     "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
     "info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

运行后继续报错

Debug 打开,发现是在进入 send 函数体中, assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) 报错,value_bytes 是 str 类型

assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

正确的 send 数据到 Kafka 代码如下:

# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092')
future = producer.send('account', json.dumps(
    {"method": "get", "step": "1", "type": "test", "testName": "kafka",
     "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
     "info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

Kafka 使用相关链接


↙↙↙阅读原文可查看相关链接,并与作者交流