Skip to content

Instantly share code, notes, and snippets.

@kemingy
Last active March 26, 2019 05:09
Show Gist options
  • Select an option

  • Save kemingy/47a74787a0e68a6fdb37e5b70da12a73 to your computer and use it in GitHub Desktop.

Select an option

Save kemingy/47a74787a0e68a6fdb37e5b70da12a73 to your computer and use it in GitHub Desktop.
Chinese phrase generator
# http://zhanghonglun.cn/blog/project/%E5%9F%BA%E4%BA%8E%E5%87%9D%E8%81%9A%E5%BA%A6%E5%92%8C%E8%87%AA%E7%94%B1%E5%BA%A6%E7%9A%84%E9%9D%9E%E7%9B%91%E7%9D%A3%E8%AF%8D%E5%BA%93%E7%94%9F%E6%88%90/
import math
from collections import Counter
import plane
class Phrase:
def __init__(self, word):
self.word = word
self.left = Counter()
self.right = Counter()
self.doa = float('inf') # degree of aggregation
self.dof = 0 # degree of freedom
self.freq = 0
self.score = 0
def __repr__(self):
return '[{}]--<freq:{}><dof:{}><doa:{}><score:{}><left:{}><right:{}>'.format(self.word, self.freq, self.dof, self.doa, self.score, len(self.left), len(self.right))
class CorpusPhrase:
def __init__(self, maxlen=5, threshold=10):
self.maxlen = maxlen
self.threshold = threshold
self.phrase = {}
def build(self, data):
# clean data, keep Chinese words only
text = plane.extract(plane.replace(data, plane.SPACE), plane.CHINESE_WORDS)
print('Building...')
for token in text:
sentence = token.value
length = len(sentence)
for start in range(length):
for step in range(1, self.maxlen):
if start + step > length:
continue
word = sentence[start:start+step]
if word not in self.phrase:
self.phrase[word] = Phrase(word)
self.phrase[word].freq += 1
if start > 0:
self.phrase[word].left.update(sentence[start-1])
if start + step < length:
self.phrase[word].right.update(sentence[start+step])
print('Computing...')
for word in self.phrase:
self.aggregation(self.phrase[word])
self.freedom(self.phrase[word])
self.score(self.phrase[word])
print('Done.')
def aggregation(self, phrase):
for i in range(1, len(phrase.word)):
left, right = phrase.word[:i], phrase.word[i:]
if left not in self.phrase or right not in self.phrase:
continue
phrase.doa = min(phrase.doa, math.log2(
len(self.phrase) * phrase.freq / (
self.phrase[left].freq * self.phrase[right].freq)
))
if phrase.doa == float('inf'):
phrase.doa = 0
def freedom(self, phrase):
phrase.dof = min(self.get_entropy(phrase.left), self.get_entropy(phrase.right))
def get_entropy(self, counter):
count = sum(counter.values())
return sum([abs(counter[w] * math.log2(counter[w] / count) / count) for w in counter])
def score(self, phrase):
if phrase.freq < self.threshold or phrase.doa < 0 or phrase.dof < 0:
phrase.score = 0
else:
phrase.score = phrase.doa * phrase.dof * phrase.freq
package main
import (
"bufio"
"fmt"
"math"
"os"
"regexp"
"sort"
"time"
)
type Phrase struct {
word string
left, right map[string]uint64
freq uint64
doa, dof, score float64
}
type CorpusPhrase struct {
maxlen, threshold int
phrase map[string]*Phrase
}
type KeyValue struct {
key string
value float64
}
func (cp *CorpusPhrase) buildFromFile(filename string) {
file, err := os.Open(filename)
check(err)
defer file.Close()
scanner := bufio.NewScanner(file)
zh := regexp.MustCompile(`\p{Han}+`)
for scanner.Scan() {
cp.addWords(zh.FindAllString(scanner.Text(), -1))
}
}
func (cp *CorpusPhrase) addWords(text []string) {
var word string
for _, sentence := range text {
length := len(sentence)
for i := 0; i*3 < length; i += 1 {
for step := 1; step < cp.maxlen; step++ {
if (i+step)*3 > length {
continue
}
word = sentence[i*3 : (i+step)*3]
phrase, ok := cp.phrase[word]
if !ok {
phrase = &Phrase{word: word}
phrase.left = make(map[string]uint64)
phrase.right = make(map[string]uint64)
cp.phrase[word] = phrase
}
phrase.freq += 1
if i > 0 {
phrase.left[sentence[(i-1)*3:i*3]] += 1
}
if (i+step)*3 < length {
phrase.right[sentence[(i+step)*3:(i+step+1)*3]] += 1
}
}
}
}
}
func (cp *CorpusPhrase) computing() {
for _, phrase := range cp.phrase {
cp.aggregation(phrase)
freedom(phrase)
get_score(phrase, cp.threshold)
}
}
func (cp *CorpusPhrase) aggregation(p *Phrase) {
doa := math.Inf(1)
for i := 3; i < len(p.word); i += 3 {
left, leftOk := cp.phrase[p.word[:i]]
right, rightOk := cp.phrase[p.word[i:]]
if !leftOk || !rightOk {
continue
}
doa = math.Min(doa, math.Log2(float64(len(cp.phrase))*float64(p.freq)/float64(left.freq*right.freq)))
}
if !math.IsInf(doa, 1) {
p.doa = doa
}
}
func freedom(p *Phrase) {
p.dof = math.Min(entropy(&p.left), entropy(&p.right))
}
func entropy(counter *map[string]uint64) float64 {
var sum uint64
for _, value := range *counter {
sum += value
}
var entropy float64
for _, value := range *counter {
entropy += math.Abs(float64(value) * math.Log2(float64(value)/float64(sum)) / float64(sum))
}
return entropy
}
func get_score(p *Phrase, threshold int) {
if int(p.freq) < threshold || p.doa < 0 || p.dof < 0 {
p.score = 0
} else {
p.score = float64(p.freq) * p.doa * p.dof
}
}
func check(e error) {
if e != nil {
panic(e)
}
}
func main() {
start := time.Now()
cp := CorpusPhrase{
maxlen: 5,
threshold: 10,
phrase: make(map[string]*Phrase),
}
cp.buildFromFile("./big.txt")
cp.computing()
fmt.Println(time.Since(start))
fmt.Println("==> Get", len(cp.phrase), "candidates.")
file, err := os.Create("./result.csv")
check(err)
defer file.Close()
var kv []KeyValue
fmt.Fprintln(file, "word,freq,doa,dof,score,left,right")
for word, phrase := range cp.phrase {
if phrase.score > 0 {
kv = append(kv, KeyValue{word, phrase.score})
}
// fmt.Fprintf(file, "%s,%d,%.6f,%.6f,%.6f,%v,%v\n", word, phrase.freq, phrase.doa, phrase.dof, phrase.score, phrase.left, phrase.right)
}
fmt.Println("==> Get", len(kv), "words.")
sort.Slice(kv, func(i, j int) bool {
return kv[i].value > kv[j].value
})
for _, ws := range kv {
fmt.Fprintf(file, "%s,%.6f\n", ws.key, ws.value)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment