Bug 795943 - Mirror mozbase -> m-c for week of Oct 1 @ 22aa0aee78;r=wlach

--HG--
extra : rebase_source : 389b6c3a2923c40167698b477e4d57b230b1a063
This commit is contained in:
Jeff Hammel 2012-10-01 14:00:55 -07:00
parent ed25aa9d76
commit 63c7228bff
11 changed files with 1115 additions and 530 deletions

View File

@ -32,21 +32,26 @@ def abstractmethod(method):
def not_implemented(*args, **kwargs):
raise NotImplementedError('Abstract method %s at File "%s", line %s '
'should be implemented by a concrete class' %
(repr(method), filename,line))
(repr(method), filename, line))
return not_implemented
class DeviceManager:
@abstractmethod
def shell(self, cmd, outputfile, env=None, cwd=None, timeout=None):
def shell(self, cmd, outputfile, env=None, cwd=None, timeout=None, root=False):
"""
executes shell command on device
Executes shell command on device.
cmd - Command string to execute
outputfile - File to store output
env - Environment to pass to exec command
cwd - Directory to execute command from
timeout - specified in seconds, defaults to 'default_timeout'
root - Specifies whether command requires root privileges
timeout is specified in seconds, and if no timeout is given,
we will run until the script returns
returns:
success: Return code from command
failure: None
success: Return code from command
failure: None
"""
def shellCheckOutput(self, cmd, env=None, cwd=None, timeout=None, root=False):
@ -73,29 +78,31 @@ class DeviceManager:
@abstractmethod
def pushFile(self, localname, destname):
"""
external function
Copies localname from the host to destname on the device
returns:
success: True
failure: False
success: True
failure: False
"""
@abstractmethod
def mkDir(self, name):
"""
external function
Creates a single directory on the device file system
returns:
success: directory name
failure: None
success: directory name
failure: None
"""
def mkDirs(self, filename):
"""
make directory structure on the device
Make directory structure on the device
WARNING: does not create last part of the path
external function
returns:
success: directory structure that we created
failure: None
success: directory structure that we created
failure: None
"""
parts = filename.split('/')
name = ""
@ -113,98 +120,82 @@ class DeviceManager:
@abstractmethod
def pushDir(self, localDir, remoteDir):
"""
push localDir from host to remoteDir on the device
external function
Push localDir from host to remoteDir on the device
returns:
success: remoteDir
failure: None
success: remoteDir
failure: None
"""
@abstractmethod
def dirExists(self, dirname):
"""
external function
Checks if dirname exists and is a directory
on the device file system
returns:
success: True
failure: False
success: True
failure: False
"""
@abstractmethod
def fileExists(self, filepath):
"""
Because we always have / style paths we make this a lot easier with some
assumptions
external function
Checks if filepath exists and is a file on
the device file system
returns:
success: True
failure: False
success: True
failure: False
"""
@abstractmethod
def listFiles(self, rootdir):
"""
list files on the device, requires cd to directory first
external function
Lists files on the device rootdir
returns:
success: array of filenames, ['file1', 'file2', ...]
failure: None
success: array of filenames, ['file1', 'file2', ...]
failure: None
"""
@abstractmethod
def removeFile(self, filename):
"""
external function
Removes filename from the device
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
failure: None
success: output of telnet
failure: None
"""
@abstractmethod
def removeDir(self, remoteDir):
"""
does a recursive delete of directory on the device: rm -Rf remoteDir
external function
Does a recursive delete of directory on the device: rm -Rf remoteDir
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
failure: None
success: output of telnet
failure: None
"""
@abstractmethod
def getProcessList(self):
"""
external function
returns:
success: array of process tuples
failure: None
"""
Lists the running processes on the device
@abstractmethod
def fireProcess(self, appname, failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: pid
failure: None
"""
@abstractmethod
def launchProcess(self, cmd, outputFile = "process.txt", cwd = '', env = '', failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: output filename
failure: None
success: array of process tuples
failure: []
"""
def processExist(self, appname):
"""
iterates process list and returns pid if exists, otherwise None
external function
Iterates process list and checks if pid exists
returns:
success: pid
failure: None
success: pid
failure: None
"""
pid = None
@ -238,90 +229,92 @@ class DeviceManager:
@abstractmethod
def killProcess(self, appname, forceKill=False):
"""
external function
Kills the process named appname.
If forceKill is True, process is killed regardless of state
returns:
success: True
failure: False
success: True
failure: False
"""
@abstractmethod
def catFile(self, remoteFile):
"""
external function
Returns the contents of remoteFile
returns:
success: filecontents
failure: None
success: filecontents, string
failure: None
"""
@abstractmethod
def pullFile(self, remoteFile):
"""
external function
Returns contents of remoteFile using the "pull" command.
returns:
success: output of pullfile, string
failure: None
success: output of pullfile, string
failure: None
"""
@abstractmethod
def getFile(self, remoteFile, localFile = ''):
"""
copy file from device (remoteFile) to host (localFile)
external function
Copy file from device (remoteFile) to host (localFile)
returns:
success: output of pullfile, string
failure: None
success: contents of file, string
failure: None
"""
@abstractmethod
def getDirectory(self, remoteDir, localDir, checkDir=True):
"""
copy directory structure from device (remoteDir) to host (localDir)
external function
checkDir exists so that we don't create local directories if the
remote directory doesn't exist but also so that we don't call isDir
twice when recursing.
Copy directory structure from device (remoteDir) to host (localDir)
returns:
success: list of files, string
failure: None
success: list of files, string
failure: None
"""
@abstractmethod
def isDir(self, remotePath):
"""
external function
Checks if remotePath is a directory on the device
returns:
success: True
failure: False
Throws a FileError exception when null (invalid dir/filename)
success: True
failure: False
"""
@abstractmethod
def validateFile(self, remoteFile, localFile):
"""
true/false check if the two files have the same md5 sum
external function
Checks if the remoteFile has the same md5 hash as the localFile
returns:
success: True
failure: False
success: True
failure: False
"""
@abstractmethod
def getRemoteHash(self, filename):
def _getRemoteHash(self, filename):
"""
return the md5 sum of a remote file
internal function
Return the md5 sum of a file on the device
returns:
success: MD5 hash for given filename
failure: None
success: MD5 hash for given filename
failure: None
"""
def getLocalHash(self, filename):
@staticmethod
def _getLocalHash(filename):
"""
return the md5 sum of a file on the host
internal function
Return the MD5 sum of a file on the host
returns:
success: MD5 hash for given filename
failure: None
success: MD5 hash for given filename
failure: None
"""
f = open(filename, 'rb')
@ -341,8 +334,6 @@ class DeviceManager:
f.close()
hexval = mdsum.hexdigest()
if (self.debug >= 3):
print "local hash returned: '" + hexval + "'"
return hexval
@abstractmethod
@ -360,32 +351,32 @@ class DeviceManager:
/xpcshell
/reftest
/mochitest
external
returns:
success: path for device root
failure: None
success: path for device root
failure: None
"""
@abstractmethod
def getAppRoot(self):
def getAppRoot(self, packageName=None):
"""
Either we will have /tests/fennec or /tests/firefox but we will never have
both. Return the one that exists
TODO: ensure we can support org.mozilla.firefox
external function
Returns the app root directory
E.g /tests/fennec or /tests/firefox
returns:
success: path for app root
failure: None
success: path for app root
failure: None
"""
# TODO Support org.mozilla.firefox and B2G
def getTestRoot(self, harness):
"""
Gets the directory location on the device for a specific test type
Harness is one of: xpcshell|reftest|mochitest
external function
returns:
success: path for test root
failure: None
success: path for test root
failure: None
"""
devroot = self.getDeviceRoot()
@ -400,13 +391,23 @@ class DeviceManager:
self.testRoot = devroot + '/mochitest'
return self.testRoot
@abstractmethod
def getTempDir(self):
"""
Gets the temporary directory we are using on this device
base on our device root, ensuring also that it exists.
returns:
success: path for temporary directory
failure: None
"""
def signal(self, processID, signalType, signalAction):
"""
Sends a specific process ID a signal code and action.
For Example: SIGINT and SIGDFL to process x
"""
#currently not implemented in device agent - todo
pass
def getReturnCode(self, processID):
@ -415,31 +416,43 @@ class DeviceManager:
return 0
def getIP(self, conn_type='eth0'):
"""
Gets the IP of the device, or None if no connection exists.
"""
match = re.match(r"%s: ip (\S+)" % conn_type, self.shellCheckOutput(['ifconfig', conn_type]))
if match:
return match.group(1)
@abstractmethod
def unpackFile(self, file_path, dest_dir=None):
"""
external function
Unzips a remote bundle to a remote location
If dest_dir is not specified, the bundle is extracted
in the same directory
returns:
success: output of unzip command
failure: None
success: output of unzip command
failure: None
"""
@abstractmethod
def reboot(self, ipAddr=None, port=30000):
"""
external function
Reboots the device
returns:
success: status from test agent
failure: None
success: status from test agent
failure: None
"""
def validateDir(self, localDir, remoteDir):
"""
validate localDir from host to remoteDir on the device
external function
Validate localDir from host to remoteDir on the device
returns:
success: True
failure: False
success: True
failure: False
"""
if (self.debug >= 2):
@ -461,66 +474,89 @@ class DeviceManager:
"""
Returns information about the device:
Directive indicates the information you want to get, your choices are:
os - name of the os
id - unique id of the device
uptime - uptime of the device
uptimemillis - uptime of the device in milliseconds (NOT supported on all
implementations)
systime - system time of the device
screen - screen resolution
memory - memory stats
process - list of running processes (same as ps)
disk - total, free, available bytes on disk
power - power status (charge, battery temp)
all - all of them - or call it with no parameters to get all the information
os - name of the os
id - unique id of the device
uptime - uptime of the device
uptimemillis - uptime of the device in milliseconds (NOT supported on all implementations)
systime - system time of the device
screen - screen resolution
memory - memory stats
process - list of running processes (same as ps)
disk - total, free, available bytes on disk
power - power status (charge, battery temp)
all - all of them - or call it with no parameters to get all the information
returns:
success: dict of info strings by directive name
failure: None
success: dict of info strings by directive name
failure: None
"""
@abstractmethod
def installApp(self, appBundlePath, destPath=None):
"""
external function
Installs an application onto the device
appBundlePath - path to the application bundle on the device
destPath - destination directory of where application should be installed to (optional)
returns:
success: output from agent for inst command
failure: None
success: None
failure: error string
"""
@abstractmethod
def uninstallApp(self, appName, installPath=None):
"""
Uninstalls the named application from device and DOES NOT cause a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
returns:
success: None
failure: DMError exception thrown
"""
@abstractmethod
def uninstallAppAndReboot(self, appName, installPath=None):
"""
external function
Uninstalls the named application from device and causes a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
returns:
success: True
failure: None
success: None
failure: DMError exception thrown
"""
@abstractmethod
def updateApp(self, appBundlePath, processName=None,
destPath=None, ipAddr=None, port=30000):
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
"""
external function
Updates the application on the device.
appBundlePath - path to the application bundle on the device
processName - used to end the process if the applicaiton is currently running (optional)
destPath - Destination directory to where the application should be installed (optional)
ipAddr - IP address to await a callback ping to let us know that the device has updated
properly - defaults to current IP.
port - port to await a callback ping to let us know that the device has updated properly
defaults to 30000, and counts up from there if it finds a conflict
returns:
success: text status from command or callback server
failure: None
success: text status from command or callback server
failure: None
"""
@abstractmethod
def getCurrentTime(self):
"""
external function
Returns device time in milliseconds since the epoch
returns:
success: time in ms
failure: None
success: time in ms
failure: None
"""
def recordLogcat(self):
"""
external function
returns:
success: file is created in <testroot>/logcat.log
failure:
Clears the logcat file making it easier to view specific events
"""
#TODO: spawn this off in a separate thread/process so we can collect all the logcat information
@ -530,10 +566,11 @@ class DeviceManager:
def getLogcat(self):
"""
external function
returns: data from the local file
success: file is in 'filename'
failure: None
Returns the contents of the logcat file as a string
returns:
success: contents of logcat, string
failure: None
"""
buf = StringIO.StringIO()
if self.shell(["/system/bin/logcat", "-d", "dalvikvm:S", "ConnectivityService:S", "WifiMonitor:S", "WifiStateTracker:S", "wpa_supplicant:S", "NetworkStateTracker:S"], buf, root=True) != 0:
@ -542,12 +579,13 @@ class DeviceManager:
return str(buf.getvalue()[0:-1]).rstrip().split('\r')
@abstractmethod
def chmodDir(self, remoteDir):
def chmodDir(self, remoteDir, mask="777"):
"""
external function
Recursively changes file permissions in a directory
returns:
success: True
failure: False
success: True
failure: False
"""
@staticmethod
@ -630,11 +668,11 @@ class NetworkTools:
return seed
def _pop_last_line(file_obj):
'''
"""
Utility function to get the last line from a file (shared between ADB and
SUT device managers). Function also removes it from the file. Intended to
strip off the return code from a shell command.
'''
"""
bytes_from_end = 1
file_obj.seek(0, 2)
length = file_obj.tell() + 1

View File

@ -18,8 +18,9 @@ class DeviceManagerADB(DeviceManager):
self.retrylimit = retrylimit
self.retries = 0
self._sock = None
self.haveRootShell = False
self.haveSu = False
self.useRunAs = False
self.haveRoot = False
self.useDDCopy = False
self.useZip = False
self.packageName = None
@ -43,70 +44,70 @@ class DeviceManagerADB(DeviceManager):
self.packageName = packageName
# verify that we can run the adb command. can't continue otherwise
self.verifyADB()
self._verifyADB()
# try to connect to the device over tcp/ip if we have a hostname
if self.host:
self.connectRemoteADB()
self._connectRemoteADB()
# verify that we can connect to the device. can't continue
self.verifyDevice()
self._verifyDevice()
# set up device root
self.setupDeviceRoot()
self._setupDeviceRoot()
# Can we use run-as? (currently not required)
# Some commands require root to work properly, even with ADB (e.g.
# grabbing APKs out of /data). For these cases, we check whether
# we're running as root. If that isn't true, check for the
# existence of an su binary
self._checkForRoot()
# Can we use run-as? (not required)
try:
self.verifyRunAs()
self._verifyRunAs()
except DMError:
pass
# Can we run things as root? (currently not required)
useRunAsTmp = self.useRunAs
self.useRunAs = False
try:
self.verifyRoot()
except DMError:
try:
self.checkCmd(["root"])
# The root command does not fail even if ADB cannot get
# root rights (e.g. due to production builds), so we have
# to check again ourselves that we have root now.
self.verifyRoot()
except DMError:
if useRunAsTmp:
print "restarting as root failed, but run-as available"
else:
print "restarting as root failed"
self.useRunAs = useRunAsTmp
# can we use zip to speed up some file operations? (currently not
# required)
try:
self.verifyZip()
self._verifyZip()
except DMError:
pass
def __del__(self):
if self.host:
self.disconnectRemoteADB()
self._disconnectRemoteADB()
def shell(self, cmd, outputfile, env=None, cwd=None, timeout=None, root=False):
"""
external function: executes shell command on device.
timeout is specified in seconds, and if no timeout is given,
we will run until we hit the default_timeout specified in __init__
Executes shell command on device.
cmd - Command string to execute
outputfile - File to store output
env - Environment to pass to exec command
cwd - Directory to execute command from
timeout - specified in seconds, defaults to 'default_timeout'
root - Specifies whether command requires root privileges
returns:
success: <return code>
failure: None
success: Return code from command
failure: None
"""
# FIXME: this function buffers all output of the command into memory,
# always. :(
# If requested to run as root, check that we can actually do that
if root and not self.haveRootShell and not self.haveSu:
raise DMError("Shell command '%s' requested to run as root but root "
"is not available on this device. Root your device or "
"refactor the test/harness to not require root." %
self._escapedCommandLine(cmd))
# Getting the return code is more complex than you'd think because adb
# doesn't actually return the return code from a process, so we have to
# capture the output to get it
if root:
if root and not self.haveRootShell:
cmdline = "su -c \"%s\"" % self._escapedCommandLine(cmd)
else:
cmdline = self._escapedCommandLine(cmd)
@ -153,15 +154,16 @@ class DeviceManagerADB(DeviceManager):
return None
def connectRemoteADB(self):
self.checkCmd(["connect", self.host + ":" + str(self.port)])
def _connectRemoteADB(self):
self._checkCmd(["connect", self.host + ":" + str(self.port)])
def disconnectRemoteADB(self):
self.checkCmd(["disconnect", self.host + ":" + str(self.port)])
def _disconnectRemoteADB(self):
self._checkCmd(["disconnect", self.host + ":" + str(self.port)])
def pushFile(self, localname, destname):
"""
external function
Copies localname from the host to destname on the device
returns:
success: True
failure: False
@ -171,14 +173,14 @@ class DeviceManagerADB(DeviceManager):
destname = destname.replace('\\', '/')
if (self.useRunAs):
remoteTmpFile = self.getTempDir() + "/" + os.path.basename(localname)
self.checkCmd(["push", os.path.realpath(localname), remoteTmpFile])
self._checkCmd(["push", os.path.realpath(localname), remoteTmpFile])
if self.useDDCopy:
self.checkCmdAs(["shell", "dd", "if=" + remoteTmpFile, "of=" + destname])
self._checkCmdAs(["shell", "dd", "if=" + remoteTmpFile, "of=" + destname])
else:
self.checkCmdAs(["shell", "cp", remoteTmpFile, destname])
self.checkCmd(["shell", "rm", remoteTmpFile])
self._checkCmdAs(["shell", "cp", remoteTmpFile, destname])
self._checkCmd(["shell", "rm", remoteTmpFile])
else:
self.checkCmd(["push", os.path.realpath(localname), destname])
self._checkCmd(["push", os.path.realpath(localname), destname])
if (self.isDir(destname)):
destname = destname + "/" + os.path.basename(localname)
return True
@ -187,13 +189,14 @@ class DeviceManagerADB(DeviceManager):
def mkDir(self, name):
"""
external function
Creates a single directory on the device file system
returns:
success: directory name
failure: None
"""
try:
result = self.runCmdAs(["shell", "mkdir", name]).stdout.read()
result = self._runCmdAs(["shell", "mkdir", name]).stdout.read()
if 'read-only file system' in result.lower():
return None
if 'file exists' in result.lower():
@ -204,15 +207,15 @@ class DeviceManagerADB(DeviceManager):
def pushDir(self, localDir, remoteDir):
"""
push localDir from host to remoteDir on the device
external function
Push localDir from host to remoteDir on the device
returns:
success: remoteDir
failure: None
"""
# adb "push" accepts a directory as an argument, but if the directory
# contains symbolic links, the links are pushed, rather than the linked
# files; we either zip/unzip or push file-by-file to get around this
# files; we either zip/unzip or push file-by-file to get around this
# limitation
try:
if (not self.dirExists(remoteDir)):
@ -224,8 +227,8 @@ class DeviceManagerADB(DeviceManager):
subprocess.check_output(["zip", "-r", localZip, '.'], cwd=localDir)
self.pushFile(localZip, remoteZip)
os.remove(localZip)
data = self.runCmdAs(["shell", "unzip", "-o", remoteZip, "-d", remoteDir]).stdout.read()
self.checkCmdAs(["shell", "rm", remoteZip])
data = self._runCmdAs(["shell", "unzip", "-o", remoteZip, "-d", remoteDir]).stdout.read()
self._checkCmdAs(["shell", "rm", remoteZip])
if (re.search("unzip: exiting", data) or re.search("Operation not permitted", data)):
raise Exception("unzip failed, or permissions error")
except:
@ -256,7 +259,9 @@ class DeviceManagerADB(DeviceManager):
def dirExists(self, dirname):
"""
external function
Checks if dirname exists and is a directory
on the device file system
returns:
success: True
failure: False
@ -267,12 +272,14 @@ class DeviceManagerADB(DeviceManager):
# assumptions
def fileExists(self, filepath):
"""
external function
Checks if filepath exists and is a file on
the device file system
returns:
success: True
failure: False
"""
p = self.runCmd(["shell", "ls", "-a", filepath])
p = self._runCmd(["shell", "ls", "-a", filepath])
data = p.stdout.readlines()
if (len(data) == 1):
if (data[0].rstrip() == filepath):
@ -280,24 +287,31 @@ class DeviceManagerADB(DeviceManager):
return False
def removeFile(self, filename):
return self.runCmd(["shell", "rm", filename]).stdout.read()
def removeSingleDir(self, remoteDir):
"""
does a recursive delete of directory on the device: rm -Rf remoteDir
external function
Removes filename from the device
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
success: output of telnet
failure: None
"""
return self.runCmd(["shell", "rmdir", remoteDir]).stdout.read()
return self._runCmd(["shell", "rm", filename]).stdout.read()
def _removeSingleDir(self, remoteDir):
"""
Deletes a single empty directory
returns:
success: output of telnet
failure: None
"""
return self._runCmd(["shell", "rmdir", remoteDir]).stdout.read()
def removeDir(self, remoteDir):
"""
does a recursive delete of directory on the device: rm -Rf remoteDir
external function
Does a recursive delete of directory on the device: rm -Rf remoteDir
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
success: output of telnet
failure: None
"""
out = ""
@ -308,13 +322,20 @@ class DeviceManagerADB(DeviceManager):
out += self.removeDir(remoteDir.strip() + "/" + f.strip())
else:
out += self.removeFile(remoteDir.strip() + "/" + f.strip())
out += self.removeSingleDir(remoteDir.strip())
out += self._removeSingleDir(remoteDir.strip())
else:
out += self.removeFile(remoteDir.strip())
return out
def isDir(self, remotePath):
p = self.runCmd(["shell", "ls", "-a", remotePath + '/'])
"""
Checks if remotePath is a directory on the device
returns:
success: True
failure: False
"""
p = self._runCmd(["shell", "ls", "-a", remotePath + '/'])
data = p.stdout.readlines()
if len(data) == 1:
@ -324,7 +345,14 @@ class DeviceManagerADB(DeviceManager):
return True
def listFiles(self, rootdir):
p = self.runCmd(["shell", "ls", "-a", rootdir])
"""
Lists files on the device rootdir, requires cd to directory first
returns:
success: array of filenames, ['file1', 'file2', ...]
failure: None
"""
p = self._runCmd(["shell", "ls", "-a", rootdir])
data = p.stdout.readlines()
data[:] = [item.rstrip('\r\n') for item in data]
if (len(data) == 1):
@ -342,12 +370,13 @@ class DeviceManagerADB(DeviceManager):
def getProcessList(self):
"""
external function
Lists the running processes on the device
returns:
success: array of process tuples
failure: []
"""
p = self.runCmd(["shell", "ps"])
p = self._runCmd(["shell", "ps"])
# first line is the headers
p.stdout.readline()
proc = p.stdout.readline()
@ -360,8 +389,8 @@ class DeviceManagerADB(DeviceManager):
def fireProcess(self, appname, failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: pid
failure: None
@ -374,14 +403,14 @@ class DeviceManagerADB(DeviceManager):
def launchProcess(self, cmd, outputFile = "process.txt", cwd = '', env = '', failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: output filename
failure: None
"""
if cmd[0] == "am":
self.checkCmd(["shell"] + cmd)
self._checkCmd(["shell"] + cmd)
return outputFile
acmd = ["shell", "am", "start", "-W"]
@ -415,11 +444,14 @@ class DeviceManagerADB(DeviceManager):
acmd.append("-d")
acmd.append(''.join(['\'',uri, '\'']));
print acmd
self.checkCmd(acmd)
self._checkCmd(acmd)
return outputFile
def killProcess(self, appname, forceKill=False):
"""
Kills the process named appname.
If forceKill is True, process is killed regardless of state
external function
returns:
success: True
@ -433,7 +465,7 @@ class DeviceManagerADB(DeviceManager):
if forceKill:
args.append("-9")
args.append(pid)
p = self.runCmdAs(args)
p = self._runCmdAs(args)
p.communicate()
if p.returncode == 0:
didKillProcess = True
@ -442,38 +474,25 @@ class DeviceManagerADB(DeviceManager):
def catFile(self, remoteFile):
"""
external function
returns:
success: filecontents
failure: None
"""
#p = self.runCmd(["shell", "cat", remoteFile])
#return p.stdout.read()
return self.getFile(remoteFile)
Returns the contents of remoteFile
def pullFile(self, remoteFile):
"""
external function
returns:
success: output of pullfile, string
success: filecontents, string
failure: None
"""
return self.getFile(remoteFile)
return self.pullFile(remoteFile)
def getFile(self, remoteFile, localFile = 'tmpfile_dm_adb'):
def _runPull(self, remoteFile, localFile):
"""
copy file from device (remoteFile) to host (localFile)
external function
Pulls remoteFile from device to host
returns:
success: output of pullfile, string
success: path to localFile
failure: None
"""
# TODO: add debug flags and allow for printing stdout
# self.runCmd(["pull", remoteFile, localFile])
try:
# First attempt to pull file regularly
outerr = self.runCmd(["pull", remoteFile, localFile]).communicate()
outerr = self._runCmd(["pull", remoteFile, localFile]).communicate()
# Now check stderr for errors
if outerr[1]:
@ -486,32 +505,70 @@ class DeviceManagerADB(DeviceManager):
# to copy the file to a world-readable location first before attempting
# to pull it again.
remoteTmpFile = self.getTempDir() + "/" + os.path.basename(remoteFile)
self.checkCmdAs(["shell", "dd", "if=" + remoteFile, "of=" + remoteTmpFile])
self.checkCmdAs(["shell", "chmod", "777", remoteTmpFile])
self.runCmd(["pull", remoteTmpFile, localFile]).stdout.read()
self._checkCmdAs(["shell", "dd", "if=" + remoteFile, "of=" + remoteTmpFile])
self._checkCmdAs(["shell", "chmod", "777", remoteTmpFile])
self._runCmd(["pull", remoteTmpFile, localFile]).stdout.read()
# Clean up temporary file
self.checkCmdAs(["shell", "rm", remoteTmpFile])
self._checkCmdAs(["shell", "rm", remoteTmpFile])
return localFile
except (OSError, ValueError):
return None
f = open(localFile)
ret = f.read()
f.close()
return ret
def pullFile(self, remoteFile):
"""
Returns contents of remoteFile using the "pull" command.
returns:
success: output of pullfile, string
failure: None
"""
# TODO: add debug flags and allow for printing stdout
localFile = tempfile.mkstemp()[1]
localFile = self._runPull(remoteFile, localFile)
if localFile is None:
print 'Automation Error: failed to pull file %s!' % remoteFile
return None
f = open(localFile, 'r')
ret = f.read()
f.close()
os.remove(localFile)
return ret
def getFile(self, remoteFile, localFile = 'temp.txt'):
"""
Copy file from device (remoteFile) to host (localFile)
returns:
success: contents of file, string
failure: None
"""
try:
contents = self.pullFile(remoteFile)
except:
return None
if contents is None:
return None
fhandle = open(localFile, 'wb')
fhandle.write(contents)
fhandle.close()
return contents
def getDirectory(self, remoteDir, localDir, checkDir=True):
"""
copy directory structure from device (remoteDir) to host (localDir)
external function
checkDir exists so that we don't create local directories if the
remote directory doesn't exist but also so that we don't call isDir
twice when recursing.
Copy directory structure from device (remoteDir) to host (localDir)
returns:
success: list of files, string
failure: None
"""
# checkDir has no affect in devicemanagerADB
ret = []
p = self.runCmd(["pull", remoteDir, localDir])
p = self._runCmd(["pull", remoteDir, localDir])
p.stdout.readline()
line = p.stdout.readline()
while (line):
@ -536,31 +593,39 @@ class DeviceManagerADB(DeviceManager):
def validateFile(self, remoteFile, localFile):
"""
true/false check if the two files have the same md5 sum
external function
returns:
success: True
failure: False
"""
return self.getRemoteHash(remoteFile) == self.getLocalHash(localFile)
Checks if the remoteFile has the same md5 hash as the localFile
def getRemoteHash(self, filename):
returns:
success: True/False
failure: None
"""
return the md5 sum of a remote file
internal function
md5Remote = self._getRemoteHash(remoteFile)
md5Local = self._getLocalHash(localFile)
if md5Remote is None or md5Local is None:
return None
return md5Remote == md5Local
def _getRemoteHash(self, remoteFile):
"""
Return the md5 sum of a file on the device
returns:
success: MD5 hash for given filename
failure: None
"""
data = self.runCmd(["shell", "ls", "-l", filename]).stdout.read()
return data.split()[3]
localFile = tempfile.mkstemp()[1]
localFile = self._runPull(remoteFile, localFile)
def getLocalHash(self, filename):
data = subprocess.Popen(["ls", "-l", filename], stdout=subprocess.PIPE).stdout.read()
return data.split()[4]
if localFile is None:
return None
# Internal method to setup the device root and cache its value
def setupDeviceRoot(self):
md5 = self._getLocalHash(localFile)
os.remove(localFile)
return md5
# setup the device root and cache its value
def _setupDeviceRoot(self):
# if self.deviceRoot is already set, create it if necessary, and use it
if self.deviceRoot:
if not self.dirExists(self.deviceRoot):
@ -601,7 +666,6 @@ class DeviceManagerADB(DeviceManager):
/reftest
/mochitest
external function
returns:
success: path for device root
failure: None
@ -613,7 +677,6 @@ class DeviceManagerADB(DeviceManager):
Gets the temporary directory we are using on this device
base on our device root, ensuring also that it exists.
internal function
returns:
success: path for temporary directory
failure: None
@ -629,10 +692,9 @@ class DeviceManagerADB(DeviceManager):
def getAppRoot(self, packageName):
"""
Either we will have /tests/fennec or /tests/firefox but we will never have
both. Return the one that exists
TODO: ensure we can support org.mozilla.firefox
external function
Returns the app root directory
E.g /tests/fennec or /tests/firefox
returns:
success: path for app root
failure: None
@ -651,52 +713,56 @@ class DeviceManagerADB(DeviceManager):
print "devicemanagerADB: getAppRoot failed"
return None
def reboot(self, wait = False):
def reboot(self, wait = False, **kwargs):
"""
external function
Reboots the device
returns:
success: status from test agent
failure: None
"""
ret = self.runCmd(["reboot"]).stdout.read()
ret = self._runCmd(["reboot"]).stdout.read()
if (not wait):
return "Success"
countdown = 40
while (countdown > 0):
countdown
try:
self.checkCmd(["wait-for-device", "shell", "ls", "/sbin"])
self._checkCmd(["wait-for-device", "shell", "ls", "/sbin"])
return ret
except:
try:
self.checkCmd(["root"])
self._checkCmd(["root"])
except:
time.sleep(1)
print "couldn't get root"
return "Success"
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
def updateApp(self, appBundlePath, **kwargs):
"""
external function
Updates the application on the device.
appBundlePath - path to the application bundle on the device
returns:
success: text status from command or callback server
failure: None
"""
return self.runCmd(["install", "-r", appBundlePath]).stdout.read()
return self._runCmd(["install", "-r", appBundlePath]).stdout.read()
def getCurrentTime(self):
"""
external function
Returns device time in milliseconds since the epoch
returns:
success: time in ms
failure: None
"""
timestr = self.runCmd(["shell", "date", "+%s"]).stdout.read().strip()
timestr = self._runCmd(["shell", "date", "+%s"]).stdout.read().strip()
if (not timestr or not timestr.isdigit()):
return None
return str(int(timestr)*1000)
def getInfo(self, directive="all"):
def getInfo(self, directive=None):
"""
Returns information about the device:
Directive indicates the information you want to get, your choices are:
@ -712,17 +778,18 @@ class DeviceManagerADB(DeviceManager):
all - all of them - or call it with no parameters to get all the information
### Note that uptimemillis is NOT supported, as there is no way to get this
### data from the shell.
returns:
success: dict of info strings by directive name
failure: {}
"""
ret = {}
if (directive == "id" or directive == "all"):
ret["id"] = self.runCmd(["get-serialno"]).stdout.read()
ret["id"] = self._runCmd(["get-serialno"]).stdout.read()
if (directive == "os" or directive == "all"):
ret["os"] = self.runCmd(["shell", "getprop", "ro.build.display.id"]).stdout.read()
ret["os"] = self._runCmd(["shell", "getprop", "ro.build.display.id"]).stdout.read()
if (directive == "uptime" or directive == "all"):
utime = self.runCmd(["shell", "uptime"]).stdout.read()
utime = self._runCmd(["shell", "uptime"]).stdout.read()
if (not utime):
raise DMError("error getting uptime")
utime = utime[9:]
@ -733,39 +800,89 @@ class DeviceManagerADB(DeviceManager):
seconds = utime[0:utime.find(",")]
ret["uptime"] = ["0 days " + hours + " hours " + minutes + " minutes " + seconds + " seconds"]
if (directive == "process" or directive == "all"):
ret["process"] = self.runCmd(["shell", "ps"]).stdout.read()
ret["process"] = self._runCmd(["shell", "ps"]).stdout.read()
if (directive == "systime" or directive == "all"):
ret["systime"] = self.runCmd(["shell", "date"]).stdout.read()
ret["systime"] = self._runCmd(["shell", "date"]).stdout.read()
print ret
return ret
def runCmd(self, args):
# If we are not root but have run-as, and we're trying to execute
# a shell command then using run-as is the best we can do
def uninstallApp(self, appName, installPath=None):
"""
Uninstalls the named application from device and DOES NOT cause a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - ignored, but used for compatibility with SUTAgent
returns:
success: None
failure: DMError exception thrown
"""
data = self._runCmd(["uninstall", appName]).stdout.read().strip()
status = data.split('\n')[0].strip()
if status == 'Success':
return
raise DMError("uninstall failed for %s" % appName)
def uninstallAppAndReboot(self, appName, installPath=None):
"""
Uninstalls the named application from device and causes a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - ignored, but used for compatibility with SUTAgent
returns:
success: None
failure: DMError exception thrown
"""
results = self.uninstallApp(appName)
self.reboot()
return
def _runCmd(self, args):
"""
Runs a command using adb
returns:
returncode from subprocess.Popen
"""
finalArgs = [self.adbPath]
if self.deviceSerial:
finalArgs.extend(['-s', self.deviceSerial])
if (not self.haveRoot and self.useRunAs and args[0] == "shell" and args[1] != "run-as"):
# use run-as to execute commands as the package we're testing if
# possible
if not self.haveRootShell and self.useRunAs and args[0] == "shell" and args[1] != "run-as":
args.insert(1, "run-as")
args.insert(2, self.packageName)
finalArgs.extend(args)
return subprocess.Popen(finalArgs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def runCmdAs(self, args):
def _runCmdAs(self, args):
"""
Runs a command using adb
If self.useRunAs is True, the command is run-as user specified in self.packageName
returns:
returncode from subprocess.Popen
"""
if self.useRunAs:
args.insert(1, "run-as")
args.insert(2, self.packageName)
return self.runCmd(args)
return self._runCmd(args)
# timeout is specified in seconds, and if no timeout is given,
# timeout is specified in seconds, and if no timeout is given,
# we will run until we hit the default_timeout specified in the __init__
def checkCmd(self, args, timeout=None):
# If we are not root but have run-as, and we're trying to execute
# a shell command then using run-as is the best we can do
def _checkCmd(self, args, timeout=None):
"""
Runs a command using adb and waits for the command to finish.
If timeout is specified, the process is killed after <timeout> seconds.
returns:
returncode from subprocess.Popen
"""
# use run-as to execute commands as the package we're testing if
# possible
finalArgs = [self.adbPath]
if self.deviceSerial:
finalArgs.extend(['-s', self.deviceSerial])
if (not self.haveRoot and self.useRunAs and args[0] == "shell" and args[1] != "run-as"):
if not self.haveRootShell and self.useRunAs and args[0] == "shell" and args[1] != "run-as":
args.insert(1, "run-as")
args.insert(2, self.packageName)
finalArgs.extend(args)
@ -782,18 +899,27 @@ class DeviceManagerADB(DeviceManager):
ret_code = proc.poll()
if ret_code == None:
proc.kill()
raise DMError("Timeout exceeded for checkCmd call")
raise DMError("Timeout exceeded for _checkCmd call")
return ret_code
def checkCmdAs(self, args, timeout=None):
def _checkCmdAs(self, args, timeout=None):
"""
Runs a command using adb and waits for command to finish
If self.useRunAs is True, the command is run-as user specified in self.packageName
If timeout is specified, the process is killed after <timeout> seconds
returns:
returncode from subprocess.Popen
"""
if (self.useRunAs):
args.insert(1, "run-as")
args.insert(2, self.packageName)
return self.checkCmd(args, timeout)
return self._checkCmd(args, timeout)
def chmodDir(self, remoteDir):
def chmodDir(self, remoteDir, mask="777"):
"""
external function
Recursively changes file permissions in a directory
returns:
success: True
failure: False
@ -805,29 +931,31 @@ class DeviceManagerADB(DeviceManager):
if (self.isDir(remoteEntry)):
self.chmodDir(remoteEntry)
else:
self.checkCmdAs(["shell", "chmod", "777", remoteEntry])
self._checkCmdAs(["shell", "chmod", mask, remoteEntry])
print "chmod " + remoteEntry
self.checkCmdAs(["shell", "chmod", "777", remoteDir])
self._checkCmdAs(["shell", "chmod", mask, remoteDir])
print "chmod " + remoteDir
else:
self.checkCmdAs(["shell", "chmod", "777", remoteDir.strip()])
self._checkCmdAs(["shell", "chmod", mask, remoteDir.strip()])
print "chmod " + remoteDir.strip()
return True
def verifyADB(self):
# Check to see if adb itself can be executed.
def _verifyADB(self):
"""
Check to see if adb itself can be executed.
"""
if self.adbPath != 'adb':
if not os.access(self.adbPath, os.X_OK):
raise DMError("invalid adb path, or adb not executable: %s", self.adbPath)
try:
self.checkCmd(["version"])
self._checkCmd(["version"])
except os.error, err:
raise DMError("unable to execute ADB (%s): ensure Android SDK is installed and adb is in your $PATH" % err)
except subprocess.CalledProcessError:
raise DMError("unable to execute ADB: ensure Android SDK is installed and adb is in your $PATH")
def verifyDevice(self):
def _verifyDevice(self):
# If there is a device serial number, see if adb is connected to it
if self.deviceSerial:
deviceStatus = None
@ -842,35 +970,25 @@ class DeviceManagerADB(DeviceManager):
if deviceStatus == None:
raise DMError("device not found: %s" % self.deviceSerial)
elif deviceStatus != "device":
raise DMError("bad status for device %s: %s" % (self.deviceSerial,
deviceStatus))
raise DMError("bad status for device %s: %s" % (self.deviceSerial, deviceStatus))
# Check to see if we can connect to device and run a simple command
try:
self.checkCmd(["shell", "echo"])
self._checkCmd(["shell", "echo"])
except subprocess.CalledProcessError:
raise DMError("unable to connect to device: is it plugged in?")
def verifyRoot(self):
# a test to see if we have root privs
p = self.runCmd(["shell", "id"])
response = p.stdout.readline()
response = response.rstrip()
response = response.split(' ')
if (response[0].find('uid=0') < 0 or response[1].find('gid=0') < 0):
print "NOT running as root ", response[0].find('uid=0')
raise DMError("not running as root")
self.haveRoot = True
def isCpAvailable(self):
def _isCpAvailable(self):
"""
Checks to see if cp command is installed
"""
# Some Android systems may not have a cp command installed,
# or it may not be executable by the user.
data = self.runCmd(["shell", "cp"]).stdout.read()
data = self._runCmd(["shell", "cp"]).stdout.read()
if (re.search('Usage', data)):
return True
else:
data = self.runCmd(["shell", "dd", "-"]).stdout.read()
data = self._runCmd(["shell", "dd", "-"]).stdout.read()
if (re.search('unknown operand', data)):
print "'cp' not found, but 'dd' was found as a replacement"
self.useDDCopy = True
@ -878,57 +996,85 @@ class DeviceManagerADB(DeviceManager):
print "unable to execute 'cp' on device; consider installing busybox from Android Market"
return False
def verifyRunAs(self):
def _verifyRunAs(self):
# If a valid package name is available, and certain other
# conditions are met, devicemanagerADB can execute file operations
# via the "run-as" command, so that pushed files and directories
# via the "run-as" command, so that pushed files and directories
# are created by the uid associated with the package, more closely
# echoing conditions encountered by Fennec at run time.
# Check to see if run-as can be used here, by verifying a
# Check to see if run-as can be used here, by verifying a
# file copy via run-as.
self.useRunAs = False
devroot = self.getDeviceRoot()
if (self.packageName and self.isCpAvailable() and devroot):
if (self.packageName and self._isCpAvailable() and devroot):
tmpDir = self.getTempDir()
# The problem here is that run-as doesn't cause a non-zero exit code
# when failing because of a non-existent or non-debuggable package :(
runAsOut = self.runCmd(["shell", "run-as", self.packageName, "mkdir", devroot + "/sanity"]).communicate()[0]
runAsOut = self._runCmd(["shell", "run-as", self.packageName, "mkdir", devroot + "/sanity"]).communicate()[0]
if runAsOut.startswith("run-as:") and ("not debuggable" in runAsOut or "is unknown" in runAsOut):
raise DMError("run-as failed sanity check")
tmpfile = tempfile.NamedTemporaryFile()
self.checkCmd(["push", tmpfile.name, tmpDir + "/tmpfile"])
self._checkCmd(["push", tmpfile.name, tmpDir + "/tmpfile"])
if self.useDDCopy:
self.checkCmd(["shell", "run-as", self.packageName, "dd", "if=" + tmpDir + "/tmpfile", "of=" + devroot + "/sanity/tmpfile"])
self._checkCmd(["shell", "run-as", self.packageName, "dd", "if=" + tmpDir + "/tmpfile", "of=" + devroot + "/sanity/tmpfile"])
else:
self.checkCmd(["shell", "run-as", self.packageName, "cp", tmpDir + "/tmpfile", devroot + "/sanity"])
self._checkCmd(["shell", "run-as", self.packageName, "cp", tmpDir + "/tmpfile", devroot + "/sanity"])
if (self.fileExists(devroot + "/sanity/tmpfile")):
print "will execute commands via run-as " + self.packageName
self.useRunAs = True
self.checkCmd(["shell", "rm", devroot + "/tmp/tmpfile"])
self.checkCmd(["shell", "run-as", self.packageName, "rm", "-r", devroot + "/sanity"])
self._checkCmd(["shell", "rm", devroot + "/tmp/tmpfile"])
self._checkCmd(["shell", "run-as", self.packageName, "rm", "-r", devroot + "/sanity"])
def isUnzipAvailable(self):
data = self.runCmdAs(["shell", "unzip"]).stdout.read()
def _checkForRoot(self):
# Check whether we _are_ root by default (some development boards work
# this way, this is also the result of some relatively rare rooting
# techniques)
proc = self._runCmd(["shell", "id"])
data = proc.stdout.read()
if data.find('uid=0(root)') >= 0:
self.haveRootShell = True
# if this returns true, we don't care about su
return
# if root shell is not available, check if 'su' can be used to gain
# root
proc = self._runCmd(["shell", "su", "-c", "id"])
# wait for response for maximum of 15 seconds, in case su prompts for a
# password or triggers the Android SuperUser prompt
start_time = time.time()
retcode = None
while (time.time() - start_time) <= 15 and retcode is None:
retcode = proc.poll()
if retcode is None: # still not terminated, kill
proc.kill()
data = proc.stdout.read()
if data.find('uid=0(root)') >= 0:
self.haveSu = True
def _isUnzipAvailable(self):
data = self._runCmdAs(["shell", "unzip"]).stdout.read()
if (re.search('Usage', data)):
return True
else:
return False
def isLocalZipAvailable(self):
def _isLocalZipAvailable(self):
try:
subprocess.check_call(["zip", "-?"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except:
return False
return True
def verifyZip(self):
def _verifyZip(self):
# If "zip" can be run locally, and "unzip" can be run remotely, then pushDir
# can use these to push just one file per directory -- a significant
# optimization for large directories.
self.useZip = False
if (self.isUnzipAvailable() and self.isLocalZipAvailable()):
if (self._isUnzipAvailable() and self._isLocalZipAvailable()):
print "will use zip to push directories"
self.useZip = True
else:

View File

@ -12,7 +12,7 @@ import posixpath
import subprocess
from threading import Thread
import StringIO
from devicemanager import DeviceManager, FileError, NetworkTools, _pop_last_line
from devicemanager import DeviceManager, FileError, DMError, NetworkTools, _pop_last_line
import errno
from distutils.version import StrictVersion
@ -27,8 +27,6 @@ class AgentError(Exception):
return self.msg
class DeviceManagerSUT(DeviceManager):
host = ''
port = 0
debug = 2
tempRoot = os.getcwd()
base_prompt = '$>'
@ -54,7 +52,7 @@ class DeviceManagerSUT(DeviceManager):
if self.getDeviceRoot() == None:
raise BaseException("Failed to connect to SUT Agent and retrieve the device root.")
try:
verstring = self.runCmds([{ 'cmd': 'ver' }])
verstring = self._runCmds([{ 'cmd': 'ver' }])
self.agentVersion = re.sub('SUTAgentAndroid Version ', '', verstring)
except AgentError, err:
raise BaseException("Failed to get SUTAgent version")
@ -77,10 +75,10 @@ class DeviceManagerSUT(DeviceManager):
return True
def _stripPrompt(self, data):
"""
"""
internal function
take a data blob and strip instances of the prompt '$>\x00'
"""
"""
promptre = re.compile(self.prompt_regex + '.*')
retVal = []
lines = data.split('\n')
@ -104,10 +102,11 @@ class DeviceManagerSUT(DeviceManager):
return '\n'.join(retVal)
def _shouldCmdCloseSocket(self, cmd):
""" Some commands need to close the socket after they are sent:
* rebt
* uninst
* quit
"""
Some commands need to close the socket after they are sent:
* rebt
* uninst
* quit
"""
socketClosingCmds = [re.compile('^quit.*'),
re.compile('^rebt.*'),
@ -116,18 +115,18 @@ class DeviceManagerSUT(DeviceManager):
for c in socketClosingCmds:
if (c.match(cmd)):
return True
return False
def sendCmds(self, cmdlist, outputfile, timeout = None):
def _sendCmds(self, cmdlist, outputfile, timeout = None):
"""
a wrapper for _doCmds that loops up to self.retrylimit iterations.
this allows us to move the retry logic outside of the _doCmds() to make it
easier for debugging in the future.
note that since cmdlist is a list of commands, they will all be retried if
one fails. this is necessary in particular for pushFile(), where we don't want
to accidentally send extra data if a failure occurs during data transmission.
Wrapper for _doCmds that loops up to self.retrylimit iterations
"""
# this allows us to move the retry logic outside of the _doCmds() to make it
# easier for debugging in the future.
# note that since cmdlist is a list of commands, they will all be retried if
# one fails. this is necessary in particular for pushFile(), where we don't want
# to accidentally send extra data if a failure occurs during data transmission.
retries = 0
while retries < self.retrylimit:
try:
@ -149,14 +148,13 @@ class DeviceManagerSUT(DeviceManager):
raise AgentError("Remote Device Error: unable to connect to %s after %s attempts" % (self.host, self.retrylimit))
def runCmds(self, cmdlist, timeout = None):
"""
similar to sendCmds, but just returns any output as a string instead of
writing to a file. this is normally what you want to call to send a set
of commands to the agent
def _runCmds(self, cmdlist, timeout = None):
"""
Similar to _sendCmds, but just returns any output as a string instead of
writing to a file
"""
outputfile = StringIO.StringIO()
self.sendCmds(cmdlist, outputfile, timeout)
self._sendCmds(cmdlist, outputfile, timeout)
outputfile.seek(0)
return outputfile.read()
@ -296,14 +294,23 @@ class DeviceManagerSUT(DeviceManager):
def shell(self, cmd, outputfile, env=None, cwd=None, timeout=None, root=False):
"""
external function: executes shell command on device
Executes shell command on device.
cmd - Command string to execute
outputfile - File to store output
env - Environment to pass to exec command
cwd - Directory to execute command from
timeout - specified in seconds, defaults to 'default_timeout'
root - Specifies whether command requires root privileges
returns:
success: <return code>
failure: None
success: Return code from command
failure: None
"""
cmdline = self._escapedCommandLine(cmd)
if env:
cmdline = '%s %s' % (self.formatEnvString(env), cmdline)
cmdline = '%s %s' % (self._formatEnvString(env), cmdline)
haveExecSu = (StrictVersion(self.agentVersion) >= StrictVersion('1.13'))
@ -321,16 +328,16 @@ class DeviceManagerSUT(DeviceManager):
try:
if cwd:
self.sendCmds([{ 'cmd': '%s %s %s' % (cmd, cwd, cmdline) }], outputfile, timeout)
self._sendCmds([{ 'cmd': '%s %s %s' % (cmd, cwd, cmdline) }], outputfile, timeout)
else:
if (not root) or haveExecSu:
self.sendCmds([{ 'cmd': '%s %s' % (cmd, cmdline) }], outputfile, timeout)
self._sendCmds([{ 'cmd': '%s %s' % (cmd, cmdline) }], outputfile, timeout)
else:
# need to manually inject su -c for backwards compatibility (this may
# not work on ICS or above!!)
# (FIXME: this backwards compatibility code is really ugly and should
# be deprecated at some point in the future)
self.sendCmds([ { 'cmd': '%s su -c "%s"' % (cmd, cmdline) }], outputfile,
self._sendCmds([ { 'cmd': '%s su -c "%s"' % (cmd, cmdline) }], outputfile,
timeout)
except AgentError:
return None
@ -347,7 +354,8 @@ class DeviceManagerSUT(DeviceManager):
def pushFile(self, localname, destname):
"""
external function
Copies localname from the host to destname on the device
returns:
success: True
failure: False
@ -379,7 +387,7 @@ class DeviceManagerSUT(DeviceManager):
f.close()
try:
retVal = self.runCmds([{ 'cmd': 'push ' + destname + ' ' + str(filesize),
retVal = self._runCmds([{ 'cmd': 'push ' + destname + ' ' + str(filesize),
'data': data }])
except AgentError, e:
print "Automation Error: error pushing file: %s" % e.msg
@ -396,7 +404,7 @@ class DeviceManagerSUT(DeviceManager):
validated = self.validateFile(destname, localname)
else:
# Then we obtained a hash from push
localHash = self.getLocalHash(localname)
localHash = self._getLocalHash(localname)
if (str(localHash) == str(retline)):
validated = True
else:
@ -414,7 +422,8 @@ class DeviceManagerSUT(DeviceManager):
def mkDir(self, name):
"""
external function
Creates a single directory on the device file system
returns:
success: directory name
failure: None
@ -423,15 +432,15 @@ class DeviceManagerSUT(DeviceManager):
return name
else:
try:
retVal = self.runCmds([{ 'cmd': 'mkdr ' + name }])
retVal = self._runCmds([{ 'cmd': 'mkdr ' + name }])
except AgentError:
retVal = None
return retVal
def pushDir(self, localDir, remoteDir):
"""
push localDir from host to remoteDir on the device
external function
Push localDir from host to remoteDir on the device
returns:
success: remoteDir
failure: None
@ -457,7 +466,9 @@ class DeviceManagerSUT(DeviceManager):
def dirExists(self, dirname):
"""
external function
Checks if dirname exists and is a directory
on the device file system
returns:
success: True
failure: False
@ -465,7 +476,7 @@ class DeviceManagerSUT(DeviceManager):
match = ".*" + dirname.replace('^', '\^') + "$"
dirre = re.compile(match)
try:
data = self.runCmds([ { 'cmd': 'cd ' + dirname }, { 'cmd': 'cwd' }])
data = self._runCmds([ { 'cmd': 'cd ' + dirname }, { 'cmd': 'cwd' }])
except AgentError:
return False
@ -480,7 +491,9 @@ class DeviceManagerSUT(DeviceManager):
# assumptions
def fileExists(self, filepath):
"""
external function
Checks if filepath exists and is a file on
the device file system
returns:
success: True
failure: False
@ -495,17 +508,17 @@ class DeviceManagerSUT(DeviceManager):
def listFiles(self, rootdir):
"""
list files on the device, requires cd to directory first
external function
Lists files on the device rootdir
returns:
success: array of filenames, ['file1', 'file2', ...]
failure: []
failure: None
"""
rootdir = rootdir.rstrip('/')
if (self.dirExists(rootdir) == False):
return []
try:
data = self.runCmds([{ 'cmd': 'cd ' + rootdir }, { 'cmd': 'ls' }])
data = self._runCmds([{ 'cmd': 'cd ' + rootdir }, { 'cmd': 'ls' }])
except AgentError:
return []
@ -517,15 +530,16 @@ class DeviceManagerSUT(DeviceManager):
def removeFile(self, filename):
"""
external function
Removes filename from the device
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
success: output of telnet
failure: None
"""
if (self.debug>= 2):
print "removing file: " + filename
try:
retVal = self.runCmds([{ 'cmd': 'rm ' + filename }])
retVal = self._runCmds([{ 'cmd': 'rm ' + filename }])
except AgentError:
return None
@ -533,14 +547,14 @@ class DeviceManagerSUT(DeviceManager):
def removeDir(self, remoteDir):
"""
does a recursive delete of directory on the device: rm -Rf remoteDir
external function
Does a recursive delete of directory on the device: rm -Rf remoteDir
returns:
success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
success: output of telnet
failure: None
"""
try:
retVal = self.runCmds([{ 'cmd': 'rmdr ' + remoteDir }])
retVal = self._runCmds([{ 'cmd': 'rmdr ' + remoteDir }])
except AgentError:
return None
@ -548,13 +562,14 @@ class DeviceManagerSUT(DeviceManager):
def getProcessList(self):
"""
external function
Lists the running processes on the device
returns:
success: array of process tuples
failure: []
"""
try:
data = self.runCmds([{ 'cmd': 'ps' }])
data = self._runCmds([{ 'cmd': 'ps' }])
except AgentError:
return []
@ -571,8 +586,8 @@ class DeviceManagerSUT(DeviceManager):
def fireProcess(self, appname, failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: pid
failure: None
@ -591,7 +606,7 @@ class DeviceManagerSUT(DeviceManager):
return None
try:
self.runCmds([{ 'cmd': 'exec ' + appname }])
self._runCmds([{ 'cmd': 'exec ' + appname }])
except AgentError:
return None
@ -605,8 +620,8 @@ class DeviceManagerSUT(DeviceManager):
def launchProcess(self, cmd, outputFile = "process.txt", cwd = '', env = '', failIfRunning=False):
"""
external function
DEPRECATED: Use shell() or launchApplication() for new code
returns:
success: output filename
failure: None
@ -625,7 +640,7 @@ class DeviceManagerSUT(DeviceManager):
cmdline += " > " + outputFile
# Prepend our env to the command
cmdline = '%s %s' % (self.formatEnvString(env), cmdline)
cmdline = '%s %s' % (self._formatEnvString(env), cmdline)
if self.fireProcess(cmdline, failIfRunning) is None:
return None
@ -633,7 +648,9 @@ class DeviceManagerSUT(DeviceManager):
def killProcess(self, appname, forceKill=False):
"""
external function
Kills the process named appname.
If forceKill is True, process is killed regardless of state
returns:
success: True
failure: False
@ -641,7 +658,7 @@ class DeviceManagerSUT(DeviceManager):
if forceKill:
print "WARNING: killProcess(): forceKill parameter unsupported on SUT"
try:
self.runCmds([{ 'cmd': 'kill ' + appname }])
self._runCmds([{ 'cmd': 'kill ' + appname }])
except AgentError:
return False
@ -649,13 +666,15 @@ class DeviceManagerSUT(DeviceManager):
def getTempDir(self):
"""
external function
Gets the temporary directory we are using on this device
base on our device root, ensuring also that it exists.
returns:
success: tmpdir, string
success: path for temporary directory
failure: None
"""
try:
data = self.runCmds([{ 'cmd': 'tmpd' }])
data = self._runCmds([{ 'cmd': 'tmpd' }])
except AgentError:
return None
@ -663,31 +682,32 @@ class DeviceManagerSUT(DeviceManager):
def catFile(self, remoteFile):
"""
external function
Returns the contents of remoteFile
returns:
success: filecontents
success: filecontents, string
failure: None
"""
try:
data = self.runCmds([{ 'cmd': 'cat ' + remoteFile }])
data = self._runCmds([{ 'cmd': 'cat ' + remoteFile }])
except AgentError:
return None
return data
def pullFile(self, remoteFile):
"""Returns contents of remoteFile using the "pull" command.
The "pull" command is different from other commands in that DeviceManager
has to read a certain number of bytes instead of just reading to the
next prompt. This is more robust than the "cat" command, which will be
confused if the prompt string exists within the file being catted.
However it means we can't use the response-handling logic in sendCMD().
external function
"""
Returns contents of remoteFile using the "pull" command.
returns:
success: output of pullfile, string
failure: None
"""
# The "pull" command is different from other commands in that DeviceManager
# has to read a certain number of bytes instead of just reading to the
# next prompt. This is more robust than the "cat" command, which will be
# confused if the prompt string exists within the file being catted.
# However it means we can't use the response-handling logic in sendCMD().
def err(error_msg):
err_str = 'DeviceManager: pull unsuccessful: %s' % error_msg
@ -752,7 +772,7 @@ class DeviceManagerSUT(DeviceManager):
# <filename>,-1\n<error message>
try:
# just send the command first, we read the response inline below
self.runCmds([{ 'cmd': 'pull ' + remoteFile }])
self._runCmds([{ 'cmd': 'pull ' + remoteFile }])
except AgentError:
return None
@ -796,10 +816,10 @@ class DeviceManagerSUT(DeviceManager):
def getFile(self, remoteFile, localFile = ''):
"""
copy file from device (remoteFile) to host (localFile)
external function
Copy file from device (remoteFile) to host (localFile)
returns:
success: output of pullfile, string
success: contents of file, string
failure: None
"""
if localFile == '':
@ -823,11 +843,8 @@ class DeviceManagerSUT(DeviceManager):
def getDirectory(self, remoteDir, localDir, checkDir=True):
"""
copy directory structure from device (remoteDir) to host (localDir)
external function
checkDir exists so that we don't create local directories if the
remote directory doesn't exist but also so that we don't call isDir
twice when recursing.
Copy directory structure from device (remoteDir) to host (localDir)
returns:
success: list of files, string
failure: None
@ -873,14 +890,14 @@ class DeviceManagerSUT(DeviceManager):
def isDir(self, remotePath):
"""
external function
Checks if remotePath is a directory on the device
returns:
success: True
failure: False
Throws a FileError exception when null (invalid dir/filename)
"""
try:
data = self.runCmds([{ 'cmd': 'isdir ' + remotePath }])
data = self._runCmds([{ 'cmd': 'isdir ' + remotePath }])
except AgentError:
# normally there should be no error here; a nonexistent file/directory will
# return the string "<filename>: No such file or directory".
@ -894,14 +911,14 @@ class DeviceManagerSUT(DeviceManager):
def validateFile(self, remoteFile, localFile):
"""
true/false check if the two files have the same md5 sum
external function
Checks if the remoteFile has the same md5 hash as the localFile
returns:
success: True
failure: False
"""
remoteHash = self.getRemoteHash(remoteFile)
localHash = self.getLocalHash(localFile)
remoteHash = self._getRemoteHash(remoteFile)
localHash = self._getLocalHash(localFile)
if (remoteHash == None):
return False
@ -911,16 +928,16 @@ class DeviceManagerSUT(DeviceManager):
return False
def getRemoteHash(self, filename):
def _getRemoteHash(self, filename):
"""
return the md5 sum of a remote file
internal function
Return the md5 sum of a file on the device
returns:
success: MD5 hash for given filename
failure: None
"""
try:
data = self.runCmds([{ 'cmd': 'hash ' + filename }])
data = self._runCmds([{ 'cmd': 'hash ' + filename }])
except AgentError:
return None
@ -946,7 +963,6 @@ class DeviceManagerSUT(DeviceManager):
/reftest
/mochitest
external function
returns:
success: path for device root
failure: None
@ -955,7 +971,7 @@ class DeviceManagerSUT(DeviceManager):
deviceRoot = self.deviceRoot
else:
try:
data = self.runCmds([{ 'cmd': 'testroot' }])
data = self._runCmds([{ 'cmd': 'testroot' }])
except:
return None
@ -969,8 +985,16 @@ class DeviceManagerSUT(DeviceManager):
return self.deviceRoot
def getAppRoot(self, packageName):
"""
Returns the app root directory
E.g /tests/fennec or /tests/firefox
returns:
success: path for app root
failure: None
"""
try:
data = self.runCmds([{ 'cmd': 'getapproot '+packageName }])
data = self._runCmds([{ 'cmd': 'getapproot ' + packageName }])
except:
return None
@ -978,7 +1002,10 @@ class DeviceManagerSUT(DeviceManager):
def unpackFile(self, file_path, dest_dir=None):
"""
external function
Unzips a remote bundle to a remote location
If dest_dir is not specified, the bundle is extracted
in the same directory
returns:
success: output of unzip command
failure: None
@ -995,7 +1022,7 @@ class DeviceManagerSUT(DeviceManager):
dest_dir += '/'
try:
data = self.runCmds([{ 'cmd': 'unzp %s %s' % (file_path, dest_dir)}])
data = self._runCmds([{ 'cmd': 'unzp %s %s' % (file_path, dest_dir)}])
except AgentError:
return None
@ -1003,7 +1030,8 @@ class DeviceManagerSUT(DeviceManager):
def reboot(self, ipAddr=None, port=30000):
"""
external function
Reboots the device
returns:
success: status from test agent
failure: None
@ -1018,18 +1046,17 @@ class DeviceManagerSUT(DeviceManager):
try:
destname = '/data/data/com.mozilla.SUTAgentAndroid/files/update.info'
data = "%s,%s\rrebooting\r" % (ipAddr, port)
self.runCmds([{ 'cmd': 'push %s %s' % (destname, len(data)),
'data': data }])
self._runCmds([{ 'cmd': 'push %s %s' % (destname, len(data)), 'data': data }])
except AgentError:
return None
ip, port = self.getCallbackIpAndPort(ipAddr, port)
ip, port = self._getCallbackIpAndPort(ipAddr, port)
cmd += " %s %s" % (ip, port)
# Set up our callback server
callbacksvr = callbackServer(ip, port, self.debug)
try:
status = self.runCmds([{ 'cmd': cmd }])
status = self._runCmds([{ 'cmd': cmd }])
except AgentError:
return None
@ -1047,18 +1074,18 @@ class DeviceManagerSUT(DeviceManager):
os - name of the os
id - unique id of the device
uptime - uptime of the device
uptimemillis - uptime of the device in milliseconds (SUTAgent 1.11+)
uptimemillis - uptime of the device in milliseconds (NOT supported on all implementations)
systime - system time of the device
screen - screen resolution
rotation - rotation of the device (in degrees)
memory - memory stats
process - list of running processes (same as ps)
disk - total, free, available bytes on disk
power - power status (charge, battery temp)
all - all of them - or call it with no parameters to get all the information
returns:
success: dict of info strings by directive name
failure: {}
failure: None
"""
data = None
result = {}
@ -1071,7 +1098,7 @@ class DeviceManagerSUT(DeviceManager):
for d in directives:
try:
data = self.runCmds([{ 'cmd': 'info ' + d }])
data = self._runCmds([{ 'cmd': 'info ' + d }])
except AgentError:
return result
@ -1098,13 +1125,10 @@ class DeviceManagerSUT(DeviceManager):
def installApp(self, appBundlePath, destPath=None):
"""
Installs the application onto the device
Application bundle - path to the application bundle on the device
Destination - destination directory of where application should be
installed to (optional)
Returns None for success, or output if known failure
Installs an application onto the device
appBundlePath - path to the application bundle on the device
destPath - destination directory of where application should be installed to (optional)
external function
returns:
success: None
failure: error string
@ -1114,7 +1138,7 @@ class DeviceManagerSUT(DeviceManager):
cmd += ' ' + destPath
try:
data = self.runCmds([{ 'cmd': cmd }])
data = self._runCmds([{ 'cmd': cmd }])
except AgentError, err:
print "Remote Device Error: Error installing app: %s" % err
return "%s" % err
@ -1125,46 +1149,64 @@ class DeviceManagerSUT(DeviceManager):
return line
return None
def uninstallApp(self, appName, installPath=None):
"""
Uninstalls the named application from device and DOES NOT cause a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
returns:
success: None
failure: DMError exception thrown
"""
cmd = 'uninstall ' + appName
if installPath:
cmd += ' ' + installPath
try:
data = self._runCmds([{ 'cmd': cmd }])
except AgentError, err:
raise DMError("Remote Device Error: Error uninstalling all %s" % appName)
status = data.split('\n')[0].strip()
if self.debug > 3:
print "uninstallApp: '%s'" % status
if status == 'Success':
return
raise DMError("Remote Device Error: uninstall failed for %s" % appName)
def uninstallAppAndReboot(self, appName, installPath=None):
"""
Uninstalls the named application from device and causes a reboot.
Takes an optional argument of installation path - the path to where the application
was installed.
Returns True, but it doesn't mean anything other than the command was sent,
the reboot happens and we don't know if this succeeds or not.
external function
Uninstalls the named application from device and causes a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
returns:
success: True
failure: None
success: None
failure: DMError exception thrown
"""
cmd = 'uninst ' + appName
if installPath:
cmd += ' ' + installPath
try:
data = self.runCmds([{ 'cmd': cmd }])
data = self._runCmds([{ 'cmd': cmd }])
except AgentError:
return None
raise DMError("Remote Device Error: uninstall failed for %s" % appName)
if (self.debug > 3):
print "uninstallAppAndReboot: " + str(data)
return True
return
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
"""
Updates the application on the device.
Application bundle - path to the application bundle on the device
Process name of application - used to end the process if the applicaiton is
currently running
Destination - Destination directory to where the application should be
installed (optional)
appBundlePath - path to the application bundle on the device
processName - used to end the process if the applicaiton is currently running (optional)
destPath - Destination directory to where the application should be installed (optional)
ipAddr - IP address to await a callback ping to let us know that the device has updated
properly - defaults to current IP.
properly - defaults to current IP.
port - port to await a callback ping to let us know that the device has updated properly
defaults to 30000, and counts up from there if it finds a conflict
Returns True if succeeds, False if not
defaults to 30000, and counts up from there if it finds a conflict
external function
returns:
success: text status from command or callback server
failure: None
@ -1181,7 +1223,7 @@ class DeviceManagerSUT(DeviceManager):
cmd += " " + destPath
if (ipAddr is not None):
ip, port = self.getCallbackIpAndPort(ipAddr, port)
ip, port = self._getCallbackIpAndPort(ipAddr, port)
cmd += " %s %s" % (ip, port)
# Set up our callback server
callbacksvr = callbackServer(ip, port, self.debug)
@ -1190,7 +1232,7 @@ class DeviceManagerSUT(DeviceManager):
print "INFO: updateApp using command: " + str(cmd)
try:
status = self.runCmds([{ 'cmd': cmd }])
status = self._runCmds([{ 'cmd': cmd }])
except AgentError:
return None
@ -1204,21 +1246,20 @@ class DeviceManagerSUT(DeviceManager):
def getCurrentTime(self):
"""
return the current time on the device
Returns device time in milliseconds since the epoch
external function
returns:
success: time in ms
failure: None
"""
try:
data = self.runCmds([{ 'cmd': 'clok' }])
data = self._runCmds([{ 'cmd': 'clok' }])
except AgentError:
return None
return data.strip()
def getCallbackIpAndPort(self, aIp, aPort):
def _getCallbackIpAndPort(self, aIp, aPort):
"""
Connect the ipaddress and port for a callback ping. Defaults to current IP address
And ports starting at 30000.
@ -1234,7 +1275,7 @@ class DeviceManagerSUT(DeviceManager):
port = nettools.findOpenPort(ip, 30000)
return ip, port
def formatEnvString(self, env):
def _formatEnvString(self, env):
"""
Returns a properly formatted env string for the agent.
Input - env, which is either None, '', or a dict
@ -1254,8 +1295,9 @@ class DeviceManagerSUT(DeviceManager):
"""
adjust the screen resolution on the device, REBOOT REQUIRED
NOTE: this only works on a tegra ATM
success: True
failure: False
return:
success: True
failure: False
supported resolutions: 640x480, 800x600, 1024x768, 1152x864, 1200x1024, 1440x900, 1680x1050, 1920x1080
"""
@ -1291,22 +1333,23 @@ class DeviceManagerSUT(DeviceManager):
if (self.debug >= 3):
print "INFO: adjusting screen resolution to %s, %s and rebooting" % (width, height)
try:
self.runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.width %s" % (screentype, width) }])
self.runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.height %s" % (screentype, height) }])
self._runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.width %s" % (screentype, width) }])
self._runCmds([{ 'cmd': "exec setprop persist.tegra.dpy%s.mode.height %s" % (screentype, height) }])
except AgentError:
return False
return True
def chmodDir(self, remoteDir):
def chmodDir(self, remoteDir, **kwargs):
"""
external function
Recursively changes file permissions in a directory
returns:
success: True
failure: False
"""
try:
self.runCmds([{ 'cmd': "chmod "+remoteDir }])
self._runCmds([{ 'cmd': "chmod "+remoteDir }])
except AgentError:
return False
return True

View File

@ -5,7 +5,7 @@
import os
from setuptools import setup
PACKAGE_VERSION = '0.6'
PACKAGE_VERSION = '0.9'
# take description from README
here = os.path.dirname(os.path.abspath(__file__))

View File

@ -0,0 +1,4 @@
mozfile is a convenience library for taking care of some common file-related
tasks in automated testing, such as extracting files or recursively removing
directories.

View File

@ -0,0 +1,5 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from mozfile import *

View File

@ -0,0 +1,133 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
mozfile.py:
Cointains file functions for mozbase:
https://bugzilla.mozilla.org/show_bug.cgi?id=774916
"""
import os
import tarfile
import zipfile
__all__ = ['extract_tarball', 'extract_zip', 'extract', 'rmtree']
### utilities for extracting archives
def extract_tarball(src, dest):
"""extract a .tar file"""
bundle = tarfile.open(src)
namelist = bundle.getnames()
for name in namelist:
bundle.extract(name, path=dest)
bundle.close()
return namelist
def extract_zip(src, dest):
"""extract a zip file"""
bundle = zipfile.ZipFile(src)
namelist = bundle.namelist()
for name in namelist:
filename = os.path.realpath(os.path.join(dest, name))
if name.endswith('/'):
os.makedirs(filename)
else:
path = os.path.dirname(filename)
if not os.path.isdir(path):
os.makedirs(path)
_dest = open(filename, 'wb')
_dest.write(bundle.read(name))
_dest.close()
bundle.close()
return namelist
def extract(src, dest=None):
"""
Takes in a tar or zip file and extracts it to dest
If dest is not specified, extracts to os.path.dirname(src)
Returns the list of top level files that were extracted
"""
assert os.path.exists(src), "'%s' does not exist" % src
assert not os.path.isfile(dest), "dest cannot be a file"
if dest is None:
dest = os.path.dirname(src)
elif not os.path.isdir(dest):
os.makedirs(dest)
if zipfile.is_zipfile(src):
namelist = extract_zip(src, dest)
elif tarfile.is_tarfile(src):
namelist = extract_tarball(src, dest)
else:
raise Exception("mozfile.extract: no archive format found for '%s'" %
src)
# namelist returns paths with forward slashes even in windows
top_level_files = [os.path.join(dest, name) for name in namelist
if len(name.rstrip('/').split('/')) == 1]
# namelist doesn't include folders, append these to the list
for name in namelist:
root = os.path.join(dest, name[:name.find('/')])
if root not in top_level_files:
top_level_files.append(root)
return top_level_files
def rmtree(dir):
"""This is a replacement for shutil.rmtree that works better under
windows. Thanks to Bear at the OSAF for the code."""
if not os.path.exists(dir):
return
if os.path.islink(dir):
os.remove(dir)
return
# Verify the directory is read/write/execute for the current user
os.chmod(dir, 0700)
# os.listdir below only returns a list of unicode filenames
# if the parameter is unicode.
# If a non-unicode-named dir contains a unicode filename,
# that filename will get garbled.
# So force dir to be unicode.
try:
dir = unicode(dir, "utf-8")
except:
print("rmtree: decoding from UTF-8 failed")
for name in os.listdir(dir):
full_name = os.path.join(dir, name)
# on Windows, if we don't have write permission we can't remove
# the file/directory either, so turn that on
if os.name == 'nt':
if not os.access(full_name, os.W_OK):
# I think this is now redundant, but I don't have an NT
# machine to test on, so I'm going to leave it in place
# -warner
os.chmod(full_name, 0600)
if os.path.islink(full_name):
os.remove(full_name)
elif os.path.isdir(full_name):
rmtree(full_name)
else:
if os.path.isfile(full_name):
os.chmod(full_name, 0700)
os.remove(full_name)
os.rmdir(dir)

View File

@ -0,0 +1,32 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import os
from setuptools import setup
PACKAGE_VERSION = '0.0'
# get documentation from the README
try:
here = os.path.dirname(os.path.abspath(__file__))
description = file(os.path.join(here, 'README.md')).read()
except (OSError, IOError):
description = ''
setup(name='mozfile',
version=PACKAGE_VERSION,
description="common file utilities for Mozilla python usage",
long_description=description,
classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
keywords='mozilla',
author='Mozilla Automation and Tools team',
author_email='tools@lists.mozilla.org',
url='https://wiki.mozilla.org/Auto-tools/Projects/MozBase',
license='MPL',
packages=['mozfile'],
include_package_data=True,
zip_safe=False,
install_requires=[]
)

View File

@ -0,0 +1 @@
[test.py]

View File

@ -0,0 +1,182 @@
#!/usr/bin/env python
"""
tests for mozfile
"""
import mozfile
import os
import shutil
import tarfile
import tempfile
import unittest
import zipfile
# stub file paths
files = [('foo.txt',),
('foo', 'bar.txt'),
('foo', 'bar', 'fleem.txt'),
('foobar', 'fleem.txt'),
('bar.txt')]
def create_stub():
"""create a stub directory"""
tempdir = tempfile.mkdtemp()
try:
for path in files:
fullpath = os.path.join(tempdir, *path)
dirname = os.path.dirname(fullpath)
if not os.path.exists(dirname):
os.makedirs(dirname)
contents = path[-1]
f = file(fullpath, 'w')
f.write(contents)
f.close()
return tempdir
except Exception, e:
try:
shutil.rmtree(tempdir)
except:
pass
raise e
class TestExtract(unittest.TestCase):
"""test extracting archives"""
def ensure_directory_contents(self, directory):
"""ensure the directory contents match"""
for f in files:
path = os.path.join(directory, *f)
exists = os.path.exists(path)
if not exists:
print "%s does not exist" % (os.path.join(f))
self.assertTrue(exists)
if exists:
contents = file(path).read().strip()
self.assertTrue(contents == f[-1])
def test_extract_zipfile(self):
"""test extracting a zipfile"""
_zipfile = self.create_zip()
self.assertTrue(os.path.exists(_zipfile))
try:
dest = tempfile.mkdtemp()
try:
mozfile.extract_zip(_zipfile, dest)
self.ensure_directory_contents(dest)
finally:
shutil.rmtree(dest)
finally:
os.remove(_zipfile)
def test_extract_tarball(self):
"""test extracting a tarball"""
tarball = self.create_tarball()
self.assertTrue(os.path.exists(tarball))
try:
dest = tempfile.mkdtemp()
try:
mozfile.extract_tarball(tarball, dest)
self.ensure_directory_contents(dest)
finally:
shutil.rmtree(dest)
finally:
os.remove(tarball)
def test_extract(self):
"""test the generalized extract function"""
# test extracting a tarball
tarball = self.create_tarball()
self.assertTrue(os.path.exists(tarball))
try:
dest = tempfile.mkdtemp()
try:
mozfile.extract(tarball, dest)
self.ensure_directory_contents(dest)
finally:
shutil.rmtree(dest)
finally:
os.remove(tarball)
# test extracting a zipfile
_zipfile = self.create_zip()
self.assertTrue(os.path.exists(_zipfile))
try:
dest = tempfile.mkdtemp()
try:
mozfile.extract_zip(_zipfile, dest)
self.ensure_directory_contents(dest)
finally:
shutil.rmtree(dest)
finally:
os.remove(_zipfile)
# test extracting some non-archive; this should fail
fd, filename = tempfile.mkstemp()
os.write(fd, 'This is not a zipfile or tarball')
os.close(fd)
exception = None
try:
dest = tempfile.mkdtemp()
mozfile.extract(filename, dest)
except Exception, exception:
pass
finally:
os.remove(filename)
os.rmdir(dest)
self.assertTrue(isinstance(exception, Exception))
### utility functions
def create_tarball(self):
"""create a stub tarball for testing"""
tempdir = create_stub()
filename = tempfile.mktemp(suffix='.tar')
archive = tarfile.TarFile(filename, mode='w')
try:
for path in files:
archive.add(os.path.join(tempdir, *path), arcname=os.path.join(*path))
except:
os.remove(archive)
raise
finally:
shutil.rmtree(tempdir)
archive.close()
return filename
def create_zip(self):
"""create a stub zipfile for testing"""
tempdir = create_stub()
filename = tempfile.mktemp(suffix='.zip')
archive = zipfile.ZipFile(filename, mode='w')
try:
for path in files:
archive.write(os.path.join(tempdir, *path), arcname=os.path.join(*path))
except:
os.remove(filename)
raise
finally:
shutil.rmtree(tempdir)
archive.close()
return filename
class TestRemoveTree(unittest.TestCase):
"""test our ability to remove a directory tree"""
def remove_directory(self):
tempdir = create_stub()
self.assertTrue(os.path.exists(tempdir))
self.assertTrue(os.path.isdir(tempdir))
try:
mozfile.rmtree(tempdir)
except:
shutil.rmtree(tempdir)
raise
self.assertFalse(os.path.exists(tempdir))
if __name__ == '__main__':
unittest.main()

View File

@ -9,9 +9,10 @@
# https://github.com/mozilla/mozbase/blob/master/test.py
[include:manifestdestiny/tests/manifest.ini]
[include:mozcrash/tests/manifest.ini]
[include:mozdevice/tests/manifest.ini]
[include:mozfile/tests/manifest.ini]
[include:mozhttpd/tests/manifest.ini]
[include:mozprocess/tests/manifest.ini]
[include:mozprofile/tests/manifest.ini]
[include:mozhttpd/tests/manifest.ini]
[include:mozdevice/tests/manifest.ini]
[include:moztest/tests/manifest.ini]
[include:mozcrash/tests/manifest.ini]