Thrift server 端 processor 笔记
摘要
在 Thrift 服务端的 processor,server,protocol,transport 四层中,不仅 server 的选择, processor/processorFactory 也会影响到服务端的并发。TMultiplexedProcessor 可以让一个服务端同时提供多种服务,但是对于同一种服务,TMultiplexedProcessor 并不能提供并发时的数据安全保证。
本文用到的 Thrift 服务定义:
service Calculate { i32 add(1: i32 i1, 2: i32 i2) }
processor 和 processorFactory
编译上述服务定义文件,得到 Calculate.h, 关注如下几个类:
class CalculateIf { public: virtual int32_t add(const int32_t i1, const int32_t i2) = 0; // ... }; class CalculateIfFactory { public: virtual CalculateIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0; virtual void releaseHandler(CalculateIf* /* handler */) = 0; // ... }; /* // from thrift/include/thrift/TDispatchProcessor.h class TDispatchProcessor: public TProcessor { // ... }; */ class CalculateProcessor: public ::apache::thrift::TDispachProcessor { public: CalculateProcessor(boost::shared_ptr<CalculateIf> iface); // ... }; class CalculateProcessorFactory: public ::apache::thrift::TProcessorFactory { public: CalculateProcessorFactory(const ::boost::shared_ptr<CalculateIfFactory>& handlerFactory); // ... };
CalculateIf 类是实现业务逻辑的最终载体,RPC 服务 add 函数最终由 CalculateIf (的派生类)实现:
class CalculateHandler: public CalculateIf { public: int32_t add(const int32_t i1, const int32_t i2) { return i1 + i2; } // ... };
随后,就可以实例化一个 CalculateProcessor:
boost::shared_ptr<CalculateHandler> handler(new CalculateHandler()); boost::shared_ptr<CalculateProcessor> CalProcessor(handler);
CalProcessor 作为参数之一用于实例化一个 Thrift 服务端。当 Thrift 服务端提供服务时,最终将通过 CalProcessor,调用 handler,提供 add 函数服务。这里 handler 指向一个 new 出来的 CalculateHandler 实例,所以每次提供服务时,Thrift 总会调用到 new 出来的这个 CalculateHandler。如果是在多线程环境下,就会出现多个线程访问这同一个实例,从而产生数据竞争问题(在本例中,可能这个问题不存在)。
所以,如果服务端要提供多线程服务(比如 TThreadedServer, TThreadPoolServer 等),是不能使用 CalculateProcessor 的,而应该使用 CalculateProcessorFactory。CalculateProcessorFactory 的构造函数接受 CalculateIfFactory 指针,在 Thrift 需要一个 handler 提供具体服务时,通过 CalculateIfFactory::getHandler 来新建一个 handler,用完后通过 CalculateIfFactory::releaseHandler 来释放 handler,从而保证对每个服务对象使用不同的 handler,避免数据竞争。
class CalHandlerFactory: public CalculateIfFactory { public: CalculateIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { return (new CalculateHandler); } void releaseHandler(CalculateIf* handler) { delete handler; } // ... };
构造 CalculateProcessorFactory:
boost::shared_ptr<CalHandlerFactory> handlerFactory(new CalHandlerFactory()); boost::shared_ptr<CalculateProcessorFactory> CalProcessorFactory(hanlderFactory);
关于 TMultiplexedProtocol 和 TMultiplexedProcessor
通常情况下,一个 Thrift Server 服务端,只可以提供一种服务。通过 TMultiplexedProcessor(用于服务端)和 TMultiplexedProtocol(用于客户端),Thrift 可以让一个服务端,同时提供多种服务。但是,一个客户端依然只能是使用一个服务,要使用多个服务,就要有多个不同的客户端实例。
class TMultiplexedProcessor: public TProcessor { public: void registerProcessor(const std::string& serviceName, shared_ptr<TProcessor> processor); // ... }; class TMultiplexedProtocol : public TProtocolDecorator { public: TMultiplexedProtocol(shared_ptr<TProtocol> _protocol, const std::string& _serviceName); // ... };
按照上文所述的步骤构造好 processor 后,约定一个服务名称,比如 "Calculate",通过 TMultiplexedProcessor::registerProcessor, 向这个多工处理器注册一个具体的服务处理,有多个服务就注册多个,然后将 TMultiplexedProcessor 用于构造 server。
在客户端方面,则同样是按普通流程构造好 protocol 后,将 protocol 和此前约定的服务名称 "Calculate" 传入 TMultiplexedProtocol,然后再使用 TMultiplexedProtocol 作为构造参数生成 client 实例。这样,一个多服务的的 Thrift 服务-客户模型就建立起来了。
虽然采用 TMultiplexedProcessor 的服务端可以同时提供多种不同的服务,但是笔者认为,对于同一种服务,TMultiplexedProcessor 不是并发安全的。原因在于观察 TMultiplexedProcessor::registerProcessor,发现其接受的参数是 TProcessor 而不是 TProcessorFactory。也就是说在同一个服务被并发访问的时候,实际上多个客户端访问的是服务端上的同一个 handler 实例,因此 TMultiplexedProcessor 并不能避免同一个服务被访问时的并发问题。