Thrift server 端 processor 笔记

摘要

在 Thrift 服务端的 processor,server,protocol,transport 四层中,不仅 server 的选择, processor/processorFactory 也会影响到服务端的并发。TMultiplexedProcessor 可以让一个服务端同时提供多种服务,但是对于同一种服务,TMultiplexedProcessor 并不能提供并发时的数据安全保证。


本文用到的 Thrift 服务定义:

service Calculate
{
    i32 add(1: i32 i1, 2: i32 i2)
}

processor 和 processorFactory

编译上述服务定义文件,得到 Calculate.h, 关注如下几个类:

class CalculateIf
{
public:
    virtual int32_t add(const int32_t i1, const int32_t i2) = 0;
// ...
};

class CalculateIfFactory
{
public:
    virtual CalculateIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0;
    virtual void releaseHandler(CalculateIf* /* handler */) = 0;
// ...
};

/*
// from thrift/include/thrift/TDispatchProcessor.h
class TDispatchProcessor: public TProcessor
{
// ...    
};
*/

class CalculateProcessor: public ::apache::thrift::TDispachProcessor
{
public:
    CalculateProcessor(boost::shared_ptr<CalculateIf> iface);
// ...
};

class CalculateProcessorFactory: public ::apache::thrift::TProcessorFactory
{
public:
    CalculateProcessorFactory(const ::boost::shared_ptr<CalculateIfFactory>& handlerFactory);
// ...
};

CalculateIf 类是实现业务逻辑的最终载体,RPC 服务 add 函数最终由 CalculateIf (的派生类)实现:

class CalculateHandler: public CalculateIf
{
public:
    int32_t add(const int32_t i1, const int32_t i2)
    {
        return i1 + i2;
    }
// ...
};

随后,就可以实例化一个 CalculateProcessor:

boost::shared_ptr<CalculateHandler> handler(new CalculateHandler());
boost::shared_ptr<CalculateProcessor> CalProcessor(handler);

CalProcessor 作为参数之一用于实例化一个 Thrift 服务端。当 Thrift 服务端提供服务时,最终将通过 CalProcessor,调用 handler,提供 add 函数服务。这里 handler 指向一个 new 出来的 CalculateHandler 实例,所以每次提供服务时,Thrift 总会调用到 new 出来的这个 CalculateHandler。如果是在多线程环境下,就会出现多个线程访问这同一个实例,从而产生数据竞争问题(在本例中,可能这个问题不存在)。

所以,如果服务端要提供多线程服务(比如 TThreadedServer, TThreadPoolServer 等),是不能使用 CalculateProcessor 的,而应该使用 CalculateProcessorFactory。CalculateProcessorFactory 的构造函数接受 CalculateIfFactory 指针,在 Thrift 需要一个 handler 提供具体服务时,通过 CalculateIfFactory::getHandler 来新建一个 handler,用完后通过 CalculateIfFactory::releaseHandler 来释放 handler,从而保证对每个服务对象使用不同的 handler,避免数据竞争。

class CalHandlerFactory: public CalculateIfFactory
{
public:
    CalculateIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
    {
        return (new CalculateHandler);
    }
    void releaseHandler(CalculateIf* handler)
    {
        delete handler;
    }
// ...
};

构造 CalculateProcessorFactory:

boost::shared_ptr<CalHandlerFactory> handlerFactory(new CalHandlerFactory());
boost::shared_ptr<CalculateProcessorFactory> CalProcessorFactory(hanlderFactory);

关于 TMultiplexedProtocol 和 TMultiplexedProcessor

通常情况下,一个 Thrift Server 服务端,只可以提供一种服务。通过 TMultiplexedProcessor(用于服务端)和 TMultiplexedProtocol(用于客户端),Thrift 可以让一个服务端,同时提供多种服务。但是,一个客户端依然只能是使用一个服务,要使用多个服务,就要有多个不同的客户端实例。

class TMultiplexedProcessor: public TProcessor
{
public:
    void registerProcessor(const std::string& serviceName, shared_ptr<TProcessor> processor);
// ...
};

class TMultiplexedProtocol : public TProtocolDecorator
{
public:
    TMultiplexedProtocol(shared_ptr<TProtocol> _protocol, const std::string& _serviceName);
// ...
};

按照上文所述的步骤构造好 processor 后,约定一个服务名称,比如 "Calculate",通过 TMultiplexedProcessor::registerProcessor, 向这个多工处理器注册一个具体的服务处理,有多个服务就注册多个,然后将 TMultiplexedProcessor 用于构造 server。

在客户端方面,则同样是按普通流程构造好 protocol 后,将 protocol 和此前约定的服务名称 "Calculate" 传入 TMultiplexedProtocol,然后再使用 TMultiplexedProtocol 作为构造参数生成 client 实例。这样,一个多服务的的 Thrift 服务-客户模型就建立起来了。

虽然采用 TMultiplexedProcessor 的服务端可以同时提供多种不同的服务,但是笔者认为,对于同一种服务,TMultiplexedProcessor 不是并发安全的。原因在于观察 TMultiplexedProcessor::registerProcessor,发现其接受的参数是 TProcessor 而不是 TProcessorFactory。也就是说在同一个服务被并发访问的时候,实际上多个客户端访问的是服务端上的同一个 handler 实例,因此 TMultiplexedProcessor 并不能避免同一个服务被访问时的并发问题。

相关推荐