#该代码在2.7版本运行正常,但是3.7版本运行报错:return '<SimpleProducer batch=%s>' % self.async
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['IP:9092'])
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
还是 return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
发现不行,不是版本的问题,让我怀疑是否连接成功,将上诉代码修改为:
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send(b'account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
打印信息如上,配置生效了。debug 模式也显示 Connected。证明连接上了。目前是 send 函数有问题,3.x 版本的 send 函数有问题。通过排查发现是 send 函数体里面的 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) 这个调用类函数有问题,接收的参数是 str 类型,但是我们传的类型是 byte 类型,将上述代码修改后又会报错
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))
print(producer.config)##打印配置信息
future = producer.send('account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}))
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
运行后继续报错
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
正确的 send 数据到 Kafka 代码如下:
# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='IP:9092')
future = producer.send('account', json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
"info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))