| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357 |
- # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
- # For details: https://github.com/pylint-dev/pylint/blob/main/LICENSE
- # Copyright (c) https://github.com/pylint-dev/pylint/blob/main/CONTRIBUTORS.txt
- from __future__ import annotations
- import argparse
- import collections
- import contextlib
- import functools
- import os
- import sys
- import tokenize
- import traceback
- import warnings
- from collections import defaultdict
- from collections.abc import Callable, Iterable, Iterator, Sequence
- from io import TextIOWrapper
- from pathlib import Path
- from re import Pattern
- from types import ModuleType
- from typing import Any, Protocol
- import astroid
- import astroid.builder
- import astroid.modutils
- from astroid import nodes
- from pylint import checkers, exceptions, interfaces, reporters
- from pylint.checkers.base_checker import BaseChecker
- from pylint.config.arguments_manager import _ArgumentsManager
- from pylint.constants import (
- MAIN_CHECKER_NAME,
- MSG_TYPES,
- MSG_TYPES_STATUS,
- WarningScope,
- )
- from pylint.interfaces import HIGH
- from pylint.lint.base_options import _make_linter_options
- from pylint.lint.caching import load_results, save_results
- from pylint.lint.expand_modules import (
- _is_ignored_file,
- discover_package_path,
- expand_modules,
- )
- from pylint.lint.message_state_handler import _MessageStateHandler
- from pylint.lint.parallel import check_parallel
- from pylint.lint.report_functions import (
- report_messages_by_module_stats,
- report_messages_stats,
- report_total_messages_stats,
- )
- from pylint.lint.utils import (
- augmented_sys_path,
- get_fatal_error_message,
- prepare_crash_report,
- )
- from pylint.message import Message, MessageDefinition, MessageDefinitionStore
- from pylint.reporters.base_reporter import BaseReporter
- from pylint.reporters.progress_reporters import ProgressReporter
- from pylint.reporters.text import ColorizedTextReporter, TextReporter
- from pylint.reporters.ureports import nodes as report_nodes
- from pylint.typing import (
- DirectoryNamespaceDict,
- FileItem,
- ManagedMessage,
- MessageDefinitionTuple,
- MessageLocationTuple,
- ModuleDescriptionDict,
- Options,
- )
- from pylint.utils import ASTWalker, FileState, LinterStats, utils
- MANAGER = astroid.MANAGER
- class GetAstProtocol(Protocol):
- def __call__(
- self, filepath: str, modname: str, data: str | None = None
- ) -> nodes.Module: ...
- def _read_stdin() -> str:
- # See https://github.com/python/typeshed/pull/5623 for rationale behind assertion
- assert isinstance(sys.stdin, TextIOWrapper)
- sys.stdin = TextIOWrapper(sys.stdin.detach(), encoding="utf-8")
- return sys.stdin.read()
- def _load_reporter_by_class(reporter_class: str) -> type[BaseReporter]:
- qname = reporter_class
- module_part = astroid.modutils.get_module_part(qname)
- module = astroid.modutils.load_module_from_name(module_part)
- class_name = qname.split(".")[-1]
- klass = getattr(module, class_name)
- assert issubclass(klass, BaseReporter), f"{klass} is not a BaseReporter"
- return klass # type: ignore[no-any-return]
- # Python Linter class #########################################################
- # pylint: disable-next=consider-using-namedtuple-or-dataclass
- MSGS: dict[str, MessageDefinitionTuple] = {
- "F0001": (
- "%s",
- "fatal",
- "Used when an error occurred preventing the analysis of a \
- module (unable to find it for instance).",
- {"scope": WarningScope.LINE},
- ),
- "F0002": (
- "%s: %s",
- "astroid-error",
- "Used when an unexpected error occurred while building the "
- "Astroid representation. This is usually accompanied by a "
- "traceback. Please report such errors !",
- {"scope": WarningScope.LINE},
- ),
- "F0010": (
- "error while code parsing: %s",
- "parse-error",
- "Used when an exception occurred while building the Astroid "
- "representation which could be handled by astroid.",
- {"scope": WarningScope.LINE},
- ),
- "F0011": (
- "error while parsing the configuration: %s",
- "config-parse-error",
- "Used when an exception occurred while parsing a pylint configuration file.",
- {"scope": WarningScope.LINE},
- ),
- "I0001": (
- "Unable to run raw checkers on built-in module %s",
- "raw-checker-failed",
- "Used to inform that a built-in module has not been checked "
- "using the raw checkers.",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0010": (
- "Unable to consider inline option %r",
- "bad-inline-option",
- "Used when an inline option is either badly formatted or can't "
- "be used inside modules.",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0011": (
- "Locally disabling %s (%s)",
- "locally-disabled",
- "Used when an inline option disables a message or a messages category.",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0013": (
- "Ignoring entire file",
- "file-ignored",
- "Used to inform that the file will not be checked",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0020": (
- "Suppressed %s (from line %d)",
- "suppressed-message",
- "A message was triggered on a line, but suppressed explicitly "
- "by a disable= comment in the file. This message is not "
- "generated for messages that are ignored due to configuration "
- "settings.",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0021": (
- "Useless suppression of %s",
- "useless-suppression",
- "Reported when a message is explicitly disabled for a line or "
- "a block of code, but never triggered.",
- {
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "I0022": (
- 'Pragma "%s" is deprecated, use "%s" instead',
- "deprecated-pragma",
- "Some inline pylint options have been renamed or reworked, "
- "only the most recent form should be used. "
- "NOTE:skip-all is only available with pylint >= 0.26",
- {
- "old_names": [("I0014", "deprecated-disable-all")],
- "scope": WarningScope.LINE,
- "default_enabled": False,
- },
- ),
- "E0001": (
- "%s",
- "syntax-error",
- "Used when a syntax error is raised for a module.",
- {"scope": WarningScope.LINE},
- ),
- "E0011": (
- "Unrecognized file option %r",
- "unrecognized-inline-option",
- "Used when an unknown inline option is encountered.",
- {"scope": WarningScope.LINE},
- ),
- "W0012": (
- "Unknown option value for '%s', expected a valid pylint message and got '%s'",
- "unknown-option-value",
- "Used when an unknown value is encountered for an option.",
- {
- "scope": WarningScope.LINE,
- "old_names": [("E0012", "bad-option-value")],
- },
- ),
- "R0022": (
- "Useless option value for '%s', %s",
- "useless-option-value",
- "Used when a value for an option that is now deleted from pylint"
- " is encountered.",
- {
- "scope": WarningScope.LINE,
- "old_names": [("E0012", "bad-option-value")],
- },
- ),
- "E0013": (
- "Plugin '%s' is impossible to load, is it installed ? ('%s')",
- "bad-plugin-value",
- "Used when a bad value is used in 'load-plugins'.",
- {"scope": WarningScope.LINE},
- ),
- "E0014": (
- "Out-of-place setting encountered in top level configuration-section '%s' : '%s'",
- "bad-configuration-section",
- "Used when we detect a setting in the top level of a toml configuration that"
- " shouldn't be there.",
- {"scope": WarningScope.LINE},
- ),
- "E0015": (
- "Unrecognized option found: %s",
- "unrecognized-option",
- "Used when we detect an option that we do not recognize.",
- {"scope": WarningScope.LINE},
- ),
- }
- # pylint: disable=too-many-instance-attributes,too-many-public-methods
- class PyLinter(
- _ArgumentsManager,
- _MessageStateHandler,
- reporters.ReportsHandlerMixIn,
- checkers.BaseChecker,
- ):
- """Lint Python modules using external checkers.
- This is the main checker controlling the other ones and the reports
- generation. It is itself both a raw checker and an astroid checker in order
- to:
- * handle message activation / deactivation at the module level
- * handle some basic but necessary stats' data (number of classes, methods...)
- IDE plugin developers: you may have to call
- `astroid.MANAGER.clear_cache()` across runs if you want
- to ensure the latest code version is actually checked.
- This class needs to support pickling for parallel linting to work. The exception
- is reporter member; see check_parallel function for more details.
- """
- name = MAIN_CHECKER_NAME
- msgs = MSGS
- # Will be used like this : datetime.now().strftime(crash_file_path)
- crash_file_path: str = "pylint-crash-%Y-%m-%d-%H-%M-%S.txt"
- option_groups_descs = {
- "Messages control": "Options controlling analysis messages",
- "Reports": "Options related to output formatting and reporting",
- }
- def __init__(
- self,
- options: Options = (),
- reporter: reporters.BaseReporter | reporters.MultiReporter | None = None,
- option_groups: tuple[tuple[str, str], ...] = (),
- pylintrc: str | None = None,
- ) -> None:
- _ArgumentsManager.__init__(self, prog="pylint")
- _MessageStateHandler.__init__(self, self)
- if pylintrc is not None:
- warnings.warn(
- "The pylintrc argument will be removed in pylint 5.0.",
- DeprecationWarning,
- stacklevel=2,
- )
- # Some stuff has to be done before initialization of other ancestors...
- # messages store / checkers / reporter / astroid manager
- # Attributes for reporters
- self.reporter: reporters.BaseReporter | reporters.MultiReporter
- if reporter:
- self.set_reporter(reporter)
- else:
- self.set_reporter(TextReporter())
- self._reporters: dict[str, type[reporters.BaseReporter]] = {}
- """Dictionary of possible but non-initialized reporters."""
- # Attributes for checkers and plugins
- self._checkers: defaultdict[str, list[checkers.BaseChecker]] = (
- collections.defaultdict(list)
- )
- """Dictionary of registered and initialized checkers."""
- self._dynamic_plugins: dict[str, ModuleType | ModuleNotFoundError | bool] = {}
- """Set of loaded plugin names."""
- # Attributes related to stats
- self.stats = LinterStats()
- # Attributes related to (command-line) options and their parsing
- self.options: Options = options + _make_linter_options(self)
- for opt_group in option_groups:
- self.option_groups_descs[opt_group[0]] = opt_group[1]
- self._option_groups: tuple[tuple[str, str], ...] = (
- *option_groups,
- *PyLinter.option_groups_descs.items(),
- )
- self.fail_on_symbols: list[str] = []
- """List of message symbols on which pylint should fail, set by --fail-on."""
- self._error_mode = False
- reporters.ReportsHandlerMixIn.__init__(self)
- checkers.BaseChecker.__init__(self, self)
- # provided reports
- self.reports = (
- ("RP0001", "Messages by category", report_total_messages_stats),
- (
- "RP0002",
- "% errors / warnings by module",
- report_messages_by_module_stats,
- ),
- ("RP0003", "Messages", report_messages_stats),
- )
- # Attributes related to registering messages and their handling
- self.msgs_store = MessageDefinitionStore(self.config.py_version)
- self.msg_status = 0
- self._by_id_managed_msgs: list[ManagedMessage] = []
- # Attributes related to visiting files
- self.file_state = FileState("", self.msgs_store, is_base_filestate=True)
- self.current_name: str = ""
- self.current_file: str | None = None
- self._ignore_file = False
- self._ignore_paths: list[Pattern[str]] = []
- self.verbose = False
- self.register_checker(self)
- def load_default_plugins(self) -> None:
- checkers.initialize(self)
- reporters.initialize(self)
- def load_plugin_modules(self, modnames: Iterable[str], force: bool = False) -> None:
- """Check a list of pylint plugins modules, load and register them.
- If a module cannot be loaded, never try to load it again and instead
- store the error message for later use in ``load_plugin_configuration``
- below.
- If `force` is True (useful when multiprocessing), then the plugin is
- reloaded regardless if an entry exists in self._dynamic_plugins.
- """
- for modname in modnames:
- if modname in self._dynamic_plugins and not force:
- continue
- try:
- module = astroid.modutils.load_module_from_name(modname)
- module.register(self)
- self._dynamic_plugins[modname] = module
- except ModuleNotFoundError as mnf_e:
- self._dynamic_plugins[modname] = mnf_e
- def load_plugin_configuration(self) -> None:
- """Call the configuration hook for plugins.
- This walks through the list of plugins, grabs the "load_configuration"
- hook, if exposed, and calls it to allow plugins to configure specific
- settings.
- The result of attempting to load the plugin of the given name
- is stored in the dynamic plugins dictionary in ``load_plugin_modules`` above.
- ..note::
- This function previously always tried to load modules again, which
- led to some confusion and silent failure conditions as described
- in GitHub issue #7264. Making it use the stored result is more efficient, and
- means that we avoid the ``init-hook`` problems from before.
- """
- for modname, module_or_error in self._dynamic_plugins.items():
- if isinstance(module_or_error, ModuleNotFoundError):
- self.add_message(
- "bad-plugin-value", args=(modname, module_or_error), line=0
- )
- elif hasattr(module_or_error, "load_configuration"):
- module_or_error.load_configuration(self)
- # We re-set all the dictionary values to True here to make sure the dict
- # is pickle-able. This is only a problem in multiprocessing/parallel mode.
- # (e.g. invoking pylint -j 2)
- self._dynamic_plugins = {
- modname: not isinstance(val, ModuleNotFoundError)
- for modname, val in self._dynamic_plugins.items()
- }
- def _load_reporters(self, reporter_names: str) -> None:
- """Load the reporters if they are available on _reporters."""
- if not self._reporters:
- return
- sub_reporters = []
- output_files = []
- with contextlib.ExitStack() as stack:
- for reporter_name in reporter_names.split(","):
- reporter_name, *reporter_output = reporter_name.split(":", 1)
- reporter = self._load_reporter_by_name(reporter_name)
- sub_reporters.append(reporter)
- if reporter_output:
- output_file = stack.enter_context(
- open(reporter_output[0], "w", encoding="utf-8")
- )
- reporter.out = output_file
- output_files.append(output_file)
- # Extend the lifetime of all opened output files
- close_output_files = stack.pop_all().close
- if len(sub_reporters) > 1 or output_files:
- self.set_reporter(
- reporters.MultiReporter(
- sub_reporters,
- close_output_files,
- )
- )
- else:
- self.set_reporter(sub_reporters[0])
- def _load_reporter_by_name(self, reporter_name: str) -> reporters.BaseReporter:
- name = reporter_name.lower()
- if name in self._reporters:
- return self._reporters[name]()
- try:
- reporter_class = _load_reporter_by_class(reporter_name)
- except (ImportError, AttributeError, AssertionError) as e:
- raise exceptions.InvalidReporterError(name) from e
- return reporter_class()
- def set_reporter(
- self, reporter: reporters.BaseReporter | reporters.MultiReporter
- ) -> None:
- """Set the reporter used to display messages and reports."""
- self.reporter = reporter
- reporter.linter = self
- def register_reporter(self, reporter_class: type[reporters.BaseReporter]) -> None:
- """Registers a reporter class on the _reporters attribute."""
- self._reporters[reporter_class.name] = reporter_class
- def report_order(self) -> list[BaseChecker]:
- reports = sorted(self._reports, key=lambda x: getattr(x, "name", ""))
- try:
- # Remove the current reporter and add it
- # at the end of the list.
- reports.pop(reports.index(self))
- except ValueError:
- pass
- else:
- reports.append(self)
- return reports
- # checkers manipulation methods ############################################
- def register_checker(self, checker: checkers.BaseChecker) -> None:
- """This method auto registers the checker."""
- self._checkers[checker.name].append(checker)
- for r_id, r_title, r_cb in checker.reports:
- self.register_report(r_id, r_title, r_cb, checker)
- if hasattr(checker, "msgs"):
- self.msgs_store.register_messages_from_checker(checker)
- for message in checker.messages:
- if not message.default_enabled:
- self.disable(message.msgid)
- # Register the checker, but disable all of its messages.
- if not getattr(checker, "enabled", True):
- self.disable(checker.name)
- def enable_fail_on_messages(self) -> None:
- """Enable 'fail on' msgs.
- Convert values in config.fail_on (which might be msg category, msg id,
- or symbol) to specific msgs, then enable and flag them for later.
- """
- fail_on_vals = self.config.fail_on
- if not fail_on_vals:
- return
- fail_on_cats = set()
- fail_on_msgs = set()
- for val in fail_on_vals:
- # If value is a category, add category, else add message
- if val in MSG_TYPES:
- fail_on_cats.add(val)
- else:
- fail_on_msgs.add(val)
- # For every message in every checker, if cat or msg flagged, enable check
- for all_checkers in self._checkers.values():
- for checker in all_checkers:
- for msg in checker.messages:
- if msg.msgid in fail_on_msgs or msg.symbol in fail_on_msgs:
- # message id/symbol matched, enable and flag it
- self.enable(msg.msgid)
- self.fail_on_symbols.append(msg.symbol)
- elif msg.msgid[0] in fail_on_cats:
- # message starts with a category value, flag (but do not enable) it
- self.fail_on_symbols.append(msg.symbol)
- def any_fail_on_issues(self) -> bool:
- return any(x in self.fail_on_symbols for x in self.stats.by_msg.keys())
- def pass_fail_on_config_to_color_reporter(self) -> None:
- """Pass fail_on symbol configuration to colorized text reporter."""
- if isinstance(self.reporter, ColorizedTextReporter):
- self.reporter.set_fail_on_symbols(self.fail_on_symbols)
- elif isinstance(self.reporter, reporters.MultiReporter):
- for reporter in self.reporter._sub_reporters:
- if isinstance(reporter, ColorizedTextReporter):
- reporter.set_fail_on_symbols(self.fail_on_symbols)
- def disable_reporters(self) -> None:
- """Disable all reporters."""
- for _reporters in self._reports.values():
- for report_id, _, _ in _reporters:
- self.disable_report(report_id)
- def _parse_error_mode(self) -> None:
- """Parse the current state of the error mode.
- Error mode: enable only errors; no reports, no persistent.
- """
- if not self._error_mode:
- return
- self.disable_noerror_messages()
- self.disable("miscellaneous")
- self.set_option("reports", False)
- self.set_option("persistent", False)
- self.set_option("score", False)
- # code checking methods ###################################################
- def get_checkers(self) -> list[BaseChecker]:
- """Return all available checkers as an ordered list."""
- return sorted(c for _checkers in self._checkers.values() for c in _checkers)
- def get_checker_names(self) -> list[str]:
- """Get all the checker names that this linter knows about."""
- return sorted(
- {
- checker.name
- for checker in self.get_checkers()
- if checker.name != MAIN_CHECKER_NAME
- }
- )
- def prepare_checkers(self) -> list[BaseChecker]:
- """Return checkers needed for activated messages and reports."""
- if not self.config.reports:
- self.disable_reporters()
- # get needed checkers
- needed_checkers: list[BaseChecker] = [self]
- for checker in self.get_checkers()[1:]:
- messages = {msg for msg in checker.msgs if self.is_message_enabled(msg)}
- if messages or any(self.report_is_enabled(r[0]) for r in checker.reports):
- needed_checkers.append(checker)
- return needed_checkers
- # pylint: disable=unused-argument
- @staticmethod
- def should_analyze_file(modname: str, path: str, is_argument: bool = False) -> bool:
- """Returns whether a module should be checked.
- This implementation returns True for all python source files (.py and .pyi),
- indicating that all files should be linted.
- Subclasses may override this method to indicate that modules satisfying
- certain conditions should not be linted.
- :param str modname: The name of the module to be checked.
- :param str path: The full path to the source code of the module.
- :param bool is_argument: Whether the file is an argument to pylint or not.
- Files which respect this property are always
- checked, since the user requested it explicitly.
- :returns: True if the module should be checked.
- """
- if is_argument:
- return True
- return path.endswith((".py", ".pyi"))
- # pylint: enable=unused-argument
- def initialize(self) -> None:
- """Initialize linter for linting.
- This method is called before any linting is done.
- """
- self._ignore_paths = self.config.ignore_paths
- # initialize msgs_state now that all messages have been registered into
- # the store
- for msg in self.msgs_store.messages:
- if not msg.may_be_emitted(self.config.py_version):
- self._msgs_state[msg.msgid] = False
- def _discover_files(self, files_or_modules: Sequence[str]) -> Iterator[str]:
- """Discover python modules and packages in sub-directory.
- Returns iterator of paths to discovered modules and packages.
- """
- for something in files_or_modules:
- if os.path.isdir(something) and not os.path.isfile(
- os.path.join(something, "__init__.py")
- ):
- skip_subtrees: list[str] = []
- for root, _, files in os.walk(something):
- if any(root.startswith(s) for s in skip_subtrees):
- # Skip subtree of already discovered package.
- continue
- if _is_ignored_file(
- root,
- self.config.ignore,
- self.config.ignore_patterns,
- self.config.ignore_paths,
- ):
- skip_subtrees.append(root)
- continue
- if "__init__.py" in files:
- skip_subtrees.append(root)
- yield root
- else:
- yield from (
- os.path.join(root, file)
- for file in files
- if file.endswith((".py", ".pyi"))
- )
- else:
- yield something
- def check(self, files_or_modules: Sequence[str]) -> None:
- """Main checking entry: check a list of files or modules from their name.
- files_or_modules is either a string or list of strings presenting modules to check.
- """
- self.initialize()
- if self.config.recursive:
- files_or_modules = tuple(self._discover_files(files_or_modules))
- if self.config.from_stdin:
- if len(files_or_modules) != 1:
- raise exceptions.InvalidArgsError(
- "Missing filename required for --from-stdin"
- )
- extra_packages_paths = list(
- dict.fromkeys(
- [
- discover_package_path(file_or_module, self.config.source_roots)
- for file_or_module in files_or_modules
- ]
- ).keys()
- )
- # TODO: Move the parallel invocation into step 3 of the checking process
- if not self.config.from_stdin and self.config.jobs > 1:
- original_sys_path = sys.path[:]
- check_parallel(
- self,
- self.config.jobs,
- self._iterate_file_descrs(files_or_modules),
- extra_packages_paths,
- )
- sys.path = original_sys_path
- return
- progress_reporter = ProgressReporter(self.verbose)
- # 1) Get all FileItems
- with augmented_sys_path(extra_packages_paths):
- if self.config.from_stdin:
- fileitems = self._get_file_descr_from_stdin(files_or_modules[0])
- data: str | None = _read_stdin()
- else:
- fileitems = self._iterate_file_descrs(files_or_modules)
- data = None
- # The contextmanager also opens all checkers and sets up the PyLinter class
- with augmented_sys_path(extra_packages_paths):
- with self._astroid_module_checker() as check_astroid_module:
- # 2) Get the AST for each FileItem
- ast_per_fileitem = self._get_asts(fileitems, data, progress_reporter)
- # 3) Lint each ast
- self._lint_files(
- ast_per_fileitem, check_astroid_module, progress_reporter
- )
- def _get_asts(
- self,
- fileitems: Iterator[FileItem],
- data: str | None,
- progress_reporter: ProgressReporter,
- ) -> dict[FileItem, nodes.Module | None]:
- """Get the AST for all given FileItems."""
- ast_per_fileitem: dict[FileItem, nodes.Module | None] = {}
- progress_reporter.start_get_asts()
- for fileitem in fileitems:
- progress_reporter.get_ast_for_file(fileitem.filepath)
- self.set_current_module(fileitem.name, fileitem.filepath)
- try:
- ast_per_fileitem[fileitem] = self.get_ast(
- fileitem.filepath, fileitem.name, data
- )
- except astroid.AstroidBuildingError as ex:
- template_path = prepare_crash_report(
- ex, fileitem.filepath, self.crash_file_path
- )
- msg = get_fatal_error_message(fileitem.filepath, template_path)
- self.add_message(
- "astroid-error",
- args=(fileitem.filepath, msg),
- confidence=HIGH,
- )
- return ast_per_fileitem
- def check_single_file_item(self, file: FileItem) -> None:
- """Check single file item.
- The arguments are the same that are documented in _check_files
- initialize() should be called before calling this method
- """
- with self._astroid_module_checker() as check_astroid_module:
- self._check_file(self.get_ast, check_astroid_module, file)
- def _lint_files(
- self,
- ast_mapping: dict[FileItem, nodes.Module | None],
- check_astroid_module: Callable[[nodes.Module], bool | None],
- progress_reporter: ProgressReporter,
- ) -> None:
- """Lint all AST modules from a mapping.."""
- progress_reporter.start_linting()
- for fileitem, module in ast_mapping.items():
- progress_reporter.lint_file(fileitem.filepath)
- if module is None:
- continue
- try:
- self._lint_file(fileitem, module, check_astroid_module)
- self.stats.modules_names.add(fileitem.filepath)
- except Exception as ex: # pylint: disable=broad-except
- template_path = prepare_crash_report(
- ex, fileitem.filepath, self.crash_file_path
- )
- msg = get_fatal_error_message(fileitem.filepath, template_path)
- if isinstance(ex, astroid.AstroidError):
- self.add_message(
- "astroid-error", args=(fileitem.filepath, msg), confidence=HIGH
- )
- else:
- self.add_message("fatal", args=msg, confidence=HIGH)
- def _lint_file(
- self,
- file: FileItem,
- module: nodes.Module,
- check_astroid_module: Callable[[nodes.Module], bool | None],
- ) -> None:
- """Lint a file using the passed utility function check_astroid_module).
- :param FileItem file: data about the file
- :param nodes.Module module: the ast module to lint
- :param Callable check_astroid_module: callable checking an AST taking the following
- arguments
- - ast: AST of the module
- :raises AstroidError: for any failures stemming from astroid
- """
- self.set_current_module(file.name, file.filepath)
- self._ignore_file = False
- self.file_state = FileState(file.modpath, self.msgs_store, module)
- # fix the current file (if the source file was not available or
- # if it's actually a c extension)
- self.current_file = module.file
- try:
- check_astroid_module(module)
- except Exception as e:
- raise astroid.AstroidError from e
- # warn about spurious inline messages handling
- spurious_messages = self.file_state.iter_spurious_suppression_messages(
- self.msgs_store
- )
- for msgid, line, args in spurious_messages:
- self.add_message(msgid, line, None, args)
- def _check_file(
- self,
- get_ast: GetAstProtocol,
- check_astroid_module: Callable[[nodes.Module], bool | None],
- file: FileItem,
- ) -> None:
- """Check a file using the passed utility functions (get_ast and
- check_astroid_module).
- :param callable get_ast: callable returning AST from defined file taking the
- following arguments
- - filepath: path to the file to check
- - name: Python module name
- :param callable check_astroid_module: callable checking an AST taking the following
- arguments
- - ast: AST of the module
- :param FileItem file: data about the file
- :raises AstroidError: for any failures stemming from astroid
- """
- self.set_current_module(file.name, file.filepath)
- # get the module representation
- ast_node = get_ast(file.filepath, file.name)
- if ast_node is None:
- return
- self._ignore_file = False
- self.file_state = FileState(file.modpath, self.msgs_store, ast_node)
- # fix the current file (if the source file was not available or
- # if it's actually a c extension)
- self.current_file = ast_node.file
- try:
- check_astroid_module(ast_node)
- except Exception as e: # pragma: no cover
- raise astroid.AstroidError from e
- # warn about spurious inline messages handling
- spurious_messages = self.file_state.iter_spurious_suppression_messages(
- self.msgs_store
- )
- for msgid, line, args in spurious_messages:
- self.add_message(msgid, line, None, args)
- def _get_file_descr_from_stdin(self, filepath: str) -> Iterator[FileItem]:
- """Return file description (tuple of module name, file path, base name) from
- given file path.
- This method is used for creating suitable file description for _check_files when the
- source is standard input.
- """
- if _is_ignored_file(
- filepath,
- self.config.ignore,
- self.config.ignore_patterns,
- self.config.ignore_paths,
- ):
- self.stats.skipped += 1
- return
- try:
- # Note that this function does not really perform an
- # __import__ but may raise an ImportError exception, which
- # we want to catch here.
- modname = ".".join(astroid.modutils.modpath_from_file(filepath))
- except ImportError:
- modname = os.path.splitext(os.path.basename(filepath))[0]
- yield FileItem(modname, filepath, filepath)
- def _iterate_file_descrs(
- self, files_or_modules: Sequence[str]
- ) -> Iterator[FileItem]:
- """Return generator yielding file descriptions (tuples of module name, file
- path, base name).
- The returned generator yield one item for each Python module that should be linted.
- """
- for descr in self._expand_files(files_or_modules).values():
- name, filepath, is_arg = descr["name"], descr["path"], descr["isarg"]
- if descr["isignored"]:
- self.stats.skipped += 1
- elif self.should_analyze_file(name, filepath, is_argument=is_arg):
- yield FileItem(name, filepath, descr["basename"])
- def _expand_files(
- self, files_or_modules: Sequence[str]
- ) -> dict[str, ModuleDescriptionDict]:
- """Get modules and errors from a list of modules and handle errors."""
- result, errors = expand_modules(
- files_or_modules,
- self.config.source_roots,
- self.config.ignore,
- self.config.ignore_patterns,
- self._ignore_paths,
- )
- for error in errors:
- message = modname = error["mod"]
- key = error["key"]
- self.set_current_module(modname)
- if key == "fatal":
- message = str(error["ex"]).replace(os.getcwd() + os.sep, "")
- self.add_message(key, args=message)
- return result
- def set_current_module(self, modname: str, filepath: str | None = None) -> None:
- """Set the name of the currently analyzed module and
- init statistics for it.
- """
- if not modname and filepath is None:
- return
- self.reporter.on_set_current_module(modname or "", filepath)
- self.current_name = modname
- self.current_file = filepath or modname
- self.stats.init_single_module(modname or "")
- # If there is an actual filepath we might need to update the config attribute
- if filepath:
- namespace = self._get_namespace_for_file(
- Path(filepath), self._directory_namespaces
- )
- if namespace:
- self.config = namespace or self._base_config
- def _get_namespace_for_file(
- self, filepath: Path, namespaces: DirectoryNamespaceDict
- ) -> argparse.Namespace | None:
- for directory in namespaces:
- if Path.is_relative_to(filepath, directory):
- namespace = self._get_namespace_for_file(
- filepath, namespaces[directory][1]
- )
- if namespace is None:
- return namespaces[directory][0]
- return None
- @contextlib.contextmanager
- def _astroid_module_checker(
- self,
- ) -> Iterator[Callable[[nodes.Module], bool | None]]:
- """Context manager for checking ASTs.
- The value in the context is callable accepting AST as its only argument.
- """
- walker = ASTWalker(self)
- _checkers = self.prepare_checkers()
- tokencheckers = [
- c for c in _checkers if isinstance(c, checkers.BaseTokenChecker)
- ]
- rawcheckers = [
- c for c in _checkers if isinstance(c, checkers.BaseRawFileChecker)
- ]
- for checker in _checkers:
- checker.open()
- walker.add_checker(checker)
- yield functools.partial(
- self.check_astroid_module,
- walker=walker,
- tokencheckers=tokencheckers,
- rawcheckers=rawcheckers,
- )
- # notify global end
- self.stats.statement = walker.nbstatements
- for checker in reversed(_checkers):
- checker.close()
- def get_ast(
- self, filepath: str, modname: str, data: str | None = None
- ) -> nodes.Module | None:
- """Return an ast(roid) representation of a module or a string.
- :param filepath: path to checked file.
- :param str modname: The name of the module to be checked.
- :param str data: optional contents of the checked file.
- :returns: the AST
- :rtype: astroid.nodes.Module
- :raises AstroidBuildingError: Whenever we encounter an unexpected exception
- """
- try:
- if data is None:
- return MANAGER.ast_from_file(filepath, modname, source=True)
- return astroid.builder.AstroidBuilder(MANAGER).string_build(
- data, modname, filepath
- )
- except astroid.AstroidSyntaxError as ex:
- line = getattr(ex.error, "lineno", None)
- if line is None:
- line = 0
- self.add_message(
- "syntax-error",
- line=line,
- col_offset=getattr(ex.error, "offset", None),
- args=f"Parsing failed: '{ex.error}'",
- confidence=HIGH,
- )
- except astroid.AstroidBuildingError as ex:
- self.add_message("parse-error", args=ex)
- except Exception as ex:
- traceback.print_exc()
- # We raise BuildingError here as this is essentially an astroid issue
- # Creating an issue template and adding the 'astroid-error' message is handled
- # by caller: _check_files
- raise astroid.AstroidBuildingError(
- "Building error when trying to create ast representation of module '{modname}'",
- modname=modname,
- ) from ex
- return None
- def check_astroid_module(
- self,
- ast_node: nodes.Module,
- walker: ASTWalker,
- rawcheckers: list[checkers.BaseRawFileChecker],
- tokencheckers: list[checkers.BaseTokenChecker],
- ) -> bool | None:
- """Check a module from its astroid representation.
- For return value see _check_astroid_module
- """
- before_check_statements = walker.nbstatements
- retval = self._check_astroid_module(
- ast_node, walker, rawcheckers, tokencheckers
- )
- self.stats.by_module[self.current_name]["statement"] = (
- walker.nbstatements - before_check_statements
- )
- return retval
- def _check_astroid_module(
- self,
- node: nodes.Module,
- walker: ASTWalker,
- rawcheckers: list[checkers.BaseRawFileChecker],
- tokencheckers: list[checkers.BaseTokenChecker],
- ) -> bool | None:
- """Check given AST node with given walker and checkers.
- :param astroid.nodes.Module node: AST node of the module to check
- :param pylint.utils.ast_walker.ASTWalker walker: AST walker
- :param list rawcheckers: List of token checkers to use
- :param list tokencheckers: List of raw checkers to use
- :returns: True if the module was checked, False if ignored,
- None if the module contents could not be parsed
- """
- try:
- tokens = utils.tokenize_module(node)
- except tokenize.TokenError as ex:
- self.add_message(
- "syntax-error",
- line=ex.args[1][0],
- col_offset=ex.args[1][1],
- args=ex.args[0],
- confidence=HIGH,
- )
- return None
- if not node.pure_python:
- self.add_message("raw-checker-failed", args=node.name)
- else:
- # assert astroid.file.endswith('.py')
- # Parse module/block level option pragma's
- self.process_tokens(tokens)
- if self._ignore_file:
- return False
- # run raw and tokens checkers
- for raw_checker in rawcheckers:
- raw_checker.process_module(node)
- for token_checker in tokencheckers:
- token_checker.process_tokens(tokens)
- # generate events to astroid checkers
- walker.walk(node)
- return True
- def open(self) -> None:
- """Initialize counters."""
- MANAGER.always_load_extensions = self.config.unsafe_load_any_extension
- MANAGER.max_inferable_values = self.config.limit_inference_results
- MANAGER.extension_package_whitelist.update(self.config.extension_pkg_allow_list)
- MANAGER.module_denylist.update(self.config.ignored_modules)
- MANAGER.prefer_stubs = self.config.prefer_stubs
- if self.config.extension_pkg_whitelist:
- MANAGER.extension_package_whitelist.update(
- self.config.extension_pkg_whitelist
- )
- self.stats.reset_message_count()
- def generate_reports(self, verbose: bool = False) -> int | None:
- """Close the whole package /module, it's time to make reports !
- if persistent run, pickle results for later comparison
- """
- # Display whatever messages are left on the reporter.
- self.reporter.display_messages(report_nodes.Section())
- if not self.file_state._is_base_filestate:
- # load previous results if any
- previous_stats = load_results(self.file_state.base_name)
- self.reporter.on_close(self.stats, previous_stats)
- if self.config.reports:
- sect = self.make_reports(self.stats, previous_stats)
- else:
- sect = report_nodes.Section()
- if self.config.reports:
- self.reporter.display_reports(sect)
- score_value = self._report_evaluation(verbose)
- # save results if persistent run
- if self.config.persistent:
- save_results(self.stats, self.file_state.base_name)
- else:
- self.reporter.on_close(self.stats, LinterStats())
- score_value = None
- return score_value
- def _report_evaluation(self, verbose: bool = False) -> int | None:
- """Make the global evaluation report."""
- # check with at least a statement (usually 0 when there is a
- # syntax error preventing pylint from further processing)
- note = None
- previous_stats = load_results(self.file_state.base_name)
- if self.stats.statement == 0:
- return note
- # get a global note for the code
- evaluation = self.config.evaluation
- try:
- stats_dict = {
- "fatal": self.stats.fatal,
- "error": self.stats.error,
- "warning": self.stats.warning,
- "refactor": self.stats.refactor,
- "convention": self.stats.convention,
- "statement": self.stats.statement,
- "info": self.stats.info,
- }
- note = eval(evaluation, {}, stats_dict) # pylint: disable=eval-used
- except Exception as ex: # pylint: disable=broad-except
- msg = f"An exception occurred while rating: {ex}"
- else:
- self.stats.global_note = note
- msg = f"Your code has been rated at {note:.2f}/10"
- if previous_stats:
- pnote = previous_stats.global_note
- if pnote is not None:
- msg += f" (previous run: {pnote:.2f}/10, {note - pnote:+.2f})"
- if verbose:
- checked_files_count = self.stats.node_count["module"]
- unchecked_files_count = self.stats.undocumented["module"]
- checked_files = ", ".join(self.stats.modules_names)
- msg += (
- f"\nChecked {checked_files_count} files/modules ({checked_files}),"
- f" skipped {unchecked_files_count} files/modules"
- )
- if self.config.score:
- sect = report_nodes.EvaluationSection(msg)
- self.reporter.display_reports(sect)
- return note
- def _add_one_message(
- self,
- message_definition: MessageDefinition,
- line: int | None,
- node: nodes.NodeNG | None,
- args: Any | None,
- confidence: interfaces.Confidence | None,
- col_offset: int | None,
- end_lineno: int | None,
- end_col_offset: int | None,
- ) -> None:
- """After various checks have passed a single Message is
- passed to the reporter and added to stats.
- """
- message_definition.check_message_definition(line, node)
- # Look up "location" data of node if not yet supplied
- if node:
- if node.position:
- if not line:
- line = node.position.lineno
- if not col_offset:
- col_offset = node.position.col_offset
- if not end_lineno:
- end_lineno = node.position.end_lineno
- if not end_col_offset:
- end_col_offset = node.position.end_col_offset
- else:
- if not line:
- line = node.fromlineno
- if not col_offset:
- col_offset = node.col_offset
- if not end_lineno:
- end_lineno = node.end_lineno
- if not end_col_offset:
- end_col_offset = node.end_col_offset
- # should this message be displayed
- if not self.is_message_enabled(message_definition.msgid, line, confidence):
- self.file_state.handle_ignored_message(
- self._get_message_state_scope(
- message_definition.msgid, line, confidence
- ),
- message_definition.msgid,
- line,
- )
- return
- # update stats
- msg_cat = MSG_TYPES[message_definition.msgid[0]]
- self.msg_status |= MSG_TYPES_STATUS[message_definition.msgid[0]]
- self.stats.increase_single_message_count(msg_cat, 1)
- self.stats.increase_single_module_message_count(self.current_name, msg_cat, 1)
- try:
- self.stats.by_msg[message_definition.symbol] += 1
- except KeyError:
- self.stats.by_msg[message_definition.symbol] = 1
- # Interpolate arguments into message string
- msg = message_definition.msg
- if args is not None:
- msg %= args
- # get module and object
- if node is None:
- module, obj = self.current_name, ""
- abspath = self.current_file
- else:
- module, obj = utils.get_module_and_frameid(node)
- abspath = node.root().file
- if abspath is not None:
- path = abspath.replace(self.reporter.path_strip_prefix, "", 1)
- else:
- path = "configuration"
- # add the message
- self.reporter.handle_message(
- Message(
- message_definition.msgid,
- message_definition.symbol,
- MessageLocationTuple(
- abspath or "",
- path,
- module or "",
- obj,
- line or 1,
- col_offset or 0,
- end_lineno,
- end_col_offset,
- ),
- msg,
- confidence,
- )
- )
- def add_message(
- self,
- msgid: str,
- line: int | None = None,
- node: nodes.NodeNG | None = None,
- args: Any | None = None,
- confidence: interfaces.Confidence | None = None,
- col_offset: int | None = None,
- end_lineno: int | None = None,
- end_col_offset: int | None = None,
- ) -> None:
- """Adds a message given by ID or name.
- If provided, the message string is expanded using args.
- AST checkers must provide the node argument (but may optionally
- provide line if the line number is different), raw and token checkers
- must provide the line argument.
- """
- if confidence is None:
- confidence = interfaces.UNDEFINED
- message_definitions = self.msgs_store.get_message_definitions(msgid)
- for message_definition in message_definitions:
- self._add_one_message(
- message_definition,
- line,
- node,
- args,
- confidence,
- col_offset,
- end_lineno,
- end_col_offset,
- )
- def add_ignored_message(
- self,
- msgid: str,
- line: int,
- node: nodes.NodeNG | None = None,
- confidence: interfaces.Confidence | None = interfaces.UNDEFINED,
- ) -> None:
- """Prepares a message to be added to the ignored message storage.
- Some checks return early in special cases and never reach add_message(),
- even though they would normally issue a message.
- This creates false positives for useless-suppression.
- This function avoids this by adding those message to the ignored msgs attribute
- """
- message_definitions = self.msgs_store.get_message_definitions(msgid)
- for message_definition in message_definitions:
- message_definition.check_message_definition(line, node)
- self.file_state.handle_ignored_message(
- self._get_message_state_scope(
- message_definition.msgid, line, confidence
- ),
- message_definition.msgid,
- line,
- )
- def _emit_stashed_messages(self) -> None:
- for keys, values in self._stashed_messages.items():
- modname, symbol = keys
- self.linter.set_current_module(modname)
- for args in values:
- self.add_message(
- symbol,
- args=args,
- line=0,
- confidence=HIGH,
- )
- self._stashed_messages = collections.defaultdict(list)
|