Created
May 15, 2016 20:51
-
-
Save raztud/69ad220911cf28218c8b99a3cc94ab99 to your computer and use it in GitHub Desktop.
asyncio tcp server write number of requests to cassandra; golang client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import asyncio | |
import asyncio.streams | |
import uuid | |
from cassandra.cluster import Cluster | |
class MyServer: | |
def __init__(self, uid): | |
self.server = None # encapsulates the server sockets | |
self.clients = {} # task -> (reader, writer) | |
self.uid = uid | |
cluster = Cluster(['127.0.0.1'], protocol_version=4) | |
self.session = cluster.connect() | |
self.session.set_keyspace("raztud") | |
def _accept_client(self, client_reader, client_writer): | |
""" | |
This method accepts a new client connection and creates a Task | |
to handle this client. self.clients is updated to keep track | |
of the new client. | |
""" | |
# start a new Task to handle this specific client connection | |
task = asyncio.Task(self._handle_client(client_reader, client_writer)) | |
self.clients[task] = (client_reader, client_writer) | |
def client_done(task): | |
print("client task done:", task, file=sys.stderr) | |
del self.clients[task] | |
task.add_done_callback(client_done) | |
@asyncio.coroutine | |
def _handle_client(self, client_reader, client_writer): | |
""" | |
This method actually does the work to handle the requests for | |
a specific client. The protocol is line oriented, so there is | |
a main loop that reads a line with a request and then sends | |
out one or more lines back to the client with the result. | |
""" | |
while True: | |
try: | |
data = (yield from client_reader.readline()).decode("utf-8") | |
except: | |
data = None | |
if not data: # an empty string means the client disconnected | |
break | |
self.session.execute("update requests set number = number+1 WHERE id = {0}".format(self.uid)); | |
# client_writer.write("end\n".encode("utf-8")) | |
# This enables us to have flow control in our connection. | |
yield from client_writer.drain() | |
def start(self, loop): | |
""" | |
Starts the TCP server, so that it listens on port 8080. | |
For each client that connects, the accept_client method gets | |
called. This method runs the loop until the server sockets | |
are ready to accept connections. | |
""" | |
self.server = loop.run_until_complete( | |
asyncio.streams.start_server(self._accept_client, | |
'127.0.0.1', 8080, | |
loop=loop)) | |
def stop(self, loop): | |
""" | |
Stops the TCP server, i.e. closes the listening socket(s). | |
This method runs the loop until the server sockets are closed. | |
""" | |
if self.server is not None: | |
self.server.close() | |
loop.run_until_complete(self.server.wait_closed()) | |
self.server = None | |
def main(): | |
loop = asyncio.get_event_loop() | |
uid = str(uuid.uuid1()) | |
print ("Start with uuid: {0}".format(uid)) | |
# creates a server and starts listening to TCP connections | |
server = MyServer(uid=uid) | |
server.start(loop) | |
try: | |
loop.run_forever() | |
finally: | |
server.close() | |
loop.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"net" | |
"os" | |
"sync" | |
) | |
func main() { | |
var numberOfMessages = flag.Int("noMsg", 100, "number of messages") | |
flag.Parse() | |
fmt.Printf("Send %d messagess. Please wait...\n", *numberOfMessages) | |
conn, err := net.Dial("tcp", "localhost:8080") | |
if err != nil { | |
fmt.Println(err) | |
conn.Close() | |
} | |
var wg sync.WaitGroup | |
for i := 0; i < *numberOfMessages; i++ { | |
wg.Add(1) | |
go sendRequest(&wg, conn) | |
} | |
wg.Wait() | |
conn.Close() | |
} | |
func sendRequest(wg *sync.WaitGroup, conn net.Conn) { | |
defer wg.Done() | |
strEcho := "Hello\n" | |
_, err := conn.Write([]byte(strEcho)) | |
if err != nil { | |
println("Write to server failed:", err.Error()) | |
os.Exit(1) | |
} | |
// reply := make([]byte, 1024) | |
// _, err = conn.Read(reply) | |
// if err != nil { | |
// println("Write to server failed:", err.Error()) | |
// } | |
// | |
// println("reply from server=", string(reply)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment