Skip to content

Instantly share code, notes, and snippets.

@taojy123
Last active December 10, 2020 03:02
Show Gist options
  • Save taojy123/db380e680251dba2b1ea8230b77964e4 to your computer and use it in GitHub Desktop.
Save taojy123/db380e680251dba2b1ea8230b77964e4 to your computer and use it in GitHub Desktop.
tcp 端口转发并拦截流量推到 kafka 中
package main
import (
"fmt"
"os"
"net"
"github.com/Shopify/sarama"
)
var port = "10086"
var dist = "127.0.0.1:7180"
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// config.Version = sarama.V0_11_0_2
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("create kafka producer error :%s\n", err.Error())
return
}
defer producer.AsyncClose()
lis, err := net.Listen("tcp", "0.0.0.0:"+port)
if err != nil {
fmt.Println(err)
return
}
defer lis.Close()
for {
conn, err := lis.Accept()
if err != nil {
fmt.Printf("建立连接错误: %v\n", err)
continue
}
go handle(conn, producer)
}
}
func handle(sconn net.Conn, producer sarama.AsyncProducer) {
defer sconn.Close()
fmt.Println("sconn:", sconn.RemoteAddr(), sconn.LocalAddr())
dconn, err := net.Dial("tcp", dist)
defer dconn.Close()
if err != nil {
fmt.Printf("连接 %v 失败: %v\n", dist, err)
return
}
fmt.Println("dconn:", dconn.RemoteAddr(), dconn.LocalAddr())
ExitChan := make(chan bool, 1)
go func(sconn net.Conn, dconn net.Conn, Exit chan bool, producer sarama.AsyncProducer) {
// _, err := io.Copy(dconn, sconn)
// if err != nil {
// fmt.Printf("往 %v 发送数据失败: %v\n", dist, err)
// }
for {
b := make([]byte, 1024)
n, err := sconn.Read(b)
if err != nil {
fmt.Println("sconn read failed: ", err)
dconn.Close()
break
}
if n == 0 {
continue
}
// fmt.Println("sconn read", n, b)
value := b[:n]
n, err = dconn.Write(value)
// fmt.Println("dconn write", n, value, err)
if err != nil {
break
}
msg := &sarama.ProducerMessage{
Topic: "groxy",
Key: sarama.StringEncoder("key"),
Value: sarama.ByteEncoder(value),
}
// msg.Value = sarama.ByteEncoder(value)
producer.Input() <- msg
select {
case <-producer.Successes():
// case suc := <-producer.Successes():
// fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())
case fail := <-producer.Errors():
fmt.Printf("err: %s\n", fail.Err.Error())
}
fl, err := os.OpenFile("./send.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
if err == nil {
defer fl.Close()
fl.Write(value)
fl.Write([]byte("\n===========\n"))
}
}
Exit <- true
}(sconn, dconn, ExitChan, producer)
go func(sconn net.Conn, dconn net.Conn, Exit chan bool) {
// _, err := io.Copy(sconn, dconn)
// if err != nil {
// fmt.Printf("从 %v 接收数据失败: %v\n", dist, err)
// }
for {
b := make([]byte, 1024)
n, err := dconn.Read(b)
if err != nil {
fmt.Println("dconn read failed: ", err)
sconn.Close()
break
}
if n == 0 {
continue
}
// fmt.Println("dconn read", n, b)
value := b[:n]
n, err = sconn.Write(value)
// fmt.Println("sconn write", n, value, err)
if err != nil {
break
}
fl, err := os.OpenFile("./recv.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
if err == nil {
defer fl.Close()
fl.Write(value)
fl.Write([]byte("\n===========\n"))
}
}
Exit <- true
}(sconn, dconn, ExitChan)
<-ExitChan
<-ExitChan
fmt.Println(sconn.RemoteAddr(), sconn.LocalAddr(), dconn.RemoteAddr(), dconn.LocalAddr(), "closed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment