AMQP-CPP代码阅读

2020-09-01 | Tags: 源码

AMQP-CPP代码阅读

起源

AMQP-CPP项目地址是一个rabbitmq的c++客户端,已知在我们的项目里稳定运行(大概),但是最近在发送大规模量的数据的时候会有一些莫名奇妙的错误,查了一些资料之后初步认为是发送消息数据的socket代码有些问题,就用了几天好好看一下代码,希望可以提个pr。

记录

我们项目里用了2.8版本,所以先clone下来,切换到tag v2.8.0,这边主要看一下发送消息相关的代码。根据调用顺序一条一条往下看。

测试程序

首先我的测试程序大致是这样的:

class MyEventHandler: public AMQP::LibEventHandler {        
    public:     
    MyEventHandler(struct event_base *evbase) : AMQP::LibEventHandler(evbase) {}                                                                                            
    virtual void onError(AMQP::TcpConnection *connection, const char *message) {
        std::cout << "ERROR: " << message << std::endl;
      }

    virtual void onHeartbeat(AMQP::TcpConnection *connection) {
          std::cout << "heartbeat"  << std::endl;
          connection->heartbeat();
      }
  };

int main() {
    EventTask event_task;
    MyEventHandler handler(event_task.evbase());
    AMQP::TcpConnection connection{&handler, AMQP::Address("address")};
    std::thread event_thread{&EventTask::Run, &event_task};

    AMQP::TcpChannel channel{&connection};

    while(true) {
        channel.publish("", "test_reply", message);
        usleep(10000);
    }
    event_task.Stop();
    event_thread.join();
}

这里EventTask是我封装的一个的libevent运行线程类,因为AMQP-CPP支持使用libenvet进行数据传输,并且使用React模式进行使用,因此我这边为了在主线程中按需发送消息,使用了std::thread event_thread{&EventTask::Run, &event_task};将libevent的消息loop放到了单独的线程中去。

主线程

这边我们先看主线程中channel.publish的实现。 头文件在include/channel.h中,实现在src/channelimpl.cpp中,不得不说注释写的不少

bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope)
{
    // we are going to send out multiple frames, each one will trigger a call to the handler,
    // which in turn could destruct the channel object, we need to monitor that
    Monitor monitor(this);

    // @todo do not copy the entire buffer to individual frames

    // send the publish frame
    if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false;

    // channel still valid?
    if (!monitor.valid()) return false;

    // send header
    if (!send(BasicHeaderFrame(_id, envelope))) return false;

    // channel and connection still valid?
    if (!monitor.valid() || !_connection) return false;

    // the max payload size is the max frame size minus the bytes for headers and trailer
    uint32_t maxpayload = _connection->maxPayload();
    uint64_t bytessent = 0;

    // the buffer
    const char *data = envelope.body();
    uint64_t bytesleft = envelope.bodySize();

    // split up the body in multiple frames depending on the max frame size
    while (bytesleft > 0)
    {
        // size of this chunk
        uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);

        // send out a body frame
        if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false;

        // channel still valid?
        if (!monitor.valid()) return false;

        // update counters
        bytessent += chunksize;
        bytesleft -= chunksize;
    }

    // done
    return true;
}

这边monitor有关的都是做连接状态检测的,而且里面逻辑也比较绕,就不具体展开了,这个函数里面关键的是三个send,分别是send(BasicPublishFrame(_id, exchange, routingKey))send(BasicHeaderFrame(_id, envelope))send(BodyFrame(_id, data + bytessent, chunksize)),他们形式都是一样的,通过多态不同的Frame建立不同的数据帧,然后通过send函数进行发送。

这边是三个Frame的关系图

classDiagram class BasicPublishFrame { string exchange string routingKey BasicPublishFrame(int channel, string exchange, string routingKey) } class BasicHeaderFrame { BasicHeaderFrame(int channel, Metadata metadata) int bodySize Metadata metadata } class BodyFrame { BodyFrame(int channel, int size, char* payload) char* payload type() * override } class ExtFrame { ExtFrame(int channel, int size) fill(OutBuffer& buffer) * override char* payload } class BasicFrame { BasicFrame(int channel, int size) uint16_t classID() } BasicFrame <|-- BasicPublishFrame ExtFrame <|-- BodyFrame BasicFrame <|-- BasicHeaderFrame class MethodFrame { MethodFrame(int channel, int size) fill(OutBuffer& buffer) * override type() * override } MethodFrame <|-- BasicFrame class Frame { Frame() fill(OutBuffer& buffer) * } Frame <|-- ExtFrame ExtFrame <|-- MethodFrame

这里的作用是将不同的信息填充到Frame中,以方便后续拼二进制数据结构,其中BasicPublishFrame记录了exchange, routingKey的信息,BasicHeaderFrame记录了消息的props信息,BodyFrame记录了实际的消息内容。

然后在看ChannelImpl::send的代码,其中的关键是connection->send(frame)。跟踪进去看看是ConnectionImpl::send,会进行一系列的状态判断,

然后通过的话会执行PassthroughBuffer buffer(parent, handler, frame)。这里的parent类型是Connection,handler类型是ConnectionHandler,frame是我们刚刚传进来的frame。

然后看看那PassthroughBuffer是个啥,以下是PassthroughBuffer的UML图

classDiagram class PassthroughBuffer { Connection* connection ConnectionHandler* handler append(void* data, size_t size)* override PassthroughBuffer(connection, handler, frame) ~PassthroughBuffer() flash() } class OutBuffer { append(void* data, size_t size)* add(...) } OutBuffer <|-- PassthroughBuffer

首先执行PassthroughBuffer的初始化函数,除了connection和handler的赋值之外,执行了一句frame.fill(*this); 这里的fill根据不同的frame实例执行相应的函数实现,函数的形式为virtual void Frame::fill(OutBuffer& buffer),里面实现基本都是调用buffer.add(...),作用是将之前构建的frame的相应内容使用buffer的add函数处理。这里的OutBuffer::add函数调用了OutBuffer::append,然后多态成传进来的PassthroughBuffer::append

PassthroughBuffer::append代码如下

virtual void append(const void *data, size_t size) override
{
    // flush existing buffers if data would not fit
    if (_size > 0 && _size + size > 4096) flush();

    // if data would not fit anyway, we send it immediately
    if (size > 4096) return _handler->onData(_connection, (const char *)data, size);

    // copy data into the buffer
    memcpy(_buffer + _size, data, size);

    // update the size
    _size += size;
}

大致的意思是把frame.fill进来的数据攒一下,小于4096B的话放在buffer里暂存,然后一次性调用handler->onData,提升传输效率。(这里的逻辑写的有点绕,第一次看的时候还以为有问题,会把数据搞乱,后面debug了一下才发现是没问题),最后如果还有多的数据没有调用handler->onData则在PassthroughBuffer的析构函数里一次性处理。

这边评价一下这个代码,个人感觉比较冗余,一个很简单的buffer积累后整体onData处理的逻辑,使用了一整个类来做这个事情,还使用了一个继承类。直接导致了frame,buffer这两个实例的函数来回调用,对看代码的人真是一个考验。

接下来就可以看handler->onData的代码了,找到ConnectionHandler::onData发现这个是一个基类的虚函数,这里我们就要看handler的实际类型,看看ConnectionIpml中handler的初始化函数,

形式为

ConnectionImpl::ConnectionImpl(Connection *parent, ConnectionHandler *handler, const Login &login, const std::string &vhost) :_parent(parent),_handler(handler), _login(login),_vhost(vhost)

这个函数是由Connection的初始化函数调用的。

形式为

Connection::Connection(ConnectionHandler *handler, const Login &login, const std::string &vhost) : _implementation(this, handler, login, vhost) {}

这里 this(Connection)传给了parent(Connection), handler(ConnectionHandler)传给了handler(ConnectionHandler),还没什么问题,在往前看,调用这个初始化函数的代码

TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_state(new TcpResolver(this, address.hostname(), address.port(), handler)),
_connection(this, address.login(), address.vhost()) {}

这里就比较迷惑了,先不看TcpResolver的部分,这个_connection(this, address.login(), address.vhost())中this是TcpConnection,却传给了ConnectionHandler *handler,然后在看了一下TcpConnection的定义

class TcpConnection :
private ConnectionHandler,
private Watchable

TcpConnection是private继承ConnectionHandler的,也不知道这个脑洞是怎么样的。

