From 98807fc866190c2863f13b146aba544d2d4d6cd1 Mon Sep 17 00:00:00 2001 From: Pruthwik Date: Wed, 4 May 2022 22:55:39 +0530 Subject: [PATCH] updated files --- ..._data_from_ssf_in_conll_format_for_file.py | 66 +++ Code/precision_recall_f1_score_chunking.py | 68 +++ Code/precision_recall_f1_score_pos.py | 62 +++ Code/ssfAPI.py | 443 ++++++++++++++++++ 4 files changed, 639 insertions(+) create mode 100644 Code/extract_data_from_ssf_in_conll_format_for_file.py create mode 100644 Code/precision_recall_f1_score_chunking.py create mode 100644 Code/precision_recall_f1_score_pos.py create mode 100755 Code/ssfAPI.py diff --git a/Code/extract_data_from_ssf_in_conll_format_for_file.py b/Code/extract_data_from_ssf_in_conll_format_for_file.py new file mode 100644 index 0000000..263b6b6 --- /dev/null +++ b/Code/extract_data_from_ssf_in_conll_format_for_file.py @@ -0,0 +1,66 @@ +# how to run the code +# python3 extract_data_from_ssf_in_conll_format.py --input InputFilePath --output OutputFilePath --level 0/1/2/3 +# level argument: 0 for token, 1 for token+pos, 2 for token+pos+morph, 3 for token+pos+chunk +# no need to create an output file, only give a name +# author : Pruthwik Mishra, LTRC, IIIT-H +# also download the ssfAPI.py program. +import ssfAPI as ssf +import argparse +import re + + +def readFileAndExtractSentencesInConLL(inputFilePath, outputFilePath, level=0): + """Read a file and extract sentences in conll format.""" + d = ssf.Document(inputFilePath) + sentencesList = list() + print(inputFilePath) + for tree in d.nodeList: + print(tree.sentenceID) + if level == 0: + sentencesList.append('\n'.join([token for token in tree.generateSentence( + ).split() if not re.search('^NUL', token)]) + '\n') + elif level == 1: + tokensWithPOS = [node.lex + '\t' + node.type.replace( + '__', '_') for chunkNode in tree.nodeList for node in chunkNode.nodeList if not re.search('^NUL', node.lex)] + sentencesList.append('\n'.join(tokensWithPOS) + '\n') + elif level == 2: + tokensWithPOSMorph = [node.lex + '\t' + node.type.replace('__', '_') + '\t' + node.getAttribute( + 'af') for chunkNode in tree.nodeList for node in chunkNode.nodeList if not re.search('^NUL', node.lex)] + sentencesList.append('\n'.join(tokensWithPOSMorph) + '\n') + else: + tokenPOSAndChunk = list() + for chunkNode in tree.nodeList: + for indexNode, node in enumerate(chunkNode.nodeList): + if indexNode == 0: + if not re.search('^NUL', node.lex): + tokenPOSAndChunk.append( + node.lex + '\t' + node.type.replace('__', '_') + '\tB-' + chunkNode.type) + else: + if not re.search('^NUL', node.lex): + tokenPOSAndChunk.append( + node.lex + '\t' + node.type.replace('__', '_') + '\tI-' + chunkNode.type) + sentencesList.append('\n'.join(tokenPOSAndChunk) + '\n') + writeListToFile(sentencesList, outputFilePath) + + +def writeListToFile(dataList, outFilePath): + with open(outFilePath, 'w', encoding='utf-8') as fileWrite: + fileWrite.write('\n'.join(dataList) + '\n') + fileWrite.close() + + +def main(): + """Pass arguments and call functions here.""" + parser = argparse.ArgumentParser() + parser.add_argument('--input', dest='inp', + help="Add the input file path") + parser.add_argument('--output', dest='out', + help="Add the output file path") + parser.add_argument('--level', dest='level', + help="Add the level 0: token, 1: token + pos, 2: token + pos + morph, 3 for token + pos + chunk", type=int, default=0) + args = parser.parse_args() + readFileAndExtractSentencesInConLL(args.inp, args.out, args.level) + + +if __name__ == '__main__': + main() diff --git a/Code/precision_recall_f1_score_chunking.py b/Code/precision_recall_f1_score_chunking.py new file mode 100644 index 0000000..ede966d --- /dev/null +++ b/Code/precision_recall_f1_score_chunking.py @@ -0,0 +1,68 @@ +"""Evaluate chunk metrics.""" +# the input file has this structure +# token\tgold-pos\tgold-chunk\tpred-chunk +# cut the predicted chunk output from the shallow parse output +# paste it with the gold-pos-chunk file +# if seqeval not installed +# install using pip install seqeval +from seqeval.metrics import classification_report +from seqeval.metrics import accuracy_score +from seqeval.metrics import f1_score +from seqeval.scheme import IOB2 +from sys import argv + + +def read_lines_from_file(file_path): + """Read lines from a file.""" + with open(file_path, 'r', encoding='utf-8') as file_read: + return file_read.readlines() + + +def process_lines_prepare_gold_and_system_outputs(lines): + """Process input lines and prepare gold and system outputs.""" + gold_all, pred_all, temp_gold, temp_pred = list(), list(), list(), list() + for line in lines: + line = line.strip() + if line: + gold, pred = line.split()[-2:] + temp_gold.append(gold) + temp_pred.append(pred) + else: + assert len(temp_gold) == len(temp_pred) + gold_all.append(temp_gold) + pred_all.append(temp_pred) + temp_gold, temp_pred = list(), list() + if temp_gold and temp_pred: + assert len(temp_gold) == len(temp_pred) + gold_all.append(temp_gold) + pred_all.append(temp_pred) + return gold_all, pred_all + + +def generate_classification_metrics(gold, pred): + """Generate classification metrics using seqeval package.""" + class_report = '' + class_report += classification_report(gold, pred, mode='strict', scheme=IOB2) + '\n' + class_report += 'Accuracy = ' + str(accuracy_score(gold, pred)) + '\n' + class_report += 'Micro_F1 = ' + str(f1_score(gold, pred)) + return class_report + + +def write_data_into_file(data, file_path): + """Write data into a file.""" + with open(file_path, 'w', encoding='utf-8') as file_write: + file_write.write(data + '\n') + + +def main(): + """Pass arguments and call functions here.""" + input_file = argv[1] + output_file = argv[2] + input_lines = read_lines_from_file(input_file) + gold_all, pred_all = process_lines_prepare_gold_and_system_outputs(input_lines) + class_report = generate_classification_metrics(gold_all, pred_all) + write_data_into_file(class_report, output_file) + + +if __name__ == '__main__': + main() diff --git a/Code/precision_recall_f1_score_pos.py b/Code/precision_recall_f1_score_pos.py new file mode 100644 index 0000000..5c63b8d --- /dev/null +++ b/Code/precision_recall_f1_score_pos.py @@ -0,0 +1,62 @@ +"""Precision, recall, F1 score for POS.""" +# the inputs to these program are: +# gold pos outputs, pred pos outputs and a file name +# where the classification results will be written. +# if you do not have sklearn +# install using pip install sklearn +from sys import argv +from sklearn.metrics import classification_report +from sklearn.metrics import f1_score +from sklearn.metrics import precision_score +from sklearn.metrics import recall_score +from sklearn.metrics import accuracy_score + + +def readLinesFromFile(filePath): + """Read lines from a file.""" + with open(filePath, 'r', encoding='utf-8') as fileRead: + return [line.strip() for line in fileRead.readlines() if line.strip()] + + +def findPrecisionRecallF1score(goldLabels, predictedLabels, trueLabels=None): + """Find Precision, Recall and F1 scores.""" + return classification_report(goldLabels, + predictedLabels, target_names=trueLabels) + + +def main(): + """Pass arguments and call functions here.""" + goldPath = argv[1] + predPath = argv[2] + outPath = argv[3] + gold = readLinesFromFile(goldPath) + predicted = readLinesFromFile(predPath) + allLabels = set(predicted).union(set(gold)) + dictLabelToIndices = {label: index for index, + label in enumerate(allLabels)} + predictedIntoIndexes = [dictLabelToIndices[item] for item in predicted] + goldIntoIndexes = [dictLabelToIndices[item] for item in gold] + outDesc = open(outPath, 'w') + classReport = '' + classReport += findPrecisionRecallF1score(gold, predicted) + if len(set(predictedIntoIndexes)) == 2: + print('Micro Precision =', precision_score(goldIntoIndexes, predictedIntoIndexes, average='binary')) + print('Micro Recall =', recall_score(goldIntoIndexes, predictedIntoIndexes, average='binary')) + print('Micro F1 =', f1_score(goldIntoIndexes, predictedIntoIndexes, average='binary')) + print('Micro Accuracy =', accuracy_score(goldIntoIndexes, predictedIntoIndexes)) + else: + classReport += '\n' + classReport += 'Micro_Precision = ' + str(precision_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n' + print('Micro Precision =', precision_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + classReport += 'Micro_Recall = ' + str(recall_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n' + print('Micro Recall =', recall_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + classReport += 'Micro_F1 = ' + str(f1_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + '\n' + print('Micro F1 =', f1_score(goldIntoIndexes, predictedIntoIndexes, average='micro')) + classReport += 'Micro_Accuracy = ' + str(accuracy_score(goldIntoIndexes, predictedIntoIndexes)) + '\n' + print('Micro Accuracy =', accuracy_score(goldIntoIndexes, predictedIntoIndexes)) + outDesc.write(classReport + '\n') + outDesc.close() + + +if __name__ == '__main__': + main() diff --git a/Code/ssfAPI.py b/Code/ssfAPI.py new file mode 100755 index 0000000..0066d95 --- /dev/null +++ b/Code/ssfAPI.py @@ -0,0 +1,443 @@ +#!/usr/bin/python +# Author: Himanshu Sharma +# changes added by Pruthwik Mishra +import os +# import sys +import codecs +import re +from collections import OrderedDict + + +class Node(): + + def __init__(self, text): + self.text = text + self.lex = None + self.type = None + self.__attributes = OrderedDict() + self.errors = [] + self.name = None + self.parent = None + self.parentRelation = None + self.alignedTo = None + self.fsList = None + self.analyzeNode(self.text) + + def analyzeNode(self, text): + [token, tokenType, fsDict, fsList] = getTokenFeats( + text.strip().split()) + attributeUpdateStatus = self.updateAttributes( + token, tokenType, fsDict, fsList) + if attributeUpdateStatus == 0: + self.errors.append("Can't update attributes for node") + self.probSent = True + + def updateAttributes(self, token, tokenType, fsDict, fsList): + self.fsList = fsList + self.lex = token + self.type = tokenType + for attribute in fsDict.keys(): + self.__attributes[attribute] = fsDict[attribute] + self.assignName() + + def assignName(self): + if self.__attributes.get('name') is not None: + self.name = self.getAttribute('name') + else: + self.errors.append('No name for this token Node') + + def printValue(self): + return self.lex + + def printSSFValue(self, prefix, allFeat): + returnValue = [prefix, self.printValue(), self.type] + if allFeat is False: + fs = ['' + + else: + fs = self.fsList + delim = '|' + return ['\t'.join(x for x in returnValue) + '\t' + delim.join(x for x in fs)] + + def getAttribute(self, key): + if key in self.__attributes: + return self.__attributes[key] + else: + return None + + def addAttribute(self, key, value): + self.__attributes[key] = value + + def deleteAttribute(self, key): + del self.__attributes[key] + + +class ChunkNode(): + + def __init__(self, header): + self.text = [] + self.header = header + self.footer = None + self.nodeList = [] + self.parent = '0' + self.__attributes = OrderedDict() + self.parentRelation = 'root' + self.name = None + self.head = None + self.isParent = False + self.errors = [] + self.upper = None + self.updateDrel() + self.type = None + self.fsList = None + + def analyzeChunk(self): + [chunkType, chunkFeatDict, chunkFSList] = getChunkFeats(self.header) + self.fsList = chunkFSList + self.type = chunkType + self.updateAttributes(chunkFeatDict) + self.text = '\n'.join([line for line in self.text]) + + def updateAttributes(self, fsDict): + for attribute in fsDict.keys(): + self.__attributes[attribute] = fsDict[attribute] + self.assignName() + self.updateDrel() + + def assignName(self): + if 'name' in self.__attributes: + self.name = self.getAttribute('name') + else: + self.errors.append('No name for this chunk Node') + + def updateDrel(self): + if 'drel' in self.__attributes: + drelList = self.getAttribute('drel').split(':') + if len(drelList) == 2: + self.parent = drelList[1] + self.parentRelation = self.getAttribute('drel').split(':')[0] + elif 'dmrel' in self.__attributes: + drelList = self.getAttribute('dmrel').split(':') + if len(drelList) == 2: + self.parent = drelList[1] + self.parentRelation = self.getAttribute('dmrel').split(':')[0] + + def printValue(self): + returnString = [] + for node in self.nodeList: + returnString.append(node.printValue()) + return ' '.join(x for x in returnString) + + def printSSFValue(self, prefix, allFeat): + returnStringList = [] + returnValue = [prefix, '((', self.type] + if allFeat is False: + fs = ['' + + else: + fs = self.fsList + delim = '|' + + returnStringList.append( + '\t'.join(x for x in returnValue) + '\t' + delim.join(x for x in fs)) + nodePosn = 0 + for node in self.nodeList: + nodePosn += 1 + if isinstance(node, ChunkNode): + returnStringList.extend( + node.printSSFValue(prefix + '.' + str(nodePosn), allFeat)) + else: + returnStringList.extend( + node.printSSFValue(prefix + '.' + str(nodePosn), allFeat)) + returnStringList.append('\t' + '))') + return returnStringList + + def getAttribute(self, key): + if key in self.__attributes: + return self.__attributes[key] + else: + return None + + def addAttribute(self, key, value): + self.__attributes[key] = value + + def deleteAttribute(self, key): + del self.__attributes[key] + + +class Sentence(): + + def __init__(self, sentence, ignoreErrors=True, nesting=True, dummySentence=False): + self.ignoreErrors = ignoreErrors + self.nesting = nesting + self.sentence = None + self.sentenceID = None + self.sentenceType = None + self.length = 0 + self.tree = None + self.nodeList = [] + self.edges = {} + self.nodes = {} + self.tokenNodes = {} + self.rootNode = None + self.fileName = None + self.comment = None + self.probSent = False + self.errors = [] + self.text = sentence + self.dummySentence = dummySentence + if self.dummySentence is False: + + # self.header = sentence.group('header') + # self.footer = sentence.group('footer') + # self.name = sentence.group('sentenceID') + # self.text = sentence.group('text') + self.analyzeSentence() + + def analyzeSentence(self, ignoreErrors=False, nesting=True): + + lastContext = self + + for line in self.text.split('\n'): + stripLine = line.strip() + + if stripLine == "": + continue + elif stripLine[0] == "<" and ignoreErrors is False: + self.errors.append('Encountered a line starting with "<"') + self.probSent = True + else: + splitLine = stripLine.split() + if len(splitLine) > 1 and splitLine[1] == '((': + currentChunkNode = ChunkNode(line + '\n') + currentChunkNode.upper = lastContext + currentChunkNode.upper.nodeList.append(currentChunkNode) + if currentChunkNode.upper.__class__.__name__ != 'Sentence': + currentChunkNode.upper.text.append(line) + lastContext = currentChunkNode + elif len(splitLine) > 0 and splitLine[0] == '))': + currentChunkNode.footer = line + '\n' + currentChunkNode.analyzeChunk() + lastContext = currentChunkNode.upper + currentChunkNode = lastContext + else: + currentNode = Node(line + '\n') + lastContext.nodeList.append(currentNode) + currentNode.upper = lastContext + + # updateAttributesStatus = self.updateAttributes() + # if updateAttributesStatus == 0 : + # self.probsent = True + # self.errors.append("Cannot update the Attributes for this sentence") + + def addEdge(self, parent, child): + if parent in self.edges.iterkeys(): + if child not in self.edges[parent]: + self.edges[parent].append(child) + else: + self.edges[parent] = [child] + + def updateAttributes(self): + populateNodesStatus = self.populateNodes() + populateEdgesStatus = self.populateEdges() + self.sentence = self.generateSentence() + if populateEdgesStatus == 0 or populateNodesStatus == 0: + return 0 + return 1 + + def printSSFValue(self, allFeat): + returnStringList = [] + returnStringList.append("") + if self.nodeList != []: + nodeList = self.nodeList + nodePosn = 0 + for node in nodeList: + nodePosn += 1 + returnStringList.extend( + node.printSSFValue(str(nodePosn), allFeat)) + returnStringList.append('\n') + return '\n'.join(x for x in returnStringList) + + def populateNodes(self, naming='strict'): + if naming == 'strict': + for nodeElement in self.nodeList: + assert nodeElement.name is not None + self.nodes[nodeElement.name] = nodeElement + return 1 + + def populateEdges(self): + for node in self.nodeList: + nodeName = node.name + if node.parent == '0' or node == self.rootNode: + self.rootNode = node + continue + elif node.parent not in self.nodes.iterkeys(): + # self.errors.append('Error : Bad DepRel Parent Name ' + self.fileName + ' : ' + str(self.name)) + return 0 + assert node.parent in self.nodes.iterkeys() + self.addEdge(node.parent, node.name) + return 1 + + def generateSentence(self): + sentence = [] + for nodeName in self.nodeList: + sentence.append(nodeName.printValue()) + return ' '.join(x for x in sentence) + + +class Document(): + + def __init__(self, fileName): + self.header = None + self.footer = None + self.text = None + self.nodeList = [] + self.fileName = fileName + self.analyzeDocument() + self.upper = None + + def analyzeDocument(self): + + inputFD = codecs.open(self.fileName, 'r', encoding='utf8') + sentenceList = findSentences(inputFD) + for sentence in sentenceList: + tree = Sentence(sentence[1], ignoreErrors=True, nesting=True) + tree.text = sentence[1] + tree.sentenceID = int(sentence[0]) + tree.footer = sentence[2] + tree.header = " "Sentence" -> "ChunkNode" -> "Node" + "Relative" value starts the base address from the node which contains the address. This is also the default option. + ''' + + currentContext = node + + if level != 'Relative': + while(currentContext.__class__.__name__ != level): + currentContext = currentContext.upper + + currentContext = currentContext.upper + + stepList = address.split('%') + + for step in stepList: + if step == '..': + currentContext = currentContext.upper + else: + refNode = [ + iterNode for iterNode in currentContext.nodeList if iterNode.name == step][0] + currentContext = refNode + return refNode + + +def getChunkFeats(line): + lineList = line.strip().split() + returnErrors = list() + chunkType = None + fsList = [] + if len(lineList) >= 3: + chunkType = lineList[2] + returnFeats = OrderedDict() + multipleFeatRE = r'' + featRE = r'(?:\W*)(\S+)=([\'|\"])?([^ \t\n\r\f\v\'\"]*)[\'|\"]?(?:.*)' + fsList = re.findall(multipleFeatRE, ' '.join(lineList)) + for x in lineList: + feat = re.findall(featRE, x) + if feat != []: + if len(feat) > 1: + returnErrors.append('Feature with more than one value') + continue + returnFeats[feat[0][0]] = feat[0][2] + return [chunkType, returnFeats, fsList] + + +def getTokenFeats(lineList): + tokenType, token = None, None + returnFeats = OrderedDict() + fsList = [] + if len(lineList) >= 3: + tokenType = lineList[2] + returnErrors = list() + token = lineList[1] + multipleFeatRE = r'' + featRE = r'(?:\W*)(\S+)=([\'|\"])?([^ \t\n\r\f\v\'\"]*)[\'|\"]?(?:.*)' + fsList = re.findall(multipleFeatRE, ' '.join(lineList)) + for x in lineList: + feat = re.findall(featRE, x) + if feat != []: + if len(feat) > 1: + returnErrors.append('Feature with more than one value') + continue + returnFeats[feat[0][0]] = feat[0][2] + + return [token, tokenType, returnFeats, fsList] + + +def getSentenceIter(inpFD): + + sentenceRE = r'''(?P(?P
\d+)[\'\"]?>)(?P.*?)(?P