Bug 860091 - mirror test.py and mozdevice and mozprofile to m-c;r=jgriffin

--HG--
extra : rebase_source : 16b0b4bb8b49b0a70e0a2160c3ca6370737b0fab
This commit is contained in:
Jeff Hammel 2013-04-17 11:08:02 -07:00
parent 850194eb18
commit 904977538a
24 changed files with 517 additions and 334 deletions

View File

@ -3,6 +3,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import hashlib
import mozlog
import socket
import os
import re
@ -46,6 +47,35 @@ class DeviceManager(object):
_logcatNeedsRoot = True
def __init__(self, logLevel=mozlog.ERROR):
self._logger = mozlog.getLogger("DeviceManager")
self._logLevel = logLevel
self._logger.setLevel(logLevel)
@property
def logLevel(self):
return self._logLevel
@logLevel.setter
def logLevel_setter(self, newLogLevel):
self._logLevel = newLogLevel
self._logger.setLevel(self._logLevel)
@property
def debug(self):
self._logger.warn("dm.debug is deprecated. Use logLevel.")
levels = {mozlog.DEBUG: 5, mozlog.INFO: 3, mozlog.WARNING: 2,
mozlog.ERROR: 1, mozlog.CRITICAL: 0}
return levels[self.logLevel]
@debug.setter
def debug_setter(self, newDebug):
self._logger.warn("dm.debug is deprecated. Use logLevel.")
newDebug = 5 if newDebug > 5 else newDebug # truncate >=5 to 5
levels = {5: mozlog.DEBUG, 3: mozlog.INFO, 2: mozlog.WARNING,
1: mozlog.ERROR, 0: mozlog.CRITICAL}
self.logLevel = levels[newDebug]
@abstractmethod
def getInfo(self, directive=None):
"""
@ -178,8 +208,7 @@ class DeviceManager(object):
Returns True if remoteDirname on device is same as localDirname on host.
"""
if (self.debug >= 2):
print "validating directory: " + localDirname + " to " + remoteDirname
self._logger.info("validating directory: %s to %s" % (localDirname, remoteDirname))
for root, dirs, files in os.walk(localDirname):
parts = root.split(localDirname)
for f in files:
@ -566,11 +595,11 @@ class NetworkTools:
break
except:
if seed > maxportnum:
print "Automation Error: Could not find open port after checking 5000 ports"
self._logger.error("Automation Error: Could not find open port after checking 5000 ports")
raise
seed += 1
except:
print "Automation Error: Socket error trying to find open port"
self._logger.error("Automation Error: Socket error trying to find open port")
return seed
@ -629,7 +658,7 @@ class ZeroconfListener(object):
return
ip = m.group(1).replace("_", ".")
if self.hwid == hwid:
self.ip = ip
self.evt.set()

View File

@ -2,6 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import mozlog
import subprocess
from devicemanager import DeviceManager, DMError, _pop_last_line
import re
@ -29,7 +30,9 @@ class DeviceManagerADB(DeviceManager):
default_timeout = 300
def __init__(self, host=None, port=5555, retryLimit=5, packageName='fennec',
adbPath='adb', deviceSerial=None, deviceRoot=None, **kwargs):
adbPath='adb', deviceSerial=None, deviceRoot=None,
logLevel=mozlog.ERROR, **kwargs):
DeviceManager.__init__(self, logLevel)
self.host = host
self.port = port
self.retryLimit = retryLimit
@ -206,7 +209,7 @@ class DeviceManagerADB(DeviceManager):
if re.search("unzip: exiting", data) or re.search("Operation not permitted", data):
raise Exception("unzip failed, or permissions error")
except:
print "zip/unzip failure: falling back to normal push"
self._logger.info("zip/unzip failure: falling back to normal push")
self._useZip = False
self.pushDir(localDir, remoteDir, retryLimit=retryLimit)
else:
@ -335,7 +338,7 @@ class DeviceManagerADB(DeviceManager):
if uri != "":
acmd.append("-d")
acmd.append(''.join(['\'',uri, '\'']));
print acmd
self._logger.info(acmd)
self._checkCmd(acmd)
return outputFile
@ -429,7 +432,7 @@ class DeviceManagerADB(DeviceManager):
try:
self.mkDir(self.deviceRoot)
except:
print "Unable to create device root %s" % self.deviceRoot
self._logger.error("Unable to create device root %s" % self.deviceRoot)
raise
return
@ -512,7 +515,7 @@ class DeviceManagerADB(DeviceManager):
ret["process"] = self._runCmd(["shell", "ps"]).stdout.read()
if (directive == "systime" or directive == "all"):
ret["systime"] = self._runCmd(["shell", "date"]).stdout.read()
print ret
self._logger.info(ret)
return ret
def uninstallApp(self, appName, installPath=None):
@ -619,12 +622,12 @@ class DeviceManagerADB(DeviceManager):
self.chmodDir(remoteEntry)
else:
self._checkCmdAs(["shell", "chmod", mask, remoteEntry])
print "chmod " + remoteEntry
self._logger.info("chmod %s" % remoteEntry)
self._checkCmdAs(["shell", "chmod", mask, remoteDir])
print "chmod " + remoteDir
self._logger.info("chmod %s" % remoteDir)
else:
self._checkCmdAs(["shell", "chmod", mask, remoteDir.strip()])
print "chmod " + remoteDir.strip()
self._logger.info("chmod %s" % remoteDir.strip())
def _verifyADB(self):
"""
@ -690,7 +693,7 @@ class DeviceManagerADB(DeviceManager):
self._checkCmd(["push", tmpfile.name, tmpDir + "/tmpfile"])
self._checkCmd(["shell", "run-as", self._packageName, "dd", "if=" + tmpDir + "/tmpfile", "of=" + devroot + "/sanity/tmpfile"])
if (self.fileExists(devroot + "/sanity/tmpfile")):
print "will execute commands via run-as " + self._packageName
self._logger.info("will execute commands via run-as %s" % self._packageName)
self._useRunAs = True
self._checkCmd(["shell", "rm", devroot + "/tmp/tmpfile"])
self._checkCmd(["shell", "run-as", self._packageName, "rm", "-r", devroot + "/sanity"])
@ -743,7 +746,7 @@ class DeviceManagerADB(DeviceManager):
# optimization for large directories.
self._useZip = False
if (self._isUnzipAvailable() and self._isLocalZipAvailable()):
print "will use zip to push directories"
self._logger.info("will use zip to push directories")
self._useZip = True
else:
raise DMError("zip not available")

View File

@ -2,6 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import mozlog
import select
import socket
import time
@ -22,7 +23,6 @@ class DeviceManagerSUT(DeviceManager):
app must be present and listening for connections for this to work.
"""
debug = 2
_base_prompt = '$>'
_base_prompt_re = '\$\>'
_prompt_sep = '\x00'
@ -33,7 +33,9 @@ class DeviceManagerSUT(DeviceManager):
reboot_timeout = 600
reboot_settling_time = 60
def __init__(self, host, port = 20701, retryLimit = 5, deviceRoot = None, **kwargs):
def __init__(self, host, port = 20701, retryLimit = 5,
deviceRoot = None, logLevel = mozlog.ERROR, **kwargs):
DeviceManager.__init__(self, logLevel)
self.host = host
self.port = port
self.retryLimit = retryLimit
@ -130,13 +132,12 @@ class DeviceManagerSUT(DeviceManager):
# couldn't execute it). retry otherwise
if err.fatal:
raise err
if self.debug >= 4:
print err
self._logger.debug(err)
retries += 1
# if we lost the connection or failed to establish one, wait a bit
if retries < retryLimit and not self._sock:
sleep_time = 5 * retries
print 'Could not connect; sleeping for %d seconds.' % sleep_time
self._logger.info('Could not connect; sleeping for %d seconds.' % sleep_time)
time.sleep(sleep_time)
raise DMError("Remote Device Error: unable to connect to %s after %s attempts" % (self.host, retryLimit))
@ -162,8 +163,8 @@ class DeviceManagerSUT(DeviceManager):
if not self._sock:
try:
if self.debug >= 1 and self._everConnected:
print "reconnecting socket"
if self._everConnected:
self._logger.info("reconnecting socket")
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
self._sock = None
@ -202,13 +203,12 @@ class DeviceManagerSUT(DeviceManager):
raise DMError("Remote Device Error: we had %s bytes of data to send, but "
"only sent %s" % (len(cmd['data']), sent))
if self.debug >= 4:
print "sent cmd: " + str(cmd['cmd'])
self._logger.debug("sent cmd: %s" % cmd['cmd'])
except socket.error, msg:
self._sock.close()
self._sock = None
if self.debug >= 1:
print "Remote Device Error: Error sending data to socket. cmd="+str(cmd['cmd'])+"; err="+str(msg)
self._logger.error("Remote Device Error: Error sending data"\
" to socket. cmd=%s; err=%s" % (cmd['cmd'], msg))
return False
# Check if the command should close the socket
@ -226,16 +226,14 @@ class DeviceManagerSUT(DeviceManager):
socketClosed = False
errStr = ''
temp = ''
if self.debug >= 4:
print "recv'ing..."
self._logger.debug("recv'ing...")
# Get our response
try:
# Wait up to a second for socket to become ready for reading...
if select.select([self._sock], [], [], select_timeout)[0]:
temp = self._sock.recv(1024)
if self.debug >= 4:
print "response: " + str(temp)
self._logger.debug("response: %s" % temp)
timer = 0
if not temp:
socketClosed = True
@ -352,8 +350,7 @@ class DeviceManagerSUT(DeviceManager):
except OSError:
raise DMError("DeviceManager: Error reading file to push")
if (self.debug >= 3):
print "push returned: %s" % remoteHash
self._logger.debug("push returned: %s" % remoteHash)
localHash = self._getLocalHash(localname)
@ -367,8 +364,7 @@ class DeviceManagerSUT(DeviceManager):
def pushDir(self, localDir, remoteDir, retryLimit = None):
retryLimit = retryLimit or self.retryLimit
if (self.debug >= 2):
print "pushing directory: %s to %s" % (localDir, remoteDir)
self._logger.info("pushing directory: %s to %s" % (localDir, remoteDir))
existentDirectories = []
for root, dirs, files in os.walk(localDir, followlinks=True):
@ -418,8 +414,7 @@ class DeviceManagerSUT(DeviceManager):
return files
def removeFile(self, filename):
if (self.debug>= 2):
print "removing file: " + filename
self._logger.info("removing file: " + filename)
if self.fileExists(filename):
self._runCmds([{ 'cmd': 'rm ' + filename }])
@ -444,8 +439,8 @@ class DeviceManagerSUT(DeviceManager):
# unexpected format
raise ValueError
except ValueError:
print "ERROR: Unable to parse process list (bug 805969)"
print "Line: %s\nFull output of process list:\n%s" % (line, data)
self._logger.error("Unable to parse process list (bug 805969)")
self._logger.error("Line: %s\nFull output of process list:\n%s" % (line, data))
raise DMError("Invalid process line: %s" % line)
return processTuples
@ -461,11 +456,10 @@ class DeviceManagerSUT(DeviceManager):
if not appname:
raise DMError("Automation Error: fireProcess called with no command to run")
if (self.debug >= 2):
print "FIRE PROC: '" + appname + "'"
self._logger.info("FIRE PROC: '%s'" % appname)
if (self.processExist(appname) != None):
print "WARNING: process %s appears to be running already\n" % appname
self._logger.warn("process %s appears to be running already\n" % appname)
if (failIfRunning):
raise DMError("Automation Error: Process is already running")
@ -484,8 +478,7 @@ class DeviceManagerSUT(DeviceManager):
time.sleep(1)
waited += 1
if (self.debug >= 4):
print "got pid: %s for process: %s" % (pid, appname)
self._logger.debug("got pid: %s for process: %s" % (pid, appname))
return pid
def launchProcess(self, cmd, outputFile = "process.txt", cwd = '', env = '', failIfRunning=False):
@ -500,8 +493,7 @@ class DeviceManagerSUT(DeviceManager):
DEPRECATED: Use shell() or launchApplication() for new code
"""
if not cmd:
if (self.debug >= 1):
print "WARNING: launchProcess called without command to run"
self._logger.warn("launchProcess called without command to run")
return None
cmdline = subprocess.list2cmdline(cmd)
@ -521,7 +513,7 @@ class DeviceManagerSUT(DeviceManager):
def killProcess(self, appname, forceKill=False):
if forceKill:
print "WARNING: killProcess(): forceKill parameter unsupported on SUT"
self._logger.warn("killProcess(): forceKill parameter unsupported on SUT")
retries = 0
while retries < self.retryLimit:
try:
@ -530,10 +522,9 @@ class DeviceManagerSUT(DeviceManager):
return
except DMError, err:
retries +=1
print ("WARNING: try %d of %d failed to kill %s" %
self._logger.warn("try %d of %d failed to kill %s" %
(retries, self.retryLimit, appname))
if self.debug >= 4:
print err
self._logger.debug(err)
if retries >= self.retryLimit:
raise err
@ -549,7 +540,7 @@ class DeviceManagerSUT(DeviceManager):
def err(error_msg):
err_str = 'DeviceManager: pull unsuccessful: %s' % error_msg
print err_str
self._logger.error(err_str)
self._sock = None
raise DMError(err_str)
@ -608,8 +599,7 @@ class DeviceManagerSUT(DeviceManager):
metadata, sep, buf = read_until_char('\n', buf, 'could not find metadata')
if not metadata:
return None
if self.debug >= 3:
print 'metadata: %s' % metadata
self._logger.debug('metadata: %s' % metadata)
filename, sep, filesizestr = metadata.partition(',')
if sep == '':
@ -648,15 +638,13 @@ class DeviceManagerSUT(DeviceManager):
remoteFile)
def getDirectory(self, remoteDir, localDir, checkDir=True):
if (self.debug >= 2):
print "getting files in '" + remoteDir + "'"
self._logger.info("getting files in '%s'" % remoteDir)
if checkDir and not self.dirExists(remoteDir):
raise DMError("Automation Error: Error getting directory: %s not a directory" %
remoteDir)
filelist = self.listFiles(remoteDir)
if (self.debug >= 3):
print filelist
self._logger.debug(filelist)
if not os.path.exists(localDir):
os.makedirs(localDir)
@ -684,8 +672,7 @@ class DeviceManagerSUT(DeviceManager):
def _getRemoteHash(self, filename):
data = self._runCmds([{ 'cmd': 'hash ' + filename }]).strip()
if self.debug >= 3:
print "remote hash returned: '%s'" % data
self._logger.debug("remote hash returned: '%s'" % data)
return data
def getDeviceRoot(self):
@ -723,8 +710,7 @@ class DeviceManagerSUT(DeviceManager):
self._runCmds([{ 'cmd': 'unzp %s %s' % (filePath, destDir)}])
def _wait_for_reboot(self, host, port):
if self.debug >= 3:
print 'Creating server with %s:%d' % (host, port)
self._logger.debug('Creating server with %s:%d' % (host, port))
timeout_expires = time.time() + self.reboot_timeout
conn = None
data = ''
@ -752,15 +738,14 @@ class DeviceManagerSUT(DeviceManager):
# also up.
time.sleep(self.reboot_settling_time)
else:
print 'Automation Error: Timed out waiting for reboot callback.'
self._logger.error('Timed out waiting for reboot callback.')
s.close()
return data
def reboot(self, ipAddr=None, port=30000):
cmd = 'rebt'
if self.debug > 3:
print "INFO: sending rebt command"
self._logger.info("sending rebt command")
if ipAddr is not None:
# The update.info command tells the SUTAgent to send a TCP message
@ -778,8 +763,7 @@ class DeviceManagerSUT(DeviceManager):
if ipAddr is not None:
status = self._wait_for_reboot(ipAddr, port)
if self.debug > 3:
print "INFO: rebt- got status back: " + str(status)
self._logger.info("rebt- got status back: %s" % status)
def getInfo(self, directive=None):
data = None
@ -810,8 +794,7 @@ class DeviceManagerSUT(DeviceManager):
proclist.append(l.split('\t'))
result['process'] = proclist
if (self.debug >= 3):
print "results: " + str(result)
self._logger.debug("results: %s" % result)
return result
def installApp(self, appBundlePath, destPath=None):
@ -833,8 +816,7 @@ class DeviceManagerSUT(DeviceManager):
data = self._runCmds([{ 'cmd': cmd }])
status = data.split('\n')[0].strip()
if self.debug > 3:
print "uninstallApp: '%s'" % status
self._logger.debug("uninstallApp: '%s'" % status)
if status == 'Success':
return
raise DMError("Remote Device Error: uninstall failed for %s" % appName)
@ -845,8 +827,7 @@ class DeviceManagerSUT(DeviceManager):
cmd += ' ' + installPath
data = self._runCmds([{ 'cmd': cmd }])
if (self.debug > 3):
print "uninstallAppAndReboot: " + str(data)
self._logger.debug("uninstallAppAndReboot: %s" % data)
return
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
@ -865,16 +846,14 @@ class DeviceManagerSUT(DeviceManager):
ip, port = self._getCallbackIpAndPort(ipAddr, port)
cmd += " %s %s" % (ip, port)
if self.debug >= 3:
print "INFO: updateApp using command: " + str(cmd)
self._logger.debug("updateApp using command: " % cmd)
status = self._runCmds([{'cmd': cmd}])
if ipAddr is not None:
status = self._wait_for_reboot(ip, port)
if self.debug >= 3:
print "INFO: updateApp: got status back: %s" + str(status)
self._logger.debug("updateApp: got status back: %s" % status)
def getCurrentTime(self):
return self._runCmds([{ 'cmd': 'clok' }]).strip()
@ -922,14 +901,12 @@ class DeviceManagerSUT(DeviceManager):
supported resolutions: 640x480, 800x600, 1024x768, 1152x864, 1200x1024, 1440x900, 1680x1050, 1920x1080
"""
if self.getInfo('os')['os'][0].split()[0] != 'harmony-eng':
if (self.debug >= 2):
print "WARNING: unable to adjust screen resolution on non Tegra device"
self._logger.warn("unable to adjust screen resolution on non Tegra device")
return False
results = self.getInfo('screen')
parts = results['screen'][0].split(':')
if (self.debug >= 3):
print "INFO: we have a current resolution of %s, %s" % (parts[1].split()[0], parts[2].split()[0])
self._logger.debug("we have a current resolution of %s, %s" % (parts[1].split()[0], parts[2].split()[0]))
#verify screen type is valid, and set it to the proper value (https://bugzilla.mozilla.org/show_bug.cgi?id=632895#c4)
screentype = -1
@ -950,8 +927,7 @@ class DeviceManagerSUT(DeviceManager):
if (height < 100 or height > 9999):
return False
if (self.debug >= 3):
print "INFO: adjusting screen resolution to %s, %s and rebooting" % (width, height)
self._logger.debug("adjusting screen resolution to %s, %s and rebooting" % (width, height))
self._runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.width %s" % (screentype, width) }])
self._runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.height %s" % (screentype, height) }])

