mirror of
https://github.com/AdaCore/cpython.git
synced 2026-02-12 12:57:15 -08:00
Improved the documentation and tests for make_archive(). Improved error handling when corresponding compress module is not available. External zip executable is now used if the zlib module is not available.
931 lines
34 KiB
Python
931 lines
34 KiB
Python
# Copyright (C) 2003 Python Software Foundation
|
|
|
|
import unittest
|
|
import shutil
|
|
import tempfile
|
|
import sys
|
|
import stat
|
|
import os
|
|
import os.path
|
|
import errno
|
|
import subprocess
|
|
from distutils.spawn import find_executable
|
|
from shutil import (make_archive,
|
|
register_archive_format, unregister_archive_format,
|
|
get_archive_formats)
|
|
import tarfile
|
|
import warnings
|
|
|
|
from test import test_support as support
|
|
from test.test_support import TESTFN, check_warnings, captured_stdout
|
|
|
|
TESTFN2 = TESTFN + "2"
|
|
|
|
try:
|
|
import grp
|
|
import pwd
|
|
UID_GID_SUPPORT = True
|
|
except ImportError:
|
|
UID_GID_SUPPORT = False
|
|
|
|
try:
|
|
import zlib
|
|
except ImportError:
|
|
zlib = None
|
|
|
|
try:
|
|
import zipfile
|
|
import zlib
|
|
ZIP_SUPPORT = True
|
|
except ImportError:
|
|
ZIP_SUPPORT = find_executable('zip')
|
|
|
|
class TestShutil(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestShutil, self).setUp()
|
|
self.tempdirs = []
|
|
|
|
def tearDown(self):
|
|
super(TestShutil, self).tearDown()
|
|
while self.tempdirs:
|
|
d = self.tempdirs.pop()
|
|
shutil.rmtree(d, os.name in ('nt', 'cygwin'))
|
|
|
|
def write_file(self, path, content='xxx'):
|
|
"""Writes a file in the given path.
|
|
|
|
|
|
path can be a string or a sequence.
|
|
"""
|
|
if isinstance(path, (list, tuple)):
|
|
path = os.path.join(*path)
|
|
f = open(path, 'w')
|
|
try:
|
|
f.write(content)
|
|
finally:
|
|
f.close()
|
|
|
|
def mkdtemp(self):
|
|
"""Create a temporary directory that will be cleaned up.
|
|
|
|
Returns the path of the directory.
|
|
"""
|
|
d = tempfile.mkdtemp()
|
|
self.tempdirs.append(d)
|
|
return d
|
|
def test_rmtree_errors(self):
|
|
# filename is guaranteed not to exist
|
|
filename = tempfile.mktemp()
|
|
self.assertRaises(OSError, shutil.rmtree, filename)
|
|
|
|
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
|
|
@unittest.skipIf(sys.platform[:6] == 'cygwin',
|
|
"This test can't be run on Cygwin (issue #1071513).")
|
|
@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
|
|
"This test can't be run reliably as root (issue #1076467).")
|
|
def test_on_error(self):
|
|
self.errorState = 0
|
|
os.mkdir(TESTFN)
|
|
self.childpath = os.path.join(TESTFN, 'a')
|
|
f = open(self.childpath, 'w')
|
|
f.close()
|
|
old_dir_mode = os.stat(TESTFN).st_mode
|
|
old_child_mode = os.stat(self.childpath).st_mode
|
|
# Make unwritable.
|
|
os.chmod(self.childpath, stat.S_IREAD)
|
|
os.chmod(TESTFN, stat.S_IREAD)
|
|
|
|
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
|
|
# Test whether onerror has actually been called.
|
|
self.assertEqual(self.errorState, 2,
|
|
"Expected call to onerror function did not happen.")
|
|
|
|
# Make writable again.
|
|
os.chmod(TESTFN, old_dir_mode)
|
|
os.chmod(self.childpath, old_child_mode)
|
|
|
|
# Clean up.
|
|
shutil.rmtree(TESTFN)
|
|
|
|
def check_args_to_onerror(self, func, arg, exc):
|
|
# test_rmtree_errors deliberately runs rmtree
|
|
# on a directory that is chmod 400, which will fail.
|
|
# This function is run when shutil.rmtree fails.
|
|
# 99.9% of the time it initially fails to remove
|
|
# a file in the directory, so the first time through
|
|
# func is os.remove.
|
|
# However, some Linux machines running ZFS on
|
|
# FUSE experienced a failure earlier in the process
|
|
# at os.listdir. The first failure may legally
|
|
# be either.
|
|
if self.errorState == 0:
|
|
if func is os.remove:
|
|
self.assertEqual(arg, self.childpath)
|
|
else:
|
|
self.assertIs(func, os.listdir,
|
|
"func must be either os.remove or os.listdir")
|
|
self.assertEqual(arg, TESTFN)
|
|
self.assertTrue(issubclass(exc[0], OSError))
|
|
self.errorState = 1
|
|
else:
|
|
self.assertEqual(func, os.rmdir)
|
|
self.assertEqual(arg, TESTFN)
|
|
self.assertTrue(issubclass(exc[0], OSError))
|
|
self.errorState = 2
|
|
|
|
def test_rmtree_dont_delete_file(self):
|
|
# When called on a file instead of a directory, don't delete it.
|
|
handle, path = tempfile.mkstemp()
|
|
os.fdopen(handle).close()
|
|
self.assertRaises(OSError, shutil.rmtree, path)
|
|
os.remove(path)
|
|
|
|
def test_copytree_simple(self):
|
|
def write_data(path, data):
|
|
f = open(path, "w")
|
|
f.write(data)
|
|
f.close()
|
|
|
|
def read_data(path):
|
|
f = open(path)
|
|
data = f.read()
|
|
f.close()
|
|
return data
|
|
|
|
src_dir = tempfile.mkdtemp()
|
|
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
|
|
|
|
write_data(os.path.join(src_dir, 'test.txt'), '123')
|
|
|
|
os.mkdir(os.path.join(src_dir, 'test_dir'))
|
|
write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
|
|
|
|
try:
|
|
shutil.copytree(src_dir, dst_dir)
|
|
self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
|
|
self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
|
|
self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
|
|
'test.txt')))
|
|
actual = read_data(os.path.join(dst_dir, 'test.txt'))
|
|
self.assertEqual(actual, '123')
|
|
actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
|
|
self.assertEqual(actual, '456')
|
|
finally:
|
|
for path in (
|
|
os.path.join(src_dir, 'test.txt'),
|
|
os.path.join(dst_dir, 'test.txt'),
|
|
os.path.join(src_dir, 'test_dir', 'test.txt'),
|
|
os.path.join(dst_dir, 'test_dir', 'test.txt'),
|
|
):
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
for path in (src_dir,
|
|
os.path.dirname(dst_dir)
|
|
):
|
|
if os.path.exists(path):
|
|
shutil.rmtree(path)
|
|
|
|
def test_copytree_with_exclude(self):
|
|
|
|
def write_data(path, data):
|
|
f = open(path, "w")
|
|
f.write(data)
|
|
f.close()
|
|
|
|
def read_data(path):
|
|
f = open(path)
|
|
data = f.read()
|
|
f.close()
|
|
return data
|
|
|
|
# creating data
|
|
join = os.path.join
|
|
exists = os.path.exists
|
|
src_dir = tempfile.mkdtemp()
|
|
try:
|
|
dst_dir = join(tempfile.mkdtemp(), 'destination')
|
|
write_data(join(src_dir, 'test.txt'), '123')
|
|
write_data(join(src_dir, 'test.tmp'), '123')
|
|
os.mkdir(join(src_dir, 'test_dir'))
|
|
write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
|
|
os.mkdir(join(src_dir, 'test_dir2'))
|
|
write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
|
|
os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
|
|
os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
|
|
write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
|
|
write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
|
|
|
|
|
|
# testing glob-like patterns
|
|
try:
|
|
patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
|
|
shutil.copytree(src_dir, dst_dir, ignore=patterns)
|
|
# checking the result: some elements should not be copied
|
|
self.assertTrue(exists(join(dst_dir, 'test.txt')))
|
|
self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
|
|
self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
|
|
finally:
|
|
if os.path.exists(dst_dir):
|
|
shutil.rmtree(dst_dir)
|
|
try:
|
|
patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
|
|
shutil.copytree(src_dir, dst_dir, ignore=patterns)
|
|
# checking the result: some elements should not be copied
|
|
self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
|
|
self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
|
|
self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
|
|
finally:
|
|
if os.path.exists(dst_dir):
|
|
shutil.rmtree(dst_dir)
|
|
|
|
# testing callable-style
|
|
try:
|
|
def _filter(src, names):
|
|
res = []
|
|
for name in names:
|
|
path = os.path.join(src, name)
|
|
|
|
if (os.path.isdir(path) and
|
|
path.split()[-1] == 'subdir'):
|
|
res.append(name)
|
|
elif os.path.splitext(path)[-1] in ('.py'):
|
|
res.append(name)
|
|
return res
|
|
|
|
shutil.copytree(src_dir, dst_dir, ignore=_filter)
|
|
|
|
# checking the result: some elements should not be copied
|
|
self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
|
|
'test.py')))
|
|
self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
|
|
|
|
finally:
|
|
if os.path.exists(dst_dir):
|
|
shutil.rmtree(dst_dir)
|
|
finally:
|
|
shutil.rmtree(src_dir)
|
|
shutil.rmtree(os.path.dirname(dst_dir))
|
|
|
|
if hasattr(os, "symlink"):
|
|
def test_dont_copy_file_onto_link_to_itself(self):
|
|
# bug 851123.
|
|
os.mkdir(TESTFN)
|
|
src = os.path.join(TESTFN, 'cheese')
|
|
dst = os.path.join(TESTFN, 'shop')
|
|
try:
|
|
f = open(src, 'w')
|
|
f.write('cheddar')
|
|
f.close()
|
|
|
|
os.link(src, dst)
|
|
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
|
with open(src, 'r') as f:
|
|
self.assertEqual(f.read(), 'cheddar')
|
|
os.remove(dst)
|
|
|
|
# Using `src` here would mean we end up with a symlink pointing
|
|
# to TESTFN/TESTFN/cheese, while it should point at
|
|
# TESTFN/cheese.
|
|
os.symlink('cheese', dst)
|
|
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
|
|
with open(src, 'r') as f:
|
|
self.assertEqual(f.read(), 'cheddar')
|
|
os.remove(dst)
|
|
finally:
|
|
try:
|
|
shutil.rmtree(TESTFN)
|
|
except OSError:
|
|
pass
|
|
|
|
def test_rmtree_on_symlink(self):
|
|
# bug 1669.
|
|
os.mkdir(TESTFN)
|
|
try:
|
|
src = os.path.join(TESTFN, 'cheese')
|
|
dst = os.path.join(TESTFN, 'shop')
|
|
os.mkdir(src)
|
|
os.symlink(src, dst)
|
|
self.assertRaises(OSError, shutil.rmtree, dst)
|
|
finally:
|
|
shutil.rmtree(TESTFN, ignore_errors=True)
|
|
|
|
# Issue #3002: copyfile and copytree block indefinitely on named pipes
|
|
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
|
|
def test_copyfile_named_pipe(self):
|
|
os.mkfifo(TESTFN)
|
|
try:
|
|
self.assertRaises(shutil.SpecialFileError,
|
|
shutil.copyfile, TESTFN, TESTFN2)
|
|
self.assertRaises(shutil.SpecialFileError,
|
|
shutil.copyfile, __file__, TESTFN)
|
|
finally:
|
|
os.remove(TESTFN)
|
|
|
|
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
|
|
def test_copytree_named_pipe(self):
|
|
os.mkdir(TESTFN)
|
|
try:
|
|
subdir = os.path.join(TESTFN, "subdir")
|
|
os.mkdir(subdir)
|
|
pipe = os.path.join(subdir, "mypipe")
|
|
os.mkfifo(pipe)
|
|
try:
|
|
shutil.copytree(TESTFN, TESTFN2)
|
|
except shutil.Error as e:
|
|
errors = e.args[0]
|
|
self.assertEqual(len(errors), 1)
|
|
src, dst, error_msg = errors[0]
|
|
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
|
|
else:
|
|
self.fail("shutil.Error should have been raised")
|
|
finally:
|
|
shutil.rmtree(TESTFN, ignore_errors=True)
|
|
shutil.rmtree(TESTFN2, ignore_errors=True)
|
|
|
|
@unittest.skipUnless(hasattr(os, 'chflags') and
|
|
hasattr(errno, 'EOPNOTSUPP') and
|
|
hasattr(errno, 'ENOTSUP'),
|
|
"requires os.chflags, EOPNOTSUPP & ENOTSUP")
|
|
def test_copystat_handles_harmless_chflags_errors(self):
|
|
tmpdir = self.mkdtemp()
|
|
file1 = os.path.join(tmpdir, 'file1')
|
|
file2 = os.path.join(tmpdir, 'file2')
|
|
self.write_file(file1, 'xxx')
|
|
self.write_file(file2, 'xxx')
|
|
|
|
def make_chflags_raiser(err):
|
|
ex = OSError()
|
|
|
|
def _chflags_raiser(path, flags):
|
|
ex.errno = err
|
|
raise ex
|
|
return _chflags_raiser
|
|
old_chflags = os.chflags
|
|
try:
|
|
for err in errno.EOPNOTSUPP, errno.ENOTSUP:
|
|
os.chflags = make_chflags_raiser(err)
|
|
shutil.copystat(file1, file2)
|
|
# assert others errors break it
|
|
os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
|
|
self.assertRaises(OSError, shutil.copystat, file1, file2)
|
|
finally:
|
|
os.chflags = old_chflags
|
|
|
|
@unittest.skipUnless(zlib, "requires zlib")
|
|
def test_make_tarball(self):
|
|
# creating something to tar
|
|
root_dir, base_dir = self._create_files('')
|
|
|
|
tmpdir2 = self.mkdtemp()
|
|
# force shutil to create the directory
|
|
os.rmdir(tmpdir2)
|
|
# working with relative paths
|
|
work_dir = os.path.dirname(tmpdir2)
|
|
rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
|
|
|
|
with support.change_cwd(work_dir):
|
|
base_name = os.path.abspath(rel_base_name)
|
|
tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
|
|
|
|
# check if the compressed tarball was created
|
|
self.assertEqual(tarball, base_name + '.tar.gz')
|
|
self.assertTrue(os.path.isfile(tarball))
|
|
self.assertTrue(tarfile.is_tarfile(tarball))
|
|
with tarfile.open(tarball, 'r:gz') as tf:
|
|
self.assertEqual(sorted(tf.getnames()),
|
|
['.', './file1', './file2',
|
|
'./sub', './sub/file3', './sub2'])
|
|
|
|
# trying an uncompressed one
|
|
with support.change_cwd(work_dir):
|
|
tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
|
|
self.assertEqual(tarball, base_name + '.tar')
|
|
self.assertTrue(os.path.isfile(tarball))
|
|
self.assertTrue(tarfile.is_tarfile(tarball))
|
|
with tarfile.open(tarball, 'r') as tf:
|
|
self.assertEqual(sorted(tf.getnames()),
|
|
['.', './file1', './file2',
|
|
'./sub', './sub/file3', './sub2'])
|
|
|
|
def _tarinfo(self, path):
|
|
with tarfile.open(path) as tar:
|
|
names = tar.getnames()
|
|
names.sort()
|
|
return tuple(names)
|
|
|
|
def _create_files(self, base_dir='dist'):
|
|
# creating something to tar
|
|
root_dir = self.mkdtemp()
|
|
dist = os.path.join(root_dir, base_dir)
|
|
if not os.path.isdir(dist):
|
|
os.makedirs(dist)
|
|
self.write_file((dist, 'file1'), 'xxx')
|
|
self.write_file((dist, 'file2'), 'xxx')
|
|
os.mkdir(os.path.join(dist, 'sub'))
|
|
self.write_file((dist, 'sub', 'file3'), 'xxx')
|
|
os.mkdir(os.path.join(dist, 'sub2'))
|
|
if base_dir:
|
|
self.write_file((root_dir, 'outer'), 'xxx')
|
|
return root_dir, base_dir
|
|
|
|
@unittest.skipUnless(zlib, "Requires zlib")
|
|
@unittest.skipUnless(find_executable('tar'),
|
|
'Need the tar command to run')
|
|
def test_tarfile_vs_tar(self):
|
|
root_dir, base_dir = self._create_files()
|
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
|
tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
|
|
|
|
# check if the compressed tarball was created
|
|
self.assertEqual(tarball, base_name + '.tar.gz')
|
|
self.assertTrue(os.path.isfile(tarball))
|
|
|
|
# now create another tarball using `tar`
|
|
tarball2 = os.path.join(root_dir, 'archive2.tar')
|
|
tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
|
|
subprocess.check_call(tar_cmd, cwd=root_dir)
|
|
|
|
self.assertTrue(os.path.isfile(tarball2))
|
|
# let's compare both tarballs
|
|
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
|
|
|
|
# trying an uncompressed one
|
|
tarball = make_archive(base_name, 'tar', root_dir, base_dir)
|
|
self.assertEqual(tarball, base_name + '.tar')
|
|
self.assertTrue(os.path.isfile(tarball))
|
|
|
|
# now for a dry_run
|
|
tarball = make_archive(base_name, 'tar', root_dir, base_dir,
|
|
dry_run=True)
|
|
self.assertEqual(tarball, base_name + '.tar')
|
|
self.assertTrue(os.path.isfile(tarball))
|
|
|
|
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
|
def test_make_zipfile(self):
|
|
# creating something to zip
|
|
root_dir, base_dir = self._create_files()
|
|
|
|
tmpdir2 = self.mkdtemp()
|
|
# force shutil to create the directory
|
|
os.rmdir(tmpdir2)
|
|
# working with relative paths
|
|
work_dir = os.path.dirname(tmpdir2)
|
|
rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
|
|
|
|
with support.change_cwd(work_dir):
|
|
base_name = os.path.abspath(rel_base_name)
|
|
res = make_archive(rel_base_name, 'zip', root_dir)
|
|
|
|
self.assertEqual(res, base_name + '.zip')
|
|
self.assertTrue(os.path.isfile(res))
|
|
self.assertTrue(zipfile.is_zipfile(res))
|
|
with zipfile.ZipFile(res) as zf:
|
|
self.assertEqual(sorted(zf.namelist()),
|
|
['dist/', 'dist/file1', 'dist/file2',
|
|
'dist/sub/', 'dist/sub/file3', 'dist/sub2/',
|
|
'outer'])
|
|
support.unlink(res)
|
|
|
|
with support.change_cwd(work_dir):
|
|
base_name = os.path.abspath(rel_base_name)
|
|
res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
|
|
|
|
self.assertEqual(res, base_name + '.zip')
|
|
self.assertTrue(os.path.isfile(res))
|
|
self.assertTrue(zipfile.is_zipfile(res))
|
|
with zipfile.ZipFile(res) as zf:
|
|
self.assertEqual(sorted(zf.namelist()),
|
|
['dist/', 'dist/file1', 'dist/file2',
|
|
'dist/sub/', 'dist/sub/file3', 'dist/sub2/'])
|
|
|
|
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
|
@unittest.skipUnless(find_executable('zip'),
|
|
'Need the zip command to run')
|
|
def test_zipfile_vs_zip(self):
|
|
root_dir, base_dir = self._create_files()
|
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
|
archive = make_archive(base_name, 'zip', root_dir, base_dir)
|
|
|
|
# check if ZIP file was created
|
|
self.assertEqual(archive, base_name + '.zip')
|
|
self.assertTrue(os.path.isfile(archive))
|
|
|
|
# now create another ZIP file using `zip`
|
|
archive2 = os.path.join(root_dir, 'archive2.zip')
|
|
zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
|
|
subprocess.check_call(zip_cmd, cwd=root_dir)
|
|
|
|
self.assertTrue(os.path.isfile(archive2))
|
|
# let's compare both ZIP files
|
|
with zipfile.ZipFile(archive) as zf:
|
|
names = zf.namelist()
|
|
with zipfile.ZipFile(archive2) as zf:
|
|
names2 = zf.namelist()
|
|
self.assertEqual(sorted(names), sorted(names2))
|
|
|
|
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
|
@unittest.skipUnless(find_executable('unzip'),
|
|
'Need the unzip command to run')
|
|
def test_unzip_zipfile(self):
|
|
root_dir, base_dir = self._create_files()
|
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
|
archive = make_archive(base_name, 'zip', root_dir, base_dir)
|
|
|
|
# check if ZIP file was created
|
|
self.assertEqual(archive, base_name + '.zip')
|
|
self.assertTrue(os.path.isfile(archive))
|
|
|
|
# now check the ZIP file using `unzip -t`
|
|
zip_cmd = ['unzip', '-t', archive]
|
|
with support.change_cwd(root_dir):
|
|
try:
|
|
subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
|
|
except subprocess.CalledProcessError as exc:
|
|
details = exc.output
|
|
msg = "{}\n\n**Unzip Output**\n{}"
|
|
self.fail(msg.format(exc, details))
|
|
|
|
def test_make_archive(self):
|
|
tmpdir = self.mkdtemp()
|
|
base_name = os.path.join(tmpdir, 'archive')
|
|
self.assertRaises(ValueError, make_archive, base_name, 'xxx')
|
|
|
|
@unittest.skipUnless(zlib, "Requires zlib")
|
|
def test_make_archive_owner_group(self):
|
|
# testing make_archive with owner and group, with various combinations
|
|
# this works even if there's not gid/uid support
|
|
if UID_GID_SUPPORT:
|
|
group = grp.getgrgid(0)[0]
|
|
owner = pwd.getpwuid(0)[0]
|
|
else:
|
|
group = owner = 'root'
|
|
|
|
root_dir, base_dir = self._create_files()
|
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
|
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
|
|
group=group)
|
|
self.assertTrue(os.path.isfile(res))
|
|
|
|
res = make_archive(base_name, 'zip', root_dir, base_dir)
|
|
self.assertTrue(os.path.isfile(res))
|
|
|
|
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
|
owner=owner, group=group)
|
|
self.assertTrue(os.path.isfile(res))
|
|
|
|
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
|
owner='kjhkjhkjg', group='oihohoh')
|
|
self.assertTrue(os.path.isfile(res))
|
|
|
|
@unittest.skipUnless(zlib, "Requires zlib")
|
|
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
|
|
def test_tarfile_root_owner(self):
|
|
root_dir, base_dir = self._create_files()
|
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
|
group = grp.getgrgid(0)[0]
|
|
owner = pwd.getpwuid(0)[0]
|
|
with support.change_cwd(root_dir):
|
|
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
|
|
owner=owner, group=group)
|
|
|
|
# check if the compressed tarball was created
|
|
self.assertTrue(os.path.isfile(archive_name))
|
|
|
|
# now checks the rights
|
|
archive = tarfile.open(archive_name)
|
|
try:
|
|
for member in archive.getmembers():
|
|
self.assertEqual(member.uid, 0)
|
|
self.assertEqual(member.gid, 0)
|
|
finally:
|
|
archive.close()
|
|
|
|
def test_make_archive_cwd(self):
|
|
current_dir = os.getcwd()
|
|
def _breaks(*args, **kw):
|
|
raise RuntimeError()
|
|
|
|
register_archive_format('xxx', _breaks, [], 'xxx file')
|
|
try:
|
|
try:
|
|
make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
|
|
except Exception:
|
|
pass
|
|
self.assertEqual(os.getcwd(), current_dir)
|
|
finally:
|
|
unregister_archive_format('xxx')
|
|
|
|
def test_make_tarfile_in_curdir(self):
|
|
# Issue #21280
|
|
root_dir = self.mkdtemp()
|
|
saved_dir = os.getcwd()
|
|
try:
|
|
os.chdir(root_dir)
|
|
self.assertEqual(make_archive('test', 'tar'), 'test.tar')
|
|
self.assertTrue(os.path.isfile('test.tar'))
|
|
finally:
|
|
os.chdir(saved_dir)
|
|
|
|
@unittest.skipUnless(zlib, "Requires zlib")
|
|
def test_make_zipfile_in_curdir(self):
|
|
# Issue #21280
|
|
root_dir = self.mkdtemp()
|
|
saved_dir = os.getcwd()
|
|
try:
|
|
os.chdir(root_dir)
|
|
self.assertEqual(make_archive('test', 'zip'), 'test.zip')
|
|
self.assertTrue(os.path.isfile('test.zip'))
|
|
finally:
|
|
os.chdir(saved_dir)
|
|
|
|
def test_register_archive_format(self):
|
|
|
|
self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
|
|
self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
|
|
1)
|
|
self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
|
|
[(1, 2), (1, 2, 3)])
|
|
|
|
register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
|
|
formats = [name for name, params in get_archive_formats()]
|
|
self.assertIn('xxx', formats)
|
|
|
|
unregister_archive_format('xxx')
|
|
formats = [name for name, params in get_archive_formats()]
|
|
self.assertNotIn('xxx', formats)
|
|
|
|
|
|
class TestMove(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
filename = "foo"
|
|
self.src_dir = tempfile.mkdtemp()
|
|
self.dst_dir = tempfile.mkdtemp()
|
|
self.src_file = os.path.join(self.src_dir, filename)
|
|
self.dst_file = os.path.join(self.dst_dir, filename)
|
|
# Try to create a dir in the current directory, hoping that it is
|
|
# not located on the same filesystem as the system tmp dir.
|
|
try:
|
|
self.dir_other_fs = tempfile.mkdtemp(
|
|
dir=os.path.dirname(__file__))
|
|
self.file_other_fs = os.path.join(self.dir_other_fs,
|
|
filename)
|
|
except OSError:
|
|
self.dir_other_fs = None
|
|
with open(self.src_file, "wb") as f:
|
|
f.write("spam")
|
|
|
|
def tearDown(self):
|
|
for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
|
|
try:
|
|
if d:
|
|
shutil.rmtree(d)
|
|
except:
|
|
pass
|
|
|
|
def _check_move_file(self, src, dst, real_dst):
|
|
with open(src, "rb") as f:
|
|
contents = f.read()
|
|
shutil.move(src, dst)
|
|
with open(real_dst, "rb") as f:
|
|
self.assertEqual(contents, f.read())
|
|
self.assertFalse(os.path.exists(src))
|
|
|
|
def _check_move_dir(self, src, dst, real_dst):
|
|
contents = sorted(os.listdir(src))
|
|
shutil.move(src, dst)
|
|
self.assertEqual(contents, sorted(os.listdir(real_dst)))
|
|
self.assertFalse(os.path.exists(src))
|
|
|
|
def test_move_file(self):
|
|
# Move a file to another location on the same filesystem.
|
|
self._check_move_file(self.src_file, self.dst_file, self.dst_file)
|
|
|
|
def test_move_file_to_dir(self):
|
|
# Move a file inside an existing dir on the same filesystem.
|
|
self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
|
|
|
|
def test_move_file_other_fs(self):
|
|
# Move a file to an existing dir on another filesystem.
|
|
if not self.dir_other_fs:
|
|
self.skipTest('dir on other filesystem not available')
|
|
self._check_move_file(self.src_file, self.file_other_fs,
|
|
self.file_other_fs)
|
|
|
|
def test_move_file_to_dir_other_fs(self):
|
|
# Move a file to another location on another filesystem.
|
|
if not self.dir_other_fs:
|
|
self.skipTest('dir on other filesystem not available')
|
|
self._check_move_file(self.src_file, self.dir_other_fs,
|
|
self.file_other_fs)
|
|
|
|
def test_move_dir(self):
|
|
# Move a dir to another location on the same filesystem.
|
|
dst_dir = tempfile.mktemp()
|
|
try:
|
|
self._check_move_dir(self.src_dir, dst_dir, dst_dir)
|
|
finally:
|
|
try:
|
|
shutil.rmtree(dst_dir)
|
|
except:
|
|
pass
|
|
|
|
def test_move_dir_other_fs(self):
|
|
# Move a dir to another location on another filesystem.
|
|
if not self.dir_other_fs:
|
|
self.skipTest('dir on other filesystem not available')
|
|
dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
|
|
try:
|
|
self._check_move_dir(self.src_dir, dst_dir, dst_dir)
|
|
finally:
|
|
try:
|
|
shutil.rmtree(dst_dir)
|
|
except:
|
|
pass
|
|
|
|
def test_move_dir_to_dir(self):
|
|
# Move a dir inside an existing dir on the same filesystem.
|
|
self._check_move_dir(self.src_dir, self.dst_dir,
|
|
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
|
|
|
|
def test_move_dir_to_dir_other_fs(self):
|
|
# Move a dir inside an existing dir on another filesystem.
|
|
if not self.dir_other_fs:
|
|
self.skipTest('dir on other filesystem not available')
|
|
self._check_move_dir(self.src_dir, self.dir_other_fs,
|
|
os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
|
|
|
|
def test_move_dir_sep_to_dir(self):
|
|
self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
|
|
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
|
|
|
|
@unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
|
|
def test_move_dir_altsep_to_dir(self):
|
|
self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
|
|
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
|
|
|
|
def test_existing_file_inside_dest_dir(self):
|
|
# A file with the same name inside the destination dir already exists.
|
|
with open(self.dst_file, "wb"):
|
|
pass
|
|
self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
|
|
|
|
def test_dont_move_dir_in_itself(self):
|
|
# Moving a dir inside itself raises an Error.
|
|
dst = os.path.join(self.src_dir, "bar")
|
|
self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
|
|
|
|
def test_destinsrc_false_negative(self):
|
|
os.mkdir(TESTFN)
|
|
try:
|
|
for src, dst in [('srcdir', 'srcdir/dest')]:
|
|
src = os.path.join(TESTFN, src)
|
|
dst = os.path.join(TESTFN, dst)
|
|
self.assertTrue(shutil._destinsrc(src, dst),
|
|
msg='_destinsrc() wrongly concluded that '
|
|
'dst (%s) is not in src (%s)' % (dst, src))
|
|
finally:
|
|
shutil.rmtree(TESTFN, ignore_errors=True)
|
|
|
|
def test_destinsrc_false_positive(self):
|
|
os.mkdir(TESTFN)
|
|
try:
|
|
for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
|
|
src = os.path.join(TESTFN, src)
|
|
dst = os.path.join(TESTFN, dst)
|
|
self.assertFalse(shutil._destinsrc(src, dst),
|
|
msg='_destinsrc() wrongly concluded that '
|
|
'dst (%s) is in src (%s)' % (dst, src))
|
|
finally:
|
|
shutil.rmtree(TESTFN, ignore_errors=True)
|
|
|
|
|
|
class TestCopyFile(unittest.TestCase):
|
|
|
|
_delete = False
|
|
|
|
class Faux(object):
|
|
_entered = False
|
|
_exited_with = None
|
|
_raised = False
|
|
def __init__(self, raise_in_exit=False, suppress_at_exit=True):
|
|
self._raise_in_exit = raise_in_exit
|
|
self._suppress_at_exit = suppress_at_exit
|
|
def read(self, *args):
|
|
return ''
|
|
def __enter__(self):
|
|
self._entered = True
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self._exited_with = exc_type, exc_val, exc_tb
|
|
if self._raise_in_exit:
|
|
self._raised = True
|
|
raise IOError("Cannot close")
|
|
return self._suppress_at_exit
|
|
|
|
def tearDown(self):
|
|
if self._delete:
|
|
del shutil.open
|
|
|
|
def _set_shutil_open(self, func):
|
|
shutil.open = func
|
|
self._delete = True
|
|
|
|
def test_w_source_open_fails(self):
|
|
def _open(filename, mode='r'):
|
|
if filename == 'srcfile':
|
|
raise IOError('Cannot open "srcfile"')
|
|
assert 0 # shouldn't reach here.
|
|
|
|
self._set_shutil_open(_open)
|
|
|
|
self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
|
|
|
|
def test_w_dest_open_fails(self):
|
|
|
|
srcfile = self.Faux()
|
|
|
|
def _open(filename, mode='r'):
|
|
if filename == 'srcfile':
|
|
return srcfile
|
|
if filename == 'destfile':
|
|
raise IOError('Cannot open "destfile"')
|
|
assert 0 # shouldn't reach here.
|
|
|
|
self._set_shutil_open(_open)
|
|
|
|
shutil.copyfile('srcfile', 'destfile')
|
|
self.assertTrue(srcfile._entered)
|
|
self.assertTrue(srcfile._exited_with[0] is IOError)
|
|
self.assertEqual(srcfile._exited_with[1].args,
|
|
('Cannot open "destfile"',))
|
|
|
|
def test_w_dest_close_fails(self):
|
|
|
|
srcfile = self.Faux()
|
|
destfile = self.Faux(True)
|
|
|
|
def _open(filename, mode='r'):
|
|
if filename == 'srcfile':
|
|
return srcfile
|
|
if filename == 'destfile':
|
|
return destfile
|
|
assert 0 # shouldn't reach here.
|
|
|
|
self._set_shutil_open(_open)
|
|
|
|
shutil.copyfile('srcfile', 'destfile')
|
|
self.assertTrue(srcfile._entered)
|
|
self.assertTrue(destfile._entered)
|
|
self.assertTrue(destfile._raised)
|
|
self.assertTrue(srcfile._exited_with[0] is IOError)
|
|
self.assertEqual(srcfile._exited_with[1].args,
|
|
('Cannot close',))
|
|
|
|
def test_w_source_close_fails(self):
|
|
|
|
srcfile = self.Faux(True)
|
|
destfile = self.Faux()
|
|
|
|
def _open(filename, mode='r'):
|
|
if filename == 'srcfile':
|
|
return srcfile
|
|
if filename == 'destfile':
|
|
return destfile
|
|
assert 0 # shouldn't reach here.
|
|
|
|
self._set_shutil_open(_open)
|
|
|
|
self.assertRaises(IOError,
|
|
shutil.copyfile, 'srcfile', 'destfile')
|
|
self.assertTrue(srcfile._entered)
|
|
self.assertTrue(destfile._entered)
|
|
self.assertFalse(destfile._raised)
|
|
self.assertTrue(srcfile._exited_with[0] is None)
|
|
self.assertTrue(srcfile._raised)
|
|
|
|
def test_move_dir_caseinsensitive(self):
|
|
# Renames a folder to the same name
|
|
# but a different case.
|
|
|
|
self.src_dir = tempfile.mkdtemp()
|
|
dst_dir = os.path.join(
|
|
os.path.dirname(self.src_dir),
|
|
os.path.basename(self.src_dir).upper())
|
|
self.assertNotEqual(self.src_dir, dst_dir)
|
|
|
|
try:
|
|
shutil.move(self.src_dir, dst_dir)
|
|
self.assertTrue(os.path.isdir(dst_dir))
|
|
finally:
|
|
if os.path.exists(dst_dir):
|
|
os.rmdir(dst_dir)
|
|
|
|
|
|
|
|
def test_main():
|
|
support.run_unittest(TestShutil, TestMove, TestCopyFile)
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|