# # ***** BEGIN LICENSE BLOCK ***** # Version: MPL 1.1/GPL 2.0/LGPL 2.1 # # The contents of this file are subject to the Mozilla Public License Version # 1.1 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # http://www.mozilla.org/MPL/ # # Software distributed under the License is distributed on an "AS IS" basis, # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License # for the specific language governing rights and limitations under the # License. # # The Original Code is mozilla.org code. # # The Initial Developer of the Original Code is # Mozilla Foundation. # Portions created by the Initial Developer are Copyright (C) 2008 # the Initial Developer. All Rights Reserved. # # Contributor(s): # Robert Sayre # Jeff Walden # # Alternatively, the contents of this file may be used under the terms of # either the GNU General Public License Version 2 or later (the "GPL"), or # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), # in which case the provisions of the GPL or the LGPL are applicable instead # of those above. If you wish to allow use of your version of this file only # under the terms of either the GPL or the LGPL, and not to allow others to # use your version of this file under the terms of the MPL, indicate your # decision by deleting the provisions above and replace them with the notice # and other provisions required by the GPL or the LGPL. If you do not delete # the provisions above, a recipient may use your version of this file under # the terms of any one of the MPL, the GPL or the LGPL. # # ***** END LICENSE BLOCK ***** import codecs from datetime import datetime import itertools import logging import shutil import os import re import signal import sys import threading """ Runs the browser from a script, and provides useful utilities for setting up the browser environment. """ SCRIPT_DIR = os.path.abspath(os.path.realpath(os.path.dirname(sys.argv[0]))) __all__ = [ "UNIXISH", "IS_WIN32", "IS_MAC", "runApp", "Process", "initializeProfile", "DIST_BIN", "DEFAULT_APP", "environment", ] # These are generated in mozilla/build/Makefile.in #expand DIST_BIN = __XPC_BIN_PATH__ #expand IS_WIN32 = len("__WIN32__") != 0 #expand IS_MAC = __IS_MAC__ != 0 #ifdef IS_CYGWIN #expand IS_CYGWIN = __IS_CYGWIN__ == 1 #else IS_CYGWIN = False #endif #expand IS_CAMINO = __IS_CAMINO__ != 0 #expand BIN_SUFFIX = __BIN_SUFFIX__ UNIXISH = not IS_WIN32 and not IS_MAC #expand DEFAULT_APP = "./" + __BROWSER_PATH__ #expand CERTS_DIR = __CERTS_DIR__ #expand IS_TEST_BUILD = __IS_TEST_BUILD__ ########### # LOGGING # ########### # We use the logging system here primarily because it'll handle multiple # threads, which is needed to process the output of the server and application # processes simultaneously. log = logging.getLogger() handler = logging.StreamHandler(sys.stdout) log.setLevel(logging.INFO) log.addHandler(handler) ################# # SUBPROCESSING # ################# class Process: """ Represents a subprocess of this process. We don't just directly use the subprocess module here because we want compatibility with Python 2.3 on non-Windows platforms. :-( """ def __init__(self, command, args, env, inputdata = None): """ Creates a process representing the execution of the given command, which must be an absolute path, with the given arguments in the given environment. The process is then started. """ command = os.path.abspath(command) if IS_WIN32: import tempfile import subprocess if inputdata: inputfile = tempfile.TemporaryFile() inputfile.write(inputdata) inputfile.seek(0) else: inputfile = None cmd = [command] cmd.extend(args) p = subprocess.Popen(cmd, env = env, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, stdin = inputfile) self._out = p.stdout else: import popen2 cmd = [] if env: for (k, v) in env.iteritems(): cmd.append(k + "='" + v + "' ") cmd.append("'" + command + "'") cmd.extend(map(lambda x: "'" + x + "'", args)) cmd = " ".join(cmd) p = popen2.Popen4(cmd) self._out = p.fromchild if inputdata: p.tochild.write(inputdata) p.tochild.close() self._process = p self.pid = p.pid self._thread = threading.Thread(target = lambda: self._run()) self._thread.start() def _run(self): "Continues execution of this process on a separate thread." p = self._process out = self._out if IS_WIN32: running = lambda: p.poll() is None else: running = lambda: p.poll() == -1 # read in lines until the process finishes, then read in any last remaining # buffered lines while running(): line = out.readline().rstrip() if len(line) > 0: log.info(line) for line in out: line = line.rstrip() if len(line) > 0: log.info(line) self._status = p.poll() def wait(self): "Waits for this process to finish, then returns the process's status." self._thread.join() return self._status def kill(self): "Kills this process." try: if not IS_WIN32: os.kill(self._process.pid, signal.SIGKILL) else: import subprocess pid = "%i" % self.pid process = subprocess.Popen(["taskkill", "/F", "/PID", pid]) process.wait() except: pass ################# # PROFILE SETUP # ################# class SyntaxError(Exception): "Signifies a syntax error on a particular line in server-locations.txt." def __init__(self, lineno, msg = None): self.lineno = lineno self.msg = msg def __str__(self): s = "Syntax error on line " + str(self.lineno) if self.msg: s += ": %s." % self.msg else: s += "." return s class Location: "Represents a location line in server-locations.txt." def __init__(self, scheme, host, port, options): self.scheme = scheme self.host = host self.port = port self.options = options def readLocations(locationsPath = "server-locations.txt"): """ Reads the locations at which the Mochitest HTTP server is available from server-locations.txt. """ locationFile = codecs.open(locationsPath, "r", "UTF-8") # Perhaps more detail than necessary, but it's the easiest way to make sure # we get exactly the format we want. See server-locations.txt for the exact # format guaranteed here. lineRe = re.compile(r"^(?P[a-z][-a-z0-9+.]*)" r"://" r"(?P" r"\d+\.\d+\.\d+\.\d+" r"|" r"(?:[a-z0-9](?:[-a-z0-9]*[a-z0-9])?\.)*" r"[a-z](?:[-a-z0-9]*[a-z0-9])?" r")" r":" r"(?P\d+)" r"(?:" r"\s+" r"(?P\S+(?:,\S+)*)" r")?$") locations = [] lineno = 0 seenPrimary = False for line in locationFile: lineno += 1 if line.startswith("#") or line == "\n": continue match = lineRe.match(line) if not match: raise SyntaxError(lineno) options = match.group("options") if options: options = options.split(",") if "primary" in options: if seenPrimary: raise SyntaxError(lineno, "multiple primary locations") seenPrimary = True else: options = [] locations.append(Location(match.group("scheme"), match.group("host"), match.group("port"), options)) if not seenPrimary: raise SyntaxError(lineno + 1, "missing primary location") return locations def initializeProfile(profileDir): "Sets up the standard testing profile." # Start with a clean slate. shutil.rmtree(profileDir, True) os.mkdir(profileDir) prefs = [] part = """\ user_pref("browser.dom.window.dump.enabled", true); user_pref("dom.allow_scripts_to_close_windows", true); user_pref("dom.disable_open_during_load", false); user_pref("dom.max_script_run_time", 0); // no slow script dialogs user_pref("signed.applets.codebase_principal_support", true); user_pref("security.warn_submit_insecure", false); user_pref("browser.shell.checkDefaultBrowser", false); user_pref("shell.checkDefaultClient", false); user_pref("browser.warnOnQuit", false); user_pref("accessibility.typeaheadfind.autostart", false); user_pref("javascript.options.showInConsole", true); user_pref("layout.debug.enable_data_xbl", true); user_pref("browser.EULA.override", true); user_pref("javascript.options.jit.content", true); user_pref("gfx.color_management.force_srgb", true); user_pref("camino.warn_when_closing", false); // Camino-only, harmless to others """ prefs.append(part) locations = readLocations() # Grant God-power to all the privileged servers on which tests run. privileged = filter(lambda loc: "privileged" in loc.options, locations) for (i, l) in itertools.izip(itertools.count(1), privileged): part = """ user_pref("capability.principal.codebase.p%(i)d.granted", "UniversalXPConnect UniversalBrowserRead UniversalBrowserWrite \ UniversalPreferencesRead UniversalPreferencesWrite \ UniversalFileRead"); user_pref("capability.principal.codebase.p%(i)d.id", "%(origin)s"); user_pref("capability.principal.codebase.p%(i)d.subjectName", ""); """ % { "i": i, "origin": (l.scheme + "://" + l.host + ":" + l.port) } prefs.append(part) # We need to proxy every server but the primary one. origins = ["'%s://%s:%s'" % (l.scheme, l.host, l.port) for l in filter(lambda l: "primary" not in l.options, locations)] origins = ", ".join(origins) pacURL = """data:text/plain, function FindProxyForURL(url, host) { var origins = [%(origins)s]; var regex = new RegExp('^([a-z][-a-z0-9+.]*)' + '://' + '(?:[^/@]*@)?' + '(.*?)' + '(?::(\\\\\\\\d+))?/'); var matches = regex.exec(url); if (!matches) return 'DIRECT'; var isHttp = matches[1] == 'http'; var isHttps = matches[1] == 'https'; if (!matches[3]) { if (isHttp) matches[3] = '80'; if (isHttps) matches[3] = '443'; } var origin = matches[1] + '://' + matches[2] + ':' + matches[3]; if (origins.indexOf(origin) < 0) return 'DIRECT'; if (isHttp) return 'PROXY 127.0.0.1:8888'; if (isHttps) return 'PROXY 127.0.0.1:4443'; return 'DIRECT'; }""" % { "origins": origins } pacURL = "".join(pacURL.splitlines()) part = """ user_pref("network.proxy.type", 2); user_pref("network.proxy.autoconfig_url", "%(pacURL)s"); user_pref("camino.use_system_proxy_settings", false); // Camino-only, harmless to others """ % {"pacURL": pacURL} prefs.append(part) # write the preferences prefsFile = open(profileDir + "/" + "user.js", "a") prefsFile.write("".join(prefs)) prefsFile.close() def fillCertificateDB(profileDir): pwfilePath = os.path.join(profileDir, ".crtdbpw") pwfile = open(pwfilePath, "w") pwfile.write("\n") pwfile.close() # Create head of the ssltunnel configuration file sslTunnelConfigPath = os.path.join(CERTS_DIR, "ssltunnel.cfg") sslTunnelConfig = open(sslTunnelConfigPath, "w") sslTunnelConfig.write("httpproxy:1\n") sslTunnelConfig.write("certdbdir:%s\n" % CERTS_DIR) sslTunnelConfig.write("forward:127.0.0.1:8888\n") sslTunnelConfig.write("listen:*:4443:pgo server certificate\n") # Generate automatic certificate and bond custom certificates locations = readLocations() locations.pop(0) for loc in locations: if loc.scheme == "https" and "nocert" not in loc.options: customCertRE = re.compile("^cert=(?P[0-9a-zA-Z_ ]+)") for option in loc.options: match = customCertRE.match(option) if match: customcert = match.group("nickname"); sslTunnelConfig.write("listen:%s:%s:4443:%s\n" % (loc.host, loc.port, customcert)) break sslTunnelConfig.close() # Pre-create the certification database for the profile certutil = DIST_BIN + "/certutil" + BIN_SUFFIX status = Process(certutil, ["-N", "-d", profileDir, "-f", pwfilePath], environment()).wait() if status != 0: return status # Walk the cert directory and add custom CAs as trusted files = os.listdir(CERTS_DIR) for item in files: root, ext = os.path.splitext(item) if ext == ".ca": Process(certutil, ["-A", "-i", os.path.join(CERTS_DIR, item), "-d", profileDir, "-f", pwfilePath, "-n", root, "-t", "CT,,"], environment()) os.unlink(pwfilePath) return 0 def environment(env = None): if env == None: env = dict(os.environ) if UNIXISH: ldLibraryPath = os.path.join(SCRIPT_DIR, DIST_BIN) if "LD_LIBRARY_PATH" in env: ldLibraryPath = ldLibraryPath + ":" + env["LD_LIBRARY_PATH"] env["LD_LIBRARY_PATH"] = ldLibraryPath return env ############### # RUN THE APP # ############### def runApp(testURL, env, app, profileDir, extraArgs): if (IS_TEST_BUILD): # create certificate database for the profile certificateStatus = fillCertificateDB(profileDir) if certificateStatus != 0: log.info("ERROR FAIL Certificate integration") return certificateStatus # start ssltunnel to provide https:// URLs capability ssltunnel = DIST_BIN + "/ssltunnel" + BIN_SUFFIX ssltunnelProcess = Process(ssltunnel, [os.path.join(CERTS_DIR, "ssltunnel.cfg")], environment()) log.info("SSL tunnel pid: %d", ssltunnelProcess.pid) "Run the app, returning the time at which it was started." # mark the start start = datetime.now() # now run with the profile we created cmd = app if IS_MAC and not IS_CAMINO and not cmd.endswith("-bin"): cmd += "-bin" cmd = os.path.abspath(cmd) args = [] if IS_MAC: args.append("-foreground") if IS_CYGWIN: profileDirectory = commands.getoutput("cygpath -w \"" + profileDir + "/\"") else: profileDirectory = profileDir + "/" args.extend(("-no-remote", "-profile", profileDirectory)) if IS_CAMINO: args.extend(("-url", testURL)) else: args.append((testURL)) args.extend(extraArgs) proc = Process(cmd, args, env = environment(env)) log.info("Application pid: %d", proc.pid) status = proc.wait() if status != 0: log.info("ERROR FAIL Exited with code %d during test run", status) if (IS_TEST_BUILD): ssltunnelProcess.kill() return start