本文在SpringMVC和MyBatis项目框架的基础上整合Spring Security作为权限管理。并且完全实现一套自定义的权限管理规则。

1.权限管理
在本例中所使用的权限管理的思路如下图所示,在系统中存在着许多帐号,同时存在着许多资源,在一个Web系统中一个典型的资源就是访问页面的URL,控制了这个就能够直接控制用户的访问权。

由于资源非常多,直接针对资源与用户进行设置关系会比较繁琐,因此针对同一类或者同一组的资源打个包,称为一组权限,这样将权限分配给用户的时候,一组权限中的资源也就都分配给用户了。

img

这个只是一个非常简单的权限管理方案,并且只能适用于较小的项目,因为此处给出这个只是为了便于理解自定义的Spring Security认证规则。

2.Spring Security的认证规则
要编写自定义的认证规则,首先需要对Spring Security中的认证规则有一定的了解,下面简单介绍下Spring Security的认证规则。

1)在Spring Security中每个URL都是一个资源,当系统启动的时候,Spring Security会根据配置将所有的URL与访问这个URL所需要的权限的映射数据加载到Spring Security中。

2)当一个请求访问一个资源时,Spring Security会判断这个URL是否需要权限验证,如果不需要,那么直接访问即可。

3)如果这个URL需要进行权限验证,那么Spring Security会检查当前请求来源所属用户是否登录,如果没有登录,则跳转到登录页面,进行登录操作,并加载这个用户的相关信息

4)如果登录,那么判断这个用户所拥有的权限是否包含访问这个URL所需要的权限,如果有则允许访问

5)如果没有权限,那么就给出相应的提示信息

3.自定义认证规则思路
根据上面一小节介绍的Spring Security认证的过程,我们相应的就能够分析出对于这个过程我们如果要修改的话,需要进行哪些方面的改动。

3.1.自定义SecurityMetadataSource
在Spring Security中的 SecurityMetadataSource 处于上面的步骤一中,也就是用于加载URL与权限对应关系的,对于这个我们需要自己进行定义

复制代码;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.oolong.customsecurity;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.log4j.LogManager;
import org.springframework.security.access.ConfigAttribute;
import org.springframework.security.access.SecurityConfig;
import org.springframework.security.web.FilterInvocation;
import org.springframework.security.web.access.intercept.FilterInvocationSecurityMetadataSource;
import org.springframework.stereotype.Component;

/**
* 加载URL与权限资源,并提供根据URL匹配权限的方法
* @author weilu2
* @date 2016年12月17日 上午11:18:52
*
*/
@Component
public class CustomSecurityMetadataSource
implements FilterInvocationSecurityMetadataSource {

private Map<String, List<ConfigAttribute>> resources;

public CustomSecurityMetadataSource() {
loadAuthorityResources();
}

private void loadAuthorityResources() {
// 此处在创建时从数据库中初始化权限数据
// 将权限与资源数据整理成 Map<resource, List<Authority>> 的形式
// 注意:加载URL资源时,需要对资源进行排序,要由精确到粗略进行排序,让精确的URL优先匹配
resources = new HashMap<>();

// 此处先伪造一些数据
List<ConfigAttribute> authorityList = new ArrayList<>();
ConfigAttribute auth = new SecurityConfig("AUTH_WELCOME");
authorityList.add(auth);
resources.put("/welcome", authorityList);
}

@Override
public Collection<ConfigAttribute> getAttributes(Object object) throws IllegalArgumentException {

String url = ((FilterInvocation) object).getRequestUrl();

Set<String> keys = resources.keySet();

for (String k : keys) {
if (url.indexOf(k) >= 0) {
return resources.get(k);
}
}
return null;
}

@Override
public Collection<ConfigAttribute> getAllConfigAttributes() {
// TODO Auto-generated method stub
return null;
}

@Override
public boolean supports(Class<?> clazz) {
return true;
}

}

复制代码;)

在这个类中,实现了FilterInvocationSecurityMetadataSource接口,这个接口中的 getAttributes(Object object)方法能够根据请求的URL,获取这个URL所需要的权限,那么我们就可以在这个类初始化的时候将所有需要的权限加载进来,然后根据我们的规则进行获取,因此这里还需要编写一个加载数据的方法 loadAuthorityResources(),并且在构造函数中调用。

此处加载资源为了简化,只是随意填充了一些数据,实际可以从数据库中获取。

3.2.自定义AccessDecisionManager
编写自定义的决策管理器,决策管理器是Spring Security用来决定对于一个用户的请求是否基于通过的中心控制。

复制代码;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.oolong.customsecurity;

import java.util.Collection;
import java.util.Iterator;

import org.apache.log4j.LogManager;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.access.ConfigAttribute;
import org.springframework.security.authentication.InsufficientAuthenticationException;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.stereotype.Component;

/**
* 进行决策,根据URL获得访问这个资源所需要的权限,然后在与当前用户所拥有的权限进行对比
* 如果当前用户拥有相关权限,就直接返回,否则抛出 AccessDeniedException异常
* @author weilu2
* @date 2016年12月17日 上午11:30:40
*
*/
@Component
public class CustomAccessDecisionManager implements AccessDecisionManager {

@Override
public void decide(Authentication authentication, Object object, Collection<ConfigAttribute> configAttributes)
throws AccessDeniedException, InsufficientAuthenticationException {

LogManager.getLogger("CustomAccessDecisionManager").info("decide invoke");

if (configAttributes == null) {
return;
}

if (configAttributes.size() <= 0) {
return;
}

Iterator<ConfigAttribute> authorities = configAttributes.iterator();
String needAuthority = null;

while(authorities.hasNext()) {
ConfigAttribute authority = authorities.next();

if (authority == null || (needAuthority = authority.getAttribute()) == null) {
continue;
}

LogManager.getLogger("CustomAccessDecisionManager").info("decide == " + needAuthority);

for (GrantedAuthority ga : authentication.getAuthorities()) {
if (needAuthority.equals(ga.getAuthority().trim())) {
return;
}
}
}
throw new AccessDeniedException("No Authority");
}

@Override
public boolean supports(ConfigAttribute attribute) {
return true;
}

@Override
public boolean supports(Class<?> clazz) {
return true;
}

}

复制代码;)

决策管理器最重要的就是这个 decide()方法,Spring Security会将当前登录用户信息包装到一个 Authentication对象中,并传入这个方法;并且调用 SecurityMetadataSource.getAttributes() 方法获取这个URL相关的权限以参数 Collection 的形式传入这个方法。

然后这个decide方法获取到这两个信息之后就可以进行对比决策了。如果当前用户允许登录,那么直接return即可。如果当前用户不许运行登录,则抛出一个 AccessDeniedException异常。

3.3.自定义 UserDetailsService 和 AuthenticationProvider

前面说过,要进行验证,除了有URL与权限的映射关系,还需要有用户的权限信息。要编写自定义的用户数据加载,就需要实现这两个接口。

3.3.1.UserDetailsService

复制代码;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.oolong.customsecurity;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.LogManager;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.stereotype.Component;

import com.oolong.model.AccountInfoModel;
import com.oolong.model.AuthorityModel;

@Component
public class CustomUserDetailsService implements UserDetailsService {

@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {

LogManager.getLogger("CustomUserDetailsService").info("loadUserByUsername invoke");

// 提供到数据库查询该用户的权限信息
// 关于角色和权限的转换关系在此处处理,根据用户与角色的关系、角色与权限的关系,
// 将用户与权限的管理整理出来

// 此处伪造一些数据
// 伪造权限
AuthorityModel authority = new AuthorityModel("AUTH_WELCOME");
List<AuthorityModel> authorities = new ArrayList<>();
authorities.add(authority);

AccountInfoModel account = new AccountInfoModel("oolong", "12345");
account.setAuthorities(authorities);

return account;
}
}

复制代码;)

3.3.2.AuthenticationProvider
AuthenticationProvider用于包装UserDetailsService,并将其提供给 Spring Security使用。这个接口中最重要的是实现 retrieveUser() 方法,这个请参考接口的说明进行实现,此处不再赘述。

复制代码;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.oolong.customsecurity;

import org.apache.log4j.LogManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.authentication.AuthenticationServiceException;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.authentication.dao.AbstractUserDetailsAuthenticationProvider;
import org.springframework.security.core.AuthenticationException;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.stereotype.Component;

/**
* 这两个方法用于添加额外的检查功能,此处不需要添加,因此空着,直接实现这个抽象类即可。
* @author weilu2
* @date 2016年12月17日 下午12:20:27
*
*/
@Component
public class CustomUserDetailsAuthenticationProvider extends AbstractUserDetailsAuthenticationProvider {

@Autowired
private UserDetailsService userDetailsService;

public UserDetailsService getUserDetailService() {
return this.userDetailsService;
}

public void setUserDetailService(UserDetailsService userDetailsService) {
this.userDetailsService = userDetailsService;
}

@Override
protected void additionalAuthenticationChecks(UserDetails userDetails,
UsernamePasswordAuthenticationToken authentication) throws AuthenticationException {

}

@Override
protected UserDetails retrieveUser(String username, UsernamePasswordAuthenticationToken authentication)
throws AuthenticationException {

LogManager.getLogger("CustomUserDetailsAuthenticationProvider").info("retrieveUser invoke");

if (userDetailsService == null) {
throw new AuthenticationServiceException("");
}

UserDetails userDetails = userDetailsService.loadUserByUsername(username);

if (userDetails == null) {
throw new UsernameNotFoundException(username);
}

if (userDetails.getUsername().equals(authentication.getPrincipal().toString())
&& userDetails.getPassword().equals(authentication.getCredentials().toString())) {
return userDetails;
}

throw new BadCredentialsException(username + authentication.getCredentials());
}
}

复制代码;)

3.4.UserDetails和GrantedAuthority
这两个接口非常简单,请参考源码,此处不再赘述

4.配置
上面编写的这些自定义的实现都有了,但是仅仅这样是没有用的,如何配置能够让它们起作用呢?

复制代码;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.oolong.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.web.access.ExceptionTranslationFilter;
import org.springframework.security.web.access.intercept.FilterSecurityInterceptor;

import com.oolong.customsecurity.CustomAccessDecisionManager;
import com.oolong.customsecurity.CustomSecurityMetadataSource;
import com.oolong.customsecurity.CustomUserDetailsAuthenticationProvider;
import com.oolong.customsecurity.TempHook;

@Configuration
@ComponentScan(basePackageClasses={TempHook.class})
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

@Autowired
private CustomUserDetailsAuthenticationProvider customAuthenticationProvider;

@Autowired
private CustomAccessDecisionManager customAccessDecisionManager;

@Autowired
private CustomSecurityMetadataSource customSecurityMetadataSource;

@Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.authenticationProvider(customAuthenticationProvider);
}

@Override
protected void configure(HttpSecurity http) throws Exception {
http.addFilterAfter(customFilterSecurityInterceptor(), ExceptionTranslationFilter.class);
http.formLogin();
}

@Bean
public FilterSecurityInterceptor customFilterSecurityInterceptor() {
FilterSecurityInterceptor fsi = new FilterSecurityInterceptor();
fsi.setAccessDecisionManager(customAccessDecisionManager);
fsi.setSecurityMetadataSource(customSecurityMetadataSource);

return fsi;
}
}

复制代码;)

在Spring MVC中,Spring Security是通过过滤器发挥作用的,因此我们就爱那个决策管理器与数据加载放到一个过滤器中,然后将这个过滤器插入到系统的过滤器链中。

此外,我们向系统中提供了一个用于检索用户的 AuthenticationProvicer。

还有,别忘记了,告诉系统,如果用户没有权限应该怎么办,http.formLogin(),告诉Spring Security要跳转到表单登录页面。

项目中使用到的RabbitMQ的总结:

rabbitMQ 本身基于erlang语言开发,erlang语言具有高并发的特性,其管理界面用起来也十分方便。单机吞吐量为万级,一般多用于中小型企业的消息中间件。

RabbitMQ结构图

这里写图片描述

几个概念说明

Broker: 消息交换机 指定消息按什么规则,路由到哪个队列

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue: 消息队列载体,每个消息都会被投入到一个或多个队列

Binding:绑定,它的作用就是将exchange 和queue进行绑定起来

Routing Key: 路由关键字,exchange根据这个关键字进行消息传递

vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离

producer:消息的投递者

consumer:消息的接收者

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

1.客户端连接消息队列服务器,打开一个channel。

2.客户端声明一个exchange, 并设置相关属性。

3.客户端声明一个queue,并设置相关属性。

4.客户端设置好route key,在exchange和queue之间建立好绑定关系。

5.客户端投递消息到exchange.

RabbitMQ 简介

RabbitMQ是一个消息代理:接受并转发消息,可以看成是一个邮局;

生产者(Producer):用于发送消息的程序

消费者(Consumer):等待接受消息的程序

队列:队列只能存在内存或磁盘里,本质上是一个消息转化中心。

RabbitMQ起源于金融系统,用于在分布式系统中存储转发消息,在易用性,扩展性,高可用性等方面表现不俗。

需要安装erlang语言开发,erlang语言本身就具有高并发特性。

spring集成rabbitMQ

step1: 添加相应的jar包:spring-rabbit amqp-client

step2:使用外部参数文件 application.properties

mq.host=127.0.0.1

mq.username=queue

mq.password=1234

mq.port=8001

step3:连接rabbitMQ服务器

1
2
3
4
5
<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
password="${mq.password}" port="${mq.port}" />

<rabbit:admin connection-factory="connectionFactory"/>

step4:声明一个rabbitMQ Template

1
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

step5:在application.xml中声明一个交换机

rabbit:topic-exchange 标签的 name 属性就是在 RabbitMQ 服务器配置交换机的 name 值。

rabbit:binding 标签的 queue 属性是 6. 标签的 id 属性。

rabbit:binding 标签的 pattern 属性是在 RabbitMQ 服务器配置交换机与队列绑定时的 Routing key 值(路由)。

1
2
3
4
5
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • durable:是否持久化
  • exclusive:仅创建者可以使用的私有队列,断开后自动删除
  • auto-delete:当所有消费端连接断开后,是否自动删除队列
交换机的四种模式:

1.direct(直连交换机):根据routekey 将消息投递给对应队列的; 转发消息到route key 指定的队列上

2.topic(主题交换机):对key进行模式匹配,比如ab 可以传到所有的ab的queue

3.headers:(头交换机):转发到headers中的键值对完全匹配的队列。性能比较差不建议使用。

4.fanout:(扇形交换机):转发消息到所有绑定队列,忽略routingkey

交换价的属性:

1.持久性:durable,如果启用,交换器将会在server重启前都有效。

2.自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。

3.惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。如果没有队列绑定到交换机上,则发送到该交换机上的消息会丢失。

MQTT是什么?

第一次接触到mqtt协议是在做海底捞物流配送系统,当时使用到了rabbitmq 消息队列,发现开发同事配置中用到了mqtt协议,本来以为是用到了别的什么中间件,后来在看rabbitmq时无意间发现和mqtt有关联,就自己学习了下,内容如下:

什么是mqtt:message queuing telemetry transport portocol 的全称是消息队列遥感传输协议的缩写,是一种基于轻量级代理的发布/订阅模式的消息传输协议,运行在TCP协议栈上,为其提供有序,可靠,双向连接的网络连接保证。IBM开发的即时通讯协议,成为物联网的重要组成成分。该协议支持所有的平台,几乎可以把所有物联网物品和外部连接起来,被用来当作传感器和制动器的通信协议。

MQTT的特点:

mqtt协议是为大量计算能力有限,且工作在低带宽,不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

1.使用发布/订阅模式,提供一对多的消息发布,解除应用程序耦合。

2.对负载内容屏蔽的消息传输。

3.使用TCP/IP提供网络连接。

4.有三种消息发布服务质量:

至多一次

至少一次

只有一次

5.小型传输,开销很小 协议交换最小化,以降低网络流量。

心跳时间: keep alive timer

以秒为单位,定义服务器端从客户端接受消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制。

如何将消息正确送达?

MQTT通过“主题”实现将消息从发布者客户端送达至接收者客户端。“主题”是附加在应用消息上的一个标签,发布者客户端将”主题“和消息发送至代理服务器,代理服务器将消息转发每一个订阅了该主题的订阅客户端。

RabbitMQ如何作为MQTT服务器使用?

rabbitmq中有实现mqtt插件,具体参考地址:

http://www.rabbitmq.com/mqtt.html

安装插件,只要启用rabbitmq的MQTT插件即可:rabbitmq-plugins enable rabbitmq-mqtt 启用后重启一下rabbitmq-server

消息队列MQ和RPC远程调用的区别:

什么是RPC?

RPC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制。如:dubbo

RPC的一般需要经历4个步骤:

1、建立通信

首先要解决通讯的问题:即A机器想要调用B机器,首先得建立起通信连接,主要是通过在客户端和服务器之间建立TCP连接。

2、服务寻址

要解决寻址的问题,A服务器上如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称是什么。

3、网络传输

1)序列化

当A服务器上的应用发起一个RPC调用时,调用方法和参数数据都需要先进行序列化。

2)反序列化

当B服务器接收到A服务器的请求之后,又需要对接收到的参数等信息进行反序列化操作。

4、服务调用

B服务器进行本地调用(通过代理Proxy)之后得到了返回值,此时还需要再把返回值发送回A服务器,同样也需要经过序列化操作,然后再经过网络传输将二进制数据发送回A服务器。

通常,一次完整的PRC调用需要经历如上4个步骤。

MQ(消息队列)

消息队列(MQ)是一种能实现生产者到消费者单向通信的通信模型,一般来说是指实现这个模型的中间件。

典型的MQ中间件:

RabbitMQ、ActiveMQ、Kafka等

典型的特点:

1、解耦

2、可靠投递

3、广播

4、最终一致性

5、流量削峰

6、消息投递保证

7、异步通信(支持同步)

8、提高系统吞吐、健壮性

典型的使用场景:秒杀业务中利用MQ来实现流量削峰,以及应用解耦使用。

RPC和MQ的区别和关联

1.在架构上,RPC和MQ的差异点是,Message有一个中间结点Message Queue,可以把消息存储。

img

2.同步调用:对于要立即等待返回处理结果的场景,RPC是首选。

3.MQ 的使用,一方面是基于性能的考虑,比如服务端不能快速的响应客户端(或客户端也不要求实时响应),需要在队列里缓存。

另外一方面,它更侧重数据的传输,因此方式更加多样化,除了点对点外,还有订阅发布等功能。

4.而且随着业务增长,有的处理端处理量会成为瓶颈,会进行同步调用改造为异步调用,这个时候可以考虑使用MQ。

RPC关注业务逻辑处理结果

为什么要用ELK?

常见的日志分析系统:直接在日志文件中grep,awk 就可以获取自己想要的信息。但在规模较大的场景中,次方法效率比较低。面临问题包括日志量太大如何归档,文本搜索太慢怎么办,如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集管理,访问。

一般大型系统是一个分布式部署的架构,不同服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

一个完整的集中式日志系统。

一个完整的集中式日志系统,需要包含以下几个主要特点:

1.收集:采集多种来源的日志数据。

2.传输:能够稳定的把日志数据传输到中央系统

3.存储:如何存储日志数据。

4.分析:可以支持UI分析

5.警告:能够提供错误报告,监控机制。

ELK提供了一整套解决方案,并且都市开源软件,之间互相配合使用,完美链接,高效的满足了很多场合的应用。目前主流的一种日志系统。

ELK简介?

ELK是三个开源软件的缩写,分别表示:Elasticsearch,LogStash,Kibana 开源软件。 FileBeat 是一个轻量级的日志收集工具。

elasticsearch:分布式的搜索引擎,提供搜索,分析,存储数据三大功能。它的特点是 分布式,零配置,自动发现,索引自动分片,restful风格接口,多数据源,自动搜索负载等。

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

Filebeat隶属于Beats。目前Beats包含四种工具:

    1. Packetbeat(搜集网络流量数据)
    2. Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据)
    3. Filebeat(搜集文件数据)
    4. Winlogbeat(搜集 Windows 事件日志数据)

官方文档:

Filebeat:

https://www.elastic.co/cn/products/beats/filebeat
https://www.elastic.co/guide/en/beats/filebeat/5.6/index.html

Logstash:
https://www.elastic.co/cn/products/logstash
https://www.elastic.co/guide/en/logstash/5.6/index.html

Kibana:

https://www.elastic.co/cn/products/kibana

https://www.elastic.co/guide/en/kibana/5.5/index.html

Elasticsearch:
https://www.elastic.co/cn/products/elasticsearch
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/index.html

elasticsearch中文社区:
https://elasticsearch.cn/

ELK架构图:

架构图一:

img

这是最简单的一种ELK架构方式。优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。

此架构由Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置Kibana Web方便的对日志查询,并根据数据生成报表。

架构图二:

img

此种架构引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。

架构图三:

img

此种架构将收集端logstash替换为beats,更灵活,消耗资源更少,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。

Filebeat工作原理:

Filebeat由两个主要组件组成:prospectors 和 harvesters。这两个组件协同工作将文件变动发送到指定的输出中。

img

Harvester(收割机):负责读取单个文件内容。每个文件会启动一个Harvester,每个Harvester会逐行读取各个文件,并将文件内容发送到制定输出中。Harvester负责打开和关闭文件,意味在Harvester运行的时候,文件描述符处于打开状态,如果文件在收集中被重命名或者被删除,Filebeat会继续读取此文件。所以在Harvester关闭之前,磁盘不会被释放。默认情况filebeat会保持文件打开的状态,直到达到close_inactive(如果此选项开启,filebeat会在指定时间内将不再更新的文件句柄关闭,时间从harvester读取最后一行的时间开始计时。若文件句柄被关闭后,文件发生变化,则会启动一个新的harvester。关闭文件句柄的时间不取决于文件的修改时间,若此参数配置不当,则可能发生日志不实时的情况,由scan_frequency参数决定,默认10s。Harvester使用内部时间戳来记录文件最后被收集的时间。例如:设置5m,则在Harvester读取文件的最后一行之后,开始倒计时5分钟,若5分钟内文件无变化,则关闭文件句柄。默认5m)。

Prospector(勘测者):负责管理Harvester并找到所有读取源。

1
`filebeat.prospectors:``- input_type: log``  ``paths:``    ``- /apps/logs/*/info.log`

Prospector会找到/apps/logs/*目录下的所有info.log文件,并为每个文件启动一个Harvester。Prospector会检查每个文件,看Harvester是否已经启动,是否需要启动,或者文件是否可以忽略。若Harvester关闭,只有在文件大小发生变化的时候Prospector才会执行检查。只能检测本地的文件。

Filebeat如何记录文件状态:

将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并再可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。

Filebeat如何保证事件至少被输出一次:

Filebeat之所以能保证事件至少被传递到配置的输出一次,没有数据丢失,是因为filebeat将每个事件的传递状态保存在文件中。在未得到输出方确认时,filebeat会尝试一直发送,直到得到回应。若filebeat在传输过程中被关闭,则不会再关闭之前确认所有时事件。任何在filebeat关闭之前为确认的时间,都会在filebeat重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置shutdown_timeout 参数来设置关闭之前的等待事件回应的时间(默认禁用)。

Logstash工作原理:

Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

img

Input:输入数据到logstash。

一些常用的输入为:

file:从文件系统的文件中读取,类似于tial -f命令

syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析

redis:从redis service中读取

beats:从filebeat中读取

Filters:数据中间处理,对数据进行操作。

一些常用的过滤器为:

grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。

官方提供的grok表达式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
grok在线调试:https://grokdebug.herokuapp.com/

mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。

drop:丢弃一部分events不进行处理。

clone:拷贝 event,这个过程中也可以添加或移除字段。

geoip:添加地理信息(为前台kibana图形化展示使用)

Outputs:outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

一些常见的outputs为:

elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。

file:将event数据保存到文件中。

graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。

Codecs:codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

一些常见的codecs:

json:使用json格式对数据进行编码/解码。

multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

分类: ELK

参考文章

基本概念

HTTP超文本传输协议被用于web浏览器和网站服务器之间传递信息,http协议以明文方式发送内容,不提供任何方式的数据加密,如果攻击者截取了Web浏览器和网站服务器之间的传输报文,就可以直接读懂其中的信息,因此,HTTP协议不适合传输一些敏感信息,比如:信用卡号、密码等支付信息。

为了解决HTTP协议的这一缺陷,需要使用另一种协议:安全套接字层超文本传输协议HTTPS,为了数据传输的安全,HTTPS在HTTP的基础上加入了SSL协议,SSL依靠证书来验证服务器的身份,并为浏览器和服务器之间的通信加密。

HTTP和HTTPS的基本概念

  HTTP:是互联网上应用最为广泛的一种网络协议,是一个客户端和服务器端请求和应答的标准(TCP),用于从WWW服务器传输超文本到本地浏览器的传输协议,它可以使浏览器更加高效,使网络传输减少。

  HTTPS:是以安全为目标的HTTP通道,简单讲是HTTP的安全版,即HTTP下加入SSL层,HTTPS的安全基础是SSL,因此加密的详细内容就需要SSL。

  HTTPS协议的主要作用可以分为两种:一种是建立一个信息安全通道,来保证数据传输的安全;另一种就是确认网站的真实性。

TCP协议:

HTTP与HTTPS有什么区别?

  HTTP协议传输的数据都是未加密的,也就是明文的,因此使用HTTP协议传输隐私信息非常不安全,为了保证这些隐私数据能加密传输,于是网景公司设计了SSL(Secure Sockets Layer)协议用于对HTTP协议传输的数据进行加密,从而就诞生了HTTPS。简单来说,HTTPS协议是由SSL+HTTP协议构建的可进行加密传输、身份认证的网络协议,要比http协议安全。

  HTTPS和HTTP的区别主要如下:

  1、https协议需要到ca申请证书,一般免费证书较少,因而需要一定费用。

  2、http是超文本传输协议,信息是明文传输,https则是具有安全性的ssl加密传输协议。

  3、http和https使用的是完全不同的连接方式,用的端口也不一样,前者是80,后者是443。

  4、http的连接很简单,是无状态的;HTTPS协议是由SSL+HTTP协议构建的可进行加密传输、身份认证的网络协议,比http协议安全。

HTTP协议的工作原理

我们都知道HTTPS能够加密信息,以免敏感信息被第三方获取,所以很多银行网站或电子邮箱等等安全级别较高的服务都会采用HTTPS协议。

HTTP与HTTPS的区åˆ"-马海祥博客

 客户端在使用HTTPS方式与Web服务器通信时有以下几个步骤,如图所示。

  (1)客户使用https的URL访问Web服务器,要求与Web服务器建立SSL连接。

  (2)Web服务器收到客户端请求后,会将网站的证书信息(证书中包含公钥)传送一份给客户端。

  (3)客户端的浏览器与Web服务器开始协商SSL连接的安全等级,也就是信息加密的等级。

  (4)客户端的浏览器根据双方同意的安全等级,建立会话密钥,然后利用网站的公钥将会话密钥加密,并传送给网站。

  (5)Web服务器利用自己的私钥解密出会话密钥。

  (6)Web服务器利用会话密钥加密与客户端之间的通信。

img

HTTPS的优点

  尽管HTTPS并非绝对安全,掌握根证书的机构、掌握加密算法的组织同样可以进行中间人形式的攻击,但HTTPS仍是现行架构下最安全的解决方案,主要有以下几个好处:

  (1)使用HTTPS协议可认证用户和服务器,确保数据发送到正确的客户机和服务器;

  (2)HTTPS协议是由SSL+HTTP协议构建的可进行加密传输、身份认证的网络协议,要比http协议安全,可防止数据在传输过程中不被窃取、改变,确保数据的完整性。

  (3)HTTPS是现行架构下最安全的解决方案,虽然不是绝对安全,但它大幅增加了中间人攻击的成本。

  (4)谷歌曾在2014年8月份调整搜索引擎算法,并称“比起同等HTTP网站,采用HTTPS加密的网站在搜索结果中的排名将会更高”。

五、HTTPS的缺点

  虽然说HTTPS有很大的优势,但其相对来说,还是存在不足之处的:

  (1)HTTPS协议握手阶段比较费时,会使页面的加载时间延长近50%,增加10%到20%的耗电;

  (2)HTTPS连接缓存不如HTTP高效,会增加数据开销和功耗,甚至已有的安全措施也会因此而受到影响;

  (3)SSL证书需要钱,功能越强大的证书费用越高,个人网站、小网站没有必要一般不会用。

  (4)SSL证书通常需要绑定IP,不能在同一IP上绑定多个域名,IPv4资源不可能支撑这个消耗。

  (5)HTTPS协议的加密范围也比较有限,在黑客攻击、拒绝服务攻击、服务器劫持等方面几乎起不到什么作用。最关键的,SSL证书的信用链体系并不安全,特别是在某些国家可以控制CA根证书的情况下,中间人攻击一样可行。

参考文章 参考文章2

什么是消息队列?

消息队列就是在消息的传输过程中保存消息的容器。

消息队列是分布式应用间交换信息的一种技术。

img

MQ的基本概念:

1) 队列管理器(Queue Manage)

队列管理器是MQ系统中最上层的一个概念,由它为我们提供基于队列的消息服务。

2) 消息

在MQ中,我们把应用程序交由MQ传输的数据定义为消息。

3) 队列

队列是消息的安全存放地,队列存储消息直到它被应用程序处理。

4) 通道

通道是两个管理器之间的一种单向点对点的通信连接,如果需要双向,需要建立一对通道。

5) 监听器

为什么要使用消息队列?

1.系统解耦,异步访问

看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃……

mq-1

在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

mq-2

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口;

传统模式的缺点:

1) 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。

2) 订单系统和库存系统耦合。

而消息队列的出现解决了传统模式带来的弊端:

1) 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

2)库存系统:订阅下单的消息,采用推拉的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。也减少用户请求的响应时间。
2.削弱高峰

流量削峰也是消息队列中常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

传统处理模式:

  1. 可以控制活动的人数;
  2. 可以缓解短时间内高流量压垮应用;

    消息队列处理模式:

  3. 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;

  4. 秒杀业务根据消息队列中的请求信息,再做后续处理。

    传统模式的缺点:

    • 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

    中间件模式的的优点:

    • 系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
3.消息通信

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

消息队列的两种消息模式:点对点模式和分布订阅模式。 同步保证结果,异步保证效率

消息模式

1. 点对点模式和发布订阅模式:是否可以重复消费

1.1 P2P模式:

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点

  1. 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  3. 接收者在成功接收消息之后需向队列应答成功

    如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、

1.2 Pub/sub模式:

包含三个角色:主题(Topic),发布者Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点

  1. 每个消息可以有多个消费者
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3. 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

2. 推模式和拉模式:消息的更新者

推(push)模式是一种基于C/S机制、由服务器主动将信息送到客户器的技术。

  1. 在push模式应用中,服务器把信息送给客户器之前,并没有明显的客户请求。push事务由服务器发起。push模式可以让信息主动、快速地寻找用户/客户器,信息的主动性和实时性**比较好。但精确性较差,可能推送的信息并不一定满足客户的需求。

  2. 推送模式不能保证能把信息送到客户器,因为推模式采用了广播机制,如果客户器正好联网并且和服务器在同一个频道上,推送模式才是有效的。

  3. push模式无法跟踪状态,采用了开环控制模式,没有用户反馈信息。在实际应用中,由客户器向服务器发送一个申请,并把自己的地址(如IP、port)告知服务器,然后服务器就源源不断地把信息推送到指定地址。在多媒体信息广播中也采用了推模式。

拉(pull)模式与推模式相反,是由客户器主动发起的事务。

消息队列的优缺点:

优点可查看为什么使用MQ所述。

缺点:

1.系统可用性降低

系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用?

2.系统复杂性提高

硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?数据一致性?怎么保证消息传递的顺序性?

但是,我们该用还是要用的。

Kafka、ActiveMQ、RabbitMQ、RocketMQ对比:

(1)中小型软件公司,建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。

不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。

不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。

(2)大型软件公司,根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。

针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集,大数据领域的实时计算等场景,肯定是首选kafka了,几乎是全世界这个领域的事实性规范。具体该选哪个,看使用场景。

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

###

如何保证消息队列的高可用:

分析:在第二点说过了,引入消息队列后,系统的可用性下降。在生产中,没人使用单机模式的消息队列。因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。

如果面试的时候,面试官问,你们的消息中间件如何保证高可用的?你的回答只是表明自己只会订阅和发布消息,面试官就会怀疑你是不是只是自己搭着玩,压根没在生产用过。请做一个爱思考,会思考,懂思考的程序员.

回答:要对消息队列的集群模式有深刻的了解。

RabbitMQ的集群模式:普通集群和镜像集群。

要求,在回答高可用的问题时,应该能逻辑清晰的画出自己的MQ集群架构或清晰的叙述出来。

————————————- ——–未完待续—————————————————

如何保证消息不被重复消费:

分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。

回答:先来说一下为什么会造成重复消费?

其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下(如果还不懂,出门找一个kafka入门到精通教程),就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

如何解决?这个问题针对业务场景来答分以下几点

  (1)比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  (2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
  (3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

补充:鉴别消息重复,并幂等处理重复消息:

1.鉴别重复消息:利用存储系统的唯一键,给每个消息加上一个MessageId.

2.幂等处理重复消息的方法:

​ 1.跟鉴别消息重复一样,利用messageId,重复即不处理。

​ 2.版本号,每个消息都带一个版本号,只处理比当前存储版本号高的消息

​ 3.状态机,跟业务耦合比较严重,根据业务类型判断。

RabbitMQ的ACK消息确认机制:防止消息丢失,如果消息者领取消息后没执行操作就挂掉了,或者执行抛出异常,也就是消费者执行失败了,rabbotMQ无从得知,就会导致消息丢失。

因此RabbitMQ中就有ACK消息确认机制,消息确认有两种模式:

1.自动模式,我们无需任何操作,在消息被消费者领取后,就会自动确认,消息也会被从队列删除。

2.手动模式,消息被消费后,我们需要调用RabbitMQ提供的API来实现消息确认。
我们在调用:channel.basicConsume()方法的时候,通过指定第二个参数来设置是自动还是手动:

如何保证消费的可靠性传输:

分析:我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。如果无法做到可靠性传输,可能给公司带来千万级别的财产损失。

个可靠性传输,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据

RabbitMQ

(1)生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

(2)消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。

(3)消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。
至于解决方案,采用手动确认消息即可。

如何保证消息队列数据最终的一致性?

依靠消息凭证完成最终一致性。

——————————待完善——————-

如何保证消息的顺序性?

分析:其实并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。

回答:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。然后只用一个消费者去消费该队列。

有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?

这个问题,没有固定回答的套路。比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。

总之,针对这个问题,我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。

消息队列实现的原理:

JMS java message service

jms是java平台中一个面向消息中间件的技术规范,MQ可以基于JMS规范实现,也可以基于其他技术规范实现。

参考文献
博客1 博客2

java存储数据的6个地方

  1. 寄存器(register)
    最快的存储区域,位于处理器内部,不同于其他存储区。但是存储器的数量及其有限,所以寄存器由编译器根据需求进行分配。不能直接控制,也不能在程序中感觉到寄存器存在的任何迹象。

    —– 最快的存储区,由编译器根据需求进行分配,程序无法控制
  2. 堆栈(stack)
    位于通用RAM中,但通过”堆栈指针”可以从处理器哪里获得支持。堆栈指针向下移动,则分配新的内存;若向上移动,则释放那些内存。这是一种快速有效的分配存储方法,仅次于寄存器。创建程序时,java编译器必须知道存储在堆栈内所有数据的确切大小和生命周期,因为它们必须生成相应的代码。以便上下移动堆栈指针,这一约束限制了程序的灵活性,所以虽然某些JAVA数据存储在堆栈中——特别是对象引用,但是JAVA对象不存储其 中。具体方法结束后,系统自动释放JVM内存资源

    —– 存放基本类型的变量数据和对象,数组的引用,但对象本身不存放在栈中,而是 存放在堆(new 对象)或者常量池中(字符串常量存放在常量池中)。
  3. 堆(heap)

    一种通用的内存池(也存在于RAM中),用于存放所有的java对象。堆内存不同于堆栈的好处是:编译器不需要知道从堆里分配多少存储区,也不必知道存储数据在堆里面存活时间。因此,堆的灵活性较强。当你需要创建一个对象的时候,只需要new写一行简单的代码,当执行 这行代码时,会自动在堆里进行存储分配。当然,为这种灵活性必须要付出相应的代码。用堆进行存储分配比用堆栈进行存储存储需要更多的时间。一般由程序员分配释放,jvm不定时查看这个对象,如果没有引用指向这个对象就回收

—– 存放所有new 出来的对象

  1. 静态存储(static storage)–数据区

    这里的“静态”是指“在固定的位置”。静态存储里存放程序运行时一直存在的数据。你可用关键字static来标识一个对象的特定元素是静态的,但JAVA对象本身从来不会存放在静态存储空间里

    —– 存放静态成员(static 定义的)
  2. 常量存储(constant storage)

    常量值通常直接存放在程序代码内部,这样做是安全的,因为它们永远不会被改变。有时,在嵌入式系统中,常量本身会和其他部分分割离开,所以在这种情况下,可以选择将其放在ROM中 。常量池存储于堆中。

    ——存放字符串常量和基本类型常量(public static final)和对其他类型、方法、字段的符号引用
  3. 非RAM存储

​ 如果数据完全存活于程序之外,那么它可以不受程序的任何控制,在程序没有运行时也可以存在。

—–硬盘等永久存储空间

就速度而言,有如下关系:寄存器 >堆栈 > 堆 > 其它

下图大致描述了java的内存分配:

img

数据段:存放定义的static 定义的静态成员。

常量区:JVM为每个已加载的类型维护一个常量池,常量池就是这个类型用到的常量的一个有序集合。

代码段:存放程序中的二进制代码,而且多个对象共享一个代码空间区域

栈内存和堆内存

  1. java中栈和常量池中的对象可以共享,对于堆中的对象不可以共享。

  2. 堆栈中的数据大小和生命周期可以确定,当没有引用指向数据时,这个数据就会消失。堆中的对象的由垃圾回收器负责回收,因此大小和生命周期不需要确定,具有很大的灵活性。(堆最大的优势可以在运行期间动态地分配内存大小)

  3. 对于字符串:其对象的引用都是存储在栈中的,如果是编译期已经创建好(直接用双引号定义的)的就存储在常量池中,如果是运行期(new出来的)才能确定的就存储在堆中。对于equals相等的字符串,在常量池中永远只有一份,在堆中有多份。
    如下代码:java代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    String s1 = "china"; 

    String s2 = "china";

    String s3 = "china";

    String ss1 = new String("china");

    String ss2 = new String("china");

    String ss3 = new String("china");

img

​ 这里解释一下,对于通过 new 产生一个字符串(假设为 ”china” )时,会先去常量池中查找是否已经有了 ”china” 对象,如果没有则在常量池中创建一个此字符串对象,然后堆中再创建一个常量池中此 ”china” 对象的拷贝对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  public class TestStringConstant {
public static void main(String args[]) {
// 字符串常量,分配在常量池中,编译器会对其进行优化, Interned table
// 即当一个字符串已经存在时,不再重复创建一个相同的对象,而是直接将s2也指向"hello".
String s1 = "hello";
String s2 = "hello";
// new出来的对象,分配在heap中.s3与s4虽然它们指向的字符串内容是相同的,但是是两个不同的对象.
// 因此==进行比较时,其所存的引用是不同的,故不会相等
String s3 = new String("world");
String s4 = new String("world");

System.out.println(s1 == s2); // true
System.out.println(s3 == s4); // false
System.out.println(s3.equals(s4)); // true
// String中equals方法已经被重写过,比较的是内容是否相等.
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1
String a = "ab";
String bb = "b";
String b = "a" + bb;
System.out.println((a == b)); //result = false
2
String a = "ab";
final String bb = "b";
String b = "a" + bb;
System.out.println((a == b)); //result = true
3
String a = "ab";
final String bb = getBB();
String b = "a" + bb;
System.out.println((a == b)); //result = false
private static String getBB() {
return "b";
}
分析:
1】中,JVM对于字符串引用,由于在字符串的"+"连接中,有字符串引用存在,而引用的值在程序编译期是无法确定的,即"a" + bb无法被编译器优化,只有在程序运行期来动态分配并将连接后的新地址赋给b。所以上面程序的结果也就为false。 
2】和【1】中唯一不同的是bb字符串加了final修饰,对于final修饰的变量,它在编译时被解析为常量值的一个本地拷贝存储到自己的常量池中或嵌入到它的字节码流中。所以此时的"a" + bb和"a" + "b"效果是一样的。故上面程序的结果为true
3】JVM对于字符串引用bb,它的值在编译期无法确定,只有在程序运行期调用方法后,将方法的返回值和"a"来动态连接并分配地址为b,故上面程序的结果为false

4.对于基础类型的变量和常量:变量和引用存储在栈中,常量存储在常量池中。

引申问题:对于成员变量和局部变量:成员变量就是方法之外,类内部定义的变量;局部变量就是方法或语句内部定义的变量。局部变量必须初始化。形参是局部变量,局部变量的数据存在栈内存中,栈内存中的局部变量随着方法的消失而消失。成员变量存储在堆中的对象中,由垃圾回收机制负责回收。

java数据类型

  1. 基本数据类型

    int short long byte char float dubble boolean
    这种类型的定义是通过诸如int a = 3; long b = 255L;的形式来定义的,称为自动变量。值得注意的是,自动变量存的是字面值,不是类的实例,即不是类的引用,这里并没有类的存在。如int a = 3; 这里的a是一个指向int类型的引用,指向3这个字面值。这些字面值的数据,由于大小可知,生存期可知(这些字面值固定定义在某个程序块里面,程序块退出 后,字段值就消失了),出于追求速度的原因,就存在于栈中。 另外,栈有一个很重要的特殊性,就是存在栈中的数据可以共享。假设我们同时定义 int a = 3; int b = 3; 编 译器先处理int a = 3;首先它会在栈中创建一个变量为a的引用,然后查找有没有字面值为3的地址,没找到,就开辟一个存放3这个字面值的地址,然后将a指向3的地址。接着处 理int b = 3;在创建完b的引用变量后,由于在栈中已经有3这个字面值,便将b直接指向3的地址。这样,就出现了a与b同时均指向3的情况。 特 别注意的是,这种字面值的引用与类对象的引用不同。假定两个类对象的引用同时指向一个对象,如果一个对象引用变量修改了这个对象的内部状态,那么另一个对 象引用变量也即刻反映出这个变化。相反,通过字面值的引用来修改其值,不会导致另一个指向此字面值的引用的值也跟着改变的情况。如上例,我们定义完a与 b的值后,再令a=4;那么,b不会等于4,还是等于3。在编译器内部,遇到a=4;时,它就会重新搜索栈中是否有4的字面值,如果没有,重新开辟地址存 放4的值;如果已经有了,则直接将a指向这个地址。因此a值的改变不会影响到b的值。

  2. 包装类类型

    如Integer String Double等将相应的基本数据类型包装起来的类。这些类数据的引用全部存在于栈中,java用new()语句来显示地告诉编译器,在运行时才根据需要动态创建,因此比较灵活,但需要占用更多的时间。

  3. 为什么会存在两种类型?

    我们都知道在Java语言中,new一个对象存储在堆里,我们通过堆栈中的引用来使用这些对象;但是对于经常用到的一系列类型如int,如果我们用new将其存储在堆里就不是很有效——特别是简单的小的变量。
    所以就出现了基本类型,同C++一样,Java采用了相似的做法,对于这些类型不是用new关键字来创建,而是直接将变量的值存储在堆栈中,因此更加高效。

    有了基本类型之后为什么还要有包装器类型呢?我们知道Java是一个面相对象的编程语言,基本类型并不具有对象的性质,为了让基本类型也具有对象的特征,就出现了包装类型(如我们在使用集合类型Collection时就一定要使用包装类型而非基本类型),它相当于将基本类型“包装起来”,使得它具有了对象的性质,并且为其添加了属性和方法,丰富了基本类型的操作。

  4. 二者之间的区别

    1.声明方式不同,基本类型不适用new关键字,而包装类型需要使用new关键字来在堆中分配存储空间;

    2.存储方式及位置不同,基本类型是直接将变量值存储在堆栈中,而包装类型是将对象放在堆中,然后通过引用来使用;

    3.初始值不同,基本类型的初始值如int为0,boolean为false,而包装类型的初始值为null

    4.使用方式不同,基本类型直接赋值直接使用就好,而包装类型在集合如Collection、Map时会使用到。

    5.当比较包装类里面的数值是否相等时,用equals()方法;当测试两个包装类的引用是否指向同一个对象时,用==。

总结栈内存和堆内存

1.存储数据:栈用于存储基本数据类型和对象,数组的引用;堆存储所有new出来的对象。
2.灵活性:堆栈中的存储数据数据大小和生命周期都确定,当没有引用指向数据,数据就会消失。堆中的数据是在运行期间动态创建,因此数据大小和生命周期不确定。也因此灵活性更强。
3.回收机制:栈中数据在具体方法结束后,jvm自动释放资源;一般由程序员释放,jvm定期查看,如果没有引用指向该对象就回收。

4.共享性:栈和常量池中的数据共享,堆中的数据不共享。同一份数据在栈或常量池中只能有一份,堆中可以有多分。

1 什么是Feign

Feign是一个声明式的web service客户端,她的出现使开发web service客户端变得非常简单。她具备可插拨的注解支持,包括Feign注解和JAX-RS注解。使用Feign的时候只需要创建一个interface再加上@FeignClient就可以。Feign支持编码器和解码器,Spring Cloud Open Feign对Feign进行了增强,使其支持了Spring MVC注解,可以像Spring Web一样使用 HttpMessageConverters

Feign是一种声明式、模板化的HTTP客户端,在Spring Cloud中使用Feign,可以做到使用HTTP请求访问远程控服务,就像调用本地方法一样,开发者完全感觉不到这是在调用远程方法,更加感觉不到这是在访问HTTP请求。以下列出了一些Feign的特性。

  • 可插播的注解支持,包括Feign注解和JAX-RS注解。
  • 支持可插拨的HTTP编码器和解码器
  • 支持Hystrix以及她的fallback机制
  • 支持ribbon的负载均衡
  • 支持Http请求和响应的压缩

Feign是一个声明式的web service客户端,她出现的目的就是让web service调用变得更加简单。她整合了Ribbon和Hystrix,从而不需开发者再将两者进行整合。Feign还提供了HTTP请求的模板,通过编写简单的注解和接口就可以定义好HTTP请求的参数、格式、地址等信息。Feign会完全代理HTTP请求在使用中我们只需要注入相应的接口,传递好参数就行。

1.1 Feign开源地址

Spring Cloud Open Feign: https://github.com/spring-cloud/spring-cloud-openfeign

Open Feign: https://github.com/OpenFeign/feign

2 简单整合Feign

2.1 引入feign的maven依赖

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>

2.2 创建主程序并添加@EnableFeignClients注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.com.bmsmart;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

/**
* <pre>
* <p>Description: 应用程序入口 </p>
* @author wusong
* @date 2018-06-27 11:31
*/
@EnableFeignClients
@SpringBootApplication
@EnableDiscoveryClient
@Slf4j
public class FeignApplication {

public static void main(String[] args) {
SpringApplication.run(FeignApplication.class, args);
}

}

其中的@EnableFeignClients注解表示当前程序启动时,会进行包扫描,扫描所有标识了@FeignClient注解的类并进行处理。

2.3 编写Feign接口

1
2
3
4
5
6
7
8
@FeignClient("stores")
public interface StoreClient {
@RequestMapping(method = RequestMethod.GET, value = "/stores")
List<Store> getStores();

@RequestMapping(method = RequestMethod.POST, value = "/stores/{storeId}", consumes = "application/json")
Store update(@PathVariable("storeId") Long storeId, Store store);
}

@FeignClient注解中stores对应的值是eureka中任何一个客户端的名称。用于创建Ribbon的负载均衡器。你还可以使用url属性(只能是绝对值或仅主机名)指定URL 。应用程序上下文中bean的名称是接口的名称。要指定自己的别名值,可以使用注释@FeignClientqualifier值。

在上面代码中,Ribbon客户端负责去发现stores服务的物理地址,如果你在项目中使用了Eureka客户端,那么她将解析的是Eureka服务注册表中的服务,如果没有使用Eureka,则需要在配置文件中配置服务器列表 e.g.

1
2
3
stores:
ribbon:
listOfServers: example.com,google.com

2.4 编写Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequiredArgsConstructor
public class HelloFeignController {

private final StoreClient storeClient;

// 服务消费者对位提供的服务
@PostMapping(value = "/stores/{storeId}")
public Store update(@PathVariable(value ="storeId")String storeId) {
return storeClient.update(storeId);
}

}

以上就完成了Feign整个调用的基本流程。

3 Feign的工作原理

通过上面的整合,我们了解了Feign的基本使用。接着来介绍一下Feign的工作原理:

  • 在开发微服务时,我们会在主类中加入@EnableFeignClients注解开启对FeignClient的扫描加载处理,根据Feign Client的开发规范,定义接口并加@FeignClient注解
  • 当程序启动时,会进行包的扫描。扫描所有用@FeignClient注解进行修饰的类并将这些信息注入Spring IOC容器中,当定义Feign接口中的方法被调用时,通过JDK的代理方式,来生成具体的RequestTemplate。当生成代理时,Feign会为每个接口方法创建一个Request Template对象,该对象奉准过了HTTP请求的全部信息,例如请求参数,请求方法等。
  • 由RequestTemplate生成Request,然后把Request交给Client去处理,这里的client制的是JDK的URLConnection、apache的http client、或者OKhttp。最后client被封装到LoadBalanceClient类中,这个类结合Ribbon负载均衡发起服务之间的调用。

4 Feign的基础功能

4.2.1 @FeignClient注解介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.openfeign;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;

/**
* Annotation for interfaces declaring that a REST client with that interface should be
* created (e.g. for autowiring into another component). If ribbon is available it will be
* used to load balance the backend requests, and the load balancer can be configured
* using a <code>@RibbonClient</code> with the same name (i.e. value) as the feign client.
*
* @author Spencer Gibb
* @author Venil Noronha
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignClient {

/**
* The name of the service with optional protocol prefix. Synonym for {@link #name()
* name}. A name must be specified for all clients, whether or not a url is provided.
* Can be specified as property key, eg: ${propertyKey}.
*/
@AliasFor("name")
String value() default "";

/**
* The service id with optional protocol prefix. Synonym for {@link #value() value}.
*
* @deprecated use {@link #name() name} instead
*/
@Deprecated
String serviceId() default "";

/**
* The service id with optional protocol prefix. Synonym for {@link #value() value}.
*/
@AliasFor("value")
String name() default "";

/**
* Sets the <code>@Qualifier</code> value for the feign client.
*/
String qualifier() default "";

/**
* An absolute URL or resolvable hostname (the protocol is optional).
*/
String url() default "";

/**
* Whether 404s should be decoded instead of throwing FeignExceptions
*/
boolean decode404() default false;

/**
* A custom <code>@Configuration</code> for the feign client. Can contain override
* <code>@Bean</code> definition for the pieces that make up the client, for instance
* {@link feign.codec.Decoder}, {@link feign.codec.Encoder}, {@link feign.Contract}.
*
* @see FeignClientsConfiguration for the defaults
*/
Class<?>[] configuration() default {};

/**
* Fallback class for the specified Feign client interface. The fallback class must
* implement the interface annotated by this annotation and be a valid spring bean.
*/
Class<?> fallback() default void.class;

/**
* Define a fallback factory for the specified Feign client interface. The fallback
* factory must produce instances of fallback classes that implement the interface
* annotated by {@link FeignClient}. The fallback factory must be a valid spring
* bean.
*
* @see feign.hystrix.FallbackFactory for details.
*/
Class<?> fallbackFactory() default void.class;

/**
* Path prefix to be used by all method-level mappings. Can be used with or without
* <code>@RibbonClient</code>.
*/
String path() default "";

/**
* Whether to mark the feign proxy as a primary bean. Defaults to true.
*/
boolean primary() default true;

}

value: 指定FeignClient的名称,如果使用了Ribbon,value属性会作为微服务的名称,用于服务发现

qualifier: 使用客户端的别名

url: url用于调试,可以手动指定@FeignClient的调用地址

decode404: 当发生404错误的时候,如果该字段为true, 会调用decoder进行解码,否则抛出FeignException异常

configuration:feign配置类,可以自定义Feign的Encoder、Decoder、LogLevel、Contract等

fallback: 定义容错的处理类,当调用远程接口失败,或超时时,会调用对应接口的容错逻辑,fallback指定的类必须实现@FeignClient标记的接口

fallbackFactory: 工厂类,用于生成fallback类示例,通过这个属性,我们可以实现每个接口通用的容错逻辑,减少重复的代码

path: 定义当前FeignClient的统一前缀

4.2.2 feign开启GZIP压缩

spring cloud feign支持对请求和响应进行GZIP压缩,以提高通信效率。

1
2
3
4
5
6
7
8
feign:
compression:
request:
enabled: true
mime-types: text/xml,application/xml,application/json # 配置压缩支持的MIME TYPE
min-request-size: 2048 # 配置压缩数据大小的下限
response:
enabled: true # 配置响应GZIP压缩

4.2.3 feign client开启日志

Feign为每个FeignClient都提供了一个feign.Logger实例,可以在配置文件中开启日志,开启方式较为简单,分为两部

4.2.3.1 在application.yml中配置日志输出

1
2
3
logging:
level:
cn.com.bmsmart.StoreClient: debug

4.2.3.2 通过Java代码进行配置

通过在主类中配置

1
2
3
4
 @Bean
Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}

通过用@Configuration修饰的类去配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class FeignServiceConfig {

/**
*
* Logger.Level 的具体级别如下:
* NONE:不记录任何信息
* BASIC:仅记录请求方法、URL以及响应状态码和执行时间
* HEADERS:除了记录 BASIC级别的信息外,还会记录请求和响应的头信息
* FULL:记录所有请求与响应的明细,包括头信息、请求体、元数据
* @return
*/
@Bean
Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
}

4.2.4 Feign超时配置

  • Feign的调用分两层,即Ribbon的调用和Hystrix的调用,高版本的Hystrix默认是关闭的。
1
feign.RetryableException: Read time out execution POST http://.....

出现上面报错信息,说明Ribbon处理超时这时设置ribbon的配置信息如下:

1
2
ribbon.ReadTimeOut: 120000 # 请求处理超时时间
ribbon.ConnectionTimeout: 3000 # 请求连接超时时间

如果开启Hystrix, hystrix超时报错信息如下:

1
com.netflix.hystrix.exception.HystrixRuntimeException: Demo#demo() timedout and no fallback available.

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
feign:
hystrix:
enabled: true
hystrix:
shareSecurityContext: true
command:
default:
circuitBreaker:
sleepWindowInMilliseconds: 10000
forceClosed: true
execution:
isolation:
thread:
timeoutInMilliseconds: 600000

5 Feign文件上传

在我们开发中,文件上传是一个比较常见的功能,Feign早先是不支持文件上传的,后来虽然支持,但是仍然有缺陷。

5.1 编写文件上传客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* <p>Description: 导入词汇</p>
*
* @param userId 用户id
* @param tradeId 行业id
* @param dicId 词典id
* @param fileName 文件名
* @param file 文件
* @return
*/
@PostMapping(value = "/importdic", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<JsonResult> importDic(@RequestParam(value = "userId") String userId,
@RequestParam(value = "tradeId") String tradeId,
@RequestParam(value = "dicId") String dicId,
@RequestParam(value = "fileName") String fileName,
@RequestPart("file") MultipartFile file
) {
byte[] bytes = new byte[0];
try {
bytes = file.getBytes();
} catch (IOException e) {
e.printStackTrace();
}
HashMap<String, Object> importDic = dicDetailService.importDic(userId, tradeId, dicId, fileName, bytes);
return ResponseMessageUtil.success(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getStatus(), importDic);
}

注意点:

5.2 编写Feign文件上传服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@FeignClient(name = "smartke-txgl", configuration = DicDetailFeignClient.MultipartSupportConfig.class)
public interface DicDetailFeignClient {
/**
* <p>Description: 导入词汇</p>
*
* @param userId 用户id
* @param tradeId 行业id
* @param dicId 词典id
* @param fileName 文件名
* @param file 文件
* @return
*/
@PostMapping(value = DIC_DETAIL_FEIGN_CLIENT_PREFIX + "/importdic", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<JsonResult> importDic(@RequestParam(value = "userId") String userId,
@RequestParam(value = "tradeId") String tradeId,
@RequestParam(value = "dicId") String dicId,
@RequestParam(value = "fileName") String fileName,
@RequestPart("file") MultipartFile file
);

/**
* <p>Description: 通过少量的配置, 让feign阔以支持MultipartFile </p>
* Created by wusong on 2018-08-23 14:15.
*/
@Configuration
public class MultipartSupportConfig {
@Inject
private ObjectFactory<HttpMessageConverters> messageConverters;

@Inject
private HttpServletRequest request;


@Bean
public RequestInterceptor overrideRequestHeader() {
return (template) -> {
template.header("User-Agent", request.getHeader("User-Agent"));
};
}


@Bean
public Encoder feignFormEncoder() {
return new SpringFormEncoder(new SpringEncoder(messageConverters));
}

}
}

注意上面SpringFormEncoder需要feign-form-spring以及feign-form

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!--feign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form-spring</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form</artifactId>
<version>3.3.0</version>
</dependency>