Last active
February 28, 2018 08:22
-
-
Save muxueqz/b5da6d1facbd7a8a6c4003c711d35911 to your computer and use it in GitHub Desktop.
Simple HTTP Put
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"files-bak":[ | |
"Z:\\tmp\\test.log" | |
], | |
"files":[ | |
"/tmp/test.log", | |
"/tmp/test-iconv.lua" | |
], | |
"files2":[ | |
"/tmp/test-seq.log2" | |
], | |
"batchsize": 10000, | |
"output": "http://upload_url/paysys_post", | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"fmt" | |
"github.com/ti/nasync" | |
"io" | |
"io/ioutil" | |
// "log" | |
"net/http" | |
"os" | |
"path/filepath" | |
"strings" | |
"time" | |
) | |
type Config struct { | |
BatchSize int | |
Output string | |
Files []string | |
} | |
var myClient = &http.Client{ | |
Transport: &http.Transport{ | |
MaxIdleConnsPerHost: 9999999, | |
}, | |
Timeout: time.Second * 10, | |
} | |
func httpPost(url string, data string) { | |
resp, err := myClient.Post(url, | |
"application/x-www-form-urlencoded", | |
strings.NewReader(data)) | |
if resp.StatusCode != 200 { | |
panic(resp) | |
} | |
if err != nil { | |
fmt.Println(err) | |
} | |
io.Copy(ioutil.Discard, resp.Body) | |
resp.Body.Close() | |
} | |
func check(e error) { | |
if e != nil { | |
panic(e) | |
} | |
} | |
func process_logs() { | |
workdir, err := filepath.Abs(filepath.Dir(os.Args[0])) | |
if err != nil { | |
check(err) | |
} | |
var check_points map[string]int64 | |
check_points = make(map[string]int64) | |
async := nasync.New(100, 100) | |
defer async.Close() | |
config_file, err := os.Open(filepath.Join(workdir, "config.json")) | |
fmt.Println(filepath.Join(workdir, "config.json")) | |
config, err := ioutil.ReadAll(config_file) | |
if err != nil { | |
check(err) | |
} | |
var cfg Config | |
json.Unmarshal([]byte(config), &cfg) | |
fmt.Println([]byte(config)) | |
fmt.Println(cfg) | |
config_file.Close() | |
dump_file := filepath.Join(workdir, | |
"checkpoint.data") | |
checkpoint_fd, err := os.Open(dump_file) | |
checkpoint_read, err := ioutil.ReadAll(checkpoint_fd) | |
json.Unmarshal([]byte(checkpoint_read), &check_points) | |
checkpoint_fd.Close() | |
tmpfile := filepath.Join(workdir, | |
"checkpoint.tmp") | |
var URL = cfg.Output | |
var files = cfg.Files | |
_ = URL | |
for _, log_file := range files { | |
count := 0 | |
fmt.Println(log_file) | |
f, err := os.Open(log_file) | |
check(err) | |
var seek int64 = 0 | |
seek = check_points[log_file] | |
f.Seek(seek, os.SEEK_SET) | |
// reader := bufio.NewReader(f) | |
// // line, err := reader.ReadString('\n') | |
// // line, err = reader.ReadString('\n') | |
// // _ = line | |
// for { | |
// line, err := reader.ReadString('\n') | |
// if err != nil { | |
// if err == io.EOF { | |
// fmt.Printf("%#v\n", line) | |
// break | |
// } | |
// panic(err) | |
// } | |
// // async.Do(httpPost, URL, line) | |
// fmt.Println(line) // Println will add back the final '\n' | |
// seek += int64(len(line)) | |
// // fmt.Printf("seek: %s\n", string(seek), seek) | |
// check_points[log_file] = seek | |
// } | |
scanner := bufio.NewScanner(f) | |
for scanner.Scan() { | |
line := (scanner.Text()) // Println will add back the final '\n' | |
async.Do(httpPost, URL, line) // go httpPost(URL, line) | |
// fmt.Println(line) // Println will add back the final '\n' | |
seek += int64(len(line) + 1) // add '\n' length | |
check_points[log_file] = seek | |
count++ | |
if count > cfg.BatchSize { | |
break | |
} | |
} | |
f.Close() | |
fmt.Println(check_points) | |
json_data, err := json.Marshal(check_points) | |
check(err) | |
err = ioutil.WriteFile(tmpfile, json_data, 0644) | |
os.Rename(tmpfile, dump_file) | |
time.Sleep(time.Second / 10) | |
} | |
} | |
func main() { | |
for { | |
process_logs() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@liuyu 。。宇哥,这里是github,不是gitlab,之前代码不是说没用了么,所以就备份到github了