View File

@ -11,211 +11,191 @@ import os
import posixpath
import StringIO
import sys
import textwrap
import mozdevice
from optparse import OptionParser
import mozlog
import argparse
class DMCli(object):
def __init__(self):
# a value of None for 'max_args' means there is no limit to the number
# of arguments. 'min_args' should always have an integer value >= 0.
self.commands = { 'install': { 'function': self.install,
'min_args': 1,
'max_args': 1,
'help_args': '<file>',
'args': [ { 'name': 'file', 'nargs': None } ],
'help': 'push this package file to the device and install it' },
'uninstall': { 'function': lambda a: self.dm.uninstallApp(a),
'min_args': 1,
'max_args': 1,
'help_args': '<packagename>',
'uninstall': { 'function': self.uninstall,
'args': [ { 'name': 'packagename', 'nargs': None } ],
'help': 'uninstall the named app from the device' },
'killapp': { 'function': self.killapp,
'min_args': 1,
'max_args': 1,
'help_args': '<process name>',
'help': 'kills any processes with a particular name on device' },
'killapp': { 'function': self.kill,
'args': [ { 'name': 'process_name', 'nargs': '*' } ],
'help': 'kills any processes with name(s) on device' },
'launchapp': { 'function': self.launchapp,
'min_args': 4,
'max_args': 4,
'help_args': '<appname> <activity name> <intent> <URL>',
'help': 'launches application on device' },
'args': [ { 'name': 'appname', 'nargs': None },
{ 'name': 'activity_name',
'nargs': None },
{ 'name': '--intent',
'action': 'store',
'default': 'android.intent.action.VIEW' },
{ 'name': '--url',
'action': 'store' }
],
'help': 'launches application on device' },
'push': { 'function': self.push,
'min_args': 2,
'max_args': 2,
'help_args': '<local> <remote>',
'args': [ { 'name': 'local_file', 'nargs': None },
{ 'name': 'remote_file', 'nargs': None }
],
'help': 'copy file/dir to device' },
'pull': { 'function': self.pull,
'min_args': 1,
'max_args': 2,
'help_args': '<local> [remote]',
'args': [ { 'name': 'local_file', 'nargs': None },
{ 'name': 'remote_file', 'nargs': '?' } ],
'help': 'copy file/dir from device' },
'shell': { 'function': self.shell,
'min_args': 1,
'max_args': None,
'help_args': '<command>',
'help': 'run shell command on device' },
'args': [ { 'name': 'command', 'nargs': argparse.REMAINDER } ],
'help': 'run shell command on device' },
'info': { 'function': self.getinfo,
'min_args': 0,
'max_args': 1,
'help_args': '[os|id|uptime|systime|screen|memory|processes]',
'help': 'get information on a specified '
'args': [ { 'name': 'directive', 'nargs': '?' } ],
'help': 'get information on specified '
'aspect of the device (if no argument '
'given, print all available information)'
},
'ps': { 'function': self.processlist,
'min_args': 0,
'max_args': 0,
'help_args': '',
'help': 'get information on running processes on device'
'help': 'get information on running processes on device'
},
'logcat' : { 'function': self.logcat,
'min_args': 0,
'max_args': 0,
'help_args': '',
'help': 'get logcat from device'
},
'ls': { 'function': self.listfiles,
'min_args': 1,
'max_args': 1,
'help_args': '<remote>',
'args': [ { 'name': 'remote_dir', 'nargs': None } ],
'help': 'list files on device'
},
'rm': { 'function': lambda f: self.dm.removeFile(f),
'min_args': 1,
'max_args': 1,
'help_args': '<remote>',
'help': 'remove file from device'
'rm': { 'function': self.removefile,
'args': [ { 'name': 'remote_file', 'nargs': None } ],
'help': 'remove file from device'
},
'isdir': { 'function': self.isdir,
'min_args': 1,
'max_args': 1,
'help_args': '<remote>',
'args': [ { 'name': 'remote_dir', 'nargs': None } ],
'help': 'print if remote file is a directory'
},
'mkdir': { 'function': lambda d: self.dm.mkDir(d),
'min_args': 1,
'max_args': 1,
'help_args': '<remote>',
'mkdir': { 'function': self.mkdir,
'args': [ { 'name': 'remote_dir', 'nargs': None } ],
'help': 'makes a directory on device'
},
'rmdir': { 'function': lambda d: self.dm.removeDir(d),
'min_args': 1,
'max_args': 1,
'help_args': '<remote>',
'help': 'recursively remove directory from device'
'rmdir': { 'function': self.rmdir,
'args': [ { 'name': 'remote_dir', 'nargs': None } ],
'help': 'recursively remove directory from device'
},
'screencap': { 'function': lambda f: self.dm.saveScreenshot(f),
'min_args': 1,
'max_args': 1,
'help_args': '<png file>',
'help': 'capture screenshot of device in action'
},
'screencap': { 'function': self.screencap,
'args': [ { 'name': 'png_file', 'nargs': None } ],
'help': 'capture screenshot of device in action'
},
'sutver': { 'function': self.sutver,
'min_args': 0,
'max_args': 0,
'help_args': '',
'help': 'SUTAgent\'s product name and version (SUT only)'
},
'clearlogcat': { 'function': self.clearlogcat,
'help': 'clear the logcat'
},
'reboot': { 'function': self.reboot,
'help': 'reboot the device'
},
'isfile': { 'function': self.isfile,
'args': [ { 'name': 'remote_file', 'nargs': None } ],
'help': 'check whether a file exists on the device'
},
'launchfennec': { 'function': self.launchfennec,
'args': [ { 'name': 'appname', 'nargs': None },
{ 'name': '--intent', 'action': 'store',
'default': 'android.intent.action.VIEW' },
{ 'name': '--url', 'action': 'store' },
{ 'name': '--extra-args', 'action': 'store' },
{ 'name': '--mozenv', 'action': 'store' } ],
'help': 'launch fennec'
},
'getip': { 'function': self.getip,
'args': [ { 'name': 'interface', 'nargs': '*' } ],
'help': 'get the ip address of the device'
}
}
usage = "usage: %prog [options] <command> [<args>]\n\ndevice commands:\n"
usage += "\n".join([textwrap.fill("%s %s - %s" %
(cmdname, cmd['help_args'],
cmd['help']),
initial_indent=" ",
subsequent_indent=" ")
for (cmdname, cmd) in
sorted(self.commands.iteritems())])
self.parser = OptionParser(usage)
self.parser = argparse.ArgumentParser()
self.add_options(self.parser)
self.add_commands(self.parser)
def run(self, args=sys.argv[1:]):
(self.options, self.args) = self.parser.parse_args(args)
args = self.parser.parse_args()
if len(self.args) < 1:
self.parser.error("must specify command")
if self.options.dmtype == "sut" and not self.options.host and \
not self.options.hwid:
if args.dmtype == "sut" and not args.host and not args.hwid:
self.parser.error("Must specify device ip in TEST_DEVICE or "
"with --host option with SUT")
(command_name, command_args) = (self.args[0], self.args[1:])
if command_name not in self.commands:
self.parser.error("Invalid command. Valid commands: %s" %
" ".join(self.commands.keys()))
self.dm = self.getDevice(dmtype=args.dmtype, hwid=args.hwid,
host=args.host, port=args.port,
verbose=args.verbose)
command = self.commands[command_name]
if (len(command_args) < command['min_args'] or
(command['max_args'] is not None and len(command_args) >
command['max_args'])):
self.parser.error("Wrong number of arguments")
self.dm = self.getDevice(dmtype=self.options.dmtype,
hwid=self.options.hwid,
host=self.options.host,
port=self.options.port)
ret = command['function'](*command_args)
ret = args.func(args)
if ret is None:
ret = 0
sys.exit(ret)
def add_options(self, parser):
parser.add_option("-v", "--verbose", action="store_true",
dest="verbose",
help="Verbose output from DeviceManager",
default=False)
parser.add_option("--host", action="store",
type="string", dest="host",
help="Device hostname (only if using TCP/IP)",
default=os.environ.get('TEST_DEVICE'))
parser.add_option("-p", "--port", action="store",
type="int", dest="port",
help="Custom device port (if using SUTAgent or "
"adb-over-tcp)", default=None)
parser.add_option("-m", "--dmtype", action="store",
type="string", dest="dmtype",
help="DeviceManager type (adb or sut, defaults " \
"to adb)", default=os.environ.get('DM_TRANS',
'adb'))
parser.add_option("-d", "--hwid", action="store",
type="string", dest="hwid",
help="HWID", default=None)
parser.add_option("--package-name", action="store",
type="string", dest="packagename",
help="Packagename (if using DeviceManagerADB)",
default=None)
parser.add_argument("-v", "--verbose", action="store_true",
help="Verbose output from DeviceManager",
default=False)
parser.add_argument("--host", action="store",
help="Device hostname (only if using TCP/IP)",
default=os.environ.get('TEST_DEVICE'))
parser.add_argument("-p", "--port", action="store",
type=int,
help="Custom device port (if using SUTAgent or "
"adb-over-tcp)", default=None)
parser.add_argument("-m", "--dmtype", action="store",
help="DeviceManager type (adb or sut, defaults " \
"to adb)", default=os.environ.get('DM_TRANS',
'adb'))
parser.add_argument("-d", "--hwid", action="store",
help="HWID", default=None)
parser.add_argument("--package-name", action="store",
help="Packagename (if using DeviceManagerADB)",
default=None)
def getDevice(self, dmtype="adb", hwid=None, host=None, port=None):
def add_commands(self, parser):
subparsers = parser.add_subparsers(title="Commands", metavar="<command>")
for (commandname, commandprops) in sorted(self.commands.iteritems()):
subparser = subparsers.add_parser(commandname, help=commandprops['help'])
if commandprops.get('args'):
for arg in commandprops['args']:
subparser.add_argument(arg['name'], nargs=arg.get('nargs'),
action=arg.get('action'))
subparser.set_defaults(func=commandprops['function'])
def getDevice(self, dmtype="adb", hwid=None, host=None, port=None,
packagename=None, verbose=False):
'''
Returns a device with the specified parameters
'''
if self.options.verbose:
mozdevice.DroidSUT.debug = 4
logLevel = mozlog.ERROR
if verbose:
logLevel = mozlog.DEBUG
if hwid:
return mozdevice.DroidConnectByHWID(hwid)
return mozdevice.DroidConnectByHWID(hwid, logLevel=logLevel)
if dmtype == "adb":
if host and not port:
port = 5555
return mozdevice.DroidADB(packageName=self.options.packagename,
host=host, port=port)
return mozdevice.DroidADB(packageName=packagename,
host=host, port=port,
logLevel=logLevel)
elif dmtype == "sut":
if not host:
self.parser.error("Must specify host with SUT!")
if not port:
port = 20701
return mozdevice.DroidSUT(host=host, port=port)
return mozdevice.DroidSUT(host=host, port=port,
logLevel=logLevel)
else:
self.parser.error("Unknown device manager type: %s" % type)
def push(self, src, dest):
def push(self, args):
(src, dest) = (args.local_file, args.remote_file)
if os.path.isdir(src):
self.dm.pushDir(src, dest)
else:
@ -225,7 +205,8 @@ class DMCli(object):
dest = posixpath.join(dest, os.path.basename(src))
self.dm.pushFile(src, dest)
def pull(self, src, dest=None):
def pull(self, args):
(src, dest) = (args.local_file, args.remote_file)
if not self.dm.fileExists(src):
print 'No such file or directory'
return
@ -236,69 +217,106 @@ class DMCli(object):
else:
self.dm.getFile(src, dest)
def install(self, apkfile):
basename = os.path.basename(apkfile)
def install(self, args):
basename = os.path.basename(args.file)
app_path_on_device = posixpath.join(self.dm.getDeviceRoot(),
basename)
self.dm.pushFile(apkfile, app_path_on_device)
self.dm.pushFile(args.file, app_path_on_device)
self.dm.installApp(app_path_on_device)
def launchapp(self, appname, activity, intent, url):
self.dm.launchApplication(appname, activity, intent, url)
def uninstall(self, args):
self.dm.uninstallApp(args.packagename)
def killapp(self, *args):
for appname in args:
self.dm.killProcess(appname)
def launchapp(self, args):
self.dm.launchApplication(args.appname, args.activity_name,
args.intent, args.url)
def shell(self, *args):
def kill(self, args):
for name in args.process_name:
self.dm.killProcess(name)
def shell(self, args, root=False):
buf = StringIO.StringIO()
self.dm.shell(args, buf)
self.dm.shell(args.command, buf, root=root)
print str(buf.getvalue()[0:-1]).rstrip()
def getinfo(self, *args):
directive=None
if args:
directive=args[0]
info = self.dm.getInfo(directive=directive)
def getinfo(self, args):
info = self.dm.getInfo(directive=args.directive)
for (infokey, infoitem) in sorted(info.iteritems()):
if infokey == "process":
pass # skip process list: get that through ps
elif not directive and not infoitem:
elif not args.directive and not infoitem:
print "%s:" % infokey.upper()
elif not directive:
elif not args.directive:
for line in infoitem:
print "%s: %s" % (infokey.upper(), line)
else:
print "%s" % "\n".join(infoitem)
def logcat(self):
def logcat(self, args):
print ''.join(self.dm.getLogcat())
def processlist(self):
def clearlogcat(self, args):
self.dm.recordLogcat()
def reboot(self, args):
self.dm.reboot()
def processlist(self, args):
pslist = self.dm.getProcessList()
for ps in pslist:
print " ".join(str(i) for i in ps)
def listfiles(self, dir):
filelist = self.dm.listFiles(dir)
def listfiles(self, args):
filelist = self.dm.listFiles(args.remote_dir)
for file in filelist:
print file
def isdir(self, file):
if self.dm.dirExists(file):
def removefile(self, args):
self.dm.removeFile(args.remote_file)
def isdir(self, args):
if self.dm.dirExists(args.remote_dir):
print "TRUE"
return 0
return
print "FALSE"
return errno.ENOTDIR
def sutver(self):
if self.options.dmtype == 'sut':
def mkdir(self, args):
self.dm.mkDir(args.remote_dir)
def rmdir(self, args):
self.dm.removeDir(args.remote_dir)
def screencap(self, args):
self.dm.saveScreenshot(args.png_file)
def sutver(self, args):
if args.dmtype == 'sut':
print '%s Version %s' % (self.dm.agentProductName,
self.dm.agentVersion)
else:
print 'Must use SUT transport to get SUT version.'
def isfile(self, args):
if self.dm.fileExists(args.remote_file):
print "TRUE"
return
print "FALSE"
return errno.ENOENT
def launchfennec(self, args):
self.dm.launchFennec(args.appname, intent=args.intent,
mozEnv=args.mozenv,
extraArgs=args.extra_args, url=args.url)
def getip(self, args):
if args.interface:
print(self.dm.getIP(args.interface))
else:
print(self.dm.getIP())
def cli(args=sys.argv[1:]):
# process the command line
cli = DMCli()

View File

@ -100,7 +100,7 @@ class DroidSUT(DeviceManagerSUT, DroidMixin):
# a different process than the one that started the app. In this case,
# we need to get back the original user serial number and then pass
# that to the 'am start' command line
if not hasattr(self, 'userSerial'):
if not hasattr(self, '_userSerial'):
infoDict = self.getInfo(directive="sutuserinfo")
if infoDict.get('sutuserinfo') and \
len(infoDict['sutuserinfo']) > 0:
@ -108,12 +108,14 @@ class DroidSUT(DeviceManagerSUT, DroidMixin):
# user serial always an integer, see: http://developer.android.com/reference/android/os/UserManager.html#getSerialNumberForUser%28android.os.UserHandle%29
m = re.match('User Serial:([0-9]+)', userSerialString)
if m:
self.userSerial = m.group(1)
self._userSerial = m.group(1)
else:
self.userSerial = None
self._userSerial = None
else:
self._userSerial = None
if self.userSerial is not None:
return [ "--user", self.userSerial ]
if self._userSerial is not None:
return [ "--user", self._userSerial ]
return []

View File

@ -4,7 +4,7 @@
from setuptools import setup
PACKAGE_VERSION = '0.21'
PACKAGE_VERSION = '0.22'
setup(name='mozdevice',
version=PACKAGE_VERSION,
@ -19,7 +19,7 @@ setup(name='mozdevice',
packages=['mozdevice'],
include_package_data=True,
zip_safe=False,
install_requires=[],
install_requires=['mozlog'],
entry_points="""
# -*- Entry points: -*-
[console_scripts]

View File

@ -0,0 +1,35 @@
from sut import MockAgent
import mozdevice
import mozlog
import unittest
class LaunchTest(unittest.TestCase):
def test_nouserserial(self):
a = MockAgent(self, commands = [("ps",
"10029 549 com.android.launcher\n"
"10066 1198 com.twitter.android"),
("info sutuserinfo", ""),
("exec am start -W -n "
"org.mozilla.fennec/.App -a "
"android.intent.action.VIEW",
"OK\nreturn code [0]")])
d = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
d.launchFennec("org.mozilla.fennec")
a.wait()
def test_userserial(self):
a = MockAgent(self, commands = [("ps",
"10029 549 com.android.launcher\n"
"10066 1198 com.twitter.android"),
("info sutuserinfo", "User Serial:0"),
("exec am start --user 0 -W -n "
"org.mozilla.fennec/.App -a "
"android.intent.action.VIEW",
"OK\nreturn code [0]")])
d = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
d.launchFennec("org.mozilla.fennec")
a.wait()
if __name__ == '__main__':
unittest.main()

View File

@ -1,5 +1,9 @@
[DEFAULT]
skip-if = os == 'win'
[sut_basic.py]
[sut_mkdir.py]
[sut_push.py]
[sut_pull.py]
[sut_ps.py]
[droidsut_launch.py]

View File

@ -1,5 +1,6 @@
from sut import MockAgent
import mozdevice
import mozlog
import unittest
class BasicTest(unittest.TestCase):
@ -8,8 +9,7 @@ class BasicTest(unittest.TestCase):
"""Tests DeviceManager initialization."""
a = MockAgent(self)
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
# all testing done in device's constructor
a.wait()
@ -21,8 +21,7 @@ class BasicTest(unittest.TestCase):
("mkdr /mnt/sdcard/tests", "/mnt/sdcard/tests successfully created"),
("ver", "SUTAgentAndroid Version 1.14")]
a = MockAgent(self, start_commands = cmds)
mozdevice.DroidSUT.debug = 4
dm = mozdevice.DroidSUT("127.0.0.1", port=a.port)
dm = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
a.wait()
def test_timeout_normal(self):
@ -32,8 +31,7 @@ class BasicTest(unittest.TestCase):
("ls", "test.txt"),
("rm /mnt/sdcard/tests/test.txt",
"Removed the file")])
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
ret = d.removeFile('/mnt/sdcard/tests/test.txt')
self.assertEqual(ret, None) # if we didn't throw an exception, we're ok
a.wait()
@ -44,8 +42,7 @@ class BasicTest(unittest.TestCase):
("cd /mnt/sdcard/tests", ""),
("ls", "test.txt"),
("rm /mnt/sdcard/tests/test.txt", 0)])
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port, logLevel=mozlog.DEBUG)
d.default_timeout = 1
exceptionThrown = False
try:

View File

@ -2,6 +2,7 @@
# http://creativecommons.org/publicdomain/zero/1.0/
import mozdevice
import mozlog
import unittest
from sut import MockAgent
@ -31,8 +32,8 @@ class MkDirsTest(unittest.TestCase):
exceptionThrown = False
try:
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT('127.0.0.1', port=a.port)
d = mozdevice.DroidSUT('127.0.0.1', port=a.port,
logLevel=mozlog.DEBUG)
d.mkDirs('/mnt/sdcard/baz/boop/bip')
except mozdevice.DMError:
exceptionThrown = True
@ -53,8 +54,8 @@ class MkDirsTest(unittest.TestCase):
('mkdr /mnt/sdcard/foo',
'/mnt/sdcard/foo successfully created')]
a = MockAgent(self, commands=cmds)
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT('127.0.0.1', port=a.port)
d = mozdevice.DroidSUT('127.0.0.1', port=a.port,
logLevel=mozlog.DEBUG)
d.mkDirs('/mnt/sdcard/foo/foo')
a.wait()

View File

@ -1,5 +1,6 @@
from sut import MockAgent
import mozdevice
import mozlog
import unittest
import hashlib
import tempfile
@ -15,14 +16,14 @@ class PullTest(unittest.TestCase):
# pull file is kind of gross, make sure we can still execute commands after it's done
remoteName = "/mnt/sdcard/cheeseburgers"
mozdevice.DroidSUT.debug = 4
a = MockAgent(self, commands = [("pull %s" % remoteName,
"%s,%s\n%s" % (remoteName,
len(cheeseburgers),
cheeseburgers)),
("isdir /mnt/sdcard", "TRUE")])
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port,
logLevel=mozlog.DEBUG)
pulledData = d.pullFile("/mnt/sdcard/cheeseburgers")
self.assertEqual(pulledData, cheeseburgers)
d.dirExists('/mnt/sdcard')
@ -35,7 +36,8 @@ class PullTest(unittest.TestCase):
a = MockAgent(self, commands = [("pull %s" % remoteName,
"%s,15\n%s" % (remoteName,
"cheeseburgh"))])
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port,
logLevel=mozlog.DEBUG)
exceptionThrown = False
try:
d.pullFile("/mnt/sdcard/cheeseburgers")

View File

@ -1,5 +1,6 @@
from sut import MockAgent
import mozdevice
import mozlog
import unittest
import hashlib
import tempfile
@ -68,8 +69,8 @@ class PushTest(unittest.TestCase):
exceptionThrown = False
try:
mozdevice.DroidSUT.debug = 4
d = mozdevice.DroidSUT("127.0.0.1", port=a.port)
d = mozdevice.DroidSUT("127.0.0.1", port=a.port,
logLevel=mozlog.DEBUG)
d.pushDir(tempdir, "/mnt/sdcard")
except mozdevice.DMError, e:
exceptionThrown = True

View File

@ -103,14 +103,14 @@ class AddonManager(object):
"""
Returns a dictionary of details about the addon.
:param addon_path: path to the addon directory
:param addon_path: path to the add-on directory or XPI
Returns::
{'id': u'rainbow@colors.org', # id of the addon
'version': u'1.4', # version of the addon
'name': u'Rainbow', # name of the addon
'unpack': False } # whether to unpack the addon
'unpack': False } # whether to unpack the addon
"""
# TODO: We don't use the unpack variable yet, but we should: bug 662683
@ -140,7 +140,15 @@ class AddonManager(object):
rc.append(node.data)
return ''.join(rc).strip()
doc = minidom.parse(os.path.join(addon_path, 'install.rdf'))
if zipfile.is_zipfile(addon_path):
compressed_file = zipfile.ZipFile(addon_path, 'r')
try:
parseable = compressed_file.read('install.rdf')
doc = minidom.parseString(parseable)
finally:
compressed_file.close()
else:
doc = minidom.parse(os.path.join(addon_path, 'install.rdf'))
# Get the namespaces abbreviations
em = get_namespace_id(doc, "http://www.mozilla.org/2004/em-rdf#")

View File

@ -21,6 +21,11 @@ except ImportError:
from pysqlite2 import dbapi2 as sqlite3
import urlparse
# http://hg.mozilla.org/mozilla-central/file/b871dfb2186f/build/automation.py.in#l28
_DEFAULT_PORTS = { 'http': '8888',
'https': '4443',
'ws': '9988',
'wss': '4443' }
class LocationError(Exception):
"""Signifies an improperly formed location."""
@ -58,7 +63,7 @@ class BadPortLocationError(LocationError):
def __init__(self, given_port):
LocationError.__init__(self, "bad value for port: %s" % given_port)
class LocationsSyntaxError(Exception):
"""Signifies a syntax error on a particular line in server-locations.txt."""
@ -182,11 +187,7 @@ class ServerLocations(object):
host, port = netloc.rsplit(':', 1)
except ValueError:
host = netloc
default_ports = {'http': '80',
'https': '443',
'ws': '443',
'wss': '443'}
port = default_ports.get(scheme, '80')
port = _DEFAULT_PORTS.get(scheme, '80')
try:
location = Location(scheme, host, port, options)
@ -232,7 +233,7 @@ class Permissions(object):
# Open database and create table
permDB = sqlite3.connect(os.path.join(self._profileDir, "permissions.sqlite"))
cursor = permDB.cursor();
cursor.execute("PRAGMA schema_version = 3;")
# SQL copied from
# http://mxr.mozilla.org/mozilla-central/source/extensions/cookie/nsPermissionManager.cpp
cursor.execute("""CREATE TABLE IF NOT EXISTS moz_hosts (
@ -252,15 +253,27 @@ class Permissions(object):
permission_type = 1
else:
permission_type = 2
cursor.execute("INSERT INTO moz_hosts values(?, ?, ?, ?, 0, 0)",
rows = cursor.execute("PRAGMA table_info(moz_hosts)")
count = len(rows.fetchall())
# if the db contains 8 columns, we're using user_version 3
if count == 8:
statement = "INSERT INTO moz_hosts values(?, ?, ?, ?, 0, 0, 0, 0)"
cursor.execute("PRAGMA user_version=3;")
else:
statement = "INSERT INTO moz_hosts values(?, ?, ?, ?, 0, 0)"
cursor.execute("PRAGMA user_version=2;")
cursor.execute(statement,
(self._num_permissions, location.host, perm,
permission_type))
permission_type))
# Commit and close
permDB.commit()
cursor.close()
def network_prefs(self, proxy=False):
def network_prefs(self, proxy=None):
"""
take known locations and generate preferences to handle permissions and proxy
returns a tuple of prefs, user_prefs
@ -277,30 +290,33 @@ class Permissions(object):
prefs.append(("capability.principal.codebase.p%s.subjectName" % i, ""))
if proxy:
user_prefs = self.pac_prefs()
user_prefs = self.pac_prefs(proxy)
else:
user_prefs = []
return prefs, user_prefs
def pac_prefs(self):
def pac_prefs(self, user_proxy=None):
"""
return preferences for Proxy Auto Config. originally taken from
http://mxr.mozilla.org/mozilla-central/source/build/automation.py.in
"""
prefs = []
proxy = _DEFAULT_PORTS
# We need to proxy every server but the primary one.
origins = ["'%s'" % l.url()
for l in self._locations]
origins = ", ".join(origins)
proxy["origins"] = origins
for l in self._locations:
if "primary" in l.options:
webServer = l.host
port = l.port
proxy["remote"] = l.host
proxy[l.scheme] = l.port
# overwrite defaults with user specified proxy
if isinstance(user_proxy, dict):
proxy.update(user_proxy)
# TODO: this should live in a template!
# TODO: So changing the 5th line of the regex below from (\\\\\\\\d+)
@ -335,14 +351,15 @@ function FindProxyForURL(url, host)
var origin = matches[1] + '://' + matches[2] + ':' + matches[3];
if (origins.indexOf(origin) < 0)
return 'DIRECT';
if (isHttp || isHttps || isWebSocket || isWebSocketSSL)
return 'PROXY %(remote)s:%(port)s';
if (isHttp) return 'PROXY %(remote)s:%(http)s';
if (isHttps) return 'PROXY %(remote)s:%(https)s';
if (isWebSocket) return 'PROXY %(remote)s:%(ws)s';
if (isWebSocketSSL) return 'PROXY %(remote)s:%(wss)s';
return 'DIRECT';
}""" % { "origins": origins,
"remote": webServer,
"port": port }
}""" % proxy
pacURL = "".join(pacURL.splitlines())
prefs = []
prefs.append(("network.proxy.type", 2))
prefs.append(("network.proxy.autoconfig_url", pacURL))

View File

@ -45,7 +45,7 @@ class Preferences(object):
def add_file(self, path):
"""a preferences from a file
:param path:
"""
self.add(self.read(path))
@ -203,26 +203,26 @@ class Preferences(object):
return retval
@classmethod
def write(cls, _file, prefs, pref_string='user_pref("%s", %s);'):
def write(cls, _file, prefs, pref_string='user_pref(%s, %s);'):
"""write preferences to a file"""
if isinstance(_file, basestring):
f = file(_file, 'w')
f = file(_file, 'a')
else:
f = _file
if isinstance(prefs, dict):
# order doesn't matter
prefs = prefs.items()
for key, value in prefs:
if value is True:
print >> f, pref_string % (key, 'true')
elif value is False:
print >> f, pref_string % (key, 'false')
elif isinstance(value, basestring):
print >> f, pref_string % (key, repr(str(value)))
else:
print >> f, pref_string % (key, value) # should be numeric!
# serialize -> JSON
_prefs = [(json.dumps(k), json.dumps(v) )
for k, v in prefs]
# write the preferences
for _pref in _prefs:
print >> f, pref_string % _pref
# close the file if opened internally
if isinstance(_file, basestring):
f.close()

View File

@ -11,6 +11,7 @@ import types
import uuid
from addons import AddonManager
from permissions import Permissions
from prefs import Preferences
from shutil import copytree, rmtree
from webapps import WebappCollection
@ -134,9 +135,8 @@ class Profile(object):
return c
def create_new_profile(self):
"""Create a new clean profile in tmp which is a simple empty folder"""
profile = tempfile.mkdtemp(suffix='.mozrunner')
return profile
"""Create a new clean temporary profile which is a simple empty folder"""
return tempfile.mkdtemp(suffix='.mozrunner')
### methods for preferences
@ -153,18 +153,15 @@ class Profile(object):
# note what files we've touched
self.written_prefs.add(filename)
if isinstance(preferences, dict):
# order doesn't matter
preferences = preferences.items()
# opening delimeter
f.write('\n%s\n' % self.delimeters[0])
# write the preferences
f.write('\n%s\n' % self.delimeters[0])
_prefs = [(json.dumps(k), json.dumps(v) )
for k, v in preferences]
for _pref in _prefs:
f.write('user_pref(%s, %s);\n' % _pref)
Preferences.write(f, preferences)
# closing delimeter
f.write('%s\n' % self.delimeters[1])
f.close()
def pop_preferences(self, filename):

View File

@ -5,7 +5,7 @@
import sys
from setuptools import setup
PACKAGE_VERSION = '0.5'
PACKAGE_VERSION = '0.7'
# we only support python 2 right now
assert sys.version_info[0] == 2

View File

@ -6,16 +6,20 @@ import unittest
import shutil
from mozprofile import addons
here = os.path.dirname(os.path.abspath(__file__))
class AddonIDTest(unittest.TestCase):
""" Test finding the addon id in a variety of install.rdf styles """
def make_install_rdf(self, filecontents):
path = tempfile.mkdtemp()
f = open(os.path.join(path, "install.rdf"), "w")
f.write(filecontents)
f.close()
return path
def test_addonID(self):
testlist = self.get_test_list()
for t in testlist:
@ -23,10 +27,15 @@ class AddonIDTest(unittest.TestCase):
p = self.make_install_rdf(t)
a = addons.AddonManager(os.path.join(p, "profile"))
addon_id = a.addon_details(p)['id']
self.assertTrue(addon_id == "winning", "We got the addon id")
self.assertEqual(addon_id, "winning", "We got the addon id")
finally:
shutil.rmtree(p)
def test_addonID_xpi(self):
a = addons.AddonManager("profile")
addon = a.addon_details(os.path.join(here, "addons", "empty.xpi"))
self.assertEqual(addon['id'], "test-empty@quality.mozilla.org", "We got the addon id")
def get_test_list(self):
""" This just returns a hardcoded list of install.rdf snippets for testing.
When adding snippets for testing, remember that the id we're looking for

View File

@ -16,5 +16,5 @@
<em:maxVersion>*</em:maxVersion>
</Description>
</em:targetApplication>
</Description>
</Description>
</RDF>

View File

@ -6,8 +6,10 @@ import shutil
import tempfile
import unittest
here = os.path.dirname(os.path.abspath(__file__))
class Bug758250(unittest.TestCase):
"""
use of --profile in mozrunner just blows away addon sources:
@ -17,7 +19,7 @@ class Bug758250(unittest.TestCase):
def test_profile_addon_cleanup(self):
# sanity check: the empty addon should be here
empty = os.path.join(here, 'empty')
empty = os.path.join(here, 'addons', 'empty')
self.assertTrue(os.path.exists(empty))
self.assertTrue(os.path.isdir(empty))
self.assertTrue(os.path.exists(os.path.join(empty, 'install.rdf')))

View File

@ -41,7 +41,7 @@ http://127.0.0.1:8888 privileged
perms_db_filename = os.path.join(self.profile_dir, 'permissions.sqlite')
perms.write_db(self.locations_file)
stmt = 'PRAGMA schema_version;'
stmt = 'PRAGMA user_version;'
con = sqlite3.connect(perms_db_filename)
cur = con.cursor()
@ -49,7 +49,7 @@ http://127.0.0.1:8888 privileged
entries = cur.fetchall()
schema_version = entries[0][0]
self.assertEqual(schema_version, 3)
self.assertEqual(schema_version, 2)
if __name__ == '__main__':
unittest.main()

View File

@ -36,7 +36,37 @@ http://127.0.0.1:8888 privileged
if self.locations_file:
self.locations_file.close()
def test_permissions_db(self):
def write_perm_db(self, version=3):
permDB = sqlite3.connect(os.path.join(self.profile_dir, "permissions.sqlite"))
cursor = permDB.cursor()
cursor.execute("PRAGMA user_version=%d;" % version)
if version == 3:
cursor.execute("""CREATE TABLE IF NOT EXISTS moz_hosts (
id INTEGER PRIMARY KEY,
host TEXT,
type TEXT,
permission INTEGER,
expireType INTEGER,
expireTime INTEGER,
appId INTEGER,
isInBrowserElement INTEGER)""")
elif version == 2:
cursor.execute("""CREATE TABLE IF NOT EXISTS moz_hosts (
id INTEGER PRIMARY KEY,
host TEXT,
type TEXT,
permission INTEGER,
expireType INTEGER,
expireTime INTEGER)""")
else:
raise Exception("version must be 2 or 3")
permDB.commit()
cursor.close()
def test_create_permissions_db(self):
perms = Permissions(self.profile_dir, self.locations_file.name)
perms_db_filename = os.path.join(self.profile_dir, 'permissions.sqlite')
@ -48,7 +78,7 @@ http://127.0.0.1:8888 privileged
entries = cur.fetchall()
self.assertEqual(len(entries), 3)
self.assertEqual(entries[0][0], 'mochi.test')
self.assertEqual(entries[0][1], 'allowXULXBL')
self.assertEqual(entries[0][2], 1)
@ -71,12 +101,17 @@ http://127.0.0.1:8888 privileged
self.assertEqual(entries[3][1], 'allowXULXBL')
self.assertEqual(entries[3][2], 2)
# when creating a DB we should default to user_version==2
cur.execute('PRAGMA user_version')
entries = cur.fetchall()
self.assertEqual(entries[0][0], 2)
perms.clean_db()
# table should be removed
cur.execute("select * from sqlite_master where type='table'")
entries = cur.fetchall()
self.assertEqual(len(entries), 0)
def test_nw_prefs(self):
perms = Permissions(self.profile_dir, self.locations_file.name)
@ -97,7 +132,6 @@ http://127.0.0.1:8888 privileged
'http://127.0.0.1:8888'))
self.assertEqual(prefs[5], ('capability.principal.codebase.p2.subjectName', ''))
prefs, user_prefs = perms.network_prefs(True)
self.assertEqual(len(user_prefs), 2)
self.assertEqual(user_prefs[0], ('network.proxy.type', 2))
@ -106,8 +140,39 @@ http://127.0.0.1:8888 privileged
origins_decl = "var origins = ['http://mochi.test:8888', 'http://127.0.0.1:80', 'http://127.0.0.1:8888'];"
self.assertTrue(origins_decl in user_prefs[1][1])
proxy_check = "if (isHttp || isHttps || isWebSocket || isWebSocketSSL) return 'PROXY mochi.test:8888';"
self.assertTrue(proxy_check in user_prefs[1][1])
proxy_check = ("if (isHttp) return 'PROXY mochi.test:8888';",
"if (isHttps) return 'PROXY mochi.test:4443';",
"if (isWebSocket) return 'PROXY mochi.test:9988';",
"if (isWebSocketSSL) return 'PROXY mochi.test:4443';")
self.assertTrue(all(c in user_prefs[1][1] for c in proxy_check))
def verify_user_version(self, version):
"""Verifies that we call INSERT statements using the correct number
of columns for existing databases.
"""
self.write_perm_db(version=version)
Permissions(self.profile_dir, self.locations_file.name)
perms_db_filename = os.path.join(self.profile_dir, 'permissions.sqlite')
select_stmt = 'select * from moz_hosts'
con = sqlite3.connect(perms_db_filename)
cur = con.cursor()
cur.execute(select_stmt)
entries = cur.fetchall()
self.assertEqual(len(entries), 3)
columns = 8 if version == 3 else 6
self.assertEqual(len(entries[0]), columns)
for x in range(4, columns):
self.assertEqual(entries[0][x], 0)
def test_existing_permissions_db_v2(self):
self.verify_user_version(2)
def test_existing_permissions_db_v3(self):
self.verify_user_version(3)
if __name__ == '__main__':

View File

@ -70,7 +70,7 @@ http://example.org:80 privileged
self.compare_location(i.next(), 'https', 'test', '80', ['privileged'])
self.compare_location(i.next(), 'http', 'example.org', '80',
['privileged'])
self.compare_location(i.next(), 'http', 'test1.example.org', '80',
self.compare_location(i.next(), 'http', 'test1.example.org', '8888',
['privileged'])
locations.add_host('mozilla.org')

View File

@ -11,6 +11,8 @@ by default https://github.com/mozilla/mozbase/blob/master/test-manifest.ini
import imp
import manifestparser
import mozinfo
import optparse
import os
import sys
import unittest
@ -38,6 +40,14 @@ def unittests(path):
def main(args=sys.argv[1:]):
# parse command line options
usage = '%prog [options] manifest.ini <manifest.ini> <...>'
parser = optparse.OptionParser(usage=usage, description=__doc__)
parser.add_option('--list', dest='list_tests',
action='store_true', default=False,
help="list paths of tests to be run")
options, args = parser.parse_args(args)
# read the manifest
if args:
manifests = args
@ -48,14 +58,21 @@ def main(args=sys.argv[1:]):
# ensure manifests exist
if not os.path.exists(manifest):
missing.append(manifest)
assert not missing, 'manifest%s not found: %s' % ((len(manifests) == 1 and '' or 's'), ', '.join(missing))
assert not missing, 'manifest(s) not found: %s' % ', '.join(missing)
manifest = manifestparser.TestManifest(manifests=manifests)
# gather the tests
tests = manifest.active_tests()
tests = manifest.active_tests(disabled=False, **mozinfo.info)
tests = [test['path'] for test in tests]
if options.list_tests:
# print test paths
print '\n'.join(tests)
sys.exit(0)
# create unittests
unittestlist = []
for test in tests:
unittestlist.extend(unittests(test['path']))
unittestlist.extend(unittests(test))
# run the tests
suite = unittest.TestSuite(unittestlist)