RabbitMQ Connection Issues with TcpConnection
When using RabbitMQ, we encountered connection issues with the TcpConnection class. Despite the absence of any error messages, the connection failed to establish, and creating a queue and other information using rabbitmqctl proved fruitless. The official demo file provided on GitHub lacked relevant examples of TcpConnection usage, and online resources were scarce.
Initial Investigation
After exhausting online resources, we decided to delve into the TcpConnection source code. Using tcpdump to confirm the connection request, we observed that the client indeed sent a connection request to the RabbitMQ server, which initiated a three-way handshake. However, the server responded with an RST packet, resulting in a handshake_timeout error.
Review of RabbitMQ Server Logs
Upon reviewing the RabbitMQ server logs, we discovered that the issue was not related to authentication but rather a failed connection attempt. The server’s heartbeat interval, which is set to 10 seconds, coincidentally matched the time it took for the client to receive the RST packet.
Code Analysis
We then analyzed the TcpConnection code and found that the problem lay in the monitor function being called twice. This led us to explore the TcpConnection source code and discovered that the monitor function was called on the log printing.
TcpConnection Class
The TcpConnection class has a constructor and a destructor, both of which call the monitor function. This is done to improve the efficiency of thread operations, allowing the actual connection to be performed in multiple threads.
// Constructor
TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler)
: TcpState(connection, handler),
_socket(socket),
_out(std::move(buffer)),
_in(4096)
{
// If there is already an output buffer, we have to send it out first
if (_out) _out.sendto(_socket);
// Tell the handler to monitor the socket, if there is an out
_handler->monitor(_connection, _socket, _out.readable() | _out.writable());
}
// Destructor
virtual ~TcpConnected() noexcept
{
// We no longer have to monitor the socket
_handler->monitor(_connection, _socket, 0);
// Close the socket
close(_socket);
}
Modified Code
After several attempts to modify the code, we finally used a pointer to operate on the function, which solved the problem. The successful use of TcpConnection connection on the RabbitMQ server is demonstrated in the following code snippet:
int Broker::init(std::string host, int port, std::string username, std::string userpasswd, int svrid)
{
// Create an instance of your own tcp handler
_handle = new DSBrokerMessageHandle();
// Address of the server
AMQP::Address address(host, port, AMQP::Login(username, userpasswd), "/");
// Create a AMQP connection object
_connection = new AMQP::TcpConnection(_handle, address);
// And create a channel
_channel = new AMQP::TcpChannel(&_connection);
// Define a callback function for receiving messages
auto receiveMessageCallback = [=](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
// _channel->ack(deliveryTag);
};
// Define a callback function for queue binding
AMQP::QueueCallback callback = [=](const std::string &name, int msgcount, int consumercount)
{
_channel->bindQueue("service", name, name);
_channel->bindQueue("service", name, "monitor");
_channel->bindQueue("service", name, "heartbeat");
_channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback);
};
// Define a callback function for queue declaration
AMQP::SuccessCallback success = [svrid, this, callback]()
{
char que[4] = {'\0'};
ACE_OS::itoa(svrid, que, 10);
std::string quename(que);
_channel->declareQueue(quename, AMQP::durable).onSuccess(callback);
};
// Use the channel object to call the AMQP method you like
_channel->declareExchange("service", AMQP::fanout).onSuccess(success);
return 0;
}
This article was originally published on Tencent Cloud Media Sharing Plan and is now available for sharing and discussion.