RabbitMQ Connection Issues with TcpConnection

RabbitMQ Connection Issues with TcpConnection

When using RabbitMQ, we encountered connection issues with the TcpConnection class. Despite the absence of any error messages, the connection failed to establish, and creating a queue and other information using rabbitmqctl proved fruitless. The official demo file provided on GitHub lacked relevant examples of TcpConnection usage, and online resources were scarce.

Initial Investigation

After exhausting online resources, we decided to delve into the TcpConnection source code. Using tcpdump to confirm the connection request, we observed that the client indeed sent a connection request to the RabbitMQ server, which initiated a three-way handshake. However, the server responded with an RST packet, resulting in a handshake_timeout error.

Review of RabbitMQ Server Logs

Upon reviewing the RabbitMQ server logs, we discovered that the issue was not related to authentication but rather a failed connection attempt. The server’s heartbeat interval, which is set to 10 seconds, coincidentally matched the time it took for the client to receive the RST packet.

Code Analysis

We then analyzed the TcpConnection code and found that the problem lay in the monitor function being called twice. This led us to explore the TcpConnection source code and discovered that the monitor function was called on the log printing.

TcpConnection Class

The TcpConnection class has a constructor and a destructor, both of which call the monitor function. This is done to improve the efficiency of thread operations, allowing the actual connection to be performed in multiple threads.

// Constructor
TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler)
    : TcpState(connection, handler),
      _socket(socket),
      _out(std::move(buffer)),
      _in(4096)
{
    // If there is already an output buffer, we have to send it out first
    if (_out) _out.sendto(_socket);

    // Tell the handler to monitor the socket, if there is an out
    _handler->monitor(_connection, _socket, _out.readable() | _out.writable());
}

// Destructor
virtual ~TcpConnected() noexcept
{
    // We no longer have to monitor the socket
    _handler->monitor(_connection, _socket, 0);

    // Close the socket
    close(_socket);
}

Modified Code

After several attempts to modify the code, we finally used a pointer to operate on the function, which solved the problem. The successful use of TcpConnection connection on the RabbitMQ server is demonstrated in the following code snippet:

int Broker::init(std::string host, int port, std::string username, std::string userpasswd, int svrid)
{
    // Create an instance of your own tcp handler
    _handle = new DSBrokerMessageHandle();

    // Address of the server
    AMQP::Address address(host, port, AMQP::Login(username, userpasswd), "/");

    // Create a AMQP connection object
    _connection = new AMQP::TcpConnection(_handle, address);

    // And create a channel
    _channel = new AMQP::TcpChannel(&_connection);

    // Define a callback function for receiving messages
    auto receiveMessageCallback = [=](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
    {
        // _channel->ack(deliveryTag);
    };

    // Define a callback function for queue binding
    AMQP::QueueCallback callback = [=](const std::string &name, int msgcount, int consumercount)
    {
        _channel->bindQueue("service", name, name);
        _channel->bindQueue("service", name, "monitor");
        _channel->bindQueue("service", name, "heartbeat");
        _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback);
    };

    // Define a callback function for queue declaration
    AMQP::SuccessCallback success = [svrid, this, callback]()
    {
        char que[4] = {'\0'};
        ACE_OS::itoa(svrid, que, 10);
        std::string quename(que);
        _channel->declareQueue(quename, AMQP::durable).onSuccess(callback);
    };

    // Use the channel object to call the AMQP method you like
    _channel->declareExchange("service", AMQP::fanout).onSuccess(success);

    return 0;
}

This article was originally published on Tencent Cloud Media Sharing Plan and is now available for sharing and discussion.