Commit 98807fc8 authored by Pruthwik's avatar Pruthwik

updated files

parent f864fdd5
# how to run the code
# python3 extract_data_from_ssf_in_conll_format.py --input InputFilePath --output OutputFilePath --level 0/1/2/3
# level argument: 0 for token, 1 for token+pos, 2 for token+pos+morph, 3 for token+pos+chunk
# no need to create an output file, only give a name
# author : Pruthwik Mishra, LTRC, IIIT-H
# also download the ssfAPI.py program.
import ssfAPI as ssf
import argparse
import re
def readFileAndExtractSentencesInConLL(inputFilePath, outputFilePath, level=0):
"""Read a file and extract sentences in conll format."""
d = ssf.Document(inputFilePath)
sentencesList = list()
print(inputFilePath)
for tree in d.nodeList:
print(tree.sentenceID)
if level == 0:
sentencesList.append('\n'.join([token for token in tree.generateSentence(
).split() if not re.search('^NUL', token)]) + '\n')
elif level == 1:
tokensWithPOS = [node.lex + '\t' + node.type.replace(
'__', '_') for chunkNode in tree.nodeList for node in chunkNode.nodeList if not re.search('^NUL', node.lex)]
sentencesList.append('\n'.join(tokensWithPOS) + '\n')
elif level == 2:
tokensWithPOSMorph = [node.lex + '\t' + node.type.replace('__', '_') + '\t' + node.getAttribute(
'af') for chunkNode in tree.nodeList for node in chunkNode.nodeList if not re.search('^NUL', node.lex)]
sentencesList.append('\n'.join(tokensWithPOSMorph) + '\n')
else:
tokenPOSAndChunk = list()
for chunkNode in tree.nodeList:
for indexNode, node in enumerate(chunkNode.nodeList):
if indexNode == 0:
if not re.search('^NUL', node.lex):
tokenPOSAndChunk.append(
node.lex + '\t' + node.type.replace('__', '_') + '\tB-' + chunkNode.type)
else:
if not re.search('^NUL', node.lex):
tokenPOSAndChunk.append(
node.lex + '\t' + node.type.replace('__', '_') + '\tI-' + chunkNode.type)
sentencesList.append('\n'.join(tokenPOSAndChunk) + '\n')
writeListToFile(sentencesList, outputFilePath)
def writeListToFile(dataList, outFilePath):
with open(outFilePath, 'w', encoding='utf-8') as fileWrite:
fileWrite.write('\n'.join(dataList) + '\n')
fileWrite.close()
def main():
"""Pass arguments and call functions here."""
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='inp',
help="Add the input file path")
parser.add_argument('--output', dest='out',
help="Add the output file path")
parser.add_argument('--level', dest='level',
help="Add the level 0: token, 1: token + pos, 2: token + pos + morph, 3 for token + pos + chunk", type=int, default=0)
args = parser.parse_args()
readFileAndExtractSentencesInConLL(args.inp, args.out, args.level)
if __name__ == '__main__':
main()
"""Evaluate chunk metrics."""
# the input file has this structure
# token\tgold-pos\tgold-chunk\tpred-chunk
# cut the predicted chunk output from the shallow parse output
# paste it with the gold-pos-chunk file
# if seqeval not installed
# install using pip install seqeval
from seqeval.metrics import classification_report
from seqeval.metrics import accuracy_score
from seqeval.metrics import f1_score
from seqeval.scheme import IOB2
from sys import argv
def read_lines_from_file(file_path):
"""Read lines from a file."""
with open(file_path, 'r', encoding='utf-8') as file_read:
return file_read.readlines()
def process_lines_prepare_gold_and_system_outputs(lines):
"""Process input lines and prepare gold and system outputs."""
gold_all, pred_all, temp_gold, temp_pred = list(), list(), list(), list()
for line in lines:
line = line.strip()
if line:
gold, pred = line.split()[-2:]
temp_gold.append(gold)
temp_pred.append(pred)
else:
assert len(temp_gold) == len(temp_pred)
gold_all.append(temp_gold)
pred_all.append(temp_pred)
temp_gold, temp_pred = list(), list()
if temp_gold and temp_pred:
assert len(temp_gold) == len(temp_pred)
gold_all.append(temp_gold)
pred_all.append(temp_pred)
return gold_all, pred_all
def generate_classification_metrics(gold, pred):
"""Generate classification metrics using seqeval package."""
class_report = ''
class_report += classification_report(gold, pred, mode='strict', scheme=IOB2) + '\n'
class_report += 'Accuracy = ' + str(accuracy_score(gold, pred)) + '\n'
class_report += 'Micro_F1 = ' + str(f1_score(gold, pred))
return class_report
def write_data_into_file(data, file_path):
"""Write data into a file."""
with open(file_path, 'w', encoding='utf-8') as file_write:
file_write.write(data + '\n')
def main():
"""Pass arguments and call functions here."""
input_file = argv[1]
output_file = argv[2]
input_lines = read_lines_from_file(input_file)
gold_all, pred_all = process_lines_prepare_gold_and_system_outputs(input_lines)
class_report = generate_classification_metrics(gold_all, pred_all)
write_data_into_file(class_report, output_file)
if __name__ == '__main__':
main()
"""Precision, recall, F1 score for POS."""
# the inputs to these program are:
# gold pos outputs, pred pos outputs and a file name
# where the classification results will be written.
# if you do not have sklearn
# install using pip install sklearn
from sys import argv
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
def readLinesFromFile(filePath):
"""Read lines from a file."""
with open(filePath, 'r', encoding='utf-8') as fileRead:
return [line.strip() for line in fileRead.readlines() if line.strip()]
def findPrecisionRecallF1score(goldLabels, predictedLabels, trueLabels=None):
"""Find Precision, Recall and F1 scores."""
return classification_report(goldLabels,
predictedLabels, target_names=trueLabels)
def main():
"""Pass arguments and call functions here."""
goldPath = argv[1]
predPath = argv[2]
outPath = argv[3]
gold = readLinesFromFile(goldPath)
predicted = readLinesFromFile(predPath)
allLabels = set(predicted).union(set(gold))
dictLabelToIndices = {label: index for index,
label in enumerate(allLabels)}
predictedIntoIndexes = [dictLabelToIndices[item] for item in predicted]
goldIntoIndexes = [dictLabelToIndices[item] for item in gold]
outDesc = open(outPath, 'w')
classReport = ''
classReport += findPrecisionRecallF1score(gold, predicted)
if len(set(predictedIntoIndexes)) == 2:
print('Micro Precision =', precision_score(goldIntoIndexes, predictedIntoIndexes, average='binary'))
print('Micro Recall =', recall_score(goldIntoIndexes, predictedIntoIndexes, average='binary'))
print('Micro F1 =', f1_score(goldIntoIndexes, predictedIntoIndexes, average='binary'))
print('Micro Accuracy =', accuracy_score(goldIntoIndexes, predictedIntoIndexes))
else:
classReport += '\n'
classReport += 'Micro_Precision = ' + str(precision_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n'
print('Micro Precision =', precision_score(goldIntoIndexes, predictedIntoIndexes, average='micro'))
classReport += 'Micro_Recall = ' + str(recall_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n'
print('Micro Recall =', recall_score(goldIntoIndexes, predictedIntoIndexes, average='micro'))
classReport += 'Micro_F1 = ' + str(f1_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n'
print('Micro F1 =', f1_score(goldIntoIndexes, predictedIntoIndexes, average='micro'))
classReport += 'Micro_Accuracy = ' + str(accuracy_score(goldIntoIndexes, predictedIntoIndexes)) + '\n'
print('Micro Accuracy =', accuracy_score(goldIntoIndexes, predictedIntoIndexes))
outDesc.write(classReport + '\n')
outDesc.close()
if __name__ == '__main__':
main()
#!/usr/bin/python
# Author: Himanshu Sharma
# changes added by Pruthwik Mishra
import os
# import sys
import codecs
import re
from collections import OrderedDict
class Node():
def __init__(self, text):
self.text = text
self.lex = None
self.type = None
self.__attributes = OrderedDict()
self.errors = []
self.name = None
self.parent = None
self.parentRelation = None
self.alignedTo = None
self.fsList = None
self.analyzeNode(self.text)
def analyzeNode(self, text):
[token, tokenType, fsDict, fsList] = getTokenFeats(
text.strip().split())
attributeUpdateStatus = self.updateAttributes(
token, tokenType, fsDict, fsList)
if attributeUpdateStatus == 0:
self.errors.append("Can't update attributes for node")
self.probSent = True
def updateAttributes(self, token, tokenType, fsDict, fsList):
self.fsList = fsList
self.lex = token
self.type = tokenType
for attribute in fsDict.keys():
self.__attributes[attribute] = fsDict[attribute]
self.assignName()
def assignName(self):
if self.__attributes.get('name') is not None:
self.name = self.getAttribute('name')
else:
self.errors.append('No name for this token Node')
def printValue(self):
return self.lex
def printSSFValue(self, prefix, allFeat):
returnValue = [prefix, self.printValue(), self.type]
if allFeat is False:
fs = ['<fs']
for key in self.__attributes.keys():
fs.append(key + "='" + self.getAttribute(key) + "'")
delim = ' '
fs[-1] = fs[-1] + '>'
else:
fs = self.fsList
delim = '|'
return ['\t'.join(x for x in returnValue) + '\t' + delim.join(x for x in fs)]
def getAttribute(self, key):
if key in self.__attributes:
return self.__attributes[key]
else:
return None
def addAttribute(self, key, value):
self.__attributes[key] = value
def deleteAttribute(self, key):
del self.__attributes[key]
class ChunkNode():
def __init__(self, header):
self.text = []
self.header = header
self.footer = None
self.nodeList = []
self.parent = '0'
self.__attributes = OrderedDict()
self.parentRelation = 'root'
self.name = None
self.head = None
self.isParent = False
self.errors = []
self.upper = None
self.updateDrel()
self.type = None
self.fsList = None
def analyzeChunk(self):
[chunkType, chunkFeatDict, chunkFSList] = getChunkFeats(self.header)
self.fsList = chunkFSList
self.type = chunkType
self.updateAttributes(chunkFeatDict)
self.text = '\n'.join([line for line in self.text])
def updateAttributes(self, fsDict):
for attribute in fsDict.keys():
self.__attributes[attribute] = fsDict[attribute]
self.assignName()
self.updateDrel()
def assignName(self):
if 'name' in self.__attributes:
self.name = self.getAttribute('name')
else:
self.errors.append('No name for this chunk Node')
def updateDrel(self):
if 'drel' in self.__attributes:
drelList = self.getAttribute('drel').split(':')
if len(drelList) == 2:
self.parent = drelList[1]
self.parentRelation = self.getAttribute('drel').split(':')[0]
elif 'dmrel' in self.__attributes:
drelList = self.getAttribute('dmrel').split(':')
if len(drelList) == 2:
self.parent = drelList[1]
self.parentRelation = self.getAttribute('dmrel').split(':')[0]
def printValue(self):
returnString = []
for node in self.nodeList:
returnString.append(node.printValue())
return ' '.join(x for x in returnString)
def printSSFValue(self, prefix, allFeat):
returnStringList = []
returnValue = [prefix, '((', self.type]
if allFeat is False:
fs = ['<fs']
for key in self.__attributes.keys():
fs.append(key + "='" + self.getAttribute(key) + "'")
delim = ' '
fs[-1] = fs[-1] + '>'
else:
fs = self.fsList
delim = '|'
returnStringList.append(
'\t'.join(x for x in returnValue) + '\t' + delim.join(x for x in fs))
nodePosn = 0
for node in self.nodeList:
nodePosn += 1
if isinstance(node, ChunkNode):
returnStringList.extend(
node.printSSFValue(prefix + '.' + str(nodePosn), allFeat))
else:
returnStringList.extend(
node.printSSFValue(prefix + '.' + str(nodePosn), allFeat))
returnStringList.append('\t' + '))')
return returnStringList
def getAttribute(self, key):
if key in self.__attributes:
return self.__attributes[key]
else:
return None
def addAttribute(self, key, value):
self.__attributes[key] = value
def deleteAttribute(self, key):
del self.__attributes[key]
class Sentence():
def __init__(self, sentence, ignoreErrors=True, nesting=True, dummySentence=False):
self.ignoreErrors = ignoreErrors
self.nesting = nesting
self.sentence = None
self.sentenceID = None
self.sentenceType = None
self.length = 0
self.tree = None
self.nodeList = []
self.edges = {}
self.nodes = {}
self.tokenNodes = {}
self.rootNode = None
self.fileName = None
self.comment = None
self.probSent = False
self.errors = []
self.text = sentence
self.dummySentence = dummySentence
if self.dummySentence is False:
# self.header = sentence.group('header')
# self.footer = sentence.group('footer')
# self.name = sentence.group('sentenceID')
# self.text = sentence.group('text')
self.analyzeSentence()
def analyzeSentence(self, ignoreErrors=False, nesting=True):
lastContext = self
for line in self.text.split('\n'):
stripLine = line.strip()
if stripLine == "":
continue
elif stripLine[0] == "<" and ignoreErrors is False:
self.errors.append('Encountered a line starting with "<"')
self.probSent = True
else:
splitLine = stripLine.split()
if len(splitLine) > 1 and splitLine[1] == '((':
currentChunkNode = ChunkNode(line + '\n')
currentChunkNode.upper = lastContext
currentChunkNode.upper.nodeList.append(currentChunkNode)
if currentChunkNode.upper.__class__.__name__ != 'Sentence':
currentChunkNode.upper.text.append(line)
lastContext = currentChunkNode
elif len(splitLine) > 0 and splitLine[0] == '))':
currentChunkNode.footer = line + '\n'
currentChunkNode.analyzeChunk()
lastContext = currentChunkNode.upper
currentChunkNode = lastContext
else:
currentNode = Node(line + '\n')
lastContext.nodeList.append(currentNode)
currentNode.upper = lastContext
# updateAttributesStatus = self.updateAttributes()
# if updateAttributesStatus == 0 :
# self.probsent = True
# self.errors.append("Cannot update the Attributes for this sentence")
def addEdge(self, parent, child):
if parent in self.edges.iterkeys():
if child not in self.edges[parent]:
self.edges[parent].append(child)
else:
self.edges[parent] = [child]
def updateAttributes(self):
populateNodesStatus = self.populateNodes()
populateEdgesStatus = self.populateEdges()
self.sentence = self.generateSentence()
if populateEdgesStatus == 0 or populateNodesStatus == 0:
return 0
return 1
def printSSFValue(self, allFeat):
returnStringList = []
returnStringList.append("<Sentence id='" + str(self.sentenceID) + "'>")
if self.nodeList != []:
nodeList = self.nodeList
nodePosn = 0
for node in nodeList:
nodePosn += 1
returnStringList.extend(
node.printSSFValue(str(nodePosn), allFeat))
returnStringList.append('</Sentence>\n')
return '\n'.join(x for x in returnStringList)
def populateNodes(self, naming='strict'):
if naming == 'strict':
for nodeElement in self.nodeList:
assert nodeElement.name is not None
self.nodes[nodeElement.name] = nodeElement
return 1
def populateEdges(self):
for node in self.nodeList:
nodeName = node.name
if node.parent == '0' or node == self.rootNode:
self.rootNode = node
continue
elif node.parent not in self.nodes.iterkeys():
# self.errors.append('Error : Bad DepRel Parent Name ' + self.fileName + ' : ' + str(self.name))
return 0
assert node.parent in self.nodes.iterkeys()
self.addEdge(node.parent, node.name)
return 1
def generateSentence(self):
sentence = []
for nodeName in self.nodeList:
sentence.append(nodeName.printValue())
return ' '.join(x for x in sentence)
class Document():
def __init__(self, fileName):
self.header = None
self.footer = None
self.text = None
self.nodeList = []
self.fileName = fileName
self.analyzeDocument()
self.upper = None
def analyzeDocument(self):
inputFD = codecs.open(self.fileName, 'r', encoding='utf8')
sentenceList = findSentences(inputFD)
for sentence in sentenceList:
tree = Sentence(sentence[1], ignoreErrors=True, nesting=True)
tree.text = sentence[1]
tree.sentenceID = int(sentence[0])
tree.footer = sentence[2]
tree.header = "<Sentence id='" + sentence[0] + "'"
tree.upper = self
self.nodeList.append(tree)
inputFD.close()
def getAddressNode(address, node, level='ChunkNode'):
''' Returns the node referenced in the address string relative to the node in the second argument.
There are levels for setting the starting address-base. These are "ChunkNode", "Node" , "Sentence" , "Document" , "Relative".
The hierarchy of levels for interpretation is :
"Document" -> "Sentence" -> "ChunkNode" -> "Node"
"Relative" value starts the base address from the node which contains the address. This is also the default option.
'''
currentContext = node
if level != 'Relative':
while(currentContext.__class__.__name__ != level):
currentContext = currentContext.upper
currentContext = currentContext.upper
stepList = address.split('%')
for step in stepList:
if step == '..':
currentContext = currentContext.upper
else:
refNode = [
iterNode for iterNode in currentContext.nodeList if iterNode.name == step][0]
currentContext = refNode
return refNode
def getChunkFeats(line):
lineList = line.strip().split()
returnErrors = list()
chunkType = None
fsList = []
if len(lineList) >= 3:
chunkType = lineList[2]
returnFeats = OrderedDict()
multipleFeatRE = r'<fs.*?>'
featRE = r'(?:\W*)(\S+)=([\'|\"])?([^ \t\n\r\f\v\'\"]*)[\'|\"]?(?:.*)'
fsList = re.findall(multipleFeatRE, ' '.join(lineList))
for x in lineList:
feat = re.findall(featRE, x)
if feat != []:
if len(feat) > 1:
returnErrors.append('Feature with more than one value')
continue
returnFeats[feat[0][0]] = feat[0][2]
return [chunkType, returnFeats, fsList]
def getTokenFeats(lineList):
tokenType, token = None, None
returnFeats = OrderedDict()
fsList = []
if len(lineList) >= 3:
tokenType = lineList[2]
returnErrors = list()
token = lineList[1]
multipleFeatRE = r'<fs.*?>'
featRE = r'(?:\W*)(\S+)=([\'|\"])?([^ \t\n\r\f\v\'\"]*)[\'|\"]?(?:.*)'
fsList = re.findall(multipleFeatRE, ' '.join(lineList))
for x in lineList:
feat = re.findall(featRE, x)
if feat != []:
if len(feat) > 1:
returnErrors.append('Feature with more than one value')
continue
returnFeats[feat[0][0]] = feat[0][2]
return [token, tokenType, returnFeats, fsList]
def getSentenceIter(inpFD):
sentenceRE = r'''(?P<complete>(?P<header><Sentence id=[\'\"]?(?P<sentenceID>\d+)[\'\"]?>)(?P<text>.*?)(?P<footer></Sentence>))'''
text = inpFD.read()
text = text.replace('0xe0', '')
return re.finditer(sentenceRE, text, re.DOTALL)
def findSentences(inpFD):
sentenceRE = "<Sentence id='?\"?(.*?)'?\"?>(.*?)(</Sentence>)"
text = inpFD.read()
text = text.replace('0xe0', '')
return re.findall(sentenceRE, text, re.DOTALL)
def folderWalk(folderPath):
fileList = []
for dirPath, dirNames, fileNames in os.walk(folderPath):
for fileName in fileNames:
fileList.append(os.path.join(dirPath, fileName))
return fileList
# if __name__ == '__main__':
# inputPath = sys.argv[1]
# fileList = folderWalk(inputPath)
# newFileList = []
# for fileName in fileList:
# xFileName = fileName.split('/')[-1]
# if xFileName == 'err.txt' or xFileName.split('.')[-1] in ['comments', 'bak'] or xFileName[:4] == 'task':
# continue
# else:
# newFileList.append(fileName)
# for fileName in newFileList:
# d = Document(fileName)
# for tree in d.nodeList:
# for chunkNode in tree.nodeList:
# if chunkNode.type == 'VGF':
# combinedTAM = ''
# for node in chunkNode.nodeList:
# if node.type != 'VM':
# combinedTAM += node.lex + '+'
# print('TAM', combinedTAM.strip('+'))
# # refAddress = node.getAttribute('ref')
# # if refAddress != None :
# # refNode = getAddressNode(refAddress, node)
# # print 'Anaphor' , node.printValue() , 'Reference' , refNode.printValue()
# # print tree.printSSFValue()
# # print tree.header + tree.text + tree.footer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment