Last active
June 6, 2024 09:40
-
-
Save AFutureD/57530807815efdb7da6faa3391e7c8eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package *.gateway.fliter; | |
import lombok.RequiredArgsConstructor; | |
import org.springframework.cloud.gateway.filter.GatewayFilterChain; | |
import org.springframework.cloud.gateway.filter.GlobalFilter; | |
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; | |
import org.springframework.core.Ordered; | |
import org.springframework.http.MediaType; | |
import org.springframework.http.codec.HttpMessageReader; | |
import org.springframework.http.codec.ServerCodecConfigurer; | |
import org.springframework.http.server.reactive.ServerHttpRequest; | |
import org.springframework.stereotype.Component; | |
import org.springframework.util.LinkedMultiValueMap; | |
import org.springframework.util.MultiValueMap; | |
import org.springframework.util.StringUtils; | |
import org.springframework.web.server.ServerWebExchange; | |
import reactor.core.publisher.Mono; | |
import java.io.UnsupportedEncodingException; | |
import java.net.URLDecoder; | |
import java.nio.charset.Charset; | |
import static com.yigegroup.gateway.constants.GatewayConstants.EXCHANGE_CACHED_FORM_DATA; | |
// Difficult to use `AdaptCachedBodyGlobalFilter`, so using this instead. | |
@Component | |
@RequiredArgsConstructor | |
public class GlobalCacheRequestFilter implements GlobalFilter, Ordered { | |
private final ServerCodecConfigurer serverCodecConfigurer; | |
@Override | |
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { | |
return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> { | |
if (serverHttpRequest == exchange.getRequest()) { | |
return chain.filter(exchange); | |
} | |
ServerWebExchange mutatedExchange = exchange.mutate().request(serverHttpRequest).build(); | |
return chain.filter(mutatedExchange); | |
}); | |
} | |
@Override | |
public int getOrder() { | |
return -1024; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package *.gateway.fliter; | |
import com.alibaba.fastjson.JSONObject; | |
import com.yigegroup.core.common.util.RequestHolder; | |
import lombok.Data; | |
import lombok.SneakyThrows; | |
import lombok.extern.slf4j.Slf4j; | |
import org.reactivestreams.Publisher; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.gateway.filter.GatewayFilterChain; | |
import org.springframework.cloud.gateway.filter.GlobalFilter; | |
import org.springframework.cloud.gateway.route.Route; | |
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; | |
import org.springframework.core.Ordered; | |
import org.springframework.core.io.buffer.DataBuffer; | |
import org.springframework.core.io.buffer.DataBufferFactory; | |
import org.springframework.core.io.buffer.DataBufferUtils; | |
import org.springframework.core.io.buffer.DefaultDataBufferFactory; | |
import org.springframework.http.HttpHeaders; | |
import org.springframework.http.MediaType; | |
import org.springframework.http.codec.HttpMessageReader; | |
import org.springframework.http.server.reactive.ServerHttpRequest; | |
import org.springframework.http.server.reactive.ServerHttpResponse; | |
import org.springframework.http.server.reactive.ServerHttpResponseDecorator; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.reactive.function.server.HandlerStrategies; | |
import org.springframework.web.server.ServerWebExchange; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.nio.CharBuffer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.function.Consumer; | |
import static org.springframework.http.MediaType.*; | |
@Data | |
class GatewayIn { | |
// 请求时间(请求到达服务器时间戳) | |
private long t; | |
private GatewayRequestCtx context; | |
private String headers; | |
private String request; | |
} | |
@Data | |
class GatewayLogRecordDTO { | |
// 客户端环境 | |
private String env; | |
private long outstamp; | |
private GatewayIn in; | |
private GatewayOut out; | |
// The only constructor for this Class. | |
// Notice: Try not use AllArgsConstructor or other Annotations. | |
public GatewayLogRecordDTO(GatewayIn in, GatewayOut out) { | |
this.in = in; | |
this.out = out; | |
this.tryMapCriticalInfo(in); | |
} | |
// In this method, it's not allowed to directly call `this.in` property, instead you should call parameter `in`. | |
public void tryMapCriticalInfo(GatewayIn in) { | |
this.env = in.getContext().getEnv(); | |
this.outstamp = out.getT(); | |
} | |
} | |
@Data | |
class GatewayOut { | |
// 服务端返回的信息。 | |
// 一般得,这里只提供客户端需要的内容,更细节的信息,通过 TraceID 获取。 | |
// 返回时间 | |
private long t; | |
private String headers; | |
private String response; | |
private long spent; | |
} | |
@Data | |
class GatewayRequestCtx { | |
private String requestID; | |
private String api; | |
private String apiVersion; | |
private String env; | |
private String station; | |
private String language; | |
private String country; | |
private String uid; | |
private String ip; | |
private List<String> tags; | |
// 客户端发出请求的时间戳 | |
private String t; | |
// 请求方法 GET、POST、DELETE、UPDATE | |
private String method; | |
private String param; | |
} | |
@Slf4j | |
@Component | |
public class LogFilter implements GlobalFilter, Ordered { | |
@Value("${spring.profiles.active}") | |
private String active; | |
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders(); | |
@Override | |
public int getOrder() { | |
return -1023; | |
} | |
@Override | |
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { | |
GatewayIn recordIn = this.prepareGatewayIn(exchange); | |
GatewayOut recordOut = new GatewayOut(); | |
Consumer<String> receiveResponse = (String responseBody) -> { | |
long stamp = System.currentTimeMillis(); | |
HttpHeaders responseHeaders = exchange.getResponse().getHeaders(); | |
responseHeaders.set("x-rc-outstamp", String.valueOf(stamp)); | |
recordOut.setResponse(responseBody); | |
recordOut.setHeaders(JSONObject.toJSONString(responseHeaders.toSingleValueMap())); | |
recordOut.setT(stamp); | |
recordOut.setSpent(stamp - recordIn.getT()); | |
}; | |
ServerHttpResponseDecorator decoratedResponse = responseDecorator(exchange, receiveResponse); | |
// 记录普通的 | |
return chain.filter( | |
exchange.mutate().response(decoratedResponse).build() | |
) | |
.then(Mono.fromRunnable(() -> { | |
// 打印日志 | |
GatewayLogRecordDTO record = new GatewayLogRecordDTO(recordIn, recordOut); | |
writeAccessLog(record); | |
})); | |
} | |
private void writeAccessLog(GatewayLogRecordDTO record) { | |
log.info(JSONObject.toJSONString(record)); | |
} | |
private Route getGatewayRoute(ServerWebExchange exchange) { | |
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); | |
} | |
private GatewayRequestCtx tryPrepareRequestContext(ServerWebExchange exchange) { | |
ServerHttpRequest request = exchange.getRequest(); | |
Map<String, String> requestMap = request.getHeaders().toSingleValueMap(); | |
Map<String, String> queryParams = request.getQueryParams().toSingleValueMap(); | |
String requestPath = request.getPath().pathWithinApplication().value(); | |
String ipAddress = RequestHolder.getServerHttpRequestIpAddress(request); | |
GatewayRequestCtx ctx = new GatewayRequestCtx(); | |
ctx.setRequestID(requestMap.get("X-Request-ID")); | |
ctx.setApi(requestPath); | |
ctx.setUid(requestMap.get("x-rc-uid")); | |
ctx.setTags(Collections.emptyList()); | |
ctx.setIp(ipAddress); | |
ctx.setT(requestMap.get("x-rc-timestamp")); | |
ctx.setMethod(request.getMethodValue()); | |
ctx.setParam(JSONObject.toJSONString(queryParams)); | |
return ctx; | |
} | |
private GatewayIn prepareGatewayIn(ServerWebExchange exchange) { | |
GatewayRequestCtx ctx = this.tryPrepareRequestContext(exchange); | |
Map<String, String> requestMap = exchange.getRequest().getHeaders().toSingleValueMap(); | |
String body = this.prepareRequestBody(exchange); | |
GatewayIn recordIn = new GatewayIn(); | |
recordIn.setT(System.currentTimeMillis()); | |
recordIn.setContext(ctx); | |
recordIn.setHeaders(JSONObject.toJSONString(requestMap)); | |
recordIn.setRequest(body); | |
return recordIn; | |
} | |
@SneakyThrows | |
private String prepareRequestBody(ServerWebExchange exchange) { | |
ServerHttpRequest request = exchange.getRequest(); | |
MediaType mediaType = request.getHeaders().getContentType(); | |
DataBuffer cachedRequestBody = exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR); | |
if (cachedRequestBody == null) { | |
return null; | |
} | |
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(cachedRequestBody.asByteBuffer()); | |
String bodyStr = charBuffer.toString(); | |
if (APPLICATION_JSON.isCompatibleWith(mediaType)) { | |
return bodyStr; | |
} else if ( | |
APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) | |
|| MULTIPART_FORM_DATA.isCompatibleWith(mediaType) | |
) { | |
// TODO Parse form data. | |
return bodyStr; | |
} | |
return null; | |
} | |
// ModifyResponseBodyGatewayFilterFactory.java | |
private ServerHttpResponseDecorator responseDecorator( | |
ServerWebExchange exchange, | |
Consumer<String> action | |
) { | |
ServerHttpResponse response = exchange.getResponse(); | |
DataBufferFactory bufferFactory = response.bufferFactory(); | |
return new ServerHttpResponseDecorator(response) { | |
@Override | |
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { | |
if (body instanceof Flux) { | |
Flux<? extends DataBuffer> fluxBody = Flux.from(body); | |
return super.writeWith(fluxBody.buffer().map(dataBuffers -> { | |
// 合并多个流集合,解决返回体分段传输 | |
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); | |
DataBuffer join = dataBufferFactory.join(dataBuffers); | |
byte[] content = new byte[join.readableByteCount()]; | |
join.read(content); | |
// 释放掉内存 | |
DataBufferUtils.release(join); | |
// 大于 1GB 丢弃 | |
if (content.length <= 1024 * 1024 * 1024) { | |
String responseResult = new String(content, StandardCharsets.UTF_8); | |
action.accept(responseResult); | |
} | |
return bufferFactory.wrap(content); | |
})); | |
} else if (body instanceof Mono) { | |
Mono<? extends DataBuffer> monoBody = Mono.from(body); | |
return super.writeWith(monoBody.map(dataBuffer -> { | |
byte[] content = new byte[dataBuffer.readableByteCount()]; | |
dataBuffer.read(content); | |
// 释放掉内存 | |
DataBufferUtils.release(dataBuffer); | |
// 大于 1GB 丢弃 | |
if (content.length <= 1024 * 1024 * 1024) { | |
String responseResult = new String(content, StandardCharsets.UTF_8); | |
action.accept(responseResult); | |
} | |
return bufferFactory.wrap(content); | |
})); | |
} | |
// if body is not a flux. never got there. | |
return super.writeWith(body); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment