Source code for benchmarkstt.csv

"""
Module providing a custom CSV file parser with support for whitespace trimming, empty lines filtering and comment lines
"""

import typing
import sys
from functools import partial
from benchmarkstt.deferred import DeferredList
from benchmarkstt.helpers import make_printable


[docs]class InvalidDialectError(ValueError): """An invalid dialect was supplied"""
[docs]class UnknownDialectError(ValueError): """An unknown dialect was requested"""
[docs]class CSVParserError(ValueError): """Some error occured while attempting to parse the file""" def __init__(self, message, line, char, index): self.message = message self.line = line self.char = char self.index = index
[docs]class UnclosedQuoteError(CSVParserError): """A quote wasn't properly closed"""
[docs]class UnallowedQuoteError(CSVParserError): """A quote is not allowed there"""
[docs]class Dialect: delimiter = None quotechar = None commentchar = None trimleft = None trimright = None
[docs]class DefaultDialect(Dialect): delimiter = ',' quotechar = '"' commentchar = '#' trimleft = ' \t\n\r' trimright = trimleft ignoreemptylines = True
[docs]class WhitespaceDialect(DefaultDialect): delimiter = ' \t'
known_dialects = { "default": DefaultDialect, "whitespace": WhitespaceDialect } MODE_FIRST = 0 MODE_OUTSIDE = 1 MODE_INSIDE = 2 MODE_INSIDE_QUOTED = 3 MODE_INSIDE_QUOTED_QUOTE = 4 MODE_COMMENT = 5 Line = list Field = str
[docs]class Line(list): @property def lineno(self): return self.__dict__['lineno']
# don't really know if it's quoted atm # class Field(str, object): # @property # def quoted(self): # return self.__dict__['quoted'] # # @quoted.setter # def quoted(self, value): # self.__dict__['quoted'] = bool(value)
[docs]class Reader: """ CSV-like file reader with support for comment chars, ignoring empty lines and whitespace trimming on both sides of each field. """ def __init__(self, file: typing.io.TextIO, dialect: Dialect, debug=None): if not issubclass(dialect, Dialect): raise InvalidDialectError("Invalid dialect", dialect) self.line_num = 0 self._dialect = dialect self._file = file self._debug = bool(debug) def _trimright(self, data: str): chars = self._dialect.trimright if chars is None: return data return data.rstrip(chars) def _is_ignore_left(self, char: str): if self._dialect.trimleft is None: return False return char in self._dialect.trimleft def _is_ignore_right(self, char: str): if self._dialect.trimright is None: # currently no dialect with no trimright return False # pragma: nocover return char in self._dialect.trimright def _is_comment(self, char: str): if self._dialect.commentchar is None: return False return char in self._dialect.commentchar def _is_quote(self, char: str): if self._dialect.quotechar is None: return False return char == self._dialect.quotechar def _is_delimiter(self, char: str): return char in self._dialect.delimiter def __iter__(self): readchar = iter(partial(self._file.read, 1), '') cur_line = 1 if self._debug: current_module = sys.modules[__name__] # print the color key the different modes print('MODES: ', end='') print(' '.join(['\033[1;%d;40m%s\033[0;0m' % (32 + getattr(current_module, name), name[5:]) for name in dir(current_module) if name.startswith('MODE_') ])) def debug(txt='', args=tuple(), **kwargs): if type(args) is not tuple: args = tuple(DeferredList(args)) print(txt % args, **kwargs) pass else: def debug(*args, **kwargs): pass newlinechars = '\n\r' mode = MODE_FIRST field = [] line = Line() if self._dialect.trimright is not None: delimiter_is_whitespace = self._dialect.delimiter in self._dialect.trimright else: delimiter_is_whitespace = False def yield_line(): nonlocal line, field, mode, delimiter_is_whitespace, is_newline, cur_line if not(mode == MODE_OUTSIDE and delimiter_is_whitespace): next_field() field = [] _line = line _line.__dict__['lineno'] = cur_line line = Line() mode = MODE_FIRST return _line def next_field(): nonlocal field, line, mode field = ''.join(field) if mode != MODE_INSIDE_QUOTED_QUOTE: field = self._trimright(field) field = Field(field) line.append(field) field = [] mode = MODE_OUTSIDE cur_char = 0 last_quote_line = None last_quote_char = None last_quote_idx = None idx = 0 for char in readchar: cur_char += 1 idx += 1 # print char to stdout with color defining mode debug('\033[1;%d;40m%s\033[0;0m', lambda: (32+mode, make_printable(char)), end='') is_newline = char in newlinechars if is_newline: cur_line += 1 cur_char = 0 if mode == MODE_COMMENT: if is_newline: mode = MODE_FIRST continue if mode in (MODE_OUTSIDE, MODE_FIRST): if is_newline: if mode != MODE_FIRST: yield yield_line() continue if self._is_ignore_left(char): continue if self._is_comment(char): if mode is MODE_OUTSIDE: yield yield_line() mode = MODE_COMMENT continue if self._is_quote(char): mode = MODE_INSIDE_QUOTED last_quote_line = cur_line last_quote_char = cur_char last_quote_idx = idx continue if self._is_delimiter(char): next_field() continue mode = MODE_INSIDE field.append(char) continue if mode == MODE_INSIDE: if self._is_quote(char): raise UnallowedQuoteError("Quote not allowed here", cur_line, cur_char, idx) if is_newline: yield yield_line() continue if self._is_delimiter(char): next_field() continue field.append(char) continue if mode == MODE_INSIDE_QUOTED_QUOTE: if self._is_quote(char): field.append(char) mode = MODE_INSIDE_QUOTED continue if self._is_delimiter(char): next_field() continue if is_newline: yield yield_line() continue if not delimiter_is_whitespace: if self._is_ignore_right(char): continue if self._is_comment(char): yield yield_line() mode = MODE_COMMENT continue raise UnallowedQuoteError("Single quote inside quoted field", cur_line, cur_char, idx) if mode == MODE_INSIDE_QUOTED: if self._is_quote(char): mode = MODE_INSIDE_QUOTED_QUOTE continue field.append(char) continue debug() if mode == MODE_INSIDE_QUOTED: raise UnclosedQuoteError("Unexpected end", last_quote_line, last_quote_char, last_quote_idx) if mode in (MODE_INSIDE_QUOTED_QUOTE, MODE_OUTSIDE, MODE_INSIDE): yield yield_line()
[docs]def reader(file: typing.io.TextIO, dialect: typing.Union[None, str, Dialect] = None, **kwargs) -> Reader: if dialect is None: dialect = DefaultDialect elif type(dialect) is str: if dialect not in known_dialects: raise UnknownDialectError("Dialect not known", dialect) dialect = known_dialects[dialect] return Reader(file, dialect, **kwargs)