问题背景
项目过程中可能会遇到,java 从数据库取了很多数据,但 java 本身不方便处理,所以传递给 python 去处理,如何传?
解决方法
调研了一些方法,譬如可以直接在 java 中调研 python,传入参数的方式,或者数据先存储到 excel 中,然后 python 去 excel 中读取,但是这些方法还是存在不便利和性能会有限制。之后,我尝试了增加个 rabbitmq 消息中间件进行不同语言之间的通信交互。
MQ 是消费 - 生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 和 JMS 类似,但不同的是 JMS 是 SUN JAVA 消息中间件服务的一个标准和 API 定义,而 MQ 则是遵循了 AMQP 协议的具体实现和产品。RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。
上图是简单抽象的描述,具体到 rabbitmq 有很多详细的概念,这里不一一详述。当然,我们上面也说过 ,rabbitmq 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念,包括:Message(消息)、Publisher(消息生产者),Exchange(交换器),Binding(绑定),Queue(消息队列),Connection(网络连接),Channel(信道),Consumer(消息消费者),Virtual Host(虚拟主机),Broker(消息队列服务器)。
实际例子
生产者使用 java,往队列中写入待处理的消息。
java 代码(即生产者)
public class Producer1 {
public final static String QUEUE_NAME="data";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException{
long startTime=System.currentTimeMillis(); //获取开始时间
//数据准备
List<testObj> list = MockData.mockData();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("192.168.43.211");
factory.setUsername("test");
factory.setPassword("123456");
//factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
List<testObj> list1 = new ArrayList<testObj>();
list1 = new MockData().mockData();
//发送消息到队列中
ObjectMapper mapper=new ObjectMapper();
String message = mapper.writeValueAsString(list1);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
long endTime=System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间: "+(endTime-startTime)/1000.0+"s");
}
}
消费者使用 python 代码,从消息队列中获取 java 生成者发送的消息进行处理。
python 代码(即消费者)
if __name__ == '__main__':
starttime = datetime.datetime.now()
# 创建socket链接
credentials = pika.PlainCredentials('test', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.43.211', 5672, '/', credentials))
# 创建管道
channel = connection.channel()
# 创建队列
queue_name = 'data'
channel.queue_declare(queue_name)
# 如果接受到消息就调用回调函数,准备接受消息
# 声明回调函数
def callback(ch, method, properties, body):
message = json.loads(body.decode())
endtime = datetime.datetime.now()
print("拿数据时间:{}".format(endtime - starttime))
list = message
for i in list:
print(i)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
我们可以看到 python 顺利拿到 java 传输的数据,至于你拿到数据后,后面要做什么复杂的操作、分析,那就是看你自己的需要了。
另外,你不妨尝试把模拟数据条数提高到 10 万条甚至更多, rabbitmq 基本能保持稳定有效。
小结
在解决不同语言程序之间数据传输问题上,方法各异,rabbitmq 是一个很好的选择,处理数据量大,且从时间效率上来说也快,而且很方便的进行系统的解耦。
更多内容可以学习《测试工程师 Python 工具开发实战》书籍