Skip to content

Instantly share code, notes, and snippets.

@bbrodriges
Created April 16, 2012 11:16
Show Gist options
  • Save bbrodriges/2397845 to your computer and use it in GitHub Desktop.
Save bbrodriges/2397845 to your computer and use it in GitHub Desktop.
Yandex.Maps http spoofer
#!/usr/bin/env python
import sys, os, gzip, zlib, StringIO, time, datetime, getopt, httplib, re, random
class pyagger:
""" Yandex Maps Spoofer (talks like an Android client) """
class mpoint:
""" A map point with speed, direction and time of capture """
pointtemplate = "<point lat=\"%.6f\" lon=\"%.6f\" avg_speed=\"%d\" direction=\"%d\" time=\"%s\" />"
def __init__(self, lat, lon, avg_speed, direction, ctime = None):
if ctime is None:
if pyagger.lasttime is None:
pyagger.lasttime = datetime.datetime.now()
ctime = pyagger.lasttime
else:
pyagger.lasttime = ctime
self.lat, self.lon, self.avg_speed, self.direction, self.ctime = lat, lon, avg_speed, direction, ctime
pyagger.lasttime += datetime.timedelta(milliseconds = pyagger.pointdelay)
def get_xml(self):
return self.mfilter(self.pointtemplate % (self.lat, self.lon, self.avg_speed, self.direction, self.ctime.strftime("%d%m%Y:%H%M%S")))
def to_xml(self, lat, lon, avg_speed, direction, ctime):
return self.mfilter(self.pointtemplate % (lat, lon, avg_speed, direction, ctime.strftime("%d%m%Y:%H%M%S")))
def mfilter(self, xmlstr):
#print "tst " +re.sub(r"\w+\=\"\-\d+\"\s","",xmlstr)
return re.sub(r"\w+\=\"\-\d+\"\s","",xmlstr)
#
# Alternative hardcoded key (uuid is usually used instead), unused, let it be.
#
altkey = "A35D8D69227C3E57807B03D6BDCE7B89F6A1229005856BD03744ACAC8CF0ADB4C6C96F59ECD465B37E051ACA63309CF038AF527BAA6F1F2E5E1B71E6E0B46402DD30BA3DFD2DC20E5103".decode('hex')
#
# XML templates
#
xmltemplate = "<?xml version=\"1.0\" encoding=\"utf8\" ?><traffic_collect>@points@</traffic_collect>"
#pointtemplate = "<point lat=\"@lat@\" lon=\"@lon@\" avg_speed=\"@avg_speed@\" direction=\"@direction@\" time=\"@time@\" />"
pointdelay = 6000
numpoints = 5
lasttime = None
pointbuf = []
wait = False
debug = False
#
# Pretend to be an LG P500
#
useragent = "Apache-HttpClient/UNAVAILABLE (java 1.4)"
inituseragent = "Dalvik/1.2.0 (Linux; U; Android 2.2.1; LG-P500 Build/FRG83)"
inituaparams = [["app_version","173"],["app_platform","android"],["screen_w","320"],["screen_h","455"],["uuid","@uuid@"],["manufacturer","LGE"],["model","LG-P500"],["os_version","2.2.1"],["clid","0"],["utf",None]]
initselector = "/startup/"
host = "mobile-partners.maps.yandex.net"
hoststr = "mobile-partners.maps.yandex.net"
conntype = "Keep-Alive"
selector = "/ymm_collect/2.x/"
logfname = "debug.log"
def __init__(self,debug=False):
if debug:
self.debug = debug
self.initlog()
def __del__(self):
if self.debug:
self.deinitlog()
#
# in-memory gzip
#
def compressbuf(self, buf):
zbuf = StringIO.StringIO()
zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf, mtime = 0, compresslevel = 6)
zfile.write(buf)
zfile.close()
return self.fixheader(zbuf.getvalue())
#
# Fix GZIP header to match java GZIPOutputStream behavior
#
def fixheader(self, buf):
return buf[:8] + '\x00\x00' + buf[10:]
#
# Compute a checksum = CRC32 of payload XORed with a key
#
def checksum(self, source, key):
i = len(source)
j = len(key) - 1
k = 0
l = 0
while l < i :
source[l] = source[l] ^ key[k]
if k == j :
k = 0
else:
k += 1
l += 1
return (zlib.crc32(str(source)) & 0XFFFFFFFF)
def set_uuid(self, uuid):
self.log("uuid = "+uuid)
self.uuid = uuid
#
# Post a request to server
#
def post(self, host, selector, fields, payload):
content_type, body = self.encode_multipart_formdata(fields, payload)
h = httplib.HTTPConnection(host)
rselector = selector + "?"+"&".join(key+"="+value for (key, value) in fields)
h.putrequest('POST', rselector, True, True)
h.putheader('Content-Type', content_type)
h.putheader('Content-Length', str(len(body)))
h.putheader('Host',self.hoststr)
h.putheader('Connection',self.conntype)
h.putheader('User-Agent',self.useragent)
h.endheaders()
h.send(body)
self.log("POST "+rselector)
response = h.getresponse()
self.log(str(response.status) + " " + str(response.reason))
return response
#
# Encode parameters and payload
#
def encode_multipart_formdata(self, fields, payload):
BOUNDARY = 'edge_here'
CRLF = '\r\n'
L = []
L.append('--' + BOUNDARY)
L.append('Content-Disposition: form-data; name="data"')
L.append('Content-Type: application/gzip')
L.append('')
L.append(payload)
for (key, value) in fields:
L.append('--' + BOUNDARY)
L.append('Content-Disposition: form-data; name="%s"' % key)
L.append('Content-Type: text/plain')
L.append('')
L.append(value)
L.append('--' + BOUNDARY + '--')
L.append('')
body = CRLF.join(L)
content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
return content_type, body
#
# Compress and compute checksum of a payload
#
def prepare_data(self, xmlstr):
print "tst2 "+ xmlstr
payload = self.compressbuf(xmlstr)
cchecksum = self.checksum(bytearray(payload), bytearray(self.uuid))
return payload, cchecksum
#
# Assemble xml
#
def prepare_bufxml(self):
strpoints = ''.join(p.get_xml() for p in self.pointbuf)
return self.xmltemplate.replace("@points@", strpoints)
#
# Assemble and send a packet using points in buffer
#
def push_packet(self):
self.push_xml(self.prepare_bufxml())
self.pointbuf = []
#
# Post the packet to server
#
def push_xml(self, xmlstr):
gzipd, cksum = self.prepare_data(xmlstr)
self.log(xmlstr)
self.log("Checksum = "+str(cksum))
self.post(pp.host, pp.selector,[["uuid",pp.uuid],["packetid",str(cksum)],["compressed","1"],["oauth_token",""]],gzipd)
#
# Add a point sing a buffer of 5 packets, push a packet when 5 points reached
#
def push_point_ring(self, point):
if len(self.pointbuf) < 4:
self.pointbuf.append(point)
elif len(self.pointbuf) == 4:
self.pointbuf.append(point)
self.push_packet()
else:
self.push_packet()
self.pointbuf.append(point)
#
# Push a point
#
def push_point(self, point):
self.pointbuf.append(point)
#
# Push points from an stream without timing information
#
def push_tsv(self, infile):
i = 0
self.log("Mode: TSV file with points")
while True:
sline = infile.readline()
if not sline:
infile.close()
break;
if sline.startswith('PUSH'):
self.push_packet()
self.randwait()
continue
if sline.startswith('WAIT'):
ar = sline.split("\t")
self.waitimm(int(ar[1]),int(ar[2]))
continue
if sline.startswith('#'):
continue
line = sline.split('\t')
if len(line) >= 4:
try:
pp = self.mpoint(float(line[0]), float(line[1]), line[2] and int(line[2]) or -1, line[3] and int(line[3]) or -1)
self.push_point(pp)
except:
raise # !!!TODO
i = i + 1
#
# Push points from a stream as is
#
def push_tsv_timed(self, infile):
i = 0
self.log("Mode: TSV file with timed points")
while True:
sline = infile.readline()
if not sline:
infile.close()
break;
if sline.startswith('PUSH'):
self.push_packet()
self.randwait()
continue
if sline.startswith('WAIT'):
ar = sline.split("\t")
self.waitimm(int(ar[1]),int(ar[2]))
continue
if sline.startswith('#'):
continue
line = sline.split('\t')
if len(line) == 5:
try:
pp = self.mpoint(float(line[0]), float(line[1]), line[2] and int(line[2]) or -1, line[3] and int(line[3]) or -1, datetime.datetime.strptime(line[4][:15],"%d%m%Y:%H%M%S"))
self.push_point(pp)
except:
raise # !!!TODO
i = i + 1
#
# Push xmls from file
#
def push_xmls(self,infile):
self.log("Mode: xml per line")
while True:
sline = infile.readline()
if not sline:
infile.close()
break;
try:
self.push_xml(sline)
except:
raise # !!!TODO
self.randwait()
#
# Pretend that we are an Android device
#
def set_wait(self, waitms = 30000, randms = 0, wait=False):
self.waitms = waitms
self.randms = randms
self.wait = wait
def randwait(self):
if(self.wait):
ms = (self.waitms + random.randint(-self.randms,self.randms))
self.log("Scheduled sleep for %d ms" % ms)
time.sleep(ms/1000.0)
def waitimm(self,waitms,randms):
ms = (waitms + random.randint(-randms,randms))
self.log("Forced sleep for %d ms" % ms)
time.sleep(ms/1000.0)
#
# Pretend that we are an Android device
#
def startup(self):
self.get_startup(self.host, self.initselector, self.inituaparams)
#
# Get startup request
#
def get_startup(self, host, selector, fields):
h = httplib.HTTPConnection(host)
rselector = selector + "?"+"&".join(value and key+"="+value or key for (key, value) in fields)
rselector = rselector.replace("@uuid@",self.uuid)
h.putrequest('GET', rselector, True, True)
h.putheader('User-Agent',self.inituseragent)
h.putheader('Host',self.hoststr)
h.putheader('Connection',self.conntype)
h.endheaders()
h.send("")
self.log("GET "+rselector)
response = h.getresponse()
self.log(str(response.status) + " " + str(response.reason))
return response
#
# Override useragent and device parms for startup
#
def set_inituseragent(self,inituseragent):
self.inituseragent = inituseragent
def set_inituaparams(self, inituaparamsstr):
if "uuid=" not in inituaparamsstr:
inituaparamsstr += "&uuid=@uuid@"
self.inituaparams = list([n.split("=") for n in inituaparamsstr.split("&")])
#
# minimal logging
#
def initlog(self):
self.logfile = open(self.logfname,"w")
self.log("Started")
def deinitlog(self):
self.log("Ended")
self.logfile.close()
def log(self, logstr):
if self.debug:
self.logfile.write("[" + str(datetime.datetime.now()) + "] " + str(logstr) + "\r\n")
def rawlog(self, logdata):
if self.debug:
self.logfile.write(logdata)
# end class pyagger
def usage():
print """Usage: pyagger -u uuid ( -i FILE | -x FILE )
[-d -g -h -a %s -p %s [-w %d -r %d] [-s %d -n %d]]]\r\n
-h This info
Mandatory arguments:
-u uuid Specify uuid, in lowercase. Mandatory
-i FILE Input points from TSV file. Specify "-" for STDIN
Format is (for timestamp see -g), no spaces, only tabs
lat \\t lon \\t avg_speed \\t direction \\t ddmmyyyy:hhmiss
To force a packet sending:
PUSH
To force a delay for wait_ms +/- rand_ms, no spaces, only tabs
WAIT \\t wait_ms \\t rand_ms
Comments:
#foo
or:
-x XMLFILE Input single-line xml from file. Specify "-" for STDIN
Format is:
<?xml >...
<?xml >...
Optional arguments:
-d Enable debug logging [default=off]
-g Generate timestamps for points [default=off, taken from file]
If enabled, timestamps from file are ingored and a packet is
sent every 5 points. Points are stored each 6 seconds.
-a USERAGENT Override startup useragent [default=hardcoded]
-p DEVICEPARMS Override startup device parms [default=hardcoded]
Format of the string is \"a=b&c=d\"
-w MILLISECS Wait for MILLISECS after each POST request [default=don't wait]
-r MILLISECS Randomize wait period by +/- MILLISECS [default=don't wait],
only effective with "-w"
effective only when generating timestamps:
-s MILLISECS point timestamp interval [default = 6000]
-n N points buffer size [default = 5]
\r\n"""
if __name__ == '__main__':
try:
infname = ""
uuid = ""
debug = False
synchro = True
useragent = None
deviceparams = None
waitmsecs = None
waitrandms = 0
waitpoint = None
numpoints = None
optlist, args = getopt.getopt(sys.argv[1:], 'u:i:x:a:p:w:r:n:s:dhg')
xmlread = False
for opt, arg in optlist:
if opt == "-u":
uuid = arg
elif opt == "-d":
debug = True
elif opt == "-h":
usage()
sys.exit(0)
elif opt == "-g":
synchro = False
elif opt == "-i":
infname = arg
xmlread = False
elif opt == "-x":
infname = arg
xmlread = True
elif opt == "-a":
useragent = arg
elif opt == "-i":
deviceparams = arg
elif opt == "-w":
waitmsecs = int(arg)
elif opt == "-r":
waitrandms = int(arg)
elif opt == "-s":
waitpoint = int(arg)
elif opt == "-n":
numpoints = int(arg)
if uuid == "":
print "Error: uuid is missing"
sys.exit(1)
if infname == "":
print "Error: input filename is missing"
sys.exit(2)
pp = pyagger(debug)
pp.set_uuid(uuid)
if useragent is not None:
pp.set_inituseragent(useragent)
if deviceparams is not None:
pp.set_inituaparams(deviceparams)
if waitmsecs is not None:
pp.set_wait(waitmsecs, waitrandms, True)
# !!!TODO
if waitpoint is not None:
pp.pointdelay = waitpoint
if numpoints is not None:
pp.numpoints = numpoints
pp.startup()
if infname == "-":
infile = sys.stdin
pp.log("Reading from STDIN")
else:
infile = open(infname,"r")
pp.log("Reading from file ["+infname+"]")
if xmlread :
pp.push_xmls(infile)
else:
if synchro :
pp.push_tsv_timed(infile)
else:
pp.push_tsv(infile)
del pp
except:
raise
@esin
Copy link

esin commented Aug 27, 2014

Hello!
Has Yandex changed algorithm of generating "packetid"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment