Created
October 14, 2014 16:45
-
-
Save lyoshenka/21aa2d2ce510a8dd9a0a to your computer and use it in GitHub Desktop.
Check torrents, change tracker on torrents, print invalid files in torrents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Written by Petru Paler | |
# see LICENSE.txt for license information | |
# http://chris.ill-logic.com/sandbox/browser/python/bencode.py | |
from types import IntType, LongType, StringType, ListType, TupleType, DictType | |
import re | |
from cStringIO import StringIO | |
int_filter = re.compile('(0|-?[1-9][0-9]*)e') | |
def decode_int(x, f): | |
m = int_filter.match(x, f) | |
if m is None: | |
raise ValueError | |
return (long(m.group(1)), m.end()) | |
string_filter = re.compile('(0|[1-9][0-9]*):') | |
def decode_string(x, f): | |
m = string_filter.match(x, f) | |
if m is None: | |
raise ValueError | |
l = int(m.group(1)) | |
s = m.end() | |
return (x[s:s+l], s + l) | |
def decode_list(x, f): | |
r = [] | |
while x[f] != 'e': | |
v, f = bdecode_rec(x, f) | |
r.append(v) | |
return (r, f + 1) | |
def decode_dict(x, f): | |
r = {} | |
lastkey = None | |
try: | |
while x[f] != 'e': | |
k, f = decode_string(x, f) | |
if lastkey is not None and lastkey >= k: | |
raise ValueError | |
lastkey = k | |
v, f = bdecode_rec(x, f) | |
r[k] = v | |
except IndexError: | |
#print "+ Oh no, you took too much... too much! (busted torrent)" | |
pass | |
return (r, f + 1) | |
def bdecode_rec(x, f): | |
t = x[f] | |
if t == 'i': | |
return decode_int(x, f + 1) | |
elif t == 'l': | |
return decode_list(x, f + 1) | |
elif t == 'd': | |
return decode_dict(x, f + 1) | |
else: | |
return decode_string(x, f) | |
def bdecode(x): | |
try: | |
r, l = bdecode_rec(x, 0) | |
except: | |
print " * This file is BOGUS!" | |
raise ValueError | |
#except IndexError: | |
# raise ValueError | |
#if l != len(x): | |
# raise ValueError | |
return r | |
def test_bdecode(): | |
try: | |
bdecode('0:0:') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('ie') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('i341foo382e') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('i4e') == 4L | |
assert bdecode('i0e') == 0L | |
assert bdecode('i123456789e') == 123456789L | |
assert bdecode('i-10e') == -10L | |
try: | |
bdecode('i-0e') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('i123') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('i6easd') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('35208734823ljdahflajhdf') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('2:abfdjslhfld') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('0:') == '' | |
assert bdecode('3:abc') == 'abc' | |
assert bdecode('10:1234567890') == '1234567890' | |
try: | |
bdecode('02:xy') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('l') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('le') == [] | |
try: | |
bdecode('leanfdldjfh') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('l0:0:0:e') == ['', '', ''] | |
try: | |
bdecode('relwjhrlewjh') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('li1ei2ei3ee') == [1, 2, 3] | |
assert bdecode('l3:asd2:xye') == ['asd', 'xy'] | |
assert bdecode('ll5:Alice3:Bobeli2ei3eee') == [['Alice', 'Bob'], [2, 3]] | |
try: | |
bdecode('d') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('defoobar') | |
assert 0 | |
except ValueError: | |
pass | |
assert bdecode('de') == {} | |
assert bdecode('d3:agei25e4:eyes4:bluee') == {'age': 25, 'eyes': 'blue'} | |
assert bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee') == {'spam.mp3': {'author': 'Alice', 'length': 100000}} | |
try: | |
bdecode('d3:fooe') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('di1e0:e') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('d1:b0:1:a0:e') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('d1:a0:1:a0:e') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('i03e') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('l01:ae') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('9999:x') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('l0:') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('d0:0:') | |
assert 0 | |
except ValueError: | |
pass | |
try: | |
bdecode('d0:') | |
assert 0 | |
except ValueError: | |
pass | |
def bencode_rec(x, b): | |
t = type(x) | |
if t in (IntType, LongType): | |
b.write('i%de' % x) | |
elif t is StringType: | |
b.write('%d:%s' % (len(x), x)) | |
elif t in (ListType, TupleType): | |
b.write('l') | |
for e in x: | |
bencode_rec(e, b) | |
b.write('e') | |
elif t is DictType: | |
b.write('d') | |
keylist = x.keys() | |
keylist.sort() | |
for k in keylist: | |
assert type(k) is StringType | |
bencode_rec(k, b) | |
bencode_rec(x[k], b) | |
b.write('e') | |
else: | |
print "*** error *** could not encode type %s (value: %s)" % (t, x) | |
assert 0 | |
def bencode(x): | |
b = StringIO() | |
bencode_rec(x, b) | |
return b.getvalue() | |
def test_bencode(): | |
assert bencode(4) == 'i4e' | |
assert bencode(0) == 'i0e' | |
assert bencode(-10) == 'i-10e' | |
assert bencode(12345678901234567890L) == 'i12345678901234567890e' | |
assert bencode('') == '0:' | |
assert bencode('abc') == '3:abc' | |
assert bencode('1234567890') == '10:1234567890' | |
assert bencode([]) == 'le' | |
assert bencode([1, 2, 3]) == 'li1ei2ei3ee' | |
assert bencode([['Alice', 'Bob'], [2, 3]]) == 'll5:Alice3:Bobeli2ei3eee' | |
assert bencode({}) == 'de' | |
assert bencode({'age': 25, 'eyes': 'blue'}) == 'd3:agei25e4:eyes4:bluee' | |
assert bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}) == 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee' | |
try: | |
bencode({1: 'foo'}) | |
assert 0 | |
except AssertionError: | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os, shutil, random, sys, hashlib | |
from bencode import * | |
TORRENTS_DIR = "/media/storage/torrents/torrent files/" | |
DATA_DIR = "/media/storage/torrents/seeding/" | |
class NoMoreFilesException(Exception): | |
pass | |
class NoMoreHashesException(Exception): | |
pass | |
class FileNotFoundException(Exception): | |
pass | |
class Torrent: | |
dir = None | |
currFile = None | |
files = None | |
hashes = None | |
pieceLength = None | |
valid = True | |
def __init__(self,info): | |
self.pieceLength = info['piece length'] | |
self.initHashes(info['pieces']) | |
if 'length' in info: | |
self.setFile(DATA_DIR + info['name']) | |
else: | |
self.files = info['files'] | |
self.dir = DATA_DIR + info['name'] + '/' | |
self.nextFile() | |
def initHashes(self,hashString): | |
hashLength = 20 | |
self.hashes = [''.join(x) for x in zip(*[list(hashString[z::hashLength]) for z in range(hashLength)])] | |
def setFile(self, path): | |
if self.currFile: | |
self.currFile.close() | |
if not os.path.exists(path): | |
self.valid = False | |
else: | |
self.currFile = open(path, "rb") | |
# print "Opened " + path | |
def nextFile(self): | |
if not self.files or not len(self.files): | |
raise NoMoreFilesException | |
next = self.files.pop(0) | |
self.setFile(self.dir + '/'.join(next["path"])) | |
def nextChunk(self): | |
chunk = self.currFile.read(self.pieceLength) | |
try: | |
while len(chunk) < self.pieceLength: | |
self.nextFile() | |
chunk = chunk + self.currFile.read(self.pieceLength - len(chunk)) | |
except NoMoreFilesException: | |
pass | |
# print "Returning chunk of length %d" % len(chunk) | |
return chunk | |
def nextHash(self): | |
if not len(self.hashes): | |
return None | |
return self.hashes.pop(0) | |
def validateNext(self): | |
chunk = self.nextChunk() | |
digest = self.nextHash() | |
if not (chunk or digest): | |
return None | |
return chunk and digest and digest == hashlib.sha1(chunk).digest()#.encode('string_escape') | |
def validate(self): | |
next = self.validateNext() | |
while next == True: | |
next = self.validateNext() | |
self.valid = False if next == False else True # if next is None, its still valid | |
def isValid(self): | |
try: | |
if self.valid: | |
self.validate() | |
except FileNotFoundException: | |
self.valid = False | |
return self.valid | |
def cleanup(self): | |
if self.currFile: | |
self.currFile.close() | |
def main(): | |
for file in os.listdir(TORRENTS_DIR): | |
(name,ext) = os.path.splitext(file) | |
if ext.lower() in ['.torrent']: | |
fp = open(TORRENTS_DIR + file, "rb") | |
data = fp.read() | |
try: | |
torrent = bdecode(data) | |
except: | |
# print file + ' - BOGUS BOGUS BOGUS' | |
continue | |
# torrent = Torrent(torrent['info']) | |
# if torrent.isValid(): | |
# print 'VALID ' + file | |
# else: | |
# print '!!!!INVALID ' + file | |
valid = None | |
if 'length' in torrent['info'] and os.path.exists(DATA_DIR + torrent['info']['name']): | |
valid = True | |
elif 'files' in torrent['info']: | |
dir = DATA_DIR + torrent['info']['name'] + '/' | |
for fileInfo in torrent['info']['files']: | |
if not os.path.exists(dir + '/'.join(fileInfo["path"])): | |
valid = False | |
break | |
if valid == None: | |
valid = True | |
if valid: | |
print file | |
# else: | |
# print '!!!!INVALID ' + file | |
continue | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os, shutil, random | |
from bencode import * | |
TORRENTS_DIR = "/media/storage/torrents/torrent files/" | |
def main(): | |
for file in os.listdir(TORRENTS_DIR): | |
(name,ext) = os.path.splitext(file) | |
if ext.lower() in ['.torrent']: | |
fp = open(TORRENTS_DIR + file, "rb") | |
data = fp.read() | |
try: | |
torrent = bdecode(data) | |
except: | |
print name + ' - BOGUS BOGUS BOGUS' | |
continue | |
# if torrent["announce"].find('waffles') != -1: | |
print name + ' - ' + torrent["announce"] | |
# shutil.copy(TORRENTS_DIR + file, '/media/storage/torrents/backuptorrents/' + file) # already copied | |
# torrent["announce"] = 'http://url:port/KEY/announce' | |
# fp.close() | |
# fp = open(TORRENTS_DIR + file, "wb") | |
# fp.write(bencode(torrent)) | |
fp.close() | |
# TODO: Write a script to go through all the torrents and see if they still exist on the tracker | |
# Example: | |
# data = open("torrenttest.torrent", "rb").read() | |
# torrent = bdecode(data) | |
# print torrent["announce"] | |
# torrent["announce"] = "TESTING.com" | |
# out = open('new.torrent', 'w') | |
# out.write(bencode(torrent)) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment