Files
e3-core/e3/sys.py
2019-03-22 10:35:53 +01:00

289 lines
8.5 KiB
Python

from __future__ import absolute_import, division, print_function
import ast
import os
import re
import sys
from enum import Enum
import e3.log
logger = e3.log.getLogger('e3.sys')
class RewriteNodeError(Exception):
pass
class RewriteImportRule(object):
"""Rewrite Import node from AST.
Skip or reject names imported by::
from <module> import <names>
or directly::
import <module>
"""
class RuleAction(Enum):
reject = 0
skip = 1
def __init__(self, module, name='.*', action=None):
"""Initialize the object.
:param module: regexp suitable for re.match() to match against the
module name (note that the string is automatically surrounded by
^ and $)
:type module: str
:param name: regexp suitable for re.match() to match against the names
imported via "from <module> import <names>" (note that the
string is automatically surrounded by ^ and $)
:type name: str
:param action: skip (default) to avoid importing the name, or reject
to raise a RewriteNodeError
:type action: RewriteImportRule.RuleAction
:raise: RewriteNodeError
"""
self.module = module
self.name = name
self.action = action if action is not None else self.RuleAction.skip
def rewrite_node(self, node):
"""Rewrite a node.
:param node: ast node
:return: a modified ast node
"""
check_in_names = None
if isinstance(node, ast.ImportFrom):
# node: ImportFrom(module, names)
# first check whether the module match our rule
if re.match('^' + self.module + '$', node.module):
# then we need to check whether our 'name' is in the
# 'from' list (node.names)
check_in_names = self.name
elif isinstance(node, ast.Import):
# node: Import(names)
# We need to check whether our 'module' appears in the imported
# module names
check_in_names = self.module
if check_in_names is not None:
new_names = []
for _, var in enumerate(node.names):
if re.match('^' + check_in_names + '$', var.name):
if self.action == self.RuleAction.skip:
# don't import this name
pass
elif self.action == self.RuleAction.reject:
raise RewriteNodeError(
'Rejected import found in ast: %s' % ast.dump(
node))
else: # defensive code
raise ValueError('unknown action %s', self.action)
else:
new_names.append(var)
node.names = new_names
return node
class RewriteImportNodeTransformer(ast.NodeTransformer):
"""Walk the AST applying a set of rules.
Currently only the RewriteImportRule are supported.
"""
def __init__(self, rules):
"""Load a set of rules.
:param rules: list of rule objects
:type rules: list[RewriteImportRule]
"""
self.rules = rules
def visit_ImportFrom(self, node):
for rule in self.rules:
node = rule.rewrite_node(node)
return node
def visit_Import(self, node):
for rule in self.rules:
node = rule.rewrite_node(node)
return node
def version():
import pkg_resources
return pkg_resources.get_distribution('e3-core').version
def sanity_check():
"""Sanity check the E3 install."""
errors = 0
print('YAMLCheck:', end=' ')
try:
import yaml
yaml.dump({'Yaml': 'works'})
print('PASSED')
except Exception: # defensive code
print('FAILED')
errors += 1
print('HashlibCheck:', end=' ')
try:
from e3.hash import sha1, md5
sha1(__file__)
md5(__file__)
print('PASSED')
except Exception: # defensive code
print('FAILED')
errors += 1
print('Version:', end=' ')
try:
print(version())
except Exception: # defensive code
errors += 1
return errors
def main():
from e3.env import Env
import e3.main
m = e3.main.Main(platform_args=True)
m.argument_parser.add_argument(
'--platform-info',
choices={'build', 'host', 'target'},
help='Show build/host/target platform info')
m.argument_parser.add_argument(
'--version',
help='Show E3 version',
action='store_true')
m.argument_parser.add_argument(
'--check',
help='Run e3 sanity checking',
action='store_true')
m.parse_args()
if m.args.version:
print(version())
return
elif m.args.check:
errors = sanity_check()
if errors: # defensive code
logger.error('sanity checking failed!')
sys.exit(1)
else:
print('Everything OK!')
return
elif m.args.platform_info:
print(getattr(Env(), m.args.platform_info))
def set_python_env(prefix):
"""Set environment for a Python distribution.
:param prefix: root directory of the python distribution
:type prefix: str
"""
import e3.env
env = e3.env.Env()
if sys.platform == 'win32': # unix: no cover
env.add_path(prefix)
env.add_path(os.path.join(prefix, 'Scripts'))
else:
env.add_path(os.path.join(prefix, 'bin'))
env.add_dll_path(os.path.join(prefix, 'lib'))
def interpreter(prefix=None):
"""Return location of the Python interpreter.
When there are both a python3 and python binary file return the path to
the python3 binary.
:param prefix: root directory of the python distribution. if None location
of the current interpreter is returned
:type prefix: None | str
:return: python executable path
:rtype: str
"""
if prefix is None:
return sys.executable
if sys.platform == 'win32': # unix: no cover
python3 = os.path.join(prefix, 'python3.exe')
if os.path.exists(python3):
return python3
else:
return os.path.join(prefix, 'python.exe')
else: # windows: no cover
python3 = os.path.join(prefix, 'bin', 'python3')
if os.path.exists(python3):
return python3
else:
return os.path.join(prefix, 'bin', 'python')
def python_script(name, prefix=None):
"""Return path to scripts contained in this Python distribution.
:param name: the script name
:type name: str
:param prefix: root directory of the Python distribution. if None the
distribution currently used by this script will be used
:type prefix: None | str
:return: a list that will be the prefix of your command line
:rtype: list[str]
"""
def has_relative_python_shebang(file_script): # unix: no cover
"""Return True if the script contains #!python shebang.
When producing relocatable python distribution we change the shebang
to #!python. In that case prefix the command line with the current
python interpreter.
#!/path/to/python shebang should return false, as we don't need to
return interpreter path.
"""
with open(file_script, 'rb') as f:
content = f.read()
return re.search(b'#!python', content, flags=re.MULTILINE) is not None
if prefix is None:
if sys.platform == 'win32': # unix: no cover
prefix = os.path.dirname(sys.executable)
else:
prefix = os.path.dirname(os.path.dirname(sys.executable))
if sys.platform == 'win32': # unix: no cover
script = os.path.join(prefix, name) \
if os.path.basename(prefix) == 'scripts' \
else os.path.join(prefix, 'Scripts', name)
script_exe = script + '.exe'
if script.endswith('exe') and os.path.isfile(script):
if has_relative_python_shebang(script): # all: no cover
# relocatable python distribution
return [interpreter(prefix), script]
return [script]
elif os.path.isfile(script_exe):
if has_relative_python_shebang(script_exe): # all: no cover
# relocatable python distribution
return [interpreter(prefix), script_exe]
return [script_exe]
else:
return [interpreter(prefix), script]
else:
return [interpreter(prefix), os.path.join(prefix, 'bin', name)]