CAT 使用小结
CAT具体的设计思想、实现原理在这我就不罗列了,本文主要是记录一下在使用CAT的过程中遇到的一些问题,比如分布式logview,Cache、DB埋点监控等,问题不多,但是比较典型。
(本文涉及的CAT版本为1.3.6)
1、分布式 logview 的日志树串联实现
目前使用过两种,一种是基于 dubbo 应用的 rpc 调用,一种是基于 http 请求的 rest 服务调用。首先说下 message tree 的实现,追踪跨服务的消息时,通过根消息 id 和父级消息 id 及子消息 id 三个属性进行消息串联,组成消息树。关键点在 tree 的三个 id 的获得和传递。
这里有两点,第一是 CAT 消息树生成原理:
我们需要实现 Cat 的 Context 上下文,然后通过 Cat.logRemoteCallClient(context) 生成包含节点数据的上下文对象(方法中通过创建消息树对象来获取各节点的消息 id,填充给上下文),当远程服务端接收到这个 context 时,使用 Cat.logRemoteCallServer(context) 方法,读取各节点消息 id,组建消息树。
第二是消息应如何传递:
dubbo 应用的 rpc 调用方式:调用过程要传递的 rpc 上下文,其中包含调用信息、参数以及状态信息等,可以把消息 id 信息放到 RpcContext 中,然后通过调用 Invocation 对象的 invoke 方法,将消息传递至服务端。最后,通过dubbo的 spi 拓展机制,实现 com.alibaba.dubbo.rpc.Filter,用来获取 rpcContext 的内容。
rest 风格的http请求方式:调用时,在服务请求方把消息id信息放到 Http-Header 中,在服务提供方,用filter 拦截,并获得 http-header 中的消息 id,这样通过埋点,串联起消息树。
废话不多说了,上码吧。
1).dubbo 调用方式部分实现(首先要清楚 dubbo 的 spi 相关配置,CAT监控的配置等)
public class DubboCatFilter implements Filter { private static final ThreadLocal<Cat.Context> CAT_CONTEXT = new ThreadLocal<Cat.Context>(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String sideKey = url.getParameter(Constants.SIDE_KEY); String loggerName = invoker.getInterface().getSimpleName() + "." + invocation.getMethodName(); String type = "PigeonCall"; if (Constants.PROVIDER_SIDE.equals(sideKey)) { type = "PigeonService"; } Transaction t = Cat.newTransaction(type, loggerName); Result result = null; try { Cat.Context context = getContext(); if (Constants.CONSUMER_SIDE.equals(sideKey)) { createConsumerCross(url, t); Cat.logRemoteCallClient(context); } else { createProviderCross(url, t); Cat.logRemoteCallServer(context); } setAttachment(context); result = invoker.invoke(invocation); if (result.hasException()) { //给调用接口出现异常进行打点 Throwable throwable = result.getException(); Event event = null; if (RpcException.class == throwable.getClass()) { Throwable caseBy = throwable.getCause(); if (caseBy != null && caseBy.getClass() == TimeoutException.class) { event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName); } else { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); } } else if (RemotingException.class.isAssignableFrom(throwable.getClass())) { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); }else{ event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName); } event.setStatus(result.getException()); completeEvent(event); t.addChild(event); t.setStatus(result.getException().getClass().getSimpleName()); } else { t.setStatus(Message.SUCCESS); } return result; } catch (RuntimeException e) { Event event = null; if (RpcException.class == e.getClass()) { Throwable caseBy = e.getCause(); if (caseBy !=null && caseBy.getClass() == TimeoutException.class) { event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName); } else { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); } } else { event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName); } event.setStatus(e); completeEvent(event); t.addChild(event); t.setStatus(e.getClass().getSimpleName()); if (result == null) { throw e; } else { return result; } } finally { t.complete(); CAT_CONTEXT.remove(); } } static class DubboCatContext implements Cat.Context { private Map<String,String> properties = new HashMap<String, String>(); @Override public void addProperty(String key, String value) { properties.put(key,value); } @Override public String getProperty(String key) { return properties.get(key); } } private void setAttachment(Cat.Context context) { RpcContext.getContext().setAttachment(Cat.Context.ROOT,context.getProperty(Cat.Context.ROOT)); RpcContext.getContext().setAttachment(Cat.Context.CHILD,context.getProperty(Cat.Context.CHILD)); RpcContext.getContext().setAttachment(Cat.Context.PARENT,context.getProperty(Cat.Context.PARENT)); } private Cat.Context getContext(){ Cat.Context context = CAT_CONTEXT.get(); if (context==null) { context = initContext(); CAT_CONTEXT.set(context); } return context; } private Cat.Context initContext() { Cat.Context context = new DubboCatContext(); Map<String,String> attachments = RpcContext.getContext().getAttachments(); if (attachments!=null&&attachments.size()>0) { for (Map.Entry<String,String> entry:attachments.entrySet()) { if (Cat.Context.CHILD.equals(entry.getKey()) || Cat.Context.ROOT.equals(entry.getKey()) || Cat.Context.PARENT.equals(entry.getKey())) { context.addProperty(entry.getKey(),entry.getValue()); } } } return context; } private void createConsumerCross(URL url, Transaction t) { Event crossAppEvent = Cat.newEvent("PigeonCall.app", getProviderAppName(url)); Event crossServerEvent = Cat.newEvent("PigeonCall.server", url.getHost()); Event crossPortEvent = Cat.newEvent("PigeonCall.port", url.getPort() + ""); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); crossPortEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossPortEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossPortEvent); t.addChild(crossServerEvent); } private void createProviderCross(URL url, Transaction t) { String consumerAppName = RpcContext.getContext().getAttachment(Constants.APPLICATION_KEY); if (StringUtils.isEmpty(consumerAppName)) { consumerAppName = RpcContext.getContext().getRemoteHost() + ":" + RpcContext.getContext().getRemotePort(); } Event crossAppEvent = Cat.newEvent("PigeonService.app", consumerAppName); Event crossServerEvent = Cat.newEvent("PigeonService.client", url.getHost()); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossServerEvent); } private void completeEvent(Event event) { AbstractMessage message = (AbstractMessage) event; message.setCompleted(true); } }
2).http-restful 调用方式部分实现
CatHttpClientProxy.java
public void requestByGet(String url) { Transaction t = Cat.newTransaction("PigeonCall", "method000"); //创建默认的httpClient实例 CloseableHttpClient httpClient = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000).setConnectionRequestTimeout(1000) .setSocketTimeout(5000).build(); try { HttpGet httpGet = new HttpGet(url); httpGet.setConfig(requestConfig); //串联埋点 Cat.Context context = new CatHttpContext(); this.createConsumerCross(url, t); Cat.logRemoteCallClient(context); httpGet.setHeader(Cat.Context.ROOT, context.getProperty(Cat.Context.ROOT)); httpGet.setHeader(Cat.Context.PARENT, context.getProperty(Cat.Context.PARENT)); httpGet.setHeader(Cat.Context.CHILD, context.getProperty(Cat.Context.CHILD)); System.out.println("执行get请求:...." + httpGet.getURI()); CloseableHttpResponse httpResponse = null; //发送get请求 httpResponse = httpClient.execute(httpGet);//请求返回的Resp,含http的header和执行结果实体Entity try { //response实体 HttpEntity entity = httpResponse.getEntity();//不包含header if (null != entity) { System.out.println("响应状态码:"+ httpResponse.getStatusLine()); System.out.println("-------------------------------------------------"); System.out.println("响应内容:" + EntityUtils.toString(entity)); } } finally { httpResponse.close(); } t.setStatus(Transaction.SUCCESS); } catch (Exception e) { e.printStackTrace(); t.setStatus(e.getClass().getSimpleName()); } finally { t.complete(); try { closeHttpClient(httpClient); } catch (IOException e) { e.printStackTrace(); } } } private void createConsumerCross(String url, Transaction t){ Event crossAppEvent = Cat.newEvent("PigeonCall.app", "serverName"); Event crossServerEvent = Cat.newEvent("PigeonCall.server", "serverIp"); Event crossPortEvent = Cat.newEvent("PigeonCall.port", "serverPort"); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); crossPortEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossPortEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossPortEvent); t.addChild(crossServerEvent); } private void completeEvent(Event event){ AbstractMessage message = (AbstractMessage) event; message.setCompleted(true); } private void closeHttpClient(CloseableHttpClient client) throws IOException{ if (client != null) { client.close(); } }
2、CAT对 redis 缓存进行详细监控
CAT源码内部对于缓存的识别存在一个 convention 约定,是基于匹配 “Cache.” 字符串的,并且通过判断字符串 “Cache.memcached” 来支持 memcached 监控,可是没有对 redis 做显示支持,需要修改源码,增加判断字符串 “Cache.redis”;
1).修改类:cat-home - com.dianping.cat.report.page.statistics.task.utilization.TransactionReportVisitor.java
增加对 redis 的判断支持:
private static final String REDIS = "Cache.redis";
public TransactionReportVisitor() { m_types.add("URL"); m_types.add("Service"); m_types.add("PigeonService"); m_types.add("Call"); m_types.add("PigeonCall"); m_types.add("SQL"); m_types.add(MEMCACHED); m_types.add(REDIS); }
@Override public void visitType(TransactionType type) { String typeName = type.getId(); Domain domain = m_report.findOrCreateDomain(m_domain); if ("Service".equals(typeName)) { typeName = "PigeonService"; } else if ("Call".equals(typeName)) { typeName = "PigeonCall"; } else if (typeName.startsWith(MEMCACHED)) { typeName = MEMCACHED; } else if (typeName.startsWith(REDIS)){ typeName = REDIS; } ...... }
2).修改类:cat-core - com.dianping.cat.config.server.ServerConfigManager.java
增加对 redis 的判断支持:
public boolean isCacheTransaction(String type) { return StringUtils.isNotEmpty(type) && (type.startsWith("Cache.memcached") || type.startsWith("Cache.redis")); }
3).修改类:cat-consumer - com.dianping.cat.consumer.storage.StorageAnalyzer.java
增加对redis的判断支持:
private void processCacheTransaction(MessageTree tree, Transaction t) { String cachePrefix = "Cache."; String ip = "Default"; String domain = tree.getDomain(); String cacheType = t.getType().substring(cachePrefix.length()); String name = t.getName(); String method = name.substring(name.lastIndexOf(":") + 1); List<Message> messages = t.getChildren(); for (Message message : messages) { if (message instanceof Event) { String type = message.getType(); if (type.equals("Cache.memcached.server") || type.equals("Cache.redis.server")) { ip = message.getName(); int index = ip.indexOf(":"); if (index > -1) { ip = ip.substring(0, index); } } } } ...... }
3、CAT 对 DB 数据库进行详细监控
如果你的 orm 框架使用的 mybatis,可以考虑通过实现拦截器 Interceptor 来对DB进行底层监控,CAT对数据库的埋点也存在 convention,这里代码中存在 hard code。具体埋点如下:
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0]; //得到类名,方法 String[] strArr = mappedStatement.getId().split("\\."); String methodName = strArr[strArr.length - 2] + "." + strArr[strArr.length - 1]; Transaction t = Cat.newTransaction("SQL", "methodName"); //获取SQL类型 SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType(); Cat.logEvent("SQL.Method", sqlCommandType.name().toLowerCase()); String JDBC_CONNECTION = "jdbc:mysql://unknown:3306/%s?useUnicode=true"; Cat.logEvent("SQL.Database", String.format(JDBC_CONNECTION, serverIp, dbName));
spring配置如下:
<bean id="sessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource"/> <property name="configLocation" value="classpath:mybatis.xml"/> <!-- 插件配置 --> <property name="plugins"> <array> <bean class="com.kubbo.java.common.cat.CatMybatisPlugin"></bean> </array> </property> </bean>
以上仅罗列了每个问题的一种实现方案,只是给正在研究CAT的同学一个参考思路,个人研究CAT也是刚开始,所说之处不免存在一些纰漏,欢迎指正和交流。
http://my.oschina.net/u/129971/blog/688371