2007年工作总结之-xx省UNICOM dxt互通短信网关系统SMSC

08年了,我也把07年所做的工作总结一下。望各位朋友不吝指教。

一、名词解释

SMPP:短消息点对点协议,SMPP协议是一个国际标准,有SMS论坛制定,官方网址为smsforum.net,截至2006年7月2日,最新版本是5.0。曾经流行的版本是3.3、3.4。SMPP协议向后兼容的。

SMC:短消息服务中心

ECME:扩展短消息实体

二、简要介绍基于SMPP协议研发的互通短信网关,所有unicom发向中国移动和中国电信的短信都通过此系统提交给unicom互通短信网关,系统角色为SMC,unicom充当ECME,由SMSC主动下发短消息(DeliverSM)给unicom短信中心,设计速度为250条/秒(运营商规定)。

系统采用spring进行ioc配置,h2内存数据库,数据库访问采用spring jdbc,基于多线程和Socket编程。

主要程序介绍:

三、主体程序介绍SMSCService:系统启动的主程序,实现启动、暂停监听服务,启动暂停消息下发服务功能。

while (keepRunning) {
			System.out.println();
			System.out.println("- 1 start listener");
			System.out.println("- 2 stop listener");
			System.out.println("- 3 client");
			System.out.println("- 4 start deliverMessage");
			System.out.println("- 5 stop deliverMessage");
			System.out.println("- 0 exit");
			System.out.print("> ");
			optionInt = -1;
			try {
				option = keyboard.readLine();
				optionInt = Integer.parseInt(option);
			} catch (Exception e) {
				optionInt = -1;
			}
			switch (optionInt) {
			case 1:
				startlistener();
				break;
			case 2:
				stoplistener();
				break;
			case 3:
				listclienter();
				break;
			case 4:
				startdeliverMessage();
				break;
			case 5:
				stopdeliverMessage();

SMSCListener:服务监听程序,当ECME连接上SMC后,生成SMSCSession。

public synchronized void start() throws IOException {
		debug.write("going to start SMSCListener on port " + port);
		if (!isReceiving) {
			serverConn = new com.logica.smpp.TCPIPConnection(port);
			serverConn.setReceiveTimeout(getAcceptTimeout());
			serverConn.open();
			keepReceiving = true;
			if (asynchronous) {
				debug.write("starting listener in separate thread.");
				Thread serverThread = new Thread(this);
				serverThread.start();
				debug.write("listener started in separate thread.");
			} else {
				debug.write("going to listen in the context of current thread.");
				run();
			}
		} else {
			debug.write("already receiving, not starting the listener.");
		}
	}
try {
			Connection connection = null;
			serverConn.setReceiveTimeout(getAcceptTimeout());
			connection = serverConn.accept();
			if (connection != null) {
				SMSCSession session = new SMSCSession(connection);
				session.setHashSet(hashSet);
				Thread thread = new Thread(session);
				thread.start();
				sessionFactory.register(session);				
				debug.write("SMSCListener launched a session on the accepted connection.");
			} else {
				debug.write("no connection accepted this time.");
			}
		} catch (InterruptedIOException e) {
			debug.write("InterruptedIOException accepting, timeout? -> " + e);
		} catch (IOException e) {
			event.write(e, "IOException accepting connection");
			keepReceiving = false;
		}

SMSCSession:实现ECME连接上来的消息解码,调度相应的服务(接受消息或发送消息)。

if (calendar.getTimeInMillis() < (enquire_LinkTime + enquire_LinkTimeout * 1000)) {
					//判断是否连接超时
					try {
						debug.write("SMSCSession going to receive a PDU");
						pdu = receiver.receive(getReceiveTimeout());
					} catch (Exception e) {
						debug
								.write("SMSCSession caught exception receiving PDU "
										+ e.getMessage());
					}
					if (pdu != null) {
						if (pdu.isRequest()) {
							debug.write("SMSCSession got request "
									+ pdu.debugString());
							requestWorker.clientRequest((Request) pdu);
						} else if (pdu.isResponse()) {
							debug.write("SMSCSession got response "
									+ pdu.debugString());
							requestWorker.clientResponse((Response) pdu);
						} else {
							debug
									.write("SMSCSession not reqest nor response => not doing anything.");
						}
					}
				}else
				{
					keepReceiving=false;
				}

RequestWorker:处理ECME和SMC之间的消息,缋SUBMIT_SM,SUBMIT_MULTI,ENQUIRE_LINK等消息。

// TODO 根据业务规则进行用户合法性验证
					commandStatus = checkIdentity((BindRequest) request);
					if (commandStatus == 0) { // authenticated
						System.out.println(request.debugString());
						BindResponse bindResponse = (BindResponse) request
								.getResponse();
						serverResponse(bindResponse);
						// success => bound
						bound = true;
					} else { // system id not authenticated
						// get the response
						response = request.getResponse();
						// set it the error command status
						response.setCommandStatus(commandStatus);
						// and send it to the client via serverResponse
						serverResponse(response);
						// bind failed, stopping the session
						session.stop();
					}

ResponseWorker:处理DELIVER_RESP消息,与发送消息的时间进行对比,实现超时重发和保存消息预处理。

if (isOutTime(sendsm.getDealTime())) {// dealTime如果没有填写,此时永远处理不到
									sendsm.setState(SendSM.STATE_SMSC_FAIL);
									reportFailQueue.put(sendsm, 100);
									//分类保存
									if (sendsm.getMessageType() == SendSM.MESSAGETYPE_CORP) {
										reportQueue.put(sendsm, 100);
									} else if (sendsm.getMessageType() == SendSM.MESSAGETYPE_CLIENT) {
										reportQueue_P.put(sendsm, 100);
									}
								} else {
									// 重新压回接收队列
									deliverQueue.put(sendsm, 100);
								}

DeliverMessage:从发送队列里取出待发短信,提交DELIVERSM消息至RequestWorker,由RequestWorker下发消息至ECME.

while (list != null && list.size() > 0) {
						try {
							sendsm = list.remove(list.size() - 1);
							if (sendsm != null) {
								deliverSM = this.convertSendsm(sendsm);
							} else {
								deliverSM = null;
							}
							if (deliverSM != null) {
								if (session.send(deliverSM)) {
									SMSCLog.SendLog(deliverSM);
									sendsm.setSequenceNumber(String
											.valueOf(deliverSM
													.getSequenceNumber()));
									sendsm.setDealTime(DataFormat.yyyy_MM_dd_HH_mm_ss
													.format(new Date()));
									deliverQueue.put(sendsm, 1000);
									
								}
							}
							Thread.sleep(intervalTime);
						} catch (Exception e) {
							log.error(e.getMessage(), e);
						}
					}

SaveService:基于spring jdbc实现的批量取出消息和批量保存的DAO实现。

count = saveQueue.drainTo(list, batchSize);
				
				if (logger.isDebugEnabled()) {
					logger.debug("get element count["+count+"] from saveQeueue.");
				}

				if (count > 0) {
					int total = batchSave.batachSave(list);
					if (logger.isDebugEnabled()) {
						logger.debug("Save ["+total+"] records.");
					}
				} 
				
				if (count < batchSize) {
					if (logger.isDebugEnabled()) {
						logger.debug("sleep "+sleepTime);
					}
					//小于0, 休眠
					Thread.sleep(sleepTime);
				} else {
					Thread.yield(); //让其他程序有机会运行
				}

其它若干辅助类,就不一一介绍了。

因为时间比较紧,系统是在logica smsc的基础修改完成的,本人1周完成。主是加入了业务所需要的东西。因涉及非技术问题,此处只取了一部份代码。

ps:

做这个项目的时候,主要是为适应xx省smsc经过huiway改造之后的话单分检,大家开始都没有注意有可能出现这种情况,时间紧(大约只有1周时间),压力大(自己以前还没有smsc角色的程序,对smpp中的DeliverSM消息不太熟,后来果然在上面载了跟头,测试时总发现下发出去的消息是乱码,反复测试,编码格式也换了9种,后来才发现有个DeliverSM中的setDataCoding没有设置,郁闷),经过自己的努力和同事的帮助。基本上在一周内完成,包括测试用例。

现在这个系统已上线,到目前为止已正常运行2个月,从未出现过大问题。运行平台为redhat enterprise 4。