Java Rabbitmq 刷数据第二篇,采用原始方法给 rabbitmq 刷数据,附带一些初步认识吧

henry · 2019年10月31日 · 1422 次阅读

接上篇,首先感谢陈恒捷大佬和恒温的评论,学到了一个自己不知道的点,然后决定自己尝试下

首先还是先引入 rabbitmq 的依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.6.0</version>
      </dependency>

网上搜索了一些 rabbitmq 的教程,其中感觉讲解比较清晰的文章,放个连接
https://www.cnblogs.com/lfalex0831/p/8963247.html
https://www.jianshu.com/p/6e6821604efc
https://www.jianshu.com/p/cd81afa8ade1 --- 这个是一些好的博客汇总

参考之后直接上代码


import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class SendMessage {
    private final static String QUEUE_NAME = "risk-topic.risk-group";

    public static void main(String[] argv) throws Exception {

        //定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器地址
        factory.setHost("127.0.0.1");
        //设置端口号
        factory.setPort(5672);
        //设置vhost
        factory.setVirtualHost("/test1");
        factory.setConnectionTimeout(10000);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //获取一个连接
        Connection connection = factory.newConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //声明一个队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        File file = new File("D:\\test.txt");//定义一个file对象,用来初始化FileReader
        FileReader reader = new FileReader(file);//定义一个fileReader对象,用来初始化BufferedReader
        BufferedReader bReader = new BufferedReader(reader);//new一个BufferedReader对象,将文件内容读取到缓存
        StringBuilder sb = new StringBuilder();//定义一个字符串缓存,将字符串存放缓存中
        String message = "";
        while ((message =bReader.readLine()) != null) {//逐行读取文件内容,不读取换行符和末尾的空格

            sb.append(message + "\n");//将读取的字符串添加换行符后累加存放在缓存中
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        bReader.close();
        System.out.println("运行完毕" );

    }

}

然后到 rabbitmq 查看结果,成功刷入数据

在调试过程中,遇到的问题
1.获取通道声明队列的时候,需要注意和远程 mq 保持一致

//从连接中获取一个通道
        Channel channel = connection.createChannel();
        //声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.queueDeclare(QUEUE_NAME, false, false, false, null); 直接使用会报错:

"C:\Program Files\Java\jdk1.8.0_202\bin\java.exe" "-javaagent:D:\software\IntelliJ IDEA 2019.1\lib\idea_rt.jar=60469:D:\software\IntelliJ IDEA 2019.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_202\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\rt.jar;D:\BaiduNetdiskDownload\seleniumRabbitmq\target\classes;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-java\3.4.0\selenium-java-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-api\3.4.0\selenium-api-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-chrome-driver\3.4.0\selenium-chrome-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-edge-driver\3.4.0\selenium-edge-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-firefox-driver\3.4.0\selenium-firefox-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-ie-driver\3.4.0\selenium-ie-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-opera-driver\3.4.0\selenium-opera-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-remote-driver\3.4.0\selenium-remote-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-safari-driver\3.4.0\selenium-safari-driver-3.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\selenium-support\3.4.0\selenium-support-3.4.0.jar;C:\Users\Administrator\.m2\repository\cglib\cglib-nodep\3.2.4\cglib-nodep-3.2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-exec\1.3\commons-exec-1.3.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;C:\Users\Administrator\.m2\repository\commons-codec\commons-codec\1.10\commons-codec-1.10.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.5\commons-io-2.5.jar;C:\Users\Administrator\.m2\repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;C:\Users\Administrator\.m2\repository\org\w3c\css\sac\1.3\sac-1.3.jar;C:\Users\Administrator\.m2\repository\net\sourceforge\cssparser\cssparser\0.9.22\cssparser-0.9.22.jar;C:\Users\Administrator\.m2\repository\com\google\code\gson\gson\2.8.0\gson-2.8.0.jar;C:\Users\Administrator\.m2\repository\com\google\guava\guava\21.0\guava-21.0.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\net\sourceforge\htmlunit\htmlunit\2.26\htmlunit-2.26.jar;C:\Users\Administrator\.m2\repository\net\sourceforge\htmlunit\htmlunit-core-js\2.26\htmlunit-core-js-2.26.jar;C:\Users\Administrator\.m2\repository\net\sourceforge\htmlunit\neko-htmlunit\2.25\neko-htmlunit-2.25.jar;C:\Users\Administrator\.m2\repository\org\apache\httpcomponents\httpclient\4.5.3\httpclient-4.5.3.jar;C:\Users\Administrator\.m2\repository\org\apache\httpcomponents\httpcore\4.4.6\httpcore-4.4.6.jar;C:\Users\Administrator\.m2\repository\org\apache\httpcomponents\httpmime\4.5.3\httpmime-4.5.3.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-io\9.4.1.v20170120\jetty-io-9.4.1.v20170120.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-util\9.4.1.v20170120\jetty-util-9.4.1.v20170120.jar;C:\Users\Administrator\.m2\repository\net\java\dev\jna\jna\4.1.0\jna-4.1.0.jar;C:\Users\Administrator\.m2\repository\net\java\dev\jna\jna-platform\4.1.0\jna-platform-4.1.0.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.12\junit-4.12.jar;C:\Users\Administrator\.m2\repository\com\codeborne\phantomjsdriver\1.4.0\phantomjsdriver-1.4.0.jar;C:\Users\Administrator\.m2\repository\org\seleniumhq\selenium\htmlunit-driver\2.26\htmlunit-driver-2.26.jar;C:\Users\Administrator\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\websocket\websocket-api\9.4.3.v20170317\websocket-api-9.4.3.v20170317.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\websocket\websocket-client\9.4.3.v20170317\websocket-client-9.4.3.v20170317.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-client\9.4.3.v20170317\jetty-client-9.4.3.v20170317.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\jetty-http\9.4.3.v20170317\jetty-http-9.4.3.v20170317.jar;C:\Users\Administrator\.m2\repository\org\eclipse\jetty\websocket\websocket-common\9.4.3.v20170317\websocket-common-9.4.3.v20170317.jar;C:\Users\Administrator\.m2\repository\xalan\serializer\2.7.2\serializer-2.7.2.jar;C:\Users\Administrator\.m2\repository\xalan\xalan\2.7.2\xalan-2.7.2.jar;C:\Users\Administrator\.m2\repository\xerces\xercesImpl\2.11.0\xercesImpl-2.11.0.jar;C:\Users\Administrator\.m2\repository\xml-apis\xml-apis\1.4.01\xml-apis-1.4.01.jar;C:\Users\Administrator\.m2\repository\com\rabbitmq\amqp-client\5.6.0\amqp-client-5.6.0.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar" SendMessage
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at SendMessage.main(SendMessage.java:32)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'risk-topic.risk-group' in vhost '/test1': received 'false' but current is 'true', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'risk-topic.risk-group' in vhost '/test1': received 'false' but current is 'true', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
    at java.lang.Thread.run(Thread.java:748)

这里有个 durable, auto_delete,passive 参数不一致,需要保持参数一致性就 ok 了

一些基本概念梳理
Server:又称为 Broker。接收客户端连接,实现 AMQP 的服务器实体。
Connection:连接,应用程序与 Broker 的网络连接。
Channel:信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可建立多个 Channel,每个 Channel 代表一个会话任务。
Message:消息。服务器和应用程序之间传递的数据,本质上就是一段数据,由 Properties 和 Body 组成。
Exchange:交换机。接收消息,根据路由键转发消息到绑定的队列。
Binding:Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key。
Routing key:一个虚拟地址,虚拟机可用它来确定如何路由一个特定消息。
Queue:也称为 Message Queue,消息队列,保存消息并将它们转发给消费者。
Virtual Host:其实是一个虚拟概念。类似于权限控制组,一个 Virtual Host 里面可以有若干个 Exchange 和 Queue,可以用来隔离 Exchange 和 Queue。,同一个 Virtual Host 里面不能有相同名称的 Exchange 和 Queue。但是权限控制的最小粒度是 Virtual Host。

在这里也需要配置 Virtual Host,/test1,需要和远程一致

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册