# encoding: UTF-8 # Copyright 2017, 2018 Amazon.com, Inc. or its affiliates. # This module is part of Amazon Linux Extras. # # Amazon Linux Extras is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License v2 as published # by the Free Software Foundation. # # Amazon Linux Extras is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # # You should have received a copy of the GNU General Public License # along with Amazon Linux Extras. If not, see . from __future__ import print_function, unicode_literals, absolute_import from .software_catalog import get_catalog, fetch_new_catalog, VERSION_KEY, CatalogError from .repo import read_configuration, write_configuration import sys import logging as loggingmod import os import re import textwrap import subprocess from datetime import datetime if sys.version_info.major == 2: from gettext import gettext as gettext_yields_encoded _ = lambda *args: gettext_yields_encoded(*args).decode("UTF-8") else: from gettext import gettext as _ IS_TESTING = os.environ.get("AMZN2DEBUGGING", False) and True logger = loggingmod.getLogger(__name__) def host_is_sane(): """Predicate to test if it makes sense to run on this kind of machine.""" if IS_TESTING: return True try: with open("/etc/system-release") as sysrel: if not "Amazon" in sysrel.read(50): return False except IOError: return False #if not os.path.exists("/etc/yum/vars/awsdomain"): return False #if not os.path.exists("/etc/yum/vars/awsregion"): return False return True def topic_combination_was_tested(enabled_topics, catalog, additional_topic_name): """Predicate to test if adding the proposed Topic is allowed. We check whether the new topic has any conflicts with the currently enabled topics. Each catalog version has its own mechanism for performing this test. """ if additional_topic_name and additional_topic_name not in catalog["topics-by-name"]: raise KeyError(additional_topic_name) if catalog['version'] == 1: return topic_combination_v1(enabled_topics, catalog, additional_topic_name) elif catalog['version'] >= 2: return topic_combination_v2(enabled_topics, catalog, additional_topic_name) def topic_combination_v1(enabled_topics, catalog, additional_topic_name): """Predicate to test if adding the proposed Topic has been tested. We do this by looking for a known whitelist that equal or is a superset of the proposed set. Version 1 uses a set of included whitelists within the catalog file. """ proposed = set() if additional_topic_name: proposed.add(additional_topic_name) proposed.update(enabled_topics) return bool(any(proposed <= set(known_good) for known_good in catalog["whitelists-by-names"])) def topic_combination_v2(enabled_topics, catalog, additional_topic_name): """Predicate to test if there are Conflicts with an installed topic. Each topic entry within a version 2 catalog may contain a 'conflicts' key, which will be a list of topics which can not be installed with the topic in question. """ try: conflicts = catalog['topics-by-name'][additional_topic_name]['conflicts'] for conflict in conflicts: if conflict in enabled_topics: return False except KeyError: # A topic with no 'conflicts' key is always installable. pass return True def topic_info(topic_dict, machine_state): is_enabled = machine_state.get(topic_dict["n"], {}).get("enabled") version = machine_state.get(topic_dict["n"], {}).get(VERSION_KEY, "") expiry = None if "deprecated-at" in topic_dict: try: expiry = datetime.strptime(topic_dict["deprecated-at"], "%Y-%m-%d").date() except ValueError as exc: pass return "{0}={1}".format(topic_dict["n"], version) if is_enabled else topic_dict["n"], is_enabled, expiry def cmd_list(args): """Lists topics in the catalog. Some may be enabled.""" try: catalog = get_catalog() except CatalogError: # get_catalog emits its own messages. sys.exit(2) emphasis_color = "\033[94m" if sys.stdout.isatty() else "" # ANSI blue foreground warning_color = "\033[93m" if sys.stdout.isatty() else "" # ANSI yellow foreground reset_color = "\033[0m" if sys.stdout.isatty() else "" # ANSI colors reset machine_state = read_configuration() today = datetime.utcnow().date() name_width = max(len(topic_info(topic, machine_state)[0]) for topic in catalog["topics"]) status_width = max([len(_("enabled")), len(_("available"))]) + 2 contains_expired = False contains_expiring = False for i, topic in enumerate(catalog["topics"]): name, is_enabled, expiry = topic_info(topic, machine_state) status = _("enabled") if is_enabled else _("available") expired_note = " " if expiry: if today > expiry: if not is_enabled: continue else: expired_note = warning_color + "*" + reset_color contains_expired = True else: expired_note = "†" contains_expiring = True if topic_combination_was_tested([k for k, v in machine_state.items() if v.get("enabled")], catalog, topic["n"]): ord_desc = "{0: 3d}".format(i) else: #ord_desc = "{0:>3s}".format(u"\N{NO ENTRY}") ord_desc = "{0:>3s}".format(u"_") versions_display = " ".join(["=" + v for v in topic.get("versions", [])]) if versions_display: versions_display = "[ " + versions_display + " ]" optionally_emphasize = emphasis_color if is_enabled else "" print("{ord_desc:3s} {expired_note}{optionally_emphasize}{name:{name_width}} {status:{status_width}}{reset_color} ".format(**locals()), end="") if name_width + status_width + len(versions_display) > 60: # typical width minus padding print("\\") print(textwrap.fill(versions_display, initial_indent=" "*8, subsequent_indent=" "*10, break_on_hyphens=False)) else: print(versions_display) if contains_expired: print("{warning_color}*{reset_color} ".format(**locals()) + _("Extra topic has reached end of support.")) if contains_expiring: print("† ".format(**locals()) + _("Note on end-of-support. Use 'info' subcommand.")) def actually_enable(args, catalog, machine_state, fail_on_untested_combination=True): wanted = [] for arg in args: if arg.startswith("-"): continue pieces = arg.split("=", 1) arg = pieces[0] exactver = pieces[1] if len(pieces) > 1 else None if re.match(r"^[0-9]+$", arg): try: wanted.append( (catalog["topics"][int(arg)]["n"], exactver or "latest") ) except IndexError as exc: logger.error(_("Topic %s is not found."), arg) sys.exit(3) else: wanted.append( (arg, exactver or "latest") ) failures = [] for name, exact_version in wanted: if machine_state.get(name, {}).get("enabled", 0): continue # Ignore already-installed. try: if not topic_combination_was_tested([k for k, d in machine_state.items() if d.get("enabled")], catalog, name): failures.append(name) except KeyError: logger.error(_("Topic %s is not found."), name) sys.exit(4) name_and_maybe_version, is_enabled, expiry = topic_info(catalog["topics-by-name"][name], machine_state) if expiry: logger.warning(_("Topic {0} has end-of-support date of {1:%Y-%m-%d}").format(name, expiry)) if exact_version != "latest" and exact_version not in catalog["topics-by-name"][name].get("versions", []): logger.error(_("Topic %s is not found."), "%s=%s" % (name, exact_version)) sys.exit(5) if failures: for failure in failures: logger.error(_("Refusing because %s could cause an invalid combination."), failure) sys.exit(6) proposed_enabled_items = dict((k, d[VERSION_KEY]) for k, d in machine_state.items() if d.get("enabled")) proposed_enabled_items.update(wanted) if not topic_combination_was_tested(proposed_enabled_items, catalog, None): logger.error(_("Refusing because %s could cause an invalid combination."), "+".join(proposed_enabled_items.keys())) sys.exit(7) previously_enabled = machine_state.copy() for name, exact_version in wanted: if name not in machine_state: machine_state[name] = {} machine_state[name]["enabled"] = 1 machine_state[name][VERSION_KEY] = exact_version if "visible" in catalog["topics-by-name"][name] and catalog["topics-by-name"][name]["visible"]: machine_state[name]["includepkgs"] = " ".join(catalog["topics-by-name"][name]["visible"]) try: write_configuration(machine_state) except IOError: sys.exit(8) return wanted def cmd_enable(args): #"""Enables topics specified by name or number.""" if not args: logger.error(_("Specify a topic name or number.")) sys.exit(9) try: catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args)) except CatalogError: # get_catalog emits its own messages. sys.exit(10) machine_state = read_configuration() is_valid_before_enabling = topic_combination_was_tested([k for k, d in machine_state.items() if d.get("enabled")], catalog, None) newly_enabled = actually_enable(args, catalog, machine_state) cmd_list([]) print() for topic, version in sorted(newly_enabled): if catalog["topics-by-name"][topic].get("inst"): cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum" print(_("Now you can install:")) print(" # {0} clean metadata".format(cmd)) print(" # {0} install {1}".format(cmd, " ".join(catalog["topics-by-name"][topic]["inst"]))) def cmd_install(args): """Enables specified topics and installs their packages.""" previous_machine_state = read_configuration() try: catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args)) except CatalogError: # get_catalog emits its own messages. sys.exit(11) is_valid_before_enabling = topic_combination_was_tested([k for k, d in previous_machine_state.items() if d.get("enabled")], catalog, None) actually_enable(args, catalog, previous_machine_state.copy()) try: install_args_excluding_options = [arg for arg in args if not arg.startswith("-")] packages = set() for name in install_args_excluding_options: topic = re.sub(r"=.*", "", name) try: integer_match = re.match(r"^([0-9]+)$", topic) if integer_match: i = int(integer_match.group(1)) topic = catalog["topics"][i]["n"] recommendations = catalog["topics"][i].get("inst", []) else: recommendations = catalog["topics-by-name"][topic].get("inst", []) except (IndexError, KeyError): logger.error(_("Topic %s is not found."), topic) sys.exit(12) if not recommendations: logger.warn(_("Topic %s does not install packages automatically."), topic) logger.warn(_("Use \"repoquery --repoid=%s --query --all\" to list packages."), topic) else: packages.update(set(recommendations)) automation = ["-y"] if not sys.stdin.isatty() or "-y" in args else [] verbosity = ["-v"] if IS_TESTING or logger.isEnabledFor(loggingmod.INFO) else [] try: if packages: print(_("Installing {0}").format(", ".join(packages))) cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum" subprocess.check_call([cmd, "clean", "metadata"]) subprocess.check_call([cmd, "install"] + verbosity + automation + sorted(packages)) except subprocess.CalledProcessError: write_configuration(previous_machine_state) logger.error(_("Installation failed. Check that you have permissions to install.")) sys.exit(13) except Exception: write_configuration(previous_machine_state) raise cmd_list([]) def cmd_disable(args): #"""Disable specified software topics.""" if not args or any(re.match(r"^[0-9]+$", arg) for arg in args): logger.error(_("Specify a topic name.")) sys.exit(14) machine_state = read_configuration() logger.warn(_("Beware that disabling topics is not supported after they are installed.")) for item in args: if not machine_state.get(item, {}).get("enabled", 0): logger.error(_("%r was not enabled. Ignoring."), item) continue machine_state[item]["enabled"] = 0 try: write_configuration(machine_state) except IOError: sys.exit(15) cmd_list([]) def cmd_info(args): """See details of a specific package.""" if not args or any(re.match(r"^[0-9]+$", arg) for arg in args): logger.error(_("Specify a topic name.")) sys.exit(16) try: catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args)) except CatalogError: # get_catalog emits its own messages. sys.exit(17) machine_state = read_configuration() cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum" wanted = [catalog["topics"][int(arg)]["n"] if re.match(r"^[0-9]+$", arg) else arg for arg in args if not arg.startswith("-")] seen = set() expiration_warning = False for topic in wanted: if topic in seen: continue seen.add(topic) try: recommendations = catalog["topics-by-name"][topic].get("inst", []) except KeyError: logger.error(_("Topic %s is not found."), topic) continue name_and_maybe_version, is_enabled, expiry = topic_info(catalog["topics-by-name"][topic], machine_state) if expiry: expiration_warning = True print(_("{0} has end-of-support date of {1:%Y-%m-%d}").format(topic, expiry)) for rec in recommendations: print((_("{0} recommends {1:25s}")+" # {2} install {1}").format(topic, rec, cmd)) if expiration_warning: sys.exit(127) def cmd_suggest(args): logger.warning(_("We don't receive suggestions at present. Thank you for trying.")) sys.exit(18) # Defined in spec, but not for beta. if not args: print(_("We collect suggestions for package topics. Specify what " "you hoped to find.")) else: pass print(_("Thanks!")) def cmd_help(args): """See list of commands.""" for name in sorted(globals()): if name.startswith("cmd_"): function = globals()[name] if function.__doc__: first_line = function.__doc__.split("\n")[0].strip() if first_line: print(" {0:9s} {1}".format(name[4:], first_line)) # translations? :( print() print(textwrap.fill(_("Amazon Linux Extras software topics give you access to the most-recent " "stable software you specifically choose, without the uncertainty of " "a wholly new environment."))) print() print(textwrap.fill(_("Best practice is to enable only one or two topics. More than that, and " "you lose the benefits of working in a stable environment."))) def cmd_system_motd(args): try: catalog = fetch_new_catalog() # bypass checks a user could address and only get data message = catalog.get("system-motd") if message: print(message) except Exception: # Do not complain! pass def main(args): if not host_is_sane(): logger.error(_("This OS is not supported.")) return 1 if not args: args = ["list"] action_function_name = "cmd_" + args.pop(0) try: action = globals()[action_function_name] except KeyError: logger.error(_("That is not a valid option. Try \"help\".")) return 1 action(args) return 0 # vi: set expandtab autoindent shiftwidth=4 softtabstop=4 fileencoding=utf-8 filetype=python :