您现在的位置是:首页 > 博客日记 > GoLang GoLang

go RabbitMQ封装以及使用

2019-09-17 17:47:26 【GoLang】 人已围观

封装RabbitMQ底层方法

  1. import (
  2. "bytes"
  3. "errors"
  4. "github.com/streadway/amqp"
  5. "strings"
  6. )
  7. var conn *amqp.Connection
  8. var channel *amqp.Channel
  9. var topics string
  10. var nodes string
  11. var hasMQ bool = false
  12. type Reader interface {
  13. Read(msg *string) (err error)
  14. }
  15. // 初始化 参数格式:amqp://用户名:密码@地址:端口号/host
  16. func SetupRMQ(rmqAddr string) (err error) {
  17. if channel == nil {
  18. conn, err = amqp.Dial(rmqAddr)
  19. if err != nil {
  20. return err
  21. }
  22. channel, err = conn.Channel()
  23. if err != nil {
  24. return err
  25. }
  26. hasMQ = true
  27. }
  28. return nil
  29. }
  30. // 是否已经初始化
  31. func HasMQ() bool {
  32. return hasMQ
  33. }
  34. // 测试连接是否正常
  35. func Ping() (err error) {
  36. if !hasMQ || channel == nil {
  37. return errors.New("RabbitMQ is not initialize")
  38. }
  39. err = channel.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil)
  40. if err != nil {
  41. return err
  42. }
  43. msgContent := "ping.ping"
  44. err = channel.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{
  45. ContentType: "text/plain",
  46. Body: []byte(msgContent),
  47. })
  48. if err != nil {
  49. return err
  50. }
  51. err = channel.ExchangeDelete("ping.ping", false, false)
  52. return err
  53. }
  54. // 发布消息
  55. func Publish(topic, msg string) (err error) {
  56. if topics == "" || !strings.Contains(topics, topic) {
  57. err = channel.ExchangeDeclare(topic, "topic", true, false, false, true, nil)
  58. if err != nil {
  59. return err
  60. }
  61. topics += " " + topic + " "
  62. }
  63. err = channel.Publish(topic, topic, false, false, amqp.Publishing{
  64. ContentType: "text/plain",
  65. Body: []byte(msg),
  66. })
  67. return nil
  68. }
  69. // 监听接收到的消息
  70. func Receive(topic, node string, reader func (msg *string)) (err error) {
  71. if topics == "" || !strings.Contains(topics, topic) {
  72. err = channel.ExchangeDeclare(topic, "topic", true, false,false, true, nil)
  73. if err != nil {
  74. return err
  75. }
  76. topics += " " + topic + " "
  77. }
  78. if nodes == "" || !strings.Contains(nodes, node) {
  79. _, err = channel.QueueDeclare(node, true, false,false, true, nil)
  80. if err != nil {
  81. return err
  82. }
  83. err = channel.QueueBind(node, topic, topic, true, nil)
  84. if err != nil {
  85. return err
  86. }
  87. nodes += " " + node + " "
  88. }
  89. msgs, err := channel.Consume(node, "", true, false, false, false, nil)
  90. if err != nil {
  91. return err
  92. }
  93. go func() {
  94. //fmt.Println(*msgs)
  95. for d := range msgs {
  96. s := bytesToString(&(d.Body))
  97. reader(s)
  98. }
  99. }()
  100. return nil
  101. }
  102. // 关闭连接
  103. func Close() {
  104. _ = channel.Close()
  105. _ = conn.Close()
  106. hasMQ = false
  107. }
  108. func bytesToString(b *[]byte) *string {
  109. s := bytes.NewBuffer(*b)
  110. r := s.String()
  111. return &r
  112. }

压入RabbitMQ队列

  1. func Mq() {
  2. err := mq.Publish("ceshi", "当前时间:"+time.Now().String())
  3. if err != nil {
  4. fmt.Println("err03 : ", err.Error())
  5. }
  6. mq.Close()
  7. }


关注TinyMeng博客,更多精彩分享,敬请期待!
 

很赞哦! ()