Last active
December 10, 2020 03:02
-
-
Save taojy123/db380e680251dba2b1ea8230b77964e4 to your computer and use it in GitHub Desktop.
tcp 端口转发并拦截流量推到 kafka 中
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"net" | |
"github.com/Shopify/sarama" | |
) | |
var port = "10086" | |
var dist = "127.0.0.1:7180" | |
func main() { | |
config := sarama.NewConfig() | |
config.Producer.RequiredAcks = sarama.WaitForAll | |
config.Producer.Partitioner = sarama.NewRandomPartitioner | |
config.Producer.Return.Successes = true | |
config.Producer.Return.Errors = true | |
// config.Version = sarama.V0_11_0_2 | |
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) | |
if err != nil { | |
fmt.Printf("create kafka producer error :%s\n", err.Error()) | |
return | |
} | |
defer producer.AsyncClose() | |
lis, err := net.Listen("tcp", "0.0.0.0:"+port) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
defer lis.Close() | |
for { | |
conn, err := lis.Accept() | |
if err != nil { | |
fmt.Printf("建立连接错误: %v\n", err) | |
continue | |
} | |
go handle(conn, producer) | |
} | |
} | |
func handle(sconn net.Conn, producer sarama.AsyncProducer) { | |
defer sconn.Close() | |
fmt.Println("sconn:", sconn.RemoteAddr(), sconn.LocalAddr()) | |
dconn, err := net.Dial("tcp", dist) | |
defer dconn.Close() | |
if err != nil { | |
fmt.Printf("连接 %v 失败: %v\n", dist, err) | |
return | |
} | |
fmt.Println("dconn:", dconn.RemoteAddr(), dconn.LocalAddr()) | |
ExitChan := make(chan bool, 1) | |
go func(sconn net.Conn, dconn net.Conn, Exit chan bool, producer sarama.AsyncProducer) { | |
// _, err := io.Copy(dconn, sconn) | |
// if err != nil { | |
// fmt.Printf("往 %v 发送数据失败: %v\n", dist, err) | |
// } | |
for { | |
b := make([]byte, 1024) | |
n, err := sconn.Read(b) | |
if err != nil { | |
fmt.Println("sconn read failed: ", err) | |
dconn.Close() | |
break | |
} | |
if n == 0 { | |
continue | |
} | |
// fmt.Println("sconn read", n, b) | |
value := b[:n] | |
n, err = dconn.Write(value) | |
// fmt.Println("dconn write", n, value, err) | |
if err != nil { | |
break | |
} | |
msg := &sarama.ProducerMessage{ | |
Topic: "groxy", | |
Key: sarama.StringEncoder("key"), | |
Value: sarama.ByteEncoder(value), | |
} | |
// msg.Value = sarama.ByteEncoder(value) | |
producer.Input() <- msg | |
select { | |
case <-producer.Successes(): | |
// case suc := <-producer.Successes(): | |
// fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String()) | |
case fail := <-producer.Errors(): | |
fmt.Printf("err: %s\n", fail.Err.Error()) | |
} | |
fl, err := os.OpenFile("./send.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644) | |
if err == nil { | |
defer fl.Close() | |
fl.Write(value) | |
fl.Write([]byte("\n===========\n")) | |
} | |
} | |
Exit <- true | |
}(sconn, dconn, ExitChan, producer) | |
go func(sconn net.Conn, dconn net.Conn, Exit chan bool) { | |
// _, err := io.Copy(sconn, dconn) | |
// if err != nil { | |
// fmt.Printf("从 %v 接收数据失败: %v\n", dist, err) | |
// } | |
for { | |
b := make([]byte, 1024) | |
n, err := dconn.Read(b) | |
if err != nil { | |
fmt.Println("dconn read failed: ", err) | |
sconn.Close() | |
break | |
} | |
if n == 0 { | |
continue | |
} | |
// fmt.Println("dconn read", n, b) | |
value := b[:n] | |
n, err = sconn.Write(value) | |
// fmt.Println("sconn write", n, value, err) | |
if err != nil { | |
break | |
} | |
fl, err := os.OpenFile("./recv.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644) | |
if err == nil { | |
defer fl.Close() | |
fl.Write(value) | |
fl.Write([]byte("\n===========\n")) | |
} | |
} | |
Exit <- true | |
}(sconn, dconn, ExitChan) | |
<-ExitChan | |
<-ExitChan | |
fmt.Println(sconn.RemoteAddr(), sconn.LocalAddr(), dconn.RemoteAddr(), dconn.LocalAddr(), "closed") | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment