Split the lexer into PlyLexer and TokenStream components
- There are two types of token streams: file based, and list based - I think this has better component separation - Doxygen parsing is a bit weirder, but I think it's more straightforward to see all the pieces?
This commit is contained in:
parent
40bf05b384
commit
1eaa85ae8d
@ -50,7 +50,7 @@ class LexToken(Protocol):
|
||||
location: Location
|
||||
|
||||
#: private
|
||||
lexer: "Lexer"
|
||||
lexer: lex.Lexer
|
||||
|
||||
|
||||
PhonyEnding: LexToken = lex.LexToken() # type: ignore
|
||||
@ -60,10 +60,13 @@ PhonyEnding.lineno = 0
|
||||
PhonyEnding.lexpos = 0
|
||||
|
||||
|
||||
class Lexer:
|
||||
class PlyLexer:
|
||||
"""
|
||||
This lexer is a combination of pieces from the PLY lexers that CppHeaderParser
|
||||
and pycparser have.
|
||||
|
||||
This tokenizes the input into tokens. The other lexer classes do more complex
|
||||
things with the tokens.
|
||||
"""
|
||||
|
||||
keywords = {
|
||||
@ -439,13 +442,6 @@ class Lexer:
|
||||
else:
|
||||
return t
|
||||
|
||||
@TOKEN(r"\/\/.*\n?")
|
||||
def t_COMMENT_SINGLELINE(self, t: LexToken) -> LexToken:
|
||||
if t.value.startswith("///") or t.value.startswith("//!"):
|
||||
self.comments.append(t.value.lstrip("\t ").rstrip("\n"))
|
||||
t.lexer.lineno += t.value.count("\n")
|
||||
return t
|
||||
|
||||
t_DIVIDE = r"/(?!/)"
|
||||
t_ELLIPSIS = r"\.\.\."
|
||||
t_DBL_LBRACKET = r"\[\["
|
||||
@ -458,22 +454,20 @@ class Lexer:
|
||||
|
||||
t_STRING_LITERAL = string_literal
|
||||
|
||||
@TOKEN(r"\/\/.*\n?")
|
||||
def t_COMMENT_SINGLELINE(self, t: LexToken) -> LexToken:
|
||||
t.lexer.lineno += t.value.count("\n")
|
||||
return t
|
||||
|
||||
# Found at http://ostermiller.org/findcomment.html
|
||||
@TOKEN(r"/\*([^*]|[\r\n]|(\*+([^*/]|[\r\n])))*\*+/\n?")
|
||||
def t_COMMENT_MULTILINE(self, t: LexToken) -> LexToken:
|
||||
if t.value.startswith("/**") or t.value.startswith("/*!"):
|
||||
# not sure why, but get double new lines
|
||||
v = t.value.replace("\n\n", "\n")
|
||||
# strip prefixing whitespace
|
||||
v = _multicomment_re.sub("\n*", v)
|
||||
self.comments = v.splitlines()
|
||||
t.lexer.lineno += t.value.count("\n")
|
||||
return t
|
||||
|
||||
@TOKEN(r"\n+")
|
||||
def t_NEWLINE(self, t: LexToken) -> LexToken:
|
||||
t.lexer.lineno += len(t.value)
|
||||
del self.comments[:]
|
||||
return t
|
||||
|
||||
def t_error(self, t: LexToken) -> None:
|
||||
@ -485,9 +479,8 @@ class Lexer:
|
||||
|
||||
_lexer = None
|
||||
lex: lex.Lexer
|
||||
lineno: int
|
||||
|
||||
def __new__(cls, *args, **kwargs) -> "Lexer":
|
||||
def __new__(cls, *args, **kwargs) -> "PlyLexer":
|
||||
# only build the lexer once
|
||||
inst = super().__new__(cls)
|
||||
if cls._lexer is None:
|
||||
@ -499,157 +492,75 @@ class Lexer:
|
||||
|
||||
def __init__(self, filename: typing.Optional[str] = None):
|
||||
self.input: typing.Callable[[str], None] = self.lex.input
|
||||
self.token: typing.Callable[[], LexToken] = self.lex.token
|
||||
|
||||
# For tracking current file/line position
|
||||
self.filename = filename
|
||||
self.line_offset = 0
|
||||
|
||||
# Doxygen comments
|
||||
self.comments = []
|
||||
def current_location(self) -> Location:
|
||||
return Location(self.filename, self.lex.lineno - self.line_offset)
|
||||
|
||||
self.lookahead = typing.Deque[LexToken]()
|
||||
|
||||
# For 'set_group_of_tokens' support
|
||||
self._get_token: typing.Callable[[], LexToken] = self.lex.token
|
||||
self.lookahead_stack = typing.Deque[typing.Deque[LexToken]]()
|
||||
class TokenStream:
|
||||
"""
|
||||
Provides access to a stream of tokens
|
||||
"""
|
||||
|
||||
tokbuf: typing.Deque[LexToken]
|
||||
|
||||
def _fill_tokbuf(self, tokbuf: typing.Deque[LexToken]) -> bool:
|
||||
"""
|
||||
Fills tokbuf with tokens from the next line. Return True if at least
|
||||
one token was added to the buffer
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def current_location(self) -> Location:
|
||||
if self.lookahead:
|
||||
return self.lookahead[0].location
|
||||
return Location(self.filename, self.lex.lineno - self.line_offset)
|
||||
raise NotImplementedError
|
||||
|
||||
def get_doxygen(self) -> typing.Optional[str]:
|
||||
"""
|
||||
This should be called after the first element of something has
|
||||
been consumed.
|
||||
|
||||
It will lookahead for comments that come after the item, if prior
|
||||
comments don't exist.
|
||||
This is called at the point that you want doxygen information
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
# Assumption: This function is either called at the beginning of a
|
||||
# statement or at the end of a statement
|
||||
|
||||
if self.comments:
|
||||
comments = self.comments
|
||||
else:
|
||||
comments = []
|
||||
# only look for comments until a newline (including lookahead)
|
||||
for tok in self.lookahead:
|
||||
if tok.type == "NEWLINE":
|
||||
return None
|
||||
|
||||
while True:
|
||||
tok = self._get_token()
|
||||
comments.extend(self.comments)
|
||||
|
||||
if tok is None:
|
||||
break
|
||||
|
||||
tok.location = Location(self.filename, tok.lineno - self.line_offset)
|
||||
ttype = tok.type
|
||||
if ttype == "NEWLINE":
|
||||
self.lookahead.append(tok)
|
||||
break
|
||||
|
||||
if ttype not in self._discard_types:
|
||||
self.lookahead.append(tok)
|
||||
|
||||
if ttype == "NAME":
|
||||
break
|
||||
|
||||
del self.comments[:]
|
||||
|
||||
comment_str = "\n".join(comments)
|
||||
del self.comments[:]
|
||||
if comment_str:
|
||||
return comment_str
|
||||
|
||||
return None
|
||||
def get_doxygen_after(self) -> typing.Optional[str]:
|
||||
"""
|
||||
This is called to retrieve doxygen information after a statement
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
_discard_types = {"NEWLINE", "COMMENT_SINGLELINE", "COMMENT_MULTILINE"}
|
||||
|
||||
def _token_limit_exceeded(self) -> typing.NoReturn:
|
||||
from .errors import CxxParseError
|
||||
|
||||
raise CxxParseError("no more tokens left in this group")
|
||||
|
||||
@contextlib.contextmanager
|
||||
def set_group_of_tokens(
|
||||
self, toks: typing.List[LexToken]
|
||||
) -> typing.Generator[typing.Deque[LexToken], None, None]:
|
||||
# intended for use when you have a set of tokens that you know
|
||||
# must be consumed, such as a paren grouping or some type of
|
||||
# lookahead case
|
||||
|
||||
stack = self.lookahead_stack
|
||||
restore_fn = False
|
||||
|
||||
if not stack:
|
||||
restore_fn = True
|
||||
self._get_token = self._token_limit_exceeded
|
||||
|
||||
this_buf = typing.Deque[LexToken](toks)
|
||||
prev_buf = self.lookahead
|
||||
stack.append(prev_buf)
|
||||
self.lookahead = this_buf
|
||||
|
||||
try:
|
||||
yield this_buf
|
||||
finally:
|
||||
buf = stack.pop()
|
||||
if prev_buf is not buf:
|
||||
raise ValueError("internal error")
|
||||
|
||||
self.lookahead = prev_buf
|
||||
|
||||
if restore_fn:
|
||||
self._get_token = self.lex.token
|
||||
|
||||
def token(self) -> LexToken:
|
||||
tok = None
|
||||
while self.lookahead:
|
||||
tok = self.lookahead.popleft()
|
||||
if tok.type not in self._discard_types:
|
||||
return tok
|
||||
|
||||
tokbuf = self.tokbuf
|
||||
while True:
|
||||
tok = self._get_token()
|
||||
if tok is None:
|
||||
while tokbuf:
|
||||
tok = tokbuf.popleft()
|
||||
if tok.type not in self._discard_types:
|
||||
return tok
|
||||
|
||||
if not self._fill_tokbuf(tokbuf):
|
||||
raise EOFError("unexpected end of file")
|
||||
|
||||
if tok.type not in self._discard_types:
|
||||
tok.location = Location(self.filename, tok.lineno - self.line_offset)
|
||||
break
|
||||
|
||||
return tok
|
||||
|
||||
def token_eof_ok(self) -> typing.Optional[LexToken]:
|
||||
tok = None
|
||||
while self.lookahead:
|
||||
tok = self.lookahead.popleft()
|
||||
if tok.type not in self._discard_types:
|
||||
return tok
|
||||
|
||||
tokbuf = self.tokbuf
|
||||
while True:
|
||||
tok = self._get_token()
|
||||
if tok is None:
|
||||
break
|
||||
while tokbuf:
|
||||
tok = tokbuf.popleft()
|
||||
if tok.type not in self._discard_types:
|
||||
return tok
|
||||
|
||||
if tok.type not in self._discard_types:
|
||||
tok.location = Location(self.filename, tok.lineno - self.line_offset)
|
||||
break
|
||||
|
||||
return tok
|
||||
if not self._fill_tokbuf(tokbuf):
|
||||
return None
|
||||
|
||||
def token_if(self, *types: str) -> typing.Optional[LexToken]:
|
||||
tok = self.token_eof_ok()
|
||||
if tok is None:
|
||||
return None
|
||||
if tok.type not in types:
|
||||
# put it back on the left in case it was retrieved
|
||||
# from the lookahead buffer
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
return None
|
||||
return tok
|
||||
|
||||
@ -658,9 +569,7 @@ class Lexer:
|
||||
if tok is None:
|
||||
return None
|
||||
if tok.type not in types:
|
||||
# put it back on the left in case it was retrieved
|
||||
# from the lookahead buffer
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
return None
|
||||
return tok
|
||||
|
||||
@ -669,9 +578,7 @@ class Lexer:
|
||||
if tok is None:
|
||||
return None
|
||||
if tok.value not in vals:
|
||||
# put it back on the left in case it was retrieved
|
||||
# from the lookahead buffer
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
return None
|
||||
return tok
|
||||
|
||||
@ -680,9 +587,7 @@ class Lexer:
|
||||
if tok is None:
|
||||
return None
|
||||
if tok.type in types:
|
||||
# put it back on the left in case it was retrieved
|
||||
# from the lookahead buffer
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
return None
|
||||
return tok
|
||||
|
||||
@ -690,18 +595,177 @@ class Lexer:
|
||||
tok = self.token_eof_ok()
|
||||
if not tok:
|
||||
return False
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
return tok.type in types
|
||||
|
||||
def return_token(self, tok: LexToken) -> None:
|
||||
self.lookahead.appendleft(tok)
|
||||
self.tokbuf.appendleft(tok)
|
||||
|
||||
def return_tokens(self, toks: typing.Sequence[LexToken]) -> None:
|
||||
self.lookahead.extendleft(reversed(toks))
|
||||
self.tokbuf.extendleft(reversed(toks))
|
||||
|
||||
|
||||
class LexerTokenStream(TokenStream):
|
||||
"""
|
||||
Provides tokens from using PlyLexer on the given input text
|
||||
"""
|
||||
|
||||
def __init__(self, filename: typing.Optional[str], content: str) -> None:
|
||||
self._lex = PlyLexer(filename)
|
||||
self._lex.input(content)
|
||||
self.tokbuf = typing.Deque[LexToken]()
|
||||
|
||||
def _fill_tokbuf(self, tokbuf: typing.Deque[LexToken]) -> bool:
|
||||
get_token = self._lex.token
|
||||
tokbuf = self.tokbuf
|
||||
|
||||
tok = get_token()
|
||||
if tok is None:
|
||||
return False
|
||||
|
||||
while True:
|
||||
tok.location = self._lex.current_location()
|
||||
tokbuf.append(tok)
|
||||
|
||||
if tok.type == "NEWLINE":
|
||||
break
|
||||
|
||||
tok = get_token()
|
||||
if tok is None:
|
||||
break
|
||||
|
||||
return True
|
||||
|
||||
def current_location(self) -> Location:
|
||||
if self.tokbuf:
|
||||
return self.tokbuf[0].location
|
||||
return self._lex.current_location()
|
||||
|
||||
def get_doxygen(self) -> typing.Optional[str]:
|
||||
|
||||
tokbuf = self.tokbuf
|
||||
|
||||
# fill the token buffer if it's empty (which indicates a newline)
|
||||
if not tokbuf and not self._fill_tokbuf(tokbuf):
|
||||
return None
|
||||
|
||||
comments: typing.List[LexToken] = []
|
||||
|
||||
# retrieve any comments in the stream right before
|
||||
# the first non-discard element
|
||||
keep_going = True
|
||||
while True:
|
||||
while tokbuf:
|
||||
tok = tokbuf.popleft()
|
||||
if tok.type == "NEWLINE":
|
||||
comments.clear()
|
||||
elif tok.type in ("COMMENT_SINGLELINE", "COMMENT_MULTILINE"):
|
||||
comments.append(tok)
|
||||
else:
|
||||
tokbuf.appendleft(tok)
|
||||
keep_going = False
|
||||
break
|
||||
|
||||
if not keep_going:
|
||||
break
|
||||
|
||||
if not self._fill_tokbuf(tokbuf):
|
||||
break
|
||||
|
||||
if comments:
|
||||
return self._extract_comments(comments)
|
||||
|
||||
return None
|
||||
|
||||
def get_doxygen_after(self) -> typing.Optional[str]:
|
||||
tokbuf = self.tokbuf
|
||||
|
||||
# if there's a newline directly after a statement, we're done
|
||||
if not tokbuf:
|
||||
return None
|
||||
|
||||
# retrieve comments after non-discard elements
|
||||
comments: typing.List[LexToken] = []
|
||||
new_tokbuf = typing.Deque[LexToken]()
|
||||
|
||||
# This is different: we only extract tokens here
|
||||
while tokbuf:
|
||||
tok = tokbuf.popleft()
|
||||
if tok.type == "NEWLINE":
|
||||
break
|
||||
elif tok.type in ("COMMENT_SINGLELINE", "COMMENT_MULTILINE"):
|
||||
comments.append(tok)
|
||||
else:
|
||||
new_tokbuf.append(tok)
|
||||
if comments:
|
||||
break
|
||||
|
||||
new_tokbuf.extend(tokbuf)
|
||||
self.tokbuf = new_tokbuf
|
||||
|
||||
if comments:
|
||||
return self._extract_comments(comments)
|
||||
|
||||
return None
|
||||
|
||||
def _extract_comments(self, comments: typing.List[LexToken]):
|
||||
# Now we have comments, need to extract the text from them
|
||||
comment_lines: typing.List[str] = []
|
||||
for c in comments:
|
||||
text = c.value
|
||||
if c.type == "COMMENT_SINGLELINE":
|
||||
if text.startswith("///") or text.startswith("//!"):
|
||||
comment_lines.append(text.rstrip("\n"))
|
||||
else:
|
||||
if text.startswith("/**") or text.startswith("/*!"):
|
||||
# not sure why, but get double new lines
|
||||
text = text.replace("\n\n", "\n")
|
||||
# strip prefixing whitespace
|
||||
text = _multicomment_re.sub("\n*", text)
|
||||
comment_lines = text.splitlines()
|
||||
|
||||
comment_str = "\n".join(comment_lines)
|
||||
if comment_str:
|
||||
return comment_str
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class BoundedTokenStream(TokenStream):
|
||||
"""
|
||||
Provides tokens from a fixed list of tokens.
|
||||
|
||||
Intended for use when you have a group of tokens that you know
|
||||
must be consumed, such as a paren grouping or some type of
|
||||
lookahead case
|
||||
"""
|
||||
|
||||
def __init__(self, toks: typing.List[LexToken]) -> None:
|
||||
self.tokbuf = typing.Deque[LexToken](toks)
|
||||
|
||||
def has_tokens(self) -> bool:
|
||||
return len(self.tokbuf) > 0
|
||||
|
||||
def _fill_tokbuf(self, tokbuf: typing.Deque[LexToken]) -> bool:
|
||||
from .errors import CxxParseError
|
||||
|
||||
raise CxxParseError("no more tokens left in this group")
|
||||
|
||||
def current_location(self) -> Location:
|
||||
if self.tokbuf:
|
||||
return self.tokbuf[0].location
|
||||
raise ValueError("internal error")
|
||||
|
||||
def get_doxygen(self) -> typing.Optional[str]:
|
||||
# comment tokens aren't going to be in this stream
|
||||
return None
|
||||
|
||||
def get_doxygen_after(self) -> typing.Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
try:
|
||||
lex.runmain(lexer=Lexer(None))
|
||||
lex.runmain(lexer=PlyLexer(None))
|
||||
except EOFError:
|
||||
pass
|
||||
|
@ -4,8 +4,9 @@ import inspect
|
||||
import re
|
||||
import typing
|
||||
|
||||
from . import lexer
|
||||
from .errors import CxxParseError
|
||||
from .lexer import Lexer, LexToken, Location, PhonyEnding
|
||||
from .lexer import LexToken, Location, PhonyEnding
|
||||
from .options import ParserOptions
|
||||
from .parserstate import (
|
||||
ClassBlockState,
|
||||
@ -80,8 +81,7 @@ class CxxParser:
|
||||
self.visitor = visitor
|
||||
self.filename = filename
|
||||
|
||||
self.lex = Lexer(filename)
|
||||
self.lex.input(content)
|
||||
self.lex: lexer.TokenStream = lexer.LexerTokenStream(filename, content)
|
||||
|
||||
global_ns = NamespaceDecl([], False)
|
||||
self.current_namespace = global_ns
|
||||
@ -319,13 +319,13 @@ class CxxParser:
|
||||
|
||||
try:
|
||||
while True:
|
||||
if doxygen is None:
|
||||
doxygen = get_doxygen()
|
||||
|
||||
tok = get_token_eof_ok()
|
||||
if not tok:
|
||||
break
|
||||
|
||||
if doxygen is None:
|
||||
doxygen = get_doxygen()
|
||||
|
||||
fn = _translation_unit_tokens.get(tok.type)
|
||||
if fn:
|
||||
fn(tok, doxygen)
|
||||
@ -619,7 +619,12 @@ class CxxParser:
|
||||
# append a token to make other parsing components happy
|
||||
raw_toks.append(PhonyEnding)
|
||||
|
||||
with self.lex.set_group_of_tokens(raw_toks) as remainder:
|
||||
old_lex = self.lex
|
||||
try:
|
||||
# set up a temporary token stream with the tokens we need to parse
|
||||
tmp_lex = lexer.BoundedTokenStream(raw_toks)
|
||||
self.lex = tmp_lex
|
||||
|
||||
try:
|
||||
parsed_type, mods = self._parse_type(None)
|
||||
if parsed_type is None:
|
||||
@ -631,9 +636,12 @@ class CxxParser:
|
||||
except CxxParseError:
|
||||
dtype = None
|
||||
else:
|
||||
if remainder:
|
||||
if tmp_lex.has_tokens():
|
||||
dtype = None
|
||||
|
||||
finally:
|
||||
self.lex = old_lex
|
||||
|
||||
if self.lex.token_if("ELLIPSIS"):
|
||||
param_pack = True
|
||||
|
||||
@ -948,12 +956,16 @@ class CxxParser:
|
||||
values: typing.List[Enumerator] = []
|
||||
|
||||
while True:
|
||||
doxygen = self.lex.get_doxygen()
|
||||
|
||||
name_tok = self._next_token_must_be("}", "NAME")
|
||||
if name_tok.value == "}":
|
||||
break
|
||||
|
||||
if doxygen is None:
|
||||
doxygen = self.lex.get_doxygen_after()
|
||||
|
||||
name = name_tok.value
|
||||
doxygen = self.lex.get_doxygen()
|
||||
value = None
|
||||
|
||||
tok = self._next_token_must_be("}", ",", "=", "DBL_LBRACKET")
|
||||
@ -1253,7 +1265,7 @@ class CxxParser:
|
||||
|
||||
if doxygen is None:
|
||||
# try checking after the var
|
||||
doxygen = self.lex.get_doxygen()
|
||||
doxygen = self.lex.get_doxygen_after()
|
||||
|
||||
if is_typedef:
|
||||
if not name:
|
||||
|
@ -1,6 +1,6 @@
|
||||
import typing
|
||||
|
||||
from .lexer import LexToken, Lexer
|
||||
from .lexer import LexToken, PlyLexer, LexerTokenStream
|
||||
from .types import Token
|
||||
|
||||
# key: token type, value: (left spacing, right spacing)
|
||||
@ -32,7 +32,7 @@ _want_spacing = {
|
||||
"&": (0, 2),
|
||||
}
|
||||
|
||||
_want_spacing.update(dict.fromkeys(Lexer.keywords, (2, 2)))
|
||||
_want_spacing.update(dict.fromkeys(PlyLexer.keywords, (2, 2)))
|
||||
|
||||
|
||||
def tokfmt(toks: typing.List[Token]) -> str:
|
||||
@ -67,9 +67,9 @@ if __name__ == "__main__": # pragma: no cover
|
||||
parser.add_argument("header")
|
||||
args = parser.parse_args()
|
||||
|
||||
lexer = Lexer(args.header)
|
||||
with open(lexer.filename) as fp:
|
||||
lexer.input(fp.read()) # type: ignore
|
||||
filename: str = args.header
|
||||
with open(filename) as fp:
|
||||
lexer = LexerTokenStream(filename, fp.read())
|
||||
|
||||
toks: typing.List[Token] = []
|
||||
while True:
|
||||
|
@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from cxxheaderparser.lexer import Lexer
|
||||
from cxxheaderparser.lexer import PlyLexer
|
||||
from cxxheaderparser.tokfmt import tokfmt
|
||||
from cxxheaderparser.types import Token
|
||||
|
||||
@ -40,11 +40,11 @@ def test_tokfmt(instr: str) -> None:
|
||||
Each input string is exactly what the output of tokfmt should be
|
||||
"""
|
||||
toks = []
|
||||
lexer = Lexer("")
|
||||
lexer = PlyLexer("")
|
||||
lexer.input(instr)
|
||||
|
||||
while True:
|
||||
tok = lexer.token_eof_ok()
|
||||
tok = lexer.token()
|
||||
if not tok:
|
||||
break
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user