dictionary = {
"北京烤鸭",
"特别",
"喜欢"
}
sentence = "他特别喜欢北京烤鸭"
# 最大后向匹配(递归)
def seg(sentence):
res = []
def maxmatch(res, sentence, dictionary):
if len(sentence) == 0:
return list()
for i in reversed(range(1, len(sentence)+1)):
fw = sentence[:i]
rw = sentence[i:]
if fw in dictionary:
res.append(fw)
return maxmatch(res, rw, dictionary)
res.append(fw)
return maxmatch(res, rw, dictionary)
maxmatch(res, sentence, dictionary)
return res
seg(sentence)
['他', '特别', '喜欢', '北京烤鸭']
import pnlp
# word
dictionary = pnlp.read_lines("./dic.txt")
# 字典中不能包括一个字的词,否则都被切成一个字了
dictionary = [w for w in dictionary if len(w) > 1]
dictionary = set(dictionary)
print(len(dictionary))
MAX_LEN = max(len(w) for w in dictionary)
MAX_LEN
415870
16
# 结巴
dict_path = "/Users/HaoShaochun/Documents/Study/NLP/jieba/jieba/dict.txt"
dictionary = pnlp.read_lines(dict_path)
# 字典中不能包括一个字的词,否则都被切成一个字了
dictionary = [line.split(" ")[0] for line in dictionary]
dictionary = set(dictionary)
print(len(dictionary))
MAX_LEN = max(len(w) for w in dictionary)
MAX_LEN
349045
16
def forward_max_match2(sent):
res = []
while len(sent) > 0:
length = min(MAX_LEN, len(sent))
try_word = sent[:length]
while try_word not in dictionary:
if len(try_word) == 1:
break
try_word = try_word[:-1]
res.append(try_word)
sent = sent[len(try_word):]
return res
def forward_max_match(sent):
res = []
length = MAX_LEN
text_len = len(sent)
start = 0
while start < text_len:
length = min(length, text_len - start)
while sent[start:start+length] not in dictionary:
if length == 1:
break
length -= 1
res.append(sent[start:start+length])
start += length
length = MAX_LEN
return res
sen = "美国加州大学的科学家发现"
forward_max_match(sen)
['美国加州大学', '的', '科学家', '发现']
forward_max_match2(sen)
['美国加州大学', '的', '科学家', '发现']
%timeit forward_max_match(sen)
8.67 µs ± 397 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit forward_max_match2(sen)
8.51 µs ± 188 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
def forward_min_match(sent):
res = []
length = 1
text_len = len(sent)
start = 0
while start < text_len:
while sent[start:start+length] not in dictionary:
if length == MAX_LEN or length == text_len - start:
length = 1
break
length += 1
res.append(sent[start:start+length])
start += length
length = 1
return res
sen = "美国加州大学的科学家发现"
forward_min_match(sen)
['美国', '加州', '大学', '的', '科学', '家', '发现']
def backward_max_match(sent):
res = []
length = MAX_LEN
text_len = len(sent)
start = max(0, text_len - length)
length = min(length, text_len - start)
while start >= 0 and length > 0:
while sent[start: start+length] not in dictionary:
if length == 1:
break
length -= 1
start += 1
res.append(sent[start: start+length])
length = MAX_LEN
if length > start:
length = start
start -= length
return list(reversed(res))
MAX_LEN = max(len(w) for w in dictionary)
sen = "研究生命的起源"
len(sen), MAX_LEN
(7, 16)
backward_max_match(sen)
['研究', '生命', '的', '起源']
def backward_min_match(sent):
res = []
length = 1
text_len = len(sent)
start = text_len - length
while start >= 0:
while sent[start: start+length] not in dictionary:
length += 1
start -= 1
if length == MAX_LEN or start < 0:
start += length - 1
length = 1
break
res.append(sent[start: start+length])
start -= 1
length = 1
return list(reversed(res))
sen = "美国加州大学的科学家发现"
backward_min_match(sen)
['美国', '加州', '大学', '的', '科', '学家', '发现']
sens = [
"中华人民共和国万岁万岁万万岁",
"乔布斯是 Apple 产品的设计者",
"美国加州大学的科学家发现",
"研究生命的起源",
"长春市长春节致辞",
"他从马上下来",
"有意见分歧",
"确实现在物价很高",
"答辩结束的和尚未答辩的同学都请留在教室"
]
for sen in sens:
res = forward_max_match(sen)
print("最大前向匹配:", "/".join(res))
res = backward_max_match(sen)
print("最大后向匹配:", "/".join(res))
res = forward_min_match(sen)
print("最小前向匹配:", "/".join(res))
res = backward_min_match(sen)
print("最小后向匹配:", "/".join(res))
print()
最大前向匹配: 中华人民共和国/万岁/万岁/万万岁 最大后向匹配: 中华人民共和国/万岁/万岁/万万岁 最小前向匹配: 中/华/人/民/共/和/国/万/岁/万/岁/万/万/岁 最小后向匹配: 中/华/人/民/共/和/国/万/岁/万/岁/万/万/岁 最大前向匹配: 乔布斯/是/ /A/p/p/l/e/ /产品/的/设计者 最大后向匹配: 乔布斯/是/ /A/p/p/l/e/ /产品/的/设计者 最小前向匹配: 乔/布/斯/是/ /A/p/p/l/e/ /产/品/的/设/计/者 最小后向匹配: 乔/布/斯/是/ /A/p/p/l/e/ /产/品/的/设/计/者 最大前向匹配: 美国加州大学/的/科学家/发现 最大后向匹配: 美国加州大学/的/科学家/发现 最小前向匹配: 美/国/加/州/大/学/的/科/学/家/发/现 最小后向匹配: 美/国/加/州/大/学/的/科/学/家/发/现 最大前向匹配: 研究生/命/的/起源 最大后向匹配: 研究/生命/的/起源 最小前向匹配: 研/究/生/命/的/起/源 最小后向匹配: 研/究/生/命/的/起/源 最大前向匹配: 长春市/长春/节/致辞 最大后向匹配: 长春/市长/春节/致辞 最小前向匹配: 长/春/市/长/春/节/致/辞 最小后向匹配: 长/春/市/长/春/节/致/辞 最大前向匹配: 他/从/马上/下来 最大后向匹配: 他/从/马上/下来 最小前向匹配: 他/从/马/上/下/来 最小后向匹配: 他/从/马/上/下/来 最大前向匹配: 有意/见/分歧 最大后向匹配: 有/意见分歧 最小前向匹配: 有/意/见/分/歧 最小后向匹配: 有/意/见/分/歧 最大前向匹配: 确实/现在/物价/很/高 最大后向匹配: 确实/现在/物价/很/高 最小前向匹配: 确/实/现/在/物/价/很/高 最小后向匹配: 确/实/现/在/物/价/很/高 最大前向匹配: 答辩/结束/的/和尚/未/答辩/的/同学/都/请留/在/教室 最大后向匹配: 答辩/结束/的/和/尚未/答辩/的/同学/都/请/留在/教室 最小前向匹配: 答/辩/结/束/的/和/尚/未/答/辩/的/同/学/都/请/留/在/教/室 最小后向匹配: 答/辩/结/束/的/和/尚/未/答/辩/的/同/学/都/请/留/在/教/室
import re
from math import log
re_han = re.compile("([\u4E00-\u9FD5]+)")
res = re_han.split("地方收到分手的饭880890df&*())(^范德萨发")
res
['', '地方收到分手的饭', '880890df&*())(^', '范德萨发', '']
re_skip = re.compile("([a-zA-Z0-9]+(?:\.\d+)?%?)")
re_userdict = re.compile('^(.+?)( [0-9]+)?( [a-z]+)?$', re.U)
re_eng = re.compile('[a-zA-Z0-9]', re.U)
def gen_pfdict():
lfreq = {}
ltotal = 0
lines = pnlp.read_lines(dict_path)
for lineno, line in enumerate(lines, 1):
try:
word, freq = line.split(' ')[:2]
freq = int(freq)
lfreq[word] = freq
ltotal += freq
for ch in range(len(word)):
wfrag = word[:ch + 1]
if wfrag not in lfreq:
lfreq[wfrag] = 0
except ValueError:
raise ValueError(
'invalid dictionary entry in %s at Line %s: %s' % (f_name, lineno, line))
return lfreq, ltotal
FREQ, total = gen_pfdict()
def get_DAG(sentence):
DAG = {}
N = len(sentence)
for k in range(N):
tmplist = []
i = k
frag = sentence[k]
while i < N and frag in FREQ:
if FREQ[frag]:
tmplist.append(i)
i += 1
frag = sentence[k:i + 1]
if not tmplist:
tmplist.append(k)
DAG[k] = tmplist
return DAG
text = "他来到了网易杭研大厦"
# text = "确实现在物价很高"
text = "答辩结束的和尚未答辩的同学都请留在教室"
# text = "中华人民"
# text = "懂法萨芬dsfsdfsf^&*()7989 love"
"辩" in FREQ
True
DAG = get_DAG(text)
DAG
{0: [0, 1], 1: [1], 2: [2, 3], 3: [3], 4: [4], 5: [5, 6], 6: [6, 7], 7: [7], 8: [8, 9], 9: [9], 10: [10], 11: [11, 12], 12: [12], 13: [13], 14: [14, 15], 15: [15, 16], 16: [16], 17: [17, 18], 18: [18]}
log(total)
17.91155312775522
def calc(sentence, DAG):
route = {}
N = len(sentence)
route[N] = (0, 0)
logtotal = log(total)
for idx in range(N - 1, -1, -1):
tmp = []
for x in DAG[idx]:
val = log(FREQ.get(sentence[idx: x+1], 1)) - logtotal + route[x+1][0]
tmp.append((val, x))
route[idx] = max(tmp)
return route
calc(text, DAG)
{19: (0, 0), 18: (-9.486036724910885, 18), 17: (-11.413270978278785, 18), 16: (-15.826884543913714, 16), 15: (-21.514471422309715, 16), 14: (-29.3602906051659, 14), 13: (-35.05196680641455, 13), 12: (-43.194592875029336, 12), 11: (-44.57506961864625, 12), 10: (-49.81422510450637, 10), 9: (-60.645751732339, 9), 8: (-62.60778441984483, 9), 7: (-70.79002482672212, 7), 6: (-72.27710765622783, 7), 5: (-76.96046999993145, 5), 4: (-82.19962548579157, 4), 3: (-93.23079453136079, 3), 2: (-91.01244041815191, 3), 1: (-101.84396704598454, 1), 0: (-103.80599973349038, 1)}
def __cut_DAG_NO_HMM(sentence):
DAG = get_DAG(sentence)
route = calc(sentence, DAG)
x = 0
N = len(sentence)
buf = ''
while x < N:
y = route[x][1] + 1
l_word = sentence[x:y]
print(l_word)
if re_eng.match(l_word) and len(l_word) == 1:
buf += l_word
x = y
else:
if buf:
yield buf
buf = ''
yield l_word
x = y
if buf:
yield buf
buf = ''
list(__cut_DAG_NO_HMM(text))
答辩 结束 的 和 尚未 答辩 的 同学 都 请 留在 教室
['答辩', '结束', '的', '和', '尚未', '答辩', '的', '同学', '都', '请', '留在', '教室']
import jieba
MIN_FLOAT = -3.14e100
PrevStatus = {
'B': 'ES',
'M': 'MB',
'S': 'SE',
'E': 'BM'
}
jieba.finalseg.start_P
{'B': -0.26268660809250016, 'E': -3.14e+100, 'M': -3.14e+100, 'S': -1.4652633398537678}
jieba.finalseg.trans_P
{'B': {'E': -0.51082562376599, 'M': -0.916290731874155}, 'E': {'B': -0.5897149736854513, 'S': -0.8085250474669937}, 'M': {'E': -0.33344856811948514, 'M': -1.2603623820268226}, 'S': {'B': -0.7211965654669841, 'S': -0.6658631448798212}}
PrevStatus
{'B': 'ES', 'M': 'MB', 'S': 'SE', 'E': 'BM'}
def viterbi(sent, states, start_p, trans_p, emit_p):
V = [{}] # tabular
path = {}
for y in states: # init
V[0][y] = start_p[y] + emit_p[y].get(sent[0], MIN_FLOAT)
path[y] = [y]
for t in range(1, len(sent)):
V.append({})
newpath = {}
for y in states:
# 发射概率,state -> 观测值(token)
em_p = emit_p[y].get(sent[t], MIN_FLOAT)
# PrevStatus 是当前状态的前一个状态,来自转移概率
# 判断前一个状态到现在状态的概率最大情况
# 从现在状态到现在观测值的发射概率 * 前一个状态到现在状态的转移概率 * 上一步的概率
(prob, state) = max(
[(V[t - 1][y0] + trans_p[y0].get(y, MIN_FLOAT) + em_p, y0) for y0 in PrevStatus[y]])
V[t][y] = prob
# 记录每个 state 的路径
newpath[y] = path[state] + [y]
path = newpath
# 最后一个状态只能是 E 或 S
print(path)
(prob, state) = max((V[len(sent) - 1][y], y) for y in 'ES')
return (prob, path[state])
text = "我答辩结束的和尚未答辩的同学都请留在教室"
viterbi(text, "BMSE", jieba.finalseg.start_P, jieba.finalseg.trans_P, jieba.finalseg.emit_P)
{'B': ['S', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'S', 'S', 'B', 'E', 'S', 'S', 'S', 'S', 'S', 'B'], 'M': ['S', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'S', 'S', 'B', 'E', 'S', 'S', 'S', 'S', 'B', 'M'], 'S': ['S', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'S', 'S', 'B', 'E', 'S', 'S', 'S', 'S', 'S', 'S'], 'E': ['S', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'S', 'S', 'B', 'E', 'S', 'S', 'S', 'S', 'B', 'E']}
(-142.70062479570734, ['S', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E', 'S', 'S', 'B', 'E', 'S', 'S', 'S', 'S', 'B', 'E'])