rocketmq相关的文档参考官方文档:https://github.com/apache/rocketmq/blob/master/docs/cn就可以全面掌握。
特性、框架、部署、demo等等都有详细介绍。不必要在网上找乱七八糟的使用教程来弄,越弄越乱。

下面是本人学习rocketmq中的一些想法和实践:先简单介绍一下rocketmq、搭建起一个本地单机的rocketmq、运行相关测试代码、最后再来讲rocketmq具体的框架结构。因为rocketmq是用java实现得,可以找相关书籍或源码视频来引导深入学习。

一、rocketmq特性

特性:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md

  • 订阅与消费模式的消息队列,可以发送同步消息、异步消息、单向消息、延迟消息、事务消息等。
  • 支持消息过滤、消息失败重试、流量控制、集群高可用、消息保存/恢复高可靠等,而且是基于java的实现。
  • 基本的使用流程(简述):

    • 1.生产者发送消息到rocketmq服务端进行消息持久化等操作;
    • 2.消费者从rocketmq服务端中拉取消息进行消费处理;

    消息

    一般使用一个topic来管理一类的消息,生产者生产某topic的消息,消费者拉取某topic的消息来消费;为了进一步方便管理,还可以给每个消息都设置一个tag,消费者可以更细一步针对tag来进行消息的消费,这也就是消息过滤。

rocketmq中消息的定义:
https://github.com/apache/rocketmq/blob/3ae251751586b940b7467284966ec4fe93f86be1/common/src/main/java/org/apache/rocketmq/common/message/Message.java#L25

// 主要的属性
private String topic; // 消息的主题
private int flag; // tag过滤属性
private Map<String, String> properties;
private byte[] body;  // 消息的内容
private String transactionId; // 事务相关

二、rocketmq服务端搭建(docker本地单机)

自定安装好docker和docker-compose,或者可以参考-》ToolMan Docker《-

1.准备安装环境

# 配rockermq docker的映射路径
sudo mkdir -p /opt/rocketmq/rmqs/logs /opt/rocketmq/rmqs/store /opt/rocketmq/rmq/logs /opt/rocketmq/rmq/store /opt/rocketmq/config
# 开放权限
sudo chmod 777 /opt/rocketmq/rmqs/logs /opt/rocketmq/rmqs/store /opt/rocketmq/rmq/logs /opt/rocketmq/rmq/store /opt/rocketmq/config
# 配置rocketmq的broker运行参数
sudo touch /opt/rocketmq/config/broker.conf
# 运行nameserver、broker和console
sudo touch /opt/rocketmq/docker-compose.yml
cd /opt/rocketmq

2.broker.conf

# /opt/rocketmq/config/broker.conf 配置内容
# 所属集群名称
brokerClusterName=AntiAddiction
# broker名字
brokerName=broker-a
# 0-master;1-slave
brokerId=0
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间点,默认48小时
fileReservedTime=48
# 路由地址配置
# 主从配置等存在多个地址,使用分号分割
# namesrvAddr=rocketmq-nameserverip1:port;rocketmq-nameserverip2:port
# rmqnamesrv是broker同一docker网络下的nameserverip,可直接用docker-compose服务名指定
namesrvAddr=rmqnamesrv:9876
# 启动ip,需要修改为本地运行的ip地址
brokerIP1=192.168.1.22
# Broker 对外服务的监听端口
listenPort=10911
# 是否允许 Broker 自动创建 Topic,开发可设为true:由代码自动生成topic,生产环境设为false:在后台手动生成topic
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 限制的消息大小
maxMessageSize=65536
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

3.docker-compose.yml

# docker-compose.yml文件配置
# 参考:https://github.com/foxiswho/docker-rocketmq/blob/master/rmq/docker-compose.yml
version: '3.5'

services:
  rmqnamesrv:
    image: foxiswho/rocketmq:4.8.0
#    image: registry.cn-hangzhou.aliyuncs.com/foxiswho/rocketmq:4.7.0
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./rmqs/logs:/home/rocketmq/logs
      - ./rmqs/store:/home/rocketmq/store
    environment:
    # 更多系统配置参考:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md#6--%E7%B3%BB%E7%BB%9F%E9%85%8D%E7%BD%AE
      JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
    command: ["sh","mqnamesrv"]
    networks:
        rmq:
          aliases:
            - rmqnamesrv
  rmqbroker:
    image: foxiswho/rocketmq:4.8.0
#    image: registry.cn-hangzhou.aliyuncs.com/foxiswho/rocketmq:4.7.0
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./rmq/logs:/home/rocketmq/logs
      - ./rmq/store:/home/rocketmq/store
      - ./config/broker.conf:/etc/rocketmq/broker.conf
    environment:
    # 更多系统配置参考:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md#6--%E7%B3%BB%E7%BB%9F%E9%85%8D%E7%BD%AE
        JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"  
    command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf","-n","rmqnamesrv:9876"] # ,"autoCreateTopicEnable=true"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng # apacherocketmq/rocketmq-console-ng:2.0.0  可以自行弄个jar包代替使用最新的console
    # https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
    # https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
    container_name: rmqconsole
    ports:
      - 8180:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

4.运行

# 单机运行
docker-compose up -d

标签: Docker, rocketmq

添加新评论