Chris Lord 1649ace873 Bug 720663 - devicemanagerADB pushDir fallback not always tried. r=gbrown
The pushDir method has two paths, one that uses zip/unzip
and the other that just pushes each file one at a time. The idea is that the
first path is chosen, and if it fails, the second is tried before bailing out,
but it's possible for the first failure not to be detected in such a way that
activates the fallback path.

Add a try/except block to catch exceptions in the first path and fall back to
the second.
2012-01-25 09:49:30 +00:00

643 lines
21 KiB

import subprocess
from devicemanager import DeviceManager, DMError
import re
import os
import sys
import tempfile
class DeviceManagerADB(DeviceManager):
def __init__(self, host = None, port = 20701, retrylimit = 5, packageName = None): = host
self.port = port
self.retrylimit = retrylimit
self.retries = 0
self._sock = None
self.useRunAs = False
self.useZip = False
self.packageName = None
if packageName == None:
if os.getenv('USER'):
packageName = 'org.mozilla.fennec_' + os.getenv('USER')
packageName = 'org.mozilla.fennec_'
def Init(self, packageName):
# Initialization code that may fail: Catch exceptions here to allow
# successful initialization even if, for example, adb is not installed.
self.useRunAs = False
self.packageName = None
self.useZip = False
# a test to see if we have root privs
files = self.listFiles("/data/data")
if (len(files) == 1):
if (files[0].find("Permission denied") != -1):
print "NOT running as root"
raise Exception("not running as root")
print "restarting as root failed"
# external function
# returns:
# success: True
# failure: False
def pushFile(self, localname, destname):
if ( == "nt"):
destname = destname.replace('\\', '/')
if (self.useRunAs):
remoteTmpFile = self.tmpDir + "/" + os.path.basename(localname)
self.checkCmd(["push", os.path.realpath(localname), remoteTmpFile])
self.checkCmdAs(["shell", "cp", remoteTmpFile, destname])
self.checkCmd(["shell", "rm", remoteTmpFile])
self.checkCmd(["push", os.path.realpath(localname), destname])
if (self.isDir(destname)):
destname = destname + "/" + os.path.basename(localname)
return True
return False
# external function
# returns:
# success: directory name
# failure: None
def mkDir(self, name):
self.checkCmdAs(["shell", "mkdir", name])
return name
return None
# make directory structure on the device
# external function
# returns:
# success: directory structure that we created
# failure: None
def mkDirs(self, filename):
parts = filename.split('/')
name = ""
for part in parts:
if (part == parts[-1]): break
if (part != ""):
name += '/' + part
if (not self.dirExists(name)):
if (self.mkDir(name) == None):
print "failed making directory: " + str(name)
return None
return name
# push localDir from host to remoteDir on the device
# external function
# returns:
# success: remoteDir
# failure: None
def pushDir(self, localDir, remoteDir):
# adb "push" accepts a directory as an argument, but if the directory
# contains symbolic links, the links are pushed, rather than the linked
# files; we either zip/unzip or push file-by-file to get around this
# limitation
if (not self.dirExists(remoteDir)):
if (self.useZip):
localZip = tempfile.mktemp()+".zip"
remoteZip = remoteDir + "/"
subprocess.check_output(["zip", "-r", localZip, '.'], cwd=localDir)
self.pushFile(localZip, remoteZip)
data = self.runCmdAs(["shell", "unzip", "-o", remoteZip, "-d", remoteDir])
self.checkCmdAs(["shell", "rm", remoteZip])
if ("unzip: exiting", data) or"Operation not permitted", data)):
raise Exception("unzip failed, or permissions error")
print "zip/unzip failure: falling back to normal push"
self.useZip = False
self.pushDir(localDir, remoteDir)
for root, dirs, files in os.walk(localDir, followlinks='true'):
relRoot = os.path.relpath(root, localDir)
for file in files:
localFile = os.path.join(root, file)
remoteFile = remoteDir + "/"
if (relRoot!="."):
remoteFile = remoteFile + relRoot + "/"
remoteFile = remoteFile + file
self.pushFile(localFile, remoteFile)
for dir in dirs:
targetDir = remoteDir + "/"
if (relRoot!="."):
targetDir = targetDir + relRoot + "/"
targetDir = targetDir + dir
if (not self.dirExists(targetDir)):
self.checkCmdAs(["shell", "chmod", "777", remoteDir])
return remoteDir
print "pushing " + localDir + " to " + remoteDir + " failed"
return None
# external function
# returns:
# success: True
# failure: False
def dirExists(self, dirname):
return self.isDir(dirname)
# Because we always have / style paths we make this a lot easier with some
# assumptions
# external function
# returns:
# success: True
# failure: False
def fileExists(self, filepath):
p = self.runCmd(["shell", "ls", "-a", filepath])
data = p.stdout.readlines()
if (len(data) == 1):
if (data[0].rstrip() == filepath):
return True
return False
def removeFile(self, filename):
return self.runCmd(["shell", "rm", filename])
# does a recursive delete of directory on the device: rm -Rf remoteDir
# external function
# returns:
# success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
# failure: None
def removeSingleDir(self, remoteDir):
return self.runCmd(["shell", "rmdir", remoteDir])
# does a recursive delete of directory on the device: rm -Rf remoteDir
# external function
# returns:
# success: output of telnet, i.e. "removing file: /mnt/sdcard/tests/test.txt"
# failure: None
def removeDir(self, remoteDir):
out = ""
if (self.isDir(remoteDir)):
files = self.listFiles(remoteDir.strip())
for f in files:
if (self.isDir(remoteDir.strip() + "/" + f.strip())):
out += self.removeDir(remoteDir.strip() + "/" + f.strip())
out += self.removeFile(remoteDir.strip() + "/" + f.strip())
out += self.removeSingleDir(remoteDir.strip())
out += self.removeFile(remoteDir.strip())
return out
def isDir(self, remotePath):
p = self.runCmd(["shell", "ls", "-a", remotePath])
data = p.stdout.readlines()
if (len(data) == 0):
return True
if (len(data) == 1):
if (data[0].rstrip() == remotePath):
return False
if (data[0].find("No such file or directory") != -1):
return False
if (data[0].find("Not a directory") != -1):
return False
return True
def listFiles(self, rootdir):
p = self.runCmd(["shell", "ls", "-a", rootdir])
data = p.stdout.readlines()
if (len(data) == 1):
if (data[0] == rootdir):
return []
if (data[0].find("No such file or directory") != -1):
return []
if (data[0].find("Not a directory") != -1):
return []
return data
# external function
# returns:
# success: array of process tuples
# failure: []
def getProcessList(self):
p = self.runCmd(["shell", "ps"])
# first line is the headers
proc = p.stdout.readline()
ret = []
while (proc):
els = proc.split()
ret.append(list([els[1], els[len(els) - 1], els[0]]))
proc = p.stdout.readline()
return ret
# external function
# returns:
# success: pid
# failure: None
def fireProcess(self, appname, failIfRunning=False):
#strip out env vars
parts = appname.split('"');
if (len(parts) > 2):
parts = parts[2:]
return self.launchProcess(parts, failIfRunning)
# external function
# returns:
# success: output filename
# failure: None
def launchProcess(self, cmd, outputFile = "process.txt", cwd = '', env = '', failIfRunning=False):
if cmd[0] == "am":
self.checkCmd(["shell"] + cmd)
return outputFile
acmd = ["shell", "am","start"]
cmd = ' '.join(cmd).strip()
i = cmd.find(" ")
# SUT identifies the URL by looking for :\\ -- another strategy to consider
re_url = re.compile('^[http|file|chrome|about].*')
last = cmd.rfind(" ")
uri = ""
args = ""
if re_url.match(cmd[last:].strip()):
args = cmd[i:last].strip()
uri = cmd[last:].strip()
args = cmd[i:].strip()
acmd.append(cmd[0:i] + "/.App")
if args != "":
if uri != "":
acmd.append(''.join(['\'',uri, '\'']));
print acmd
return outputFile
# external function
# returns:
# success: output from testagent
# failure: None
def killProcess(self, appname):
procs = self.getProcessList()
for (pid, name, user) in procs:
if name == appname:
p = self.runCmdAs(["shell", "kill", pid])
return None
# external function
# returns:
# success: filecontents
# failure: None
def catFile(self, remoteFile):
#p = self.runCmd(["shell", "cat", remoteFile])
return self.getFile(remoteFile)
# external function
# returns:
# success: output of pullfile, string
# failure: None
def pullFile(self, remoteFile):
#return self.catFile(remoteFile)
return self.getFile(remoteFile)
# copy file from device (remoteFile) to host (localFile)
# external function
# returns:
# success: output of pullfile, string
# failure: None
def getFile(self, remoteFile, localFile = 'tmpfile_dm_adb'):
# TODO: add debug flags and allow for printing stdout
# self.runCmd(["pull", remoteFile, localFile])
self.runCmd(["pull", remoteFile, localFile])
f = open(localFile)
ret =
return ret;
return None
# copy directory structure from device (remoteDir) to host (localDir)
# external function
# checkDir exists so that we don't create local directories if the
# remote directory doesn't exist but also so that we don't call isDir
# twice when recursing.
# returns:
# success: list of files, string
# failure: None
def getDirectory(self, remoteDir, localDir, checkDir=True):
ret = []
p = self.runCmd(["pull", remoteDir, localDir])
line = p.stderr.readline()
while (line):
els = line.split()
f = els[len(els) - 1]
i = f.find(localDir)
if (i != -1):
if (localDir[len(localDir) - 1] != '/'):
i = i + 1
f = f[i + len(localDir):]
i = f.find("/")
if (i > 0):
f = f[0:i]
line = p.stderr.readline()
#the last line is a summary
if (len(ret) > 0):
return ret
# true/false check if the two files have the same md5 sum
# external function
# returns:
# success: True
# failure: False
def validateFile(self, remoteFile, localFile):
return self.getRemoteHash(remoteFile) == self.getLocalHash(localFile)
# return the md5 sum of a remote file
# internal function
# returns:
# success: MD5 hash for given filename
# failure: None
def getRemoteHash(self, filename):
data = p = self.runCmd(["shell", "ls", "-l", filename])
return data.split()[3]
def getLocalHash(self, filename):
data = p = subprocess.Popen(["ls", "-l", filename], stdout=subprocess.PIPE)
return data.split()[4]
# Gets the device root for the testing area on the device
# For all devices we will use / type slashes and depend on the device-agent
# to sort those out. The agent will return us the device location where we
# should store things, we will then create our /tests structure relative to
# that returned path.
# Structure on the device is as follows:
# /tests
# /<fennec>|<firefox> --> approot
# /profile
# /xpcshell
# /reftest
# /mochitest
# external function
# returns:
# success: path for device root
# failure: None
def getDeviceRoot(self):
# /mnt/sdcard/tests is preferred to /data/local/tests, but this can be
# over-ridden by creating /data/local/tests
testRoot = "/data/local/tests"
if (self.dirExists(testRoot)):
return testRoot
root = "/mnt/sdcard"
if (not self.dirExists(root)):
root = "/data/local"
testRoot = root + "/tests"
if (not self.dirExists(testRoot)):
return testRoot
# Either we will have /tests/fennec or /tests/firefox but we will never have
# both. Return the one that exists
# TODO: ensure we can support org.mozilla.firefox
# external function
# returns:
# success: path for app root
# failure: None
def getAppRoot(self, packageName):
devroot = self.getDeviceRoot()
if (devroot == None):
return None
if (packageName and self.dirExists('/data/data/' + packageName)):
self.packageName = packageName
return '/data/data/' + packageName
elif (self.packageName and self.dirExists('/data/data/' + self.packageName)):
return '/data/data/' + self.packageName
# Failure (either not installed or not a recognized platform)
print "devicemanagerADB: getAppRoot failed"
return None
# Gets the directory location on the device for a specific test type
# Type is one of: xpcshell|reftest|mochitest
# external function
# returns:
# success: path for test root
# failure: None
def getTestRoot(self, type):
devroot = self.getDeviceRoot()
if (devroot == None):
return None
if ('xpcshell', type, re.I)):
self.testRoot = devroot + '/xpcshell'
elif ('?(i)reftest', type)):
self.testRoot = devroot + '/reftest'
elif ('?(i)mochitest', type)):
self.testRoot = devroot + '/mochitest'
return self.testRoot
# external function
# returns:
# success: status from test agent
# failure: None
def reboot(self, wait = False):
ret = self.runCmd(["reboot"])
if (not wait):
return "Success"
countdown = 40
while (countdown > 0):
self.checkCmd(["wait-for-device", "shell", "ls", "/sbin"])
return ret
print "couldn't get root"
return "Success"
# external function
# returns:
# success: text status from command or callback server
# failure: None
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
return self.runCmd(["install", "-r", appBundlePath])
# external function
# returns:
# success: time in ms
# failure: None
def getCurrentTime(self):
timestr = self.runCmd(["shell", "date", "+%s"])
if (not timestr or not timestr.isdigit()):
return None
return str(int(timestr)*1000)
# Returns information about the device:
# Directive indicates the information you want to get, your choices are:
# os - name of the os
# id - unique id of the device
# uptime - uptime of the device
# systime - system time of the device
# screen - screen resolution
# memory - memory stats
# process - list of running processes (same as ps)
# disk - total, free, available bytes on disk
# power - power status (charge, battery temp)
# all - all of them - or call it with no parameters to get all the information
# returns:
# success: dict of info strings by directive name
# failure: {}
def getInfo(self, directive="all"):
ret = {}
if (directive == "id" or directive == "all"):
ret["id"] = self.runCmd(["get-serialno"])
if (directive == "os" or directive == "all"):
ret["os"] = self.runCmd(["shell", "getprop", ""])
if (directive == "uptime" or directive == "all"):
utime = self.runCmd(["shell", "uptime"])
if (not utime):
raise DMError("error getting uptime")
utime = utime[9:]
hours = utime[0:utime.find(":")]
utime = utime[utime[1:].find(":") + 2:]
minutes = utime[0:utime.find(":")]
utime = utime[utime[1:].find(":") + 2:]
seconds = utime[0:utime.find(",")]
ret["uptime"] = ["0 days " + hours + " hours " + minutes + " minutes " + seconds + " seconds"]
if (directive == "process" or directive == "all"):
ret["process"] = self.runCmd(["shell", "ps"])
if (directive == "systime" or directive == "all"):
ret["systime"] = self.runCmd(["shell", "date"])
print ret
return ret
def runCmd(self, args):
args.insert(0, "adb")
return subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def runCmdAs(self, args):
if self.useRunAs:
args.insert(1, "run-as")
args.insert(2, self.packageName)
return self.runCmd(args)
def checkCmd(self, args):
args.insert(0, "adb")
return subprocess.check_call(args)
def checkCmdAs(self, args):
if (self.useRunAs):
args.insert(1, "run-as")
args.insert(2, self.packageName)
return self.checkCmd(args)
def chmodDir(self, remoteDir):
if (self.isDir(remoteDir)):
files = self.listFiles(remoteDir.strip())
for f in files:
if (self.isDir(remoteDir.strip() + "/" + f.strip())):
self.chmodDir(remoteDir.strip() + "/" + f.strip())
self.checkCmdAs(["shell", "chmod", "777", remoteDir.strip()])
print "chmod " + remoteDir.strip()
self.checkCmdAs(["shell", "chmod", "777", remoteDir])
print "chmod " + remoteDir
self.checkCmdAs(["shell", "chmod", "777", remoteDir.strip()])
print "chmod " + remoteDir.strip()
def verifyADB(self):
# Check to see if adb itself can be executed.
print "unable to execute ADB: ensure Android SDK is installed and adb is in your $PATH"
def isCpAvailable(self):
# Some Android systems may not have a cp command installed,
# or it may not be executable by the user.
data = self.runCmd(["shell", "cp"])
if ('Usage', data)):
return True
print "unable to execute 'cp' on device; consider installing busybox from Android Market"
return False
def verifyRunAs(self, packageName):
# If a valid package name is available, and certain other
# conditions are met, devicemanagerADB can execute file operations
# via the "run-as" command, so that pushed files and directories
# are created by the uid associated with the package, more closely
# echoing conditions encountered by Fennec at run time.
# Check to see if run-as can be used here, by verifying a
# file copy via run-as.
self.useRunAs = False
devroot = self.getDeviceRoot()
if (packageName and self.isCpAvailable() and devroot):
self.tmpDir = devroot + "/tmp"
if (not self.dirExists(self.tmpDir)):
self.checkCmd(["shell", "run-as", packageName, "mkdir", devroot + "/sanity"])
self.checkCmd(["push", os.path.abspath(sys.argv[0]), self.tmpDir + "/tmpfile"])
self.checkCmd(["shell", "run-as", packageName, "cp", self.tmpDir + "/tmpfile", devroot + "/sanity"])
if (self.fileExists(devroot + "/sanity/tmpfile")):
print "will execute commands via run-as " + packageName
self.packageName = packageName
self.useRunAs = True
self.checkCmd(["shell", "rm", devroot + "/tmp/tmpfile"])
self.checkCmd(["shell", "run-as", packageName, "rm", "-r", devroot + "/sanity"])
def isUnzipAvailable(self):
data = self.runCmdAs(["shell", "unzip"])
if ('Usage', data)):
return True
return False
def isLocalZipAvailable(self):
subprocess.check_call(["zip", "-?"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return False
return True
def verifyZip(self):
# If "zip" can be run locally, and "unzip" can be run remotely, then pushDir
# can use these to push just one file per directory -- a significant
# optimization for large directories.
self.useZip = False
if (self.isUnzipAvailable() and self.isLocalZipAvailable()):
print "will use zip to push directories"
self.useZip = True