RocketMQ简单实例搭建 - 21IC中国电子网

git mq

RocketMQ简单实例搭建

2019-11-21
779次浏览
1、安装RocketMQ(在Linux环境下,我用的是Centos6.5)

安装RocketMQ需要jdk1.6以上, maven,git环境,以上环境自行百度命令安装。

git clone https://github.com/alibaba/RocketMQ.git  ##从github上下载RocketMQ开源项目

cd RocketMQ  ##进入文件夹

sh install.sh  ##开始安装

安装完之后可以看到下图这样:其中可以看到一个符号链接devenv如红框所示
这里写图片描述
然后

cd devenv/bin  ##进入链接的目录下的bin目录

nohup sh mqnamesrv -n "121.42.179.195:9876" &  ##配置nameserver,121.42.179.195是本机ip,也就是服务器外网地址

nohup sh mqbroker -n "121.42.179.195:9876" &  ##配置broker,121.42.179.195同上

之后

cat nohup.out

在输出的最低端,可以看到红框中的两句话则说明nameserver和broker启动成功。
这里写图片描述
如果服务器内存不够,你就会启动失败,可以修改runbroker.sh脚本(mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker)的JAVA_OPT参数

vi runbroker.sh 

我阿里云内存小,我就改成这样

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
2、编写Consumer和Producer测试类

首先需要的jar包如下:



    4.0.0

    groupId
    RocketMQ
    1.0-SNAPSHOT
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.6
                    1.6
                
            
        
    

    RocketMQTest
    http://maven.apache.org

    
        UTF-8
    

    
        
            com.alibaba.rocketmq
            rocketmq-client
            3.0.10
        
        
            com.alibaba.rocketmq
            rocketmq-all
            3.0.10
            pom
        
        
            ch.qos.logback
            logback-classic
            1.1.1
        
        
            ch.qos.logback
            logback-core
            1.1.1
        
        
            junit
            junit
            4.10
            test
        
    

Consumer类:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "rmq-group");

        consumer.setNamesrvAddr("121.42.179.195:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

Producer类:

public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("121.42.179.195:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000);  //每秒发送一次MQ
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

之后分别启动,Consumer效果如下
这里写图片描述
这里需要注意,
默认情况下,一台服务器只能启动一个Producer或Consumer实例,所以如果需要在一台服务器启动多个实例,需要设置实例的名称,如要再建一个producer:
producer.setNamesrvAddr(“121.42.179.195:9876”);
producer.setInstanceName(“Producer2”);

我要点评

评论暂时关闭。
幸运飞艇官网 吉林快3 海鸥娱乐系统 一分时时彩 北京幸运28 快乐赛车投注 内蒙古快3计划 德国时时彩 快乐赛车 秒速时时彩