Last active
December 30, 2015 11:09
-
-
Save odashi/7820750 to your computer and use it in GitHub Desktop.
IBM Translation Model 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import codecs | |
import sys | |
from collections import defaultdict | |
# calculate IBM Model 1 translation probability | |
# params: | |
# fname_e: <str> name of corpus file in target language | |
# fname_f: <str> name of corpus file in foreign language | |
# num_iteration: <int> number of learning iteration | |
# add_null: <bool> whether inserting (null) word into foreign corpus or not | |
# return: (t, wid_e, wid_f) | |
# t: <{(int e, int f): float}> word translation probability | |
# wid_e: <{str: int}> word id table in target language | |
# wid_f: <{str: int}> word id table in foreign language | |
def ibm1(fname_e, fname_f, num_iteration, add_null=True): | |
wid_e = defaultdict(lambda: len(wid_e)) | |
wid_f = defaultdict(lambda: len(wid_f)) | |
corpus_e = [] | |
corpus_f = [] | |
# null word | |
if add_null: | |
_ = wid_f['(null)'] # = 0 | |
# read corpus | |
with \ | |
codecs.open(fname_e, 'r', 'utf-8') as file_e, \ | |
codecs.open(fname_f, 'r', 'utf-8') as file_f: | |
for str_e, str_f in zip(file_e, file_f): | |
corpus_e.append([wid_e[w] for w in str_e.strip().split(' ')]) | |
fs = [wid_f[w] for w in str_f.strip().split(' ')] | |
if add_null: | |
fs = [0] + fs | |
corpus_f.append(fs) | |
# initialize | |
t = {} | |
for e in range(len(wid_e)): | |
for f in range(len(wid_f)): | |
t[e, f] = 1.0 / len(wid_e) | |
# learn | |
for iteration in range(num_iteration): | |
sys.stderr.write('iteration %d/%d...\n' % (iteration+1, num_iteration)) | |
count = defaultdict(lambda: 0) | |
total = defaultdict(lambda: 0) | |
for fs, es in zip(corpus_f, corpus_e): | |
s_total = defaultdict(lambda: 0) | |
for e in es: | |
for f in fs: | |
s_total[e] += t[e, f] | |
for e in es: | |
for f in fs: | |
x = t[e, f] / s_total[e] | |
count[e, f] += x | |
total[f] += x | |
for e in range(len(wid_e)): | |
for f in range(len(wid_f)): | |
t[e, f] = count[e, f] / total[f] | |
return t, wid_e, wid_f | |
def parse_options(): | |
import optparse | |
usage = 'Usage: python ibm1.py -e PATH -f PATH -o PATH [options]' | |
parser = optparse.OptionParser(usage) | |
# settings | |
parser.add_option('-e', '--target', | |
action='store', type='str', dest='fname_e', default='', metavar='PATH', | |
help='[IN] path of corpus file in target language'); | |
parser.add_option('-f', '--foreign', | |
action='store', type='str', dest='fname_f', default='', metavar='PATH', | |
help='[IN] path of corpus file in foreign language'); | |
parser.add_option('-o', '--output', | |
action='store', type='str', dest='fname_o', default='', metavar='PATH', | |
help='[OUT] path of translation probability file') | |
parser.add_option('-I', '--iteration', | |
action='store', type='int', dest='num_iteration', default=100, metavar='INT', | |
help='number of learning iteration (must be positive)') | |
parser.add_option('-T', '--threshold', | |
action='store', type='float', dest='threshold', default=1e-8, metavar='FLOAT', | |
help='threshold of word translation table (must be in range [0.0, 1.0])') | |
parser.add_option('-N', '--no-null', | |
action='store_false', dest='add_null', default=True, | |
help='Never insert NULL word into foreign corpus') | |
options, args = parser.parse_args() | |
# checking | |
if not options.fname_e: | |
sys.stderr.write('option -e must be set.\n') | |
return None | |
if not options.fname_f: | |
sys.stderr.write('option -f must be set.\n') | |
return None | |
if not options.fname_o: | |
sys.stderr.write('option -o must be set.\n') | |
return None | |
if options.num_iteration <= 0: | |
sys.stderr.write('option -I must be positive.\n') | |
return None | |
if options.threshold < 0.0 or options.threshold > 1.0: | |
sys.stderr.write('option -T must be in range [0.0, 1.0].\n') | |
return None | |
return options | |
def main(): | |
options = parse_options() | |
if options is None: | |
sys.stderr.write('insufficient options.\n') | |
return | |
t, wid_e, wid_f = ibm1(options.fname_e, options.fname_f, options.num_iteration, options.add_null) | |
# output results | |
# word translation probability | |
with codecs.open(options.fname_o, 'w', 'utf-8') as f: | |
for k, v in sorted(t.items(), key=lambda x:x[0]): | |
if v > options.threshold: | |
f.write('%d\t%d\t%.10f\n' % (k[0], k[1], v)) | |
# word id table in target language | |
with codecs.open(options.fname_e+'.wid', 'w', 'utf-8') as f: | |
for k, v in sorted(wid_e.items(), key=lambda x:x[1]): | |
f.write('%s\t%d\n' % (k, v)) | |
# word id table in foreign language | |
with codecs.open(options.fname_f+'.wid', 'w', 'utf-8') as f: | |
for k, v in sorted(wid_f.items(), key=lambda x:x[1]): | |
f.write('%s\t%d\n' % (k, v)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment