Skip to content

Instantly share code, notes, and snippets.

@trunet
Last active January 6, 2021 00:56
Show Gist options
  • Save trunet/6370693 to your computer and use it in GitHub Desktop.
Save trunet/6370693 to your computer and use it in GitHub Desktop.
NTP protocol for Twisted using ntplib
from twisted.internet.protocol import DatagramProtocol
from twisted.python import log
from twisted.python.failure import Failure
from ntplib import *
import time
class Error(Exception): pass
class TimeoutError(Error): pass
class NTPProtocol(DatagramProtocol):
def __init__(self, host, callback, version=2, port=123, timeout=3, reactor=None):
self.host = host
self.version = version
self.port = port
self.ip = None
self.d = callback
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor
self.timeout = self._reactor.callLater(timeout, self.timedOut)
self._reactor.listenUDP(0, self, maxPacketSize=256)
def startProtocol(self):
dr = self._reactor.resolve(self.host)
dr.addCallback(self.connect)
dr.addErrback(self.resolveError)
def connect(self, ip):
log.msg("Connecting to NTP Server " + ip)
self.ip = ip
self.transport.connect(self.ip, self.port)
self.sendDatagram()
def resolveError(self, failure):
log.err(str(failure))
return None
def timedOut(self):
log.err("Timed out")
self.stop()
self.d.errback(Failure(TimeoutError("Connection to NTP Server timed out")))
def stop(self):
self.transport.stopListening()
self.stopProtocol()
def sendDatagram(self):
# create the request packet - mode 3 is client
query_packet = NTPPacket(mode=3, version=self.version, tx_timestamp=system_to_ntp_time(time.time()))
self.transport.write(query_packet.to_data())
def datagramReceived(self, datagram, host):
self.timeout.cancel()
log.msg("Received NTP packet from " + str(host[0]))
dest_timestamp = system_to_ntp_time(time.time())
stats = NTPStats()
stats.from_data(datagram)
stats.dest_timestamp = dest_timestamp
self.d.callback(stats)
from protocol import NTPProtocol
from time import ctime
import sys
from twisted.internet import reactor, defer
from twisted.python import log
log.startLogging(sys.stdout)
class MyObj(object):
def __init__(self):
self.requestNTP()
def printNtpStats(self, stats):
print ctime(stats.tx_time)
self.protocol.stop()
self.requestNTP()
def printError(self, failure):
log.err(str(failure))
self.requestNTP()
return None
def requestNTP(self):
self.d = defer.Deferred()
self.d.addCallback(self.printNtpStats)
self.d.addErrback(self.printError)
self.protocol = NTPProtocol('pool.ntp.org', self.d)
reactor.callWhenRunning(MyObj)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment