开始的开始,我先来吐槽一下这个RocketMQ,既然是开源项目,代码质量自不用说,但是文档也是很重要的,RocketMQ的文档首页竟然只有一则招聘广告。。。

安装运行

1
2
3
git clone https://github.com/alibaba/RocketMQ.git
cd RocketMQ
sh install.sh

把RocketMQ的目录放入你的bash_profile或者zsh_profile中:

1
2
echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile
source ~/.bash_profile

然后启动Name Server:

1
2
cd devenv/bin
sh mqnamesrv

Name Server会运行在9876端口上。然后我们启动Broker,我们这里只启动一个Broker,也就是单Master模式:

1
2
3
4
sh mqbroker -n localhost:9876

输出:
The broker[mazhibindeMacBook-Pro.local, 172.28.0.118:10911] boot success. serializeType=JSON and name server is localhost:9876

Broker的日志可以在~/logs/rocketmqlogs/broker.log中查看。

这样就可以开始编写生产者和消费者的代码了。RocketMQ提供了测试的工具,可以快速发送消息和接受消息:

1
2
3
4
5
6
7
export NAMESRV_ADDR=localhost:9876

# 发送消息
sh tools.sh com.alibaba.rocketmq.example.quickstart.Producer

# 接受消息
sh tools.sh com.alibaba.rocketmq.example.quickstart.Consumer

如果你看到很多输出,说明安装成功了。

编写生产者和消费者

添加依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.5</version>
</dependency>

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Producer {

public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("localhost:9876");

try {
producer.start();

Message msg = new Message("PushTopic","push","1","Just for test".getBytes());

SendResult result = producer.send(msg);
System.out.println(result.toString());

} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer {

public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("localhost:9876");

try {
consumer.subscribe("PushTopic", "push");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Message msg = list.get(0);
System.out.println(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}

}
}

启动生产者,输出:

1
SendResult [sendStatus=SEND_OK, msgId=AC1C007620F629453F44340972120000,offsetMsgId=AC1C007600002A9F0000000000021AA2, messageQueue=MessageQueue [topic=PushTopic, brokerName=mazhibindeMacBook-Pro.local, queueId=1], queueOffset=0]

启动消费者输出:

1
MessageExt [queueId=1, storeSize=182, queueOffset=0, sysFlag=0, bornTimestamp=1470897034259, bornHost=/172.28.0.118:50525, storeTimestamp=1470897034305, storeHost=/172.28.0.118:10911, msgId=AC1C007600002A9F0000000000021AA2, commitLogOffset=137890, bodyCRC=1001808822, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=1, CONSUME_START_TIME=1470897111881, UNIQ_KEY=AC1C007620F629453F44340972120000, WAIT=true, TAGS=push}, body=13]]

参考资料