Source code for ducky.tools.ld

import ast
import collections
import logging
import os
import re
import sys
import tarfile
import tempfile

from six import iteritems, integer_types
from functools import partial
from collections import defaultdict

from ..mm import i32_t, UINT32_FMT, MalformedBinaryError, WORD_SIZE
from ..mm.binary import File, SectionTypes, SymbolEntry, SectionFlags, SymbolFlags
from ..asm import align_to_next_page
from ..errors import Error, UnalignedJumpTargetError, EncodingLargeValueError, UnknownSymbolError, PatchTooLargeError, BadLinkerScriptError, IncompatibleSectionFlagsError, UnknownDestinationSectionError, LinkerError
from ..log import get_logger

[docs]class LinkerScript(object): def __init__(self, filepath = None): self._filepath = filepath self._dst_section_start = collections.OrderedDict() self._dst_section_map = collections.OrderedDict() if self._filepath is None: return with open(filepath, 'r') as f: data = f.read() try: script = ast.literal_eval(data) except ValueError as e: raise BadLinkerScriptError(filepath, e) section_start = None for entry in script: if isinstance(entry, integer_types): section_start = entry continue if not isinstance(entry, tuple): continue dst_section, src_sections = entry if section_start is None: self._dst_section_start[dst_section] = None else: self._dst_section_start[dst_section] = section_start section_start = None for src_section in src_sections: self._dst_section_map[re.compile(r'^' + src_section + r'$')] = dst_section
[docs] def section_ordering(self): return self._dst_section_start.keys()
[docs] def where_to_merge(self, src_section): for src_section_pattern, dst_section in iteritems(self._dst_section_map): if not src_section_pattern.match(src_section): continue return dst_section raise UnknownDestinationSectionError(src_section)
[docs] def where_to_base(self, section): return self._dst_section_start.get(section)
[docs]class LinkerInfo(object): def __init__(self, linker_script): super(LinkerInfo, self).__init__() self.section_offsets = collections.defaultdict(dict) self.relocations = collections.defaultdict(list) self.symbols = collections.defaultdict(list) self.section_bases = collections.defaultdict(dict) self.linker_script = linker_script
[docs]def merge_object_into(info, f_dst, f_src): D = get_logger().debug D('----- * ----- * ----- * ----- * -----') D('Merging %s file', f_src.name) D('----- * ----- * ----- * ----- * -----') for src_section in f_src.sections: src_header = src_section.header if src_header.type == SectionTypes.RELOC: info.relocations[f_src].append(src_section) continue elif src_header.type == SectionTypes.SYMBOLS: src_section._filename = f_src.name info.symbols[f_src].append(src_section) continue elif src_header.type == SectionTypes.STRINGS: continue assert src_header.type == SectionTypes.PROGBITS D('Merge section %s into dst file', src_section.name) dst_section_name = info.linker_script.where_to_merge(src_section.name) D(' Section "%s" will be merged into "%s"', src_section.name, dst_section_name) try: dst_section = f_dst.get_section_by_name(dst_section_name, dont_create = True) except MalformedBinaryError: D(' No such section exists in dst file yet, copy') info.section_offsets[f_src][src_header.index] = 0 info.section_bases[f_src][src_header.index] = src_header.base dst_section = f_dst.create_section(name = dst_section_name) dst_header = dst_section.header dst_header.type = src_header.type dst_header.name = f_dst.string_table.put_string(dst_section_name) dst_header.data_size = src_header.data_size dst_header.file_size = src_header.data_size dst_header.flags = src_header.flags dst_section.payload = src_section.payload[:] continue else: dst_header = dst_section.header if SectionFlags.from_encoding(dst_header.flags).to_int() != SectionFlags.from_encoding(src_header.flags).to_int(): raise IncompatibleSectionFlagsError(dst_section, src_section) D(' merging into an existing section') D(' name=%s, range=%s - %s', dst_section.name, UINT32_FMT(dst_header.base), UINT32_FMT(dst_header.base + dst_header.data_size)) D(' dst_header=%s', dst_header) if dst_header.data_size % WORD_SIZE != 0: padding_bytes = WORD_SIZE - (dst_header.data_size % WORD_SIZE) D(' * unaligned data section, %d padding bytes appended', padding_bytes) dst_header.data_size += padding_bytes dst_section.payload += bytearray([0 for _ in range(0, padding_bytes)]) D(' dst_header=%s', dst_header) info.section_offsets[f_src][src_header.index] = dst_header.data_size info.section_bases[f_src][src_header.index] = src_header.base dst_header.data_size += src_header.data_size dst_header.file_size = dst_header.data_size dst_section.payload += src_section.payload
[docs]def fix_section_bases(info, f_out): logger, D = get_logger(), get_logger().debug D('----- * ----- * ----- * ----- * -----') D('Fixing base addresses of sections') D('----- * ----- * ----- * ----- * -----') sort_by_base = partial(sorted, key = lambda x: x.header.base) D('Sections to fix:') sections = {} for section in f_out.sections: if section.header.type != SectionTypes.PROGBITS: continue sections[section.name] = section base = info.linker_script.where_to_base(section.name) section.header.base = base if base is not None else 0xFFFFFFFF D(' %s - %s', section.name, section.header) tmp_sections = collections.OrderedDict([(section.name, section) for section in sort_by_base(sections.values())]) for _, section in iteritems(tmp_sections): if section.header.base == 0xFFFFFFFF: D(' %s floating', section.name) else: D(' %s @ %s', section.name, UINT32_FMT(section.header.base)) D('Order: %s', info.linker_script.section_ordering()) def overlap(a, b): return max(0, min(a[1], b[1]) - max(a[0], b[0])) sections = [] D('Looking for section base addresses:') def dump_sections(): D('') D('Sections:') table = [['Name', 'Start', 'End']] for section in sections: table.append([name, UINT32_FMT(section.header.base), UINT32_FMT(align_to_next_page(section.header.base + section.header.data_size))]) logger.table(table, fn = logger.debug) for name in info.linker_script.section_ordering(): dump_sections() D('') D('Considering %s', name) if name not in tmp_sections: D(' not present') continue section = tmp_sections[name] D(' section: %s', section) D(' header: %s', section.header) if sections: last_section = sections[-1] base = align_to_next_page(last_section.header.base + last_section.header.data_size) else: last_section = None base = 0x00000000 D(' suggested base=%s', UINT32_FMT(base)) if section.header.base == 0xFFFFFFFF: D(' base not set') if base + section.header.data_size >= 0xFFFFFFFF: logger.error('Cannot place %s to a base - no space left', name) logger.error(' header: %s', section.header) dump_sections() sys.exit(1) section.header.base = base else: D(' base requested') if section.header.base < base: logger.error('Cannot place %s to a requested base - previous section is too long', name) logger.error(' header: %s', section.header) dump_sections() sys.exit(1) sections.append(section) sections = sort_by_base(sections) dump_sections()
[docs]def resolve_symbols(info, f_out, f_ins): D = get_logger().debug D('Resolve symbols - compute their new addresses') symbols = [] symbol_map = [] symbol_map = defaultdict(list) for f_in, symbol_sections in iteritems(info.symbols): D('Processing file %s', f_in.name) for section in symbol_sections: D('Symbol section: %s', section.header) for symbol in section.payload: symbol_name = f_in.string_table.get_string(symbol.name) symbol._filename = f_in.string_table.get_string(symbol.filename) D('Symbol: %s', symbol_name) src_symbol_section = f_in.get_section_by_index(symbol.section) D(' src section: %s', src_symbol_section.header) D(' src section name: %s', src_symbol_section.name) dst_symbol_section = f_out.get_section_by_name(info.linker_script.where_to_merge(src_symbol_section.name)) D(' dst section: %s', dst_symbol_section.header) D(' dst section name: %s', dst_symbol_section.name) D('src base: %s, dst base: %s, symbol addr: %s, section dst offset: %s', UINT32_FMT(src_symbol_section.header.base), UINT32_FMT(dst_symbol_section.header.base), UINT32_FMT(symbol.address), UINT32_FMT(info.section_offsets[f_in][src_symbol_section.header.index])) new_addr = symbol.address - src_symbol_section.header.base + info.section_offsets[f_in][src_symbol_section.header.index] + dst_symbol_section.header.base dst_symbol = SymbolEntry() dst_symbol.flags = symbol.flags dst_symbol.name = f_out.string_table.put_string(symbol_name) dst_symbol.address = new_addr dst_symbol.size = symbol.size dst_symbol.section = dst_symbol_section.header.index dst_symbol.type = symbol.type dst_symbol.filename = f_out.string_table.put_string(symbol._filename) dst_symbol.lineno = symbol.lineno symbols.append(dst_symbol) symbol_map[symbol_name].append((dst_symbol, f_in)) D('New symbol: %s', dst_symbol) symtab = f_out.create_section(name = '.symtab') symtab.header.name = f_out.string_table.put_string('.symtab') symtab.header.type = SectionTypes.SYMBOLS symtab.payload = symbols info.symbols = symbol_map
[docs]class RelocationPatcher(object): def __init__(self, re, se, symbol_name, section, original_section = None, section_offset = 0): self._logger = logging.getLogger('ducky') self.DEBUG = self._logger.debug original_section = original_section or section self.DEBUG('%s:', self.__class__.__name__) self.DEBUG(' symbol=%s', symbol_name) self.DEBUG(' re=%s', re) self.DEBUG(' se=%s', se) self.DEBUG(' dsection=%s', section.header) self.DEBUG(' osection=%s', original_section.header) self.DEBUG(' offset=%s', UINT32_FMT(section_offset)) self._re = re self._se = se self._patch_section_header = section.header self._patch_section_content = section.payload self.DEBUG(' section.base=%s, re.address=%s, original.base=%s, section.offset=%s', UINT32_FMT(section.header.base), UINT32_FMT(re.patch_address), UINT32_FMT(original_section.header.base), UINT32_FMT(section_offset)) self._patch_address = re.patch_address - original_section.header.base + section_offset + section.header.base self._patch_address = re.patch_address - section.header.base + (section.header.base - original_section.header.base) + section_offset self._ip_address = re.patch_address - original_section.header.base + section.header.base + section_offset self.DEBUG(' section.base=%s, patch address=%s, ip=%s', UINT32_FMT(section.header.base), UINT32_FMT(self._patch_address), UINT32_FMT(self._ip_address)) self._content_index = self._patch_address self.DEBUG(' content index=%d', self._content_index) self._patch = self._create_patch() self.DEBUG(' patch=%s (%s)', UINT32_FMT(self._patch), i32_t(self._patch).value)
[docs] def _create_patch(self): patch = self._se.address if self._re.flags.relative == 1: patch -= (self._ip_address + 4) return patch + self._re.patch_add
[docs] def _apply_patch(self, value): re = self._re se = self._se self.DEBUG(' value=%s', UINT32_FMT(value)) lower_mask = 0xFFFFFFFF >> (32 - re.patch_offset) upper_mask = 0xFFFFFFFF << (re.patch_offset + re.patch_size) mask = upper_mask | lower_mask self.DEBUG(' lmask=%s, umask=%s, mask=%s', UINT32_FMT(lower_mask), UINT32_FMT(upper_mask), UINT32_FMT(mask)) masked = value & mask self.DEBUG(' masked=%s', UINT32_FMT(masked)) patch = self._patch self.DEBUG(' patch: %s (%s)', UINT32_FMT(patch), patch) if re.flags.inst_aligned == 1: if patch & 0x3: raise UnalignedJumpTargetError(info = 'address=%s' % UINT32_FMT(patch)) patch = patch // 4 if patch >= 2 ** re.patch_size: raise EncodingLargeValueError(info = 'size=%s, value=%s' % (re.patch_size, UINT32_FMT(patch))) self.DEBUG(' patch: %s (%s)', UINT32_FMT(patch), patch) if re.flags.relative == 1: lower, upper = -(2 ** (re.patch_size - 1)), (2 ** (re.patch_size - 1)) - 1 else: lower, upper = 0, 2 ** re.patch_size self.DEBUG(' patch size: %s', re.patch_size) self.DEBUG(' patch limits: %s %s', lower, upper) self.DEBUG(' patch fits? %s %s', patch < lower, patch > upper) if not (lower <= patch <= upper): raise PatchTooLargeError('Patch cannot fit into available space: re={reloc_entry}, se={symbol_entry}, patch={patch}'.format(reloc_entry = re, symbol_entry = se, patch = patch)) patch = patch << re.patch_offset self.DEBUG(' shifted patch: %s', UINT32_FMT(patch)) patch = patch & (~mask) & 0xFFFFFFFF self.DEBUG(' masked patch: %s', UINT32_FMT(patch)) value = masked | patch self.DEBUG(' patched: %s', UINT32_FMT(value)) return value
[docs] def patch(self): content = self._patch_section_content content_index = self._content_index value = content[content_index] | (content[content_index + 1] << 8) | (content[content_index + 2] << 16) | (content[content_index + 3] << 24) value = self._apply_patch(value) content[content_index] = value & 0xFF content[content_index + 1] = (value >> 8) & 0xFF content[content_index + 2] = (value >> 16) & 0xFF content[content_index + 3] = (value >> 24) & 0xFF
[docs]def resolve_relocations(info, f_out, f_ins): logger, D = get_logger(), get_logger().debug D('') D('----- * ----- * ----- * ----- * -----') D('Resolve relocations') D('----- * ----- * ----- * ----- * -----') section_symbols = {} for section in f_out.sections: D('name=%s base=%s header=%s', section.name, UINT32_FMT(section.header.base), section.header) se = SymbolEntry() se.flags = SymbolFlags.from_int(0).to_encoding() se.name = f_out.string_table.put_string(section.name) se.address = section.header.base section_symbols[section.name] = se for f_in, reloc_sections in iteritems(info.relocations): D('Processing file %s', f_in.name) for section in reloc_sections: for reloc_entry in section.payload: D('-----*-----*-----') D(' %s', reloc_entry) symbol_name = f_in.string_table.get_string(reloc_entry.name) # Get all involved sections src_section = f_in.get_section_by_index(reloc_entry.patch_section) dst_section_name = info.linker_script.where_to_merge(src_section.name) dst_section = f_out.get_section_by_name(dst_section_name) D(' src section: %s', src_section.name) D(' src header: %s', src_section.header) D(' dst section: %s', dst_section.name) D(' dst header: %s', dst_section.header) D(' symbol: %s', symbol_name) D(' file: %s', f_in.name) # Find referenced symbol if symbol_name in info.symbols: symbol_family = info.symbols[symbol_name] if len(symbol_family) > 1: D(' multiple candidates:') for se, f_src in symbol_family: D(' %s from file %s', se, f_src.name) for se, f_src in symbol_family: if f_in.name == f_src.name: D(' found file match in %s from %s', se, f_in.name) break else: logger.warn('Symbol with name "%s" has multiple candidates but no definitve match', symbol_name) logger.warn(' file: %s', f_in.name) for se, f_src in symbol_family: logger.warn(' %s from file %s', se, f_src.name) continue else: se, f_src = symbol_family[0] if f_src != f_in and se.flags.globally_visible == 0: raise UnknownSymbolError('Symbol "%s" is not globally visible' % symbol_name) else: if symbol_name not in section_symbols: raise UnknownSymbolError('No such symbol "%s", referenced from %s' % (symbol_name, f_in.name)) se = section_symbols[symbol_name] RelocationPatcher(reloc_entry, se, symbol_name, dst_section, original_section = src_section, section_offset = info.section_offsets[f_in][src_section.header.index]).patch()
[docs]def archive_files(logger, files_in, file_out): D = logger.debug D('Merging files into an archive') if os.path.exists(file_out): D('Unlink existing archive') os.unlink(file_out) with tarfile.open(file_out, 'w:gz') as f_out: for f_in in files_in: D(' Adding %s', f_out) f_out.add(f_in, recursive = False)
[docs]def main(): import optparse from . import add_common_options, parse_options parser = optparse.OptionParser() add_common_options(parser) group = optparse.OptionGroup(parser, 'File options') parser.add_option_group(group) group.add_option('-i', dest = 'file_in', action = 'append', default = [], help = 'Input file') group.add_option('-o', dest = 'file_out', default = None, help = 'Output file') group.add_option('-f', dest = 'force', default = False, action = 'store_true', help = 'Force overwrite of the output file') group = optparse.OptionGroup(parser, 'Linker options') parser.add_option_group(group) group.add_option('--script', dest = 'script', action = 'store', default = None, help = 'Linker script') group.add_option('--archive', dest = 'archive', action = 'store_true', default = False, help = 'Instead of linking, create an archive containing all input files') options, logger = parse_options(parser) if not options.file_in: parser.print_help() sys.exit(1) if not options.file_out: parser.print_help() sys.exit(1) if any(not file_in.endswith('.o') and not file_in.endswith('.tgz') for file_in in options.file_in): logger.error('All input files must be object files') sys.exit(1) if os.path.exists(options.file_out) and not options.force: logger.error('Output file %s already exists, use -f to force overwrite', options.file_out) sys.exit(1) if options.archive is True: archive_files(logger, options.file_in, options.file_out) else: def __cleanup(exc, msg = None): if hasattr(e, 'log'): e.log(logger.error) else: logger.exception(exc) if os.path.exists(options.file_out): os.unlink(options.file_out) sys.exit(1) try: script = LinkerScript(options.script) info = LinkerInfo(script) link_files(info, options.file_in, options.file_out) except LinkerError as e: __cleanup(e) except Error as e: __cleanup(e)