Created
March 28, 2025 11:58
-
-
Save ad3n/b4b593db083decd85506571921d146f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package requests | |
import ( | |
"crypto/tls" | |
"errors" | |
"io" | |
"lawang" | |
"lawang/bytes" | |
"lawang/configs" | |
s "lawang/strings" | |
"net/http" | |
"net/url" | |
"strings" | |
"sync/atomic" | |
"time" | |
"github.com/gabriel-vasile/mimetype" | |
"github.com/gofiber/fiber/v3" | |
"github.com/rs/zerolog/log" | |
core "github.com/valyala/fasthttp" | |
) | |
type fasthttp struct { | |
client core.Client | |
cache *lawang.Cache | |
} | |
func NewFastHttpClient(cache *lawang.Cache) *fasthttp { | |
return &fasthttp{ | |
cache: cache, | |
client: core.Client{ | |
Name: "Lawang Crawler", | |
TLSConfig: &tls.Config{ | |
InsecureSkipVerify: true, | |
}, | |
}, | |
} | |
} | |
func (f *fasthttp) Do(requestID string, endpoint *configs.Endpoint, request *Request, response *Response) error { | |
payload := core.AcquireRequest() | |
defer core.ReleaseRequest(payload) | |
payload.Header = *f.buildHeader(endpoint, request) | |
payload.Header.SetMethod(request.Method) | |
payload.Header.Set(configs.LAWANG_REQUEST_ID_HEADER, requestID) | |
f.buildRequest(payload, *endpoint.Backend, request) | |
coreResponse := core.AcquireResponse() | |
defer core.ReleaseResponse(coreResponse) | |
payload.SetTimeout(time.Duration(request.Timeout) * time.Second) | |
response.Success = true | |
start := time.Now() | |
err := f.do(payload, coreResponse) | |
elapsed := time.Since(start) | |
if err != nil { | |
log.Error().Err(err). | |
Str("app_id", configs.Config.Name). | |
Str("env", configs.Config.Environment). | |
Str("request_id", requestID). | |
Str("function", "requests.nethttp.request"). | |
Msg("Failed to call endpoint") | |
response.Body = s.StringToByte(err.Error()) | |
response.ContentType = bytes.ByteToString(coreResponse.Header.Peek(configs.LAWANG_CONTENT_TYPE_HEADER)) | |
if err == core.ErrTimeout { | |
response.StatusCode = core.StatusGatewayTimeout | |
return err | |
} | |
if coreResponse.StatusCode() != fiber.StatusOK { | |
response.StatusCode = coreResponse.StatusCode() | |
} | |
return err | |
} | |
if elapsed.Seconds() > configs.Config.SlowLogThreshold { | |
log.Info(). | |
Str("app_id", configs.Config.Name). | |
Str("env", configs.Config.Environment). | |
Str("request_id", requestID). | |
Str("function", "loggers.slow_log"). | |
Str("path", payload.URI().String()). | |
Str("execution_time", elapsed.String()). | |
Msg("Slow request detected") | |
} | |
response.StatusCode = coreResponse.StatusCode() | |
response.Headers = url.Values{} | |
body, err := coreResponse.BodyUncompressed() | |
if err != nil { | |
log.Error().Err(err). | |
Str("app_id", configs.Config.Name). | |
Str("env", configs.Config.Environment). | |
Str("request_id", requestID). | |
Str("function", "requests.fasthttps.request"). | |
Msg("Failed to read response") | |
return nil | |
} | |
coreResponse.Header.Del(configs.LAWANG_CONTENT_ENCODING_HEADER) | |
mimeType := mimetype.Detect(body).String() | |
response.Body = body | |
coreResponse.Header.VisitAll(func(k, v []byte) { | |
response.Headers.Add(bytes.ByteToString(k), bytes.ByteToString(v)) | |
}) | |
response.ContentType = bytes.ByteToString(coreResponse.Header.Peek(configs.LAWANG_CONTENT_TYPE_HEADER)) | |
if response.ContentType != mimeType { | |
response.ContentType = mimeType | |
} | |
return err | |
} | |
func (f *fasthttp) buildRequest(payload *core.Request, backend configs.Backend, request *Request) { | |
target := s.BuilderPool.Get() | |
defer s.BuilderPool.Put(target) | |
for k, v := range request.Params { | |
target.WriteString(":") | |
target.WriteString(k) | |
backend.Path = strings.Replace(backend.Path, target.String(), v, 1) | |
target.Reset() | |
} | |
queries := url.Values{} | |
for k, v := range backend.Queries { | |
queries.Add(k, v) | |
} | |
for k, v := range request.Queries { | |
queries.Add(k, v) | |
} | |
contentType := bytes.ByteToString(payload.Header.Peek(configs.LAWANG_CONTENT_TYPE_HEADER)) | |
body := buildBody(f.cache, &contentType, request.Files, request.RawBody) | |
byteBody := bytes.BufferPool.Get() | |
defer bytes.BufferPool.Put(byteBody) | |
io.Copy(byteBody, body) | |
payload.URI().SetScheme(backend.Protocol) | |
payload.URI().SetHost(backend.Host) | |
payload.URI().SetPath(backend.Path) | |
payload.URI().SetQueryString(queries.Encode()) | |
if contentType != bytes.ByteToString(payload.Header.Peek(configs.LAWANG_CONTENT_TYPE_HEADER)) { | |
payload.Header.Del(configs.LAWANG_CONTENT_TYPE_HEADER) | |
payload.Header.Set(configs.LAWANG_CONTENT_TYPE_HEADER, contentType) | |
} | |
payload.SetBody(byteBody.Bytes()) | |
} | |
func (f *fasthttp) do(request *core.Request, response *core.Response) error { | |
var init int32 = 0 | |
var maxRedirection int32 = 50 | |
for { | |
atomic.AddInt32(&init, 1) | |
if init > maxRedirection { | |
return core.ErrTooManyRedirects | |
} | |
if err := f.client.Do(request, response); err != nil { | |
return err | |
} | |
statusCode := response.Header.StatusCode() | |
if statusCode != core.StatusMovedPermanently && | |
statusCode != core.StatusFound && | |
statusCode != core.StatusSeeOther && | |
statusCode != core.StatusTemporaryRedirect && | |
statusCode != core.StatusPermanentRedirect { | |
break | |
} | |
location := response.Header.PeekBytes([]byte("Location")) | |
if len(location) == 0 { | |
return errors.New("redirect with missing Location header") | |
} | |
u := request.URI() | |
u.UpdateBytes(location) | |
response.Header.VisitAllCookie(func(key, value []byte) { | |
c := core.AcquireCookie() | |
c.ParseBytes(value) | |
if expire := c.Expire(); expire != core.CookieExpireUnlimited && expire.Before(time.Now()) { | |
request.Header.DelCookieBytes(key) | |
} else { | |
request.Header.SetCookieBytesKV(key, c.Value()) | |
} | |
core.ReleaseCookie(c) | |
}) | |
} | |
return nil | |
} | |
func (f *fasthttp) buildHeader(endpoint *configs.Endpoint, request *Request) *core.RequestHeader { | |
headers := f.cleaningHeaders(request.Headers, endpoint.AllowedHeaders) | |
if len(headers.Peek(configs.LAWANG_ACCEPT_ENCODING_HEADER)) == 0 { | |
headers.Set(configs.LAWANG_ACCEPT_ENCODING_HEADER, configs.LAWANG_CONTENT_ENCODING_GZIP) | |
} | |
for k, v := range endpoint.Backend.Headers { | |
headers.Set(k, v) | |
} | |
return headers | |
} | |
func (f *fasthttp) cleaningHeaders(headers http.Header, allowedHeaders string) *core.RequestHeader { | |
cleanHeaders := core.RequestHeader{} | |
allowed := strings.Split(allowedHeaders, ",") | |
for _, header := range allowed { | |
if h := headers.Get(s.ToLower(header)); h != "" { | |
cleanHeaders.Set(header, h) | |
} | |
} | |
contentType := headers.Get(s.ToLower(configs.LAWANG_CONTENT_TYPE_HEADER)) | |
if bytes.ByteToString(cleanHeaders.Peek(configs.LAWANG_CONTENT_TYPE_HEADER)) != contentType { | |
cleanHeaders.Set(configs.LAWANG_CONTENT_TYPE_HEADER, contentType) | |
} | |
return &cleanHeaders | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment