# ============================================================================== # Copyright 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== from cfnbootstrap import endpoint_tool import hashlib from optparse import OptionGroup import threading import datetime from cfnbootstrap.packages.requests.exceptions import ConnectionError, HTTPError, Timeout, SSLError import logging import os.path import random import re from cfnbootstrap.packages import requests import stat import subprocess import sys import time import uuid import inspect import tempfile import shutil try: from cfnbootstrap.packages.requests.packages.urllib3.exceptions import ProxyError except ImportError: class ProxyError(Exception): def __init__(self, *args, **kwargs): super(ProxyError, self).__init__(*args, **kwargs) REQUEST_TIMEOUT = 60.0 WINDOWS = 'nt' log = logging.getLogger("cfn.init") wire_log = logging.getLogger("wire") cmd_log = logging.getLogger("cfn.init.cmd") # ============================================================================== # HTTP backoff-and-retry # ============================================================================== def exponential_backoff(max_tries, max_sleep=20): """ Returns a series of floating point numbers between 0 and min(max_sleep, 2^i-1) for i in 0 to max_tries """ return [random.random() * min(max_sleep, (2 ** i - 1)) for i in range(0, max_tries)] def extend_backoff(durations, max_sleep=20): """ Adds another exponential delay time to a list of delay times """ durations.append(random.random() * min(max_sleep, (2 ** len(durations) - 1))) def _extract_http_error(resp): if resp.status_code == 503: retry_mode = 'RETRIABLE_FOREVER' elif resp.status_code < 500 and resp.status_code not in (404, 408): retry_mode = 'TERMINAL' else: retry_mode = 'RETRIABLE' return RemoteError(resp.status_code, "HTTP Error %s : %s" % (resp.status_code, resp.text), retry_mode) class EtagCheckedResponse(object): def __init__(self, response): self._response = check_status(response) etag = response.headers['etag'].strip('"') if 'etag' in response.headers and endpoint_tool.is_service_url( "AmazonS3", response.url) else None if response.headers.get('x-amz-server-side-encryption') == 'aws:kms': log.warn('file uses KMS encryption; skipping checksum comparison') etag = None elif etag and '-' in etag: log.warn( 'cannot check consistency of file uploaded multipart; etag has - character present') etag = None self._etag = etag # hashlib will throw ValueError for md5 on FIPS-enabled instances unless usedforsecurity is set to False # however we need to check if system's libcrypto library supports usedforsecurity flag # S3 only offers md5 checksums at present if self._etag: try: self._digest = hashlib.md5() except ValueError as e: if "disabled for fips" in str(e): if 'usedforsecurity' in inspect.getargspec(hashlib.new).args: self._digest = hashlib.new('md5', usedforsecurity=False) else: log.warn( 'libcrypto does not support usedforsecurity flag; cannot use md5 checksum on FIPS enabled instance') self._digest = NoOpDigest() else: raise else: self._digest = NoOpDigest() def _check_digest(self): if not self._etag: return final_digest = self._digest.hexdigest() if self._etag != final_digest: raise ChecksumError("Expected digest %s; received %s" % (self._etag, final_digest)) def write_to(self, dest): dest.seek(0, 0) dest.truncate() for c in self._response.iter_content(10 * 1024): dest.write(c) self._digest.update(c) self._check_digest() def contents(self): content = self._response.content self._digest.update(content) self._check_digest() return content class ChecksumError(IOError): def __init__(self, msg): super(ChecksumError, self).__init__(None, msg) class NoOpDigest(): def __init__(self): self.digest_size = -1 self.block_size = -1 def update(self, content): pass def hexdigest(self): return None def digest(self): return None def copy(self): return self class RemoteError(IOError): retry_modes = frozenset(['TERMINAL', 'RETRIABLE', 'RETRIABLE_FOREVER']) def __init__(self, code, msg, retry_mode='RETRIABLE'): super(RemoteError, self).__init__(code, msg) if not retry_mode in RemoteError.retry_modes: raise ValueError("Invalid retry mode: %s" % retry_mode) self.retry_mode = retry_mode class Sleeper(object): def sleep(self, secs): time.sleep(secs) def retry_on_failure(max_tries=5, http_error_extractor=_extract_http_error): def _decorate(f): def _retry(*args, **kwargs): sleeper = Sleeper() durations = exponential_backoff(max_tries) for duration in durations: if duration > 0: log.debug( "Sleeping for %f seconds before retrying", duration) sleeper.sleep(duration) try: return f(*args, **kwargs) except SSLError as e: log.exception("SSLError") last_error = RemoteError(None, str(e)) except ChecksumError as e: log.exception("Checksum mismatch") last_error = RemoteError(None, str(e)) except ConnectionError as e: log.exception('ConnectionError') last_error = RemoteError(None, str(e)) except ProxyError as e: log.exception('ProxyError') last_error = RemoteError(None, str(e)) # ProxyError skips the typical 3 retries done by urllib # this prevents us from taking an availability hit when there is a 'false' ProxyError # which happens because Requests never passes proxies==None to urllib3, # and newer versions of urllib3 always wrap socket errors in ProxyError when proxies is not None if len(durations) < max_tries * 3: extend_backoff(durations) except HTTPError as e: last_error = http_error_extractor(e.response) if last_error.retry_mode == 'TERMINAL': raise last_error elif last_error.retry_mode == 'RETRIABLE_FOREVER': extend_backoff(durations) log.exception(last_error.strerror) except Timeout as e: log.exception('Timeout') last_error = RemoteError(None, str(e)) except TimeoutError as e: log.exception('Client-side timeout') last_error = RemoteError(None, str(e)) except IOError as e: log.exception('Generic IOError') last_error = RemoteError(None, str(e)) except Exception as e: log.exception('Unexpected Exception') raise RemoteError(None, str(e), 'TERMINAL') else: raise last_error return _retry return _decorate class TimeoutError(Exception): def __init__(self, msg): super(Exception, self).__init__() self.msg = msg def timeout(duration=60): def _decorate(f): def _timeout(*args, **kwargs): ret_val = [] exc = [] def value_fn(): try: ret_val.append(f(*args, **kwargs)) except Exception as e: exc.append(e) worker_thread = threading.Thread(target=value_fn) worker_thread.daemon = True worker_thread.start() worker_thread.join(duration) if worker_thread.is_alive(): log.warn('Timeout of %s seconds breached', duration) raise TimeoutError( "Execution did not succeed after %s seconds" % duration) if exc: raise exc[0] return ret_val[0] if ret_val else None return _timeout return _decorate # ============================================================================== # Instance metadata # ============================================================================== def _get_metadata_path(path): return "http://169.254.169.254/latest{}".format(path) def _get_session_token(): """ Try to get token for getting instance metadata(https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) """ resp = requests.put(_get_metadata_path('/api/token'), headers={'X-aws-ec2-metadata-token-ttl-seconds': '21600'}, timeout=REQUEST_TIMEOUT) return resp.text.strip() if resp.status_code == 200 else '' def _get_metadata_response(path, token, _timeout=None): if not token: resp = requests.get(path, timeout=_timeout, proxies={'no_proxy': '169.254.169.254/32'}) else: resp = requests.get(path, timeout=_timeout, proxies={'no_proxy': '169.254.169.254/32'}, headers={'X-aws-ec2-metadata-token': token}) return resp @retry_on_failure(max_tries=10) def get_instance_identity_document(): token = _get_session_token() metadata_path = _get_metadata_path('/dynamic/instance-identity/document') resp = _get_metadata_response(metadata_path, token) resp.raise_for_status() return resp.text.rstrip() @retry_on_failure(max_tries=10) def get_instance_identity_signature(): token = _get_session_token() metadata_path = _get_metadata_path('/dynamic/instance-identity/signature') resp = _get_metadata_response(metadata_path, token) resp.raise_for_status() return resp.text.rstrip() _instance_id = '__unset' @retry_on_failure(max_tries=10) def _fetch_instance_id(): token = _get_session_token() metadata_path = _get_metadata_path('/meta-data/instance-id') request_timeout_in_seconds = 2 resp = _get_metadata_response(metadata_path, token, _timeout=request_timeout_in_seconds) resp.raise_for_status() return resp.text.strip() def get_instance_id(): """ Attempt to retrieve an EC2 instance id, returning None if this is not EC2 """ global _instance_id if _instance_id == '__unset': try: _instance_id = _fetch_instance_id() except IOError: log.exception("Exception retrieving InstanceId") _instance_id = None return _instance_id def is_ec2(): return get_instance_id() is not None @retry_on_failure(max_tries=10) def get_role_creds(name): token = _get_session_token() metadata_path = _get_metadata_path( '/meta-data/iam/security-credentials/{}'.format(name)) resp = _get_metadata_response(metadata_path, token) resp.raise_for_status() role = resp.json() return Credentials(role['AccessKeyId'], role['SecretAccessKey'], role['Token'], datetime.datetime.strptime(role['Expiration'], '%Y-%m-%dT%H:%M:%SZ')) _trues = frozenset([True, 1, 'true', 'yes', 'y', '1']) # ============================================================================== # Miscellaneous # ============================================================================== def interpret_boolean(input): """ This tries to interpret if the user intended True I don't use python's boolean equivalent because it's likely that we're getting a string """ if not input: return False input = input.lower().strip() if isinstance(input, str) else input return input in _trues _dot_split = re.compile(r'(?