而且更进一步的,private继承是不能做隐式类型转换的,这里是怎么把this(TcpConnection)传给handler(ConnectionHandler)的呢?这边整理了一下逻辑,大概简化一下是这样的:

Class C {
    int c = 3;
    C(A* obj) {
        c = obj->a;
    }
}

Class A {
    int a = 1;
}

Class B : private A {
    int b = 2;
    C c_;
    B(): c_(this) {};
}

int main() {
    B b;
    cout << b.c_.c;
    return 0
}

这边编译和执行都是没问题的,打印结果是1。这边在B的初始化函数中初始化C的时候就把B隐式转换成了A。对于这个操作,我的解释为,当B执行初始化函数的时候,因为继承A,所以先隐式执行了A的初始化函数,然后执行c_(this),这个时候B()还没执行完,因此B的this只有A的部分,这个this也不被认为是一个完整的B,因此可以传给C(A* obj)并正常执行......。

这个怎么说呢,感觉太hack了,我自己肯定是不会这么写的。对于private继承,Effective C++里写了,可以转化成多写一个成员类并继承原有Base类的形式结构上会清晰许多。这边这么写也应该是有他自己的考量,但是这个实现简直太绕了,给看代码的人带来很大的麻烦,我甚至一度以为这块并不是真正执行的代码,还有另一个继承类的代码在执行......。

搞完了这一大堆的事情之后,我终于知道处理数据的handler->onData语句中实际调用 的是TcpConnection::onData函数,翻到函数实现后发现这个只执行了一句_state->send(buffer, size),这边这个state就是

TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_state(new TcpResolver(this, address.hostname(), address.port(), handler)),
_connection(this, address.login(), address.vhost()) {}

_state(new TcpResolver(this, address.hostname(), address.port(), handler))的这部分,又是一番折腾之后发现,这边这个state也是一个虚基类(TcpState),这边初始化的时候先把state赋值成派生类TcpResolver,这个派生类负责与mq进行tcp连接的操作,当连接完成后state会被替换成另一个派生类TcpConnected进行实际的消息数据发送操作。

找到TcpConnected::send,实现如下

virtual void send(const char *buffer, size_t size) override
{
    // is there already a buffer of data that can not be sent?
    if (_out) return _out.add(buffer, size);

    // there is no buffer, send the data right away
    auto result = ::send(_socket, buffer, size, AMQP_CPP_MSG_NOSIGNAL);

    // number of bytes sent
    size_t bytes = result < 0 ? 0 : result;

    // ok if all data was sent
    if (bytes >= size) return;

    // add the data to the buffer
    _out.add(buffer + bytes, size - bytes);

    // start monitoring the socket to find out when it is writable
    _handler->monitor(_connection, _socket, readable | writable);
}

这里是一个典型的使用event发送数据的过程,如果发送buffer(_out)里没有数据,就直接进行一次socket传输,如果有剩下的数据,就放到out里,然后添加一个write的event,触发libevent的写回调函数。如果out里还有数据没有发送完,就把新的数据直接加到out里等待libevent处理。

这里的handler->monitor就是这个框架进行二次封装之后触发write的event的方法。

到这里我们在主线程中调用channel.publish("", "test_reply", message);发送消息的过程就结束了,对于一次性没有发送完的数据,会触发libevent分发线程的回调函数,接下来看一下相关代码。

libevent分发线程

前面有许多和建立event有关的代码,这里就忽略了,直接在include/libevent.h中找到watcher::callback()

static void callback(evutil_socket_t fd, short what, void *connection_arg)
{
    // retrieve the connection
    TcpConnection *connection = static_cast<TcpConnection*>(connection_arg);

    // setup amqp flags
    int amqp_flags = 0;
    if (what & EV_READ)
        amqp_flags |= AMQP::readable;
    if (what & EV_WRITE)
        amqp_flags |= AMQP::writable;

    // tell the connection that its filedescriptor is active
    connection->process(fd, amqp_flags);
}

这边可以看到判断了一下传进来的what就是read和write的flag,然后就调用了connection->process(fd, amqp_flags);

这边的connection就是单纯的TcpConnection,找一下代码,在TcpConnection::process中调用了state->process 这里的state就是上文中会派生为TcpResolverTcpConnected的东西,这边的话直接看TcpConnected::process的相关代码

