文本数据结构化在知识图谱、标签标注、chatbot等应用中占据重要地位,普通的文本是无结构的数据,进行进一步的分析和理解,需要将无结构化的文本进行“有结构化”,科班自然语言处理的专业人士多会采用词法分析、句法分析等专业方法,该系列方法不在本文讨论范畴。本文主要基于统计+“深度学习”方式进行粗暴解析,涉及语义树、word2vec、贝叶斯、威尔逊置信区间等知识对文本语料进行结构化抽取。抽取逻辑:先计算语料归属语义树中相关节点的概率和置信度,然后根据树结构抽取合并当前标注的路径。假设语料:小米科技生产的手机很实惠。抽取后的可能路径为:科技—>制造—>电子产品—>正面。当然也可以进行意图路径等抽取,只需要在语义树中设置意图类型和表达即可,剩下交给统计来处理。
语义树(图)的定义
该树的定义很关键,关系文本结构化后路径的走向,由使用者根据自己的业务方向进行制作,后期也只需维护和更新该树的内容即可,不同任务的长期积累即形成有价值的知识库,具体定义如下:
{
"domain": "代表该规则的抽象概念",
"concepts": [
"该规则的表达(关键词集)"
],
"children": ["该规则的子规则List"]
}
```
示例:
{
"domain": "体育",
"concepts": [
"健身","打篮球","瑜伽","跑步"
],
"children": [
"健身",
"篮球爱好",
"瑜伽爱好者",
]
}
{
"domain": "健身",
"concepts": [
"健身房","哑铃","跑步机"
],
"children": [
]
}
```
树状结构即为:
注意点:
对语义树中domain的抽象表达,越高的节点上概念越宽泛,如“健身”,“运动会”,“人工智能”等, 越低的层次概念越细化,如“贝叶斯信任网络”,“08奥运会”,“白鹿原”等。人工抽取出不同概念层次的词作为标签/意图等比较关键,会影响语料抽取的合理性,也就是说抽取的词概念既不要太宽泛,也不要太细化, 例如,“计算机”,“汽车”,“电视剧”等词过于宽泛; 而“健身房”,“三分球”,“白鹿原”又过于细化, 理想情况下, 抽取出“健身”,“蓝球”,“剧情/伦理”可能更为合适。
对给定语料domain的抽取
在该步骤中,我们先忘掉语义树的结构,把语义树的每个节点看作独立的个体,此时domain的抽取就退化为一个分类问题,任务变为:一个语料可以被预测为相关domain的概率,为实现该任务比较容易想到的方法是基于语料级文本分类,但该方式需要标注大量训练样本。除此之外,容易想到的方法为基于keyword的匹配,比如文本中出现篮球,即给归为体育,出现三分球即归为篮球,进一步想到通过word2vec对keyword进行扩展,即出现和三分球相似的滞空扣篮也归为篮球,可以看出这个想法看似也make sense,但细想可能禁不起推敲,举例:假如语料中出现小米,是给其分类到科技还是植物呢?没错,这是个概率问题,另外一个大段文本中被打上体育标签的有20个keyword,而被打上娱乐标签的有2个,那么针对该语料这两个被打上的标签那哪个更可信呢?所以除了需要去计算其归属概率,还要判断是否可信,或在多大概率下可信(置信区间/置信度)。通过以上分析,思路逐渐清晰,接下来第一步我们需要去train每个token/keyword对应相关domain分类的概率,然后在predict的时候给出置信度(confidence)。
- p(domain|token)的估计
容易想到的方法:bayes估计(细节可参见Naive Bayes spam filtering),首先
![][2]
[2]: http://latex.codecogs.com/gif.latex?\Pr(S|W)={\frac{\Pr(W|S)\cdot\Pr(S)}{\Pr(W|S)\cdot\Pr(S)+\Pr(W|H)\cdot\Pr(H)}}
Pr(S|W):为语料中token/similarity(token)为某domain S的概率, 比如篮球属于体育的概率;
Pr(S)为对于给定任何语料或token/similarity(token)情况下domain S的先验概率;
Pr(W|S)为token/similarity(token) W出现在domain S的概率;
Pr(H) 为对于给定任何语料或token/similarity(token)情况下不在tag S中的先验概率;
Pr(W|H)为token/similarity(token) W没有出现在标签S的概率;
在train之前,我们先用word2vec对每个domain的concepts进行扩展,具体方法在spotify开源annoy[Github]库基础上封装一层方法类,具体如下:
# coding: utf-8
"""
The MIT License (MIT)
Copyright (c) 2017 Leepand
"""
#keyord_knn.py
import gzip
import struct
import cPickle
import collections
class BaseANN(object):
def use_threads(self):
return True
class Annoy(BaseANN):
def __init__(self, metric, n_trees, search_k):
self._n_trees = n_trees
self._search_k = search_k
self._metric = metric
self.name = 'Annoy(n_trees=%d, search_k=%d)' % (n_trees, search_k)
self.id2word={}
self.word2id={}
@classmethod
def get_vectors(cls,fn, n=float('inf')):
def _get_vectors(fn):
if fn.endswith('.gz'):
f = gzip.open(fn)
fn = fn[:-3]
else:
f = open(fn)
if fn.endswith('.bin'): # word2vec format
words, size = (int(x) for x in f.readline().strip().split())
t = 'f' * size
while True:
pos = f.tell()
buf = f.read(1024)
if buf == '' or buf == '\n': return
i = buf.index(' ')
word = buf[:i]
f.seek(pos + i + 1)
vec = struct.unpack(t, f.read(4 * size))
yield word, vec
elif fn.endswith('.txt'): # Assume simple text format
for line in f:
items = line.strip().split()
yield items[0], [float(x) for x in items[1:]]
elif fn.endswith('.pkl'): # Assume pickle (MNIST)
mi = 0
for pics, labels in cPickle.load(f):
for pic in pics:
yield mi, pic
mi += 1
i = 0
for line in _get_vectors(fn):
yield line
i += 1
if i >= n:
break
def fit(self, vecpath):
import annoy
self._annoy = annoy.AnnoyIndex(f=200, metric=self._metric)
getvec=self.get_vectors(vecpath)
j=0
for i, x in getvec:
self.id2word[int(j)]=str(i.strip())
self.word2id[str(i.strip())]=int(j)
self._annoy.add_item(j, x)
j=j+1
print 'building annoy tree...'
self._annoy.build(self._n_trees)
self._annoy.save('wiki2.ann')
with open('id2word.pickle', 'wb') as f:
cPickle.dump(self.id2word, f)
with open('word2id.pickle', 'wb') as f:
cPickle.dump(self.word2id, f)
print 'building annoy tree DONE!'
def predict(self,annoytreepath):
import annoy
self._annoy = annoy.AnnoyIndex(f=200, metric=self._metric)
self._annoy.load(annoytreepath)
with open('word2id.pickle', 'rb') as f:
self.word2id = cPickle.load(f)
with open('id2word.pickle', 'rb') as f:
self.id2word = cPickle.load(f)
def query(self, v, n):
return self._annoy.get_nns_by_vector(v.tolist(), n, self._search_k)
def search(self,word):
index=self.word2id[str(word)]
knn=dict(zip((self._annoy.get_nns_by_item(index, 5,include_distances=True))[0] ,\
(self._annoy.get_nns_by_item(index, 5,include_distances=True))[1] ))
knn_word_dist=collections.OrderedDict({self.id2word[int(k)]:(1-(v**2)/2) for k,v in knn.items()})
return knn_word_dist
def get_cosine_by_items(self,word1,word2):
if word1 in self.word2id and word2 in self.word2id:
index1=self.word2id[str(word1)]
index2=self.word2id[str(word2)]
cosine=1-(self._annoy.get_distance(index1, index2)**2)/2
return cosine
else:
return 'NoItem'
#build annoy tree
an=Annoy(metric = 'angular',n_trees=10,search_k=10)
an.fit(vecpath='./data/word_vec.bin')
p(domain|token)的估计:
import json
def calculate_category_probability(self):
"""
Caches the individual probabilities for each category
"""
total_tally = 0.0
probs = {}
for category, bayes_category in \
self.categories.get_categories().items():
count = bayes_category.get_tally()
total_tally += count
probs[category] = count
# Calculating the probability
for category, count in probs.items():
if total_tally > 0:
probs[category] = float(count)/float(total_tally)
else:
probs[category] = 0.0
for category, probability in probs.items():
self.probabilities[category] = {
# Probability that any given token is of this category
'prc': probability,
# Probability that any given token is not of this category
'prnc': sum(probs.values()) - probability
}
def calculate_bayesian_probability(self, cat, token_score, token_tally):
"""
Calculates the bayesian probability for a given token/category
:param cat: The category we're scoring for this token
:type cat: str
:param token_score: The tally of this token for this category
:type token_score: float
:param token_tally: The tally total for this token from all categories
:type token_tally: float
:return: bayesian probability
:rtype: float
"""
# P that any given token IS in this category
prc = self.probabilities[cat]['prc']
# P that any given token is NOT in this category
prnc = self.probabilities[cat]['prnc']
# P that this token is NOT of this category
prtnc = (token_tally - token_score) / token_tally
# P that this token IS of this category
prtc = token_score / token_tally
# Assembling the parts of the bayes equation
numerator = (prtc * prc)
denominator = (numerator + (prtnc * prnc))
# Returning the calculated bayes probability unless the denom. is 0
return numerator / denominator if denominator != 0.0 else 0.0
###train:
def train(self, category, text):
"""
Trains a category with a sample of text
:param category: the name of the category we want to train
:type category: str
:param text: the text we want to train the category with
:type text: str
"""
try:
bayes_category = self.categories.get_category(category)
except KeyError:
bayes_category = self.categories.add_category(category)
tokens = self.tokenizer(str(text))
occurrence_counts = self.count_token_occurrences(tokens)
for word, count in occurrence_counts.items():
bayes_category.train_token(word, count)
# Updating our per-category overall probabilities
self.calculate_category_probability()
def my_tokenizer(sample):
return sample
def load_rules(path, reload=False, is_root=False):
with open('word2id.pickle', 'rb') as f:
word2id_dict = cPickle.load(f)
"""
Build the rulebase by loading the rules terms from the given file.
Args: the path of file.
"""
bn=Annoy(metric = 'angular',n_trees=10,search_k=10)
bn.predict(annoytreepath='wiki2.ann')
bayes = simplebayes.SimpleBayes(cache_path='./my/tree/')
bayes.flush()
bayes.cache_train()
with open(path, 'r') as input:
json_data = json.load(input)
# load rule and build an instance
for data in json_data:
simlist=[]
domain = data["domain"]
concepts_list = data["concepts"]
children_list = data["children"]
simstr=' '.join(concepts_list).encode('utf-8')+' '
for word in concepts_list:
if str(word.encode('utf-8')) in word2id_dict:
print 'word:',str(word.encode('utf-8'))
simstr+=' '.join([k for k,v in bn.search(str(word.encode('utf-8')) ).items() if v>0.5])
else:
continue
#' '.join(concepts_list).encode('utf-8')
print 'sim',domain,simstr
bayes.train(domain,simstr)
- predict/score过程中p(domain|tokens)的置信计算:
置信区间的实质,就是进行可信度的修正,弥补样本量过小的影响。如果样本多,就说明比较可信,不需要很大的修正,所以置信区间会比较窄,下限值会比较大;如果样本少,就说明不一定可信,必须进行较大的修正,所以置信区间会比较宽,下限值会比较小。
二项分布的置信区间有多种计算公式,最常见的是"正态区间"(Normal approximation interval),教科书里几乎都是这种方法。但是,它只适用于样本较多的情况(np > 5 且 n(1 − p) > 5),对于小样本,它的准确性很差。
1927年,美国数学家 Edwin Bidwell Wilson提出了一个修正公式,被称为"威尔逊区间",很好地解决了小样本的准确性问题。- 计算每个标签的投票率(即一个语料归属该标签的标签词的比率)
- 计算每个投票率低置信区间(95%)
- 根据置信区间的下限值作为该标签的置信度
- 原理解释:置信区间的宽窄与样本的数量有关,比如,某条语料中归属A标签的标签词1个,9个为其他标签;另外一条归属B标签的有3个,27个为其他标签,两个标签的投票比率都是10%,但是B的置信度要高于A
- 语料中token归属任一节点的概率:Wilson_score_interval*p(s|w)
- 估计公式:
![][1] - 代码示例:
@classmethod
def confidence(cls,supports, downs):
if ups == 0:
return -downs
n = supports + downs
z = 1.6
liked = float(supports) / n
return (liked+z*z/(2*n)-z*sqrt((liked*(1-liked)+z*z/(4*n))/n))/(1+z*z/n)
#z为不同置信度对应的分位值
def score(self, text):
"""
Scores a sample of text
:param text: sample text to score
:type text: str
:return: dict of scores per category
:rtype: dict
"""
occurs = self.count_token_occurrences(self.tokenizer(text))
scores = {}
for category in self.categories.get_categories().keys():
scores[category] = 0
categories = self.categories.get_categories().items()
for word, count in occurs.items():
token_scores = {}
# Adding up individual token scores
for category, bayes_category in categories:
token_scores[category] = \
float(bayes_category.get_token_count(word))
# We use this to get token-in-category probabilities
token_tally = sum(token_scores.values())
#print word,count,token_tally,sum(occurs.values()),self.confidence(count,sum(occurs.values())-count)
# If this token isn't found anywhere its probability is 0
if token_tally == 0.0:
continue
# Calculating bayes probabiltity for this token
# http://en.wikipedia.org/wiki/Naive_Bayes_spam_filtering
Wilson_score_interval=self.confidence(count,sum(occurs.values())-count)
for category, token_score in token_scores.items():
# Bayes probability * the number of occurances of this token
#print self.calculate_bayesian_probability(category,token_score,token_tally)
#using Wilson_score_interval inplace count
scores[category] += Wilson_score_interval * \
self.calculate_bayesian_probability(
category,
token_score,
token_tally
)
# Removing empty categories from the results
final_scores = {}
for category, score in scores.items():
if score > 0:
final_scores[category] = score
return final_scores
类别抽取示例:
# -*- coding:utf-8 -*-
#tagger_score.py
load_rules('/Users/leepand/Downloads/Semantic_Tree_Path/rules/rule.json')
b_p=simplebayes.SimpleBayes(cache_path='./my/tree/')
b_p.get_cache_location()
print b_p.get_cache_location()
b_p.cache_train()
print 'predict:'
import jieba
#print ' '.join(jieba.lcut('姚明喜欢打篮球和逛街住宿贝叶斯数据挖掘'))
for k,v in b_p.score(' '.join(jieba.lcut('姚明喜欢打篮球和逛街住宿贝叶斯数据挖掘晶体学')).encode('utf-8')).items():
print k,v
语义路径提取
该过程将上一步骤抽取的domain进行路径合并,用于相应业务分析和应用,具体方法:
- 树规则序列化及模型预测
# -*- coding: utf-8 -*-
import os
import json
class ParseTree(object):
"""
Store the concept terms of a rule, and calculate the rule match.
"""
def __init__(self, domain, rule_terms, children,bayes_model):
self.id_term = domain
self.terms = rule_terms
self.model = bayes_model
self.children = children
def __str__(self):
res = 'Domain:' + self.id_term
if self.has_child():
res += ' with children: '
for child in self.children:
res += ' ' + str(child)
return res
def serialize(self):
"""
Convert the instance to json format.
"""
ch_list = []
for child in self.children:
ch_list.append(child.id_term)
cp_list = []
for t in self.terms:
cp_list.append(t)
response = []
data = {
"domain": str(self.id_term),
"concepts": cp_list,
"children": ch_list
}
return data
def add_child(self,child_rule):
"""
Add child rule into children list , e.g: Purchase(Parent) -> Drinks(Child).
"""
self.children.append(child_rule)
def has_child(self):
return len(self.children)
def has_response(self):
return len(self.response)
def match(self, sentence, threshold=0):
"""
Calculate the p*confidence of the input using bayes model.
Args:
threshold: a threshold to ignore the low confidence.
sentence : a list of words.
Returns:
a dict : [domain,p*confidence]
"""
matchs_dict=self.model.score(' '.join(jieba.lcut(sentence)).encode('utf-8'))
return matchs_dict
- 解析树建立及domain路径搜索
创建树解析方法类:class ParseBase(object): """ to store rules, and load the trained bayes model. """ def __init__(self, domain="general"): self.rules = {} self.domain = domain self.model = None self.forest_base_roots = [] def __str__(self): res = "There are " + str(self.rule_amount()) + " rules in the ParseBase:" res+= "\n-------\n" for key,rulebody in self.rules.items(): res += str(rulebody) + '\n' return res def rule_amount(self): return len(self.rules) def output_as_json(self, path='rule.json'): rule_list = [] print self.rules.values() for rule in self.rules.values(): print rule.serialize() rule_list.append(rule.serialize()) with open(path,'w') as op: op.write(json.dumps(rule_list, indent=4))
规则配置数据导入
def load_rules(self, path, reload=False, is_root=False):
"""
Build the rulebase by loading the rules terms from the given file.
Args: the path of file.
"""
assert self.model is not None, "Please load the bayes model before loading rules."
if reload:
self.rules.clear()
with open(path, 'r') as input:
json_data = json.load(input)
# load rule and build an instance
for data in json_data:
domain = data["domain"]
concepts_list = data["concepts"]
children_list = data["children"]
response = data["response"]
if domain not in self.rules:
self.rule = ParseTree(domain, concepts_list, children_list,self.model)
self.rules[domain] = self.rule
if is_root:
self.forest_base_roots.append(self.rule)
else:
print("[Rules]: Detect a duplicate domain name '%s'." % domain)
def load_rules_from_dic(self,path):
"""
load all rule_files in given path
"""
for file_name in os.listdir(path):
if not file_name.startswith('.'): # escape .DS_Store on OSX.
if file_name == "rule.json": # roots of forest
self.load_rules(path + file_name, is_root=True)
else:
self.load_rules(path + file_name)
load已训练好的token-tags bayes 模型
def load_model(self,path='./my/tree/'):
"""
Load a trained bayes model(binary format only).
Args:
path: the path of the model.
"""
self.model = simplebayes.SimpleBayes(cache_path=path)
self.model.get_cache_location()
print self.model.get_cache_location()
self.model.cache_train()
路径搜索的迭代宽度优先搜索算法
@classmethod
def iterative_bfs(cls,graph, start):
'''iterative breadth first search from start'''
bfs_tree = {start: {"parents":[], "children":[], "level":0}}
bfs_path=start+'>'
q = [start]
while q:
current = q.pop(0)
for v in graph[current]:
if not v in bfs_tree:
bfs_tree[v]={"parents":[current], "children":[], "level": bfs_tree[current]["level"] + 1}
#print v
bfs_path+=v+'>'
bfs_tree[current]["children"].append(v)
q.append(v)
else:
if bfs_tree[v]["level"] > bfs_tree[current]["level"]:
bfs_tree[current]["children"].append(v)
bfs_tree[v]["parents"].append(current)
bfs_path+=current+'>'
#print bfs_path
return bfs_path
路径解析主函数
def parse(self, sentence, threshold=0, root=None):
"""
parse the sentence score order by bayes probability and confidence.
Args:
sentence: a list of domain and p*confidence
threshold: a threshold to ignore the low confidence.
Return:
a list holds the max-path rules and the classification tree travel path.
"""
log = open("matching_log.txt",'w')
result_list = []
at_leaf_node = False
assert self.model is not None, "Please load the model before any match."
if root is None: # then search from roots of forest.
focused_rule = self.forest_base_roots[:]
else:
focused_rule = [self.rules[root]]
#rule=focused_rule
rule_match2=self.rule.match(sentence, threshold)
rule_match={}
for d,w in rule_match2.items():
if w>threshold:
rule_match[d]=w
domain_graph={}
for domain,weight in rule_match.items():
term_trans = ""
if self.rules[domain].has_child():
domain_graph[domain]=[domain]
term_trans += domain+'>'
for child_id in self.rules[domain].children:
if child_id in rule_match:
term_trans += child_id+'>'
domain_graph[domain]=[child_id]
else:
continue
else:
term_trans= domain
domain_graph[domain]=[domain]
print 'all',term_trans.split('>')
print 'end',term_trans.split('>')[-2]
print 'begin',term_trans.split('>')[0]
path_begin=term_trans.split('>')[0]
path_end=term_trans.split('>')[-2]
print 'domain tree path:',term_trans
tmp_list=[]
for domain,child in domain_graph.items():
tag_path=self.iterative_bfs(domain_graph,domain)
if len(tag_path.split('>'))>2:
if tag_path not in ' '.join(tmp_list):
tmp_list.append(tag_path)
else:
continue
print 'This sentence\'s Domain Path is:','|'.join(tmp_list)
调用示例
# -*- coding: utf-8 -*-
import jieba
rb=ParseBase()
#load_bayes_model
rb.load_model()
#load domain tree
rb.load_rules_from_dic('./rules/')
#rb.output_as_json(path='rule.json')
rb.rule_amount()
sentence='贝叶斯数据挖掘人工智能时代的新算法'
rb.parse(sentence, threshold=0.02, root=None)
###############Log:######################
[Rules]: Detect a duplicate domain name '鬧鐘'.
[Rules]: Detect a duplicate domain name '觀光'.
[Rules]: Detect a duplicate domain name '天氣'.
[Rules]: Detect a duplicate domain name '吃喝玩樂'.
[Rules]: Detect a duplicate domain name '股票'.
[Rules]: Detect a duplicate domain name '住宿'.
[Rules]: Detect a duplicate domain name '購買'.
[Rules]: Detect a duplicate domain name '病症'.
domain tree path: 机器学习>算法>
domain tree path: 算法>贝叶斯>
This sentence's Domain Path is: 机器学习>算法>贝叶斯>
思考
1、 扩展部分采用词向量方式,该方式扩展时属于弱匹配,虽然弹性比较大,但很容易发生干扰情形,比如有一些高频词「嘻嘻」、「哈哈」经常在匹配过程中得到较高分数,而一些如「动漫」有明确意图的被迫做了陪衬,也不能武断的将这些词柜为停用词,思考贝叶斯+置信度+路径的方式可以规避掉这个问题吗?
2、 当domain“样本”非均衡时需要对先验概率做相应调整么?
[1]: http://latex.codecogs.com/gif.latex?{Wilson_score_interval}=\frac{\hat\p+\frac1{2n}+z2_{1-\frac{\alpha}{2}}\pm\z_{1-\frac\alpha{2}}\sqrt{\frac{\hat\p(1-\hat\p)}{n}+\frac{z2_{1-\frac\alpha{2}}}{4n2}}}{1+\frac1{n}z2_{1-\frac\alpha{2}}}