上下文
记的学英语的时候,总是不记的某个词是什么意思,然后就看不下去了,只能问周围的同学或者老师或者去查词典,他们的建议是通过上下文去推测这个词大概的意思,反正我那会上学时没理解,所以英文一直比较差。
现在英语水平也没提高多少,尽管有点领会。
后来慢慢理解了一些,因为有些词有很多种意思,放在某个场景下可能是一个意思,放在另外一个场景下又是其它的意思,这里不举例子了,上文有一定的相似度。
RPC客户端上下文
客户端由于是一个独立的环境,所以可以认为它处于一个属于自己的上下文中,与其它的端隔离。在上下文中可以指定一些公共的参数来提供给接口使用,比如:
RPC版本号
客户端请求ID 各类自定义的公共参数
RPC服务端上下文
服务端同理,也处于一个属于自己的上下文中,与其它端隔离。
RPC上下文的作用
基本线程级别的访问,让客户端或者服务端能够像访问本地变量一样访问RPC框架级别的变量。比如我们想将客户端的一个请求ID传递给服务端,这个请求ID作用于所有接口,比如RPC的调用链追踪,有两种方式:
接口中增加请求ID参数
这个方案显然是不能接受的,因为需要改的接口过多。
接口不改的情况下,在RPC框架中提供一个上下文,其中包含请求ID
这个方案显然成本最小,比如这种样调用:RpcContext.getRequestId();
- 上图中的Context是指RPC框架级变量的一个本地副本,为了简化调用复杂度。
- 上图中的Invocation是RPC框架级的变量,它与上面提到的Context相互配合,做到调用端与框架本身的解耦合
实现
定义上下文对象
在RpcContext对象中增加一个map类型的参数对象,可以存放任意扩展的参数。
public class RpcContext { private Map contextParameters; public Map getContextParameters() { return contextParameters; } public void setContextParameters(Map contextParameters) { this.contextParameters = contextParameters; } private static final ThreadLocal rpcContextThreadLocal=new ThreadLocal (){ @Override protected RpcContext initialValue() { RpcContext context= new RpcContext(); context.setContextParameters(Maps.newHashMap()); return context; } }; public static RpcContext getContext() { return rpcContextThreadLocal.get(); } public static void removeContext() { rpcContextThreadLocal.remove(); } private RpcContext() { }}
RPC请求对象中增加上下文参数
RpcRequest增加如下字段,用于服务端调用。
private Map contextParameters;
RpcInvocation接口中增加上下文参数
在后续新增加的过滤器使用。
private Map contextParameters;
客户端代理
RpcProxy在组装RpcRequest对象时,从RpcContext中获取最新的参数传递给RpcReuest,从而传递给服务端。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); //... request.setContextParameters(RpcContext.getContext().getContextParameters()); //... }
客户端上下文过滤器
主要作用就是从本线线程变量中获取参数传递给RpcInvocation。
注解上的order属性文章后面详细介绍
@ActiveFilter(group = { ConstantConfig.CONSUMER},order = -1000)public class ClientContextFilter implements RpcFilter { private final static Logger logger = LoggerFactory.getLogger(ClientContextFilter.class); @Override public Object invoke(RpcInvoker invoker, RpcInvocation invocation) { Map contextParameters=invocation.getContextParameters(); if(null==contextParameters){ contextParameters= Maps.newHashMap(); } Map contextParametersFromRpcContext= RpcContext.getContext().getContextParameters(); if(null!=contextParametersFromRpcContext) { contextParameters.putAll(contextParametersFromRpcContext); } Object rpcResponse=invoker.invoke(invocation); logger.info("ClientContextFilter.invoke end"); return rpcResponse; }}
服务端上下文过滤器
服务端上下文过滤器与客户端的作用相反,是从RpcInvocation中获取参数传递给本地线程变量RpcContext,后面在执行服务端方法时就可以方便的通过RpcContext获取指定变量。
@ActiveFilter(group = { ConstantConfig.PROVIDER},order = -1000)public class ServerContextFilter implements RpcFilter { private final static Logger logger = LoggerFactory.getLogger(ServerContextFilter.class); @Override public Object invoke(RpcInvoker invoker, RpcInvocation invocation) { Map contextParameters=invocation.getContextParameters(); RpcContext.getContext().setContextParameters(contextParameters); Object rpcResponse=invoker.invoke(invocation); logger.info("ServerContextFilter.invoke end"); return rpcResponse; }}
过滤器排序
因为我们的RpcContext是个本地线程变量,而且Rpc服务端是多线程处理业务,所以需要在请求结束后及时的清理掉相关本地线程变量信息。这就需要清理上下文的过滤动作在最后执行,否则有会出现服务端方法还没有执行就被清空了参数。创建一个工具类,专门用来处理获取客户端以及服务端过滤器。
过滤器注解增加排序属性
增加order字段,升级排列。
@Target({ ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface ActiveFilter { String[] group() default {}; String[] value() default {}; int order() default 999999999;}
过滤器工具类
创建ActiveFilterUtil,包含下面两个函数。
过滤器排序函数
获取特定注解的类,然后根据注解上的排序属性升序排序。
private static List
获取RPC过滤器列表
提供给客户端以及服务端的一个协助方法,便于客户端以及服务构建过滤器职责链。
public static Map getFilterMap(boolean isServer){ List rpcFilterList=getActiveFilter(); Map filterMap=new HashMap<>(); for (Object filterBean : rpcFilterList) { Class [] interfaces = filterBean.getClass().getInterfaces(); ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class); String includeFilterGroupName=!isServer?ConstantConfig.CONSUMER:ConstantConfig.PROVIDER; if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(includeFilterGroupName)).count()==0){ continue; } for(Class clazz:interfaces) { if(clazz.isAssignableFrom(RpcFilter.class)){ filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean); } } } return filterMap;}
客户端以及服务端初始化
获取过滤器map的逻辑改为调用上面ActiveFilterUtil.getFilterMap方法。
使用示例
客户端设置参数
设置一个RPC版本号的参数。
@RequestMapping("/{productId}") public Product getById(@PathVariable final long productId) throws UnknownHostException { RpcContext.getContext().addContextParameter("rpc-version","1.0"); //... }
服务端获取参数
简单的在服务端打印出RPC版本号。
logger.info("get context parameter from server,rpc-version={}",String.valueOf(RpcContext.getContext().getContextParameter("rpc-version")));
输出日志如下:
本文源码
文中代码是依赖上述项目的,如果有不明白的可下载源码