// can we write more data to the socket?
if (flags & writable)
{
    // send out the buffered data
    auto result = _out.sendto(_socket);

    // are we in an error state?
    if (result <  0 && reportError()) return nextState(monitor);

    // if buffer is empty by now, we no longer have to check for 
    // writability, but only for readability
    if (!_out) _handler->monitor(_connection, _socket, readable);
}

这边就是经过tcp进行实际发送数据的代码了,经过检查也没什么问题。

其实到这里这边发送大规模量的数据的时候会有一些莫名奇妙的错误的问题以及找到了,因为我使用的是主线程发送,libevent单独线程进行回调,涉及到两个线程,但是源码中完全没有多线程保护相关的代码,因此当消息数据规模变大TcpConnected::send中第一次send不能完全把数据发送而要放入out中缓存的时候,两个线程会竞争out及相关的变量资源,导致了各种莫名其妙的错误...。

解决方案

对于这个原因感觉还是挺失望的,本以为可以找个bug提个pr,没想到是自己的用法不对...,不过document里也没说多线程是否安全,本来想当然的以为这样的用法是没错的,可见使用第三方代码之前有条件的话还是稍微读一下会比较好。

既然如此,就要找点方法把这个使用的问题解决掉。

  1. 一个直观的想法就是把两个线程中所有产生冲突的地方使用mutex锁起来,保证没有冲突,这样调用的代码不用修改就可以正常使用了。不过看了下源码,纠葛实在太复杂,涉及的变量也非常多,改完估计就崩了,这个方案还是pass了。
  2. 既然是两个线程中参与发数据导致的冲突,那就想办法把发消息的过程整合到libevent的循环过程中去。一个比较通用的方法是建立一个管道,使用libevent来额外监听管道的out端fd,当需要发消息的时候,主线程往管道里写消息数据,libevent监听到消息数据,在回调函数中把消息再publish,这样所有AMQP-CPP的发消息过程都在一个线程中,就不会有问题了。
  3. 还有一个方案是既然是两个线程参与导致的问题,那就把AMQP-CPP换掉,使用一个阻塞式的rabbitmq客户端,用一个线程解决这个问题。那就使用官方推荐的rabbitmq-c,一个c实现的rabbitmq客户端。简单看了一下代码,非常的清爽,发送的流程符合人类的顺序认知,也没有AMQP-CPP那么多class的弯弯绕,测了一下性能也很好。

最后取舍了一下,因为AMQP-CPP实现的实在太绕了,而且里面有两次莫名的内存拷贝,如果用管道的话不可避免的还要增加一次内存拷贝的过程。因此对于发送消息的代码,决定修改为rabbitmq-c来实现。之后看时间,把接收消息部分也用rabbitmq-c代替。

由于我们自己对消息队列的调用又包了一层,因此修改成本不高,在我们自己的实现里把底层的AMQP-CPP调用修改为rabbitmq-c就完成了,实际测试一下,运行效率很好,问题解决!

总结

当初选型的时候还比较年轻,对OO还是比较有好感,加上AMQP-CPP的实现第一观感的确不错,可以写出类似这样

channel.commitTransaction()
    .onSuccess([]() {
        // all messages were successfully published
    })
    .onError([](const char *message) {
        // none of the messages were published
        // now we have to do it all over again
    });

的连环注册回调的代码。简单看了下代码的风格也非常不错。

经过实际使用之后,还是发现不少小问题,花了不少精力来解决。然后现在又仔细看了一下代码,发现内部的相互调用是如此的复杂,还多了两次无谓的内存拷贝。虽然代码写的如此复杂肯定是作者有他自己对于兼容性,泛用性和扩展性的考量,也无形中增大了代码阅读的难度,运行效率上也有不小的影响。因此最后还是打算逐步使用rabbitmq-c把这部分的代码代替掉。

这个代码里有关于回调注册的代码风格的确不错,也学习了这样的写法。但是关于管理connection和handler的部分,还是太复杂了,还学到了趁初始化还没完成就把对象传出去的写法......,个人感觉不是一个很好的例子。之后有空的话可以尝试整理一下相关class的关系,给出一个更清晰的结构。