Logo Search packages:      
Sourcecode: s3ql version File versions  Download package

t4_fuse.py

00001 '''
t4_fuse.py - this file is part of S3QL (http://s3ql.googlecode.com)

Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org>

This program can be distributed under the terms of the GNU LGPL.
'''

from __future__ import division, print_function
from _common import TestCase
from cStringIO import StringIO
from os.path import basename
from s3ql.common import retry, AsyncFn
import filecmp
import os.path
import s3ql.cli.fsck
import s3ql.cli.mkfs
import s3ql.cli.mount
import s3ql.cli.umount
import shutil
import stat
import llfuse
import subprocess
import sys
import tempfile
import time
import unittest2 as unittest

# For debugging
USE_VALGRIND = False

00032 class fuse_tests(TestCase):
    
    def setUp(self):
        # We need this to test multi block operations
        self.src = __file__
        if os.path.getsize(self.src) < 1048:
            raise RuntimeError("test file %s should be bigger than 1 kb" % self.src)

        self.mnt_dir = tempfile.mkdtemp()
        self.cache_dir = tempfile.mkdtemp()
        self.bucket_dir = tempfile.mkdtemp()

        self.bucketname = 'local://' + os.path.join(self.bucket_dir, 'mybucket')
        self.passphrase = 'oeut3d'

        self.mount_thread = None
        self.name_cnt = 0

    def tearDown(self):
        # Umount if still mounted
        if os.path.ismount(self.mnt_dir):
            subprocess.call(['fusermount', '-z', '-u', self.mnt_dir])

        # Try to wait for mount thread to prevent spurious errors
        # because the db file is being removed
        if self.mount_thread and USE_VALGRIND:
            retry(60, lambda: self.mount_thread.poll() is not None)
        elif self.mount_thread:
            self.mount_thread.join(60)
                
        shutil.rmtree(self.mnt_dir)
        shutil.rmtree(self.cache_dir)
        shutil.rmtree(self.bucket_dir)

        if not USE_VALGRIND and not self.mount_thread.is_alive():
            self.mount_thread.join_and_raise()
                    
    def mount(self):
        
        sys.stdin = StringIO('%s\n%s\n' % (self.passphrase, self.passphrase))
        try:
            s3ql.cli.mkfs.main(['-L', 'test fs', '--blocksize', '500',
                                '--homedir', self.cache_dir, self.bucketname ])
        except BaseException as exc:
            self.fail("mkfs.s3ql failed: %s" % exc)

        
        # Note: When running inside test suite, we have less available
        # file descriptors        
        if USE_VALGRIND:
            if __name__ == '__main__':
                mypath = sys.argv[0]
            else:
                mypath = __file__
            basedir = os.path.abspath(os.path.join(os.path.dirname(mypath), '..'))
            self.mount_thread = subprocess.Popen(['valgrind', 'python-dbg', 
                                                  os.path.join(basedir, 'bin', 'mount.s3ql'), 
                                                  "--fg", '--homedir', self.cache_dir, 
                                                  '--max-cache-entries', '500',
                                                  self.bucketname, self.mnt_dir],
                                                  stdin=subprocess.PIPE)
            print(self.passphrase, file=self.mount_thread.stdin)
            retry(30, os.path.ismount, self.mnt_dir)                              
        else:
            sys.stdin = StringIO('%s\n' % self.passphrase)
            self.mount_thread = AsyncFn(s3ql.cli.mount.main,
                                        ["--fg", '--homedir', self.cache_dir, 
                                         '--max-cache-entries', '500',
                                         self.bucketname, self.mnt_dir])
            self.mount_thread.start()

            # Wait for mountpoint to come up
            try:
                retry(3, os.path.ismount, self.mnt_dir)
            except:
                self.mount_thread.join_and_raise()

    def umount(self):
        time.sleep(0.5)
        devnull = open('/dev/null', 'wb')
        retry(5, lambda: subprocess.call(['fuser', '-m', self.mnt_dir],
                                         stdout=devnull, stderr=devnull) == 1)
        s3ql.cli.umount.DONTWAIT = True
        try:
            s3ql.cli.umount.main([self.mnt_dir])
        except BaseException as exc:
            self.fail("Umount failed: %s" % exc)

        # Now wait for server process
        if USE_VALGRIND:
            self.assertEqual(self.mount_thread.wait(), 0)
        else:
            exc = self.mount_thread.join_get_exc()
            self.assertIsNone(exc)
        self.assertFalse(os.path.ismount(self.mnt_dir))

        # Now run an fsck
        sys.stdin = StringIO('%s\n' % self.passphrase)
        try:
            s3ql.cli.fsck.main(['--force', '--homedir', self.cache_dir, 
                                self.bucketname])
        except BaseException as exc:
            self.fail("fsck failed: %s" % exc)

    def runTest(self):
        # Run all tests in same environment, mounting and umounting
        # just takes too long otherwise

        self.mount()
        self.tst_chown()
        self.tst_link()
        self.tst_mkdir()
        self.tst_mknod()
        self.tst_readdir()
        self.tst_statvfs()
        self.tst_symlink()
        self.tst_truncate()
        self.tst_write()
        self.umount()

    def newname(self):
        self.name_cnt += 1
        return "s3ql_%d" % self.name_cnt

    def tst_mkdir(self):
        dirname = self.newname()
        fullname = self.mnt_dir + "/" + dirname
        os.mkdir(fullname)
        fstat = os.stat(fullname)
        self.assertTrue(stat.S_ISDIR(fstat.st_mode))
        self.assertEquals(llfuse.listdir(fullname), [])
        self.assertEquals(fstat.st_nlink, 1)
        self.assertTrue(dirname in llfuse.listdir(self.mnt_dir))
        os.rmdir(fullname)
        self.assertRaises(OSError, os.stat, fullname)
        self.assertTrue(dirname not in llfuse.listdir(self.mnt_dir))

    def tst_symlink(self):
        linkname = self.newname()
        fullname = self.mnt_dir + "/" + linkname
        os.symlink("/imaginary/dest", fullname)
        fstat = os.lstat(fullname)
        self.assertTrue(stat.S_ISLNK(fstat.st_mode))
        self.assertEquals(os.readlink(fullname), "/imaginary/dest")
        self.assertEquals(fstat.st_nlink, 1)
        self.assertTrue(linkname in llfuse.listdir(self.mnt_dir))
        os.unlink(fullname)
        self.assertRaises(OSError, os.lstat, fullname)
        self.assertTrue(linkname not in llfuse.listdir(self.mnt_dir))

    def tst_mknod(self):
        filename = os.path.join(self.mnt_dir, self.newname())
        src = self.src
        shutil.copyfile(src, filename)
        fstat = os.lstat(filename)
        self.assertTrue(stat.S_ISREG(fstat.st_mode))
        self.assertEquals(fstat.st_nlink, 1)
        self.assertTrue(basename(filename) in llfuse.listdir(self.mnt_dir))
        self.assertTrue(filecmp.cmp(src, filename, False))
        os.unlink(filename)
        self.assertRaises(OSError, os.stat, filename)
        self.assertTrue(basename(filename) not in llfuse.listdir(self.mnt_dir))

    def tst_chown(self):
        filename = os.path.join(self.mnt_dir, self.newname())
        os.mkdir(filename)
        fstat = os.lstat(filename)
        uid = fstat.st_uid
        gid = fstat.st_gid

        uid_new = uid + 1
        os.chown(filename, uid_new, -1)
        fstat = os.lstat(filename)
        self.assertEquals(fstat.st_uid, uid_new)
        self.assertEquals(fstat.st_gid, gid)

        gid_new = gid + 1
        os.chown(filename, -1, gid_new)
        fstat = os.lstat(filename)
        self.assertEquals(fstat.st_uid, uid_new)
        self.assertEquals(fstat.st_gid, gid_new)

        os.rmdir(filename)
        self.assertRaises(OSError, os.stat, filename)
        self.assertTrue(basename(filename) not in llfuse.listdir(self.mnt_dir))


    def tst_write(self):
        name = os.path.join(self.mnt_dir, self.newname())
        src = self.src
        shutil.copyfile(src, name)
        self.assertTrue(filecmp.cmp(name, src, False))

        # Don't unlink file, we want to see if cache flushing
        # works

    def tst_statvfs(self):
        os.statvfs(self.mnt_dir)

    def tst_link(self):
        name1 = os.path.join(self.mnt_dir, self.newname())
        name2 = os.path.join(self.mnt_dir, self.newname())
        src = self.src
        shutil.copyfile(src, name1)
        self.assertTrue(filecmp.cmp(name1, src, False))
        os.link(name1, name2)

        fstat1 = os.lstat(name1)
        fstat2 = os.lstat(name2)

        self.assertEquals(fstat1, fstat2)
        self.assertEquals(fstat1.st_nlink, 2)

        self.assertTrue(basename(name2) in llfuse.listdir(self.mnt_dir))
        self.assertTrue(filecmp.cmp(name1, name2, False))
        os.unlink(name2)
        fstat1 = os.lstat(name1)
        self.assertEquals(fstat1.st_nlink, 1)
        os.unlink(name1)

    def tst_readdir(self):
        dir_ = os.path.join(self.mnt_dir, self.newname())
        file_ = dir_ + "/" + self.newname()
        subdir = dir_ + "/" + self.newname()
        subfile = subdir + "/" + self.newname()
        src = self.src

        os.mkdir(dir_)
        shutil.copyfile(src, file_)
        os.mkdir(subdir)
        shutil.copyfile(src, subfile)

        listdir_is = llfuse.listdir(dir_)
        listdir_is.sort()
        listdir_should = [ basename(file_), basename(subdir) ]
        listdir_should.sort()
        self.assertEquals(listdir_is, listdir_should)

        os.unlink(file_)
        os.unlink(subfile)
        os.rmdir(subdir)
        os.rmdir(dir_)

    def tst_truncate(self):
        filename = os.path.join(self.mnt_dir, self.newname())
        src = self.src
        shutil.copyfile(src, filename)
        self.assertTrue(filecmp.cmp(filename, src, False))
        fstat = os.stat(filename)
        size = fstat.st_size
        fd = os.open(filename, os.O_RDWR)

        os.ftruncate(fd, size + 1024) # add > 1 block
        self.assertEquals(os.stat(filename).st_size, size + 1024)

        os.ftruncate(fd, size - 1024) # Truncate > 1 block
        self.assertEquals(os.stat(filename).st_size, size - 1024)

        os.close(fd)
        os.unlink(filename)
            
        
# Somehow important according to pyunit documentation
def suite():
    return unittest.makeSuite(fuse_tests)


# Allow calling from command line
if __name__ == "__main__":
    unittest.main()

Generated by  Doxygen 1.6.0   Back to index