Logo Search packages:      
Sourcecode: s3ql version File versions  Download package

def s3ql::upload_manager::UploadManager::add (   self,
  el 
)
Upload cache entry `el` asynchronously

Return (uncompressed) size of cache entry.

This method releases the global lock.

Definition at line 60 of file upload_manager.py.

                     :
        '''Upload cache entry `el` asynchronously
        
        Return (uncompressed) size of cache entry.
        
        This method releases the global lock.
        '''
        
        log.debug('UploadManager.add(%s): start', el)

        if (el.inode, el.blockno) in self.in_transit:
            raise ValueError('Block already in transit')
        
        old_obj_id = el.obj_id
        size = os.fstat(el.fileno()).st_size
        el.seek(0)
        if log.isEnabledFor(logging.DEBUG):
            time_ = time.time()
            hash_ = sha256_fh(el)
            time_ = time.time() - time_
            if time_ != 0:
                rate = size / (1024**2 * time_)
            else:
                rate = 0
            log.debug('UploadManager(inode=%d, blockno=%d): '
                     'hashed %d bytes in %.3f seconds, %.2f MB/s',
                      el.inode, el.blockno, size, time_, rate)             
        else:
            hash_ = sha256_fh(el)
        
        try:
            el.obj_id = self.db.get_val('SELECT id FROM objects WHERE hash=?', (hash_,))

        except NoSuchRowError:
            need_upload = True
            el.obj_id = self.db.rowid('INSERT INTO objects (refcount, hash, size) VALUES(?, ?, ?)',
                                  (1, hash_, size))
            log.debug('add(inode=%d, blockno=%d): created new object %d',
                      el.inode, el.blockno, el.obj_id)

        else:
            need_upload = False
            if old_obj_id == el.obj_id:
                log.debug('add(inode=%d, blockno=%d): unchanged, obj_id=%d',
                          el.inode, el.blockno, el.obj_id)
                el.dirty = False
                el.modified_after_upload = False
                os.rename(el.name + '.d', el.name)
                return size
                  
            log.debug('add(inode=%d, blockno=%d): (re)linking to %d',
                      el.inode, el.blockno, el.obj_id)
            self.db.execute('UPDATE objects SET refcount=refcount+1 WHERE id=?',
                         (el.obj_id,))
            
        to_delete = False
        if old_obj_id is None:
            log.debug('add(inode=%d, blockno=%d): no previous object',
                      el.inode, el.blockno)
            self.db.execute('INSERT INTO blocks (obj_id, inode, blockno) VALUES(?,?,?)',
                         (el.obj_id, el.inode, el.blockno))    
        else:
            self.db.execute('UPDATE blocks SET obj_id=? WHERE inode=? AND blockno=?',
                         (el.obj_id, el.inode, el.blockno))
            refcount = self.db.get_val('SELECT refcount FROM objects WHERE id=?',
                                    (old_obj_id,))
            if refcount > 1:
                log.debug('add(inode=%d, blockno=%d): '
                          'decreased refcount for prev. obj: %d',
                          el.inode, el.blockno, old_obj_id)
                self.db.execute('UPDATE objects SET refcount=refcount-1 WHERE id=?',
                             (old_obj_id,))
            else:
                log.debug('add(inode=%d, blockno=%d): '
                          'prev. obj %d marked for removal',
                          el.inode, el.blockno, old_obj_id)
                self.db.execute('DELETE FROM objects WHERE id=?', (old_obj_id,))
                to_delete = True

        if need_upload:
            log.debug('add(inode=%d, blockno=%d): starting compression thread', 
                      el.inode, el.blockno)
            el.modified_after_upload = False
            self.in_transit.add((el.inode, el.blockno))
            
            # Create a new fd so that we don't get confused if another
            # thread repositions the cursor (and do so before unlocking)
            fh = open(el.name + '.d', 'rb')
            self.compress_threads.add_thread(CompressThread(el, fh, self, size)) # Releases global lock

        else:
            el.dirty = False
            el.modified_after_upload = False
            os.rename(el.name + '.d', el.name)
                
        if to_delete:
            log.debug('add(inode=%d, blockno=%d): removing object %d', 
                      el.inode, el.blockno, old_obj_id)
            
            try:
                # Note: Old object can not be in transit
                # Releases global lock
                self.removal_queue.add_thread(RemoveThread(old_obj_id, self.bucket))
            except EmbeddedException as exc:
                exc = exc.exc
                if isinstance(exc, NoSuchObject):
                    log.warn('Backend seems to have lost object %s', exc.key)
                    self.encountered_errors = True
                else:
                    raise
                                                
        log.debug('add(inode=%d, blockno=%d): end', el.inode, el.blockno)
        return size


Generated by  Doxygen 1.6.0   Back to index