一、RabbitMQ的主要組件
1. RabbitMQ Server:消息代理服務(wù)器,負(fù)責(zé)接收、存儲(chǔ)和轉(zhuǎn)發(fā)消息。
2. RabbitMQ Client:客戶端庫(kù),用于與RabbitMQ服務(wù)器進(jìn)行通信。
3. Exchange:消息交換機(jī),負(fù)責(zé)將生產(chǎn)者發(fā)送的消息路由到相應(yīng)的隊(duì)列。
4. Queue:消息隊(duì)列,用于存儲(chǔ)生產(chǎn)者發(fā)送的消息。
5. Virtual Host:虛擬主機(jī),用于隔離不同的應(yīng)用環(huán)境。
6. Connection:連接,表示客戶端與RabbitMQ服務(wù)器之間的通信通道。
7. Channel:通道,表示客戶端與Exchange之間的通信通道。
8. Binding:綁定,用于將Exchange與Queue關(guān)聯(lián)起來(lái),定義消息路由規(guī)則。
9. Producer:生產(chǎn)者,負(fù)責(zé)向Exchange發(fā)送消息。
10. Consumer:消費(fèi)者,負(fù)責(zé)從Queue中獲取消息并處理。
二、RabbitMQ的安裝與配置
1. 安裝Erlang環(huán)境:RabbitMQ基于Erlang語(yǔ)言開發(fā),因此需要先安裝Erlang環(huán)境??梢栽L問(wèn)Erlang官網(wǎng)(https://www.erlang.org/downloads)下載并安裝。
2. 安裝RabbitMQ:以Ubuntu為例,可以通過(guò)以下命令安裝RabbitMQ:
sudo apt-get update sudo apt-get install rabbitmq-server
3. 啟動(dòng)RabbitMQ服務(wù):
sudo systemctl start rabbitmq-server
4. 查看RabbitMQ服務(wù)狀態(tài):
sudo systemctl status rabbitmq-server
三、RabbitMQ的基本使用方法
1. 創(chuàng)建用戶和權(quán)限:在RabbitMQ服務(wù)器上執(zhí)行以下命令,創(chuàng)建一個(gè)名為"guest"的用戶,并賦予讀寫權(quán)限:
rabbitmqctl add_user guest guest@localhost rabbitmqctl set_permissions -p / guest ".*" ".*" ".*"
2. 啟動(dòng)管理插件:?jiǎn)⒂霉芾聿寮?,以便通過(guò)Web界面管理RabbitMQ服務(wù)器:
rabbitmq-plugins enable rabbitmq_management
3. 啟動(dòng)管理控制臺(tái):默認(rèn)情況下,管理控制臺(tái)監(jiān)聽在5672端口??梢允褂脼g覽器訪問(wèn)"http://localhost:15672",使用剛才創(chuàng)建的"guest"用戶登錄。
4. 創(chuàng)建交換機(jī)和隊(duì)列:在管理控制臺(tái)中,點(diǎn)擊"Exchanges",然后點(diǎn)擊"Add exchange",輸入交換機(jī)的名稱和類型(如direct),其他選項(xiàng)保持默認(rèn)即可。接著點(diǎn)擊"Queues",選擇剛剛創(chuàng)建的隊(duì)列(如果沒(méi)有創(chuàng)建隊(duì)列,請(qǐng)先創(chuàng)建一個(gè)),將其綁定到剛剛創(chuàng)建的交換機(jī)上。這樣就完成了交換機(jī)和隊(duì)列的創(chuàng)建。
5. 發(fā)送和接收消息:在管理控制臺(tái)中,點(diǎn)擊"Queues",找到剛剛創(chuàng)建的隊(duì)列,查看消息狀態(tài)(如等待中、已出隊(duì)等)。點(diǎn)擊"Messages",可以查看該隊(duì)列中的消息內(nèi)容。要發(fā)送一條消息,可以在生產(chǎn)者端執(zhí)行以下代碼:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='hello', exchange_type='direct')
channel.queue_declare(queue='hello')
channel.queue_bind(exchange='hello', queue='hello')
message = "Hello World!"
channel.basic_publish(exchange='hello', routing_key='hello', body=message)
print(" [x] Sent %r" % message)
connection.close()要在消費(fèi)者端接收消息,可以使用以下代碼:
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='hello', exchange_type='direct')
result = channel.queue_declare(queue='hello', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='hello', queue=queue_name)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()