Commit cccc1d5d by IK

json,celeryconfig

parent 5bee7535
#
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_RESULT_EXPIRES = 300
#CELERY_TIMEZONE = 'UTC'
#CELERY_ENABLE_UTC = True
#CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_ACCEPT_CONTENT = ['json', 'pickle_v2', 'pickle']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'pickle_v2'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
...@@ -479,6 +479,7 @@ class Disk(object): ...@@ -479,6 +479,7 @@ class Disk(object):
def merge(self, task, new_disk, parent_id=None): def merge(self, task, new_disk, parent_id=None):
""" Merging a new_disk from the actual disk and its base. """ Merging a new_disk from the actual disk and its base.
""" """
logger.debug("Merge %s into %s.", self.get_path(), new_disk.get_path())
if task.is_aborted(): if task.is_aborted():
raise AbortException() raise AbortException()
......
# serializers.py
from kombu.serialization import register
import pickle
def pickle_v2_dumps(obj):
return pickle.dumps(obj, protocol=2)
def pickle_v2_loads(s):
return pickle.loads(s)
register(
'pickle_v2',
pickle_v2_dumps,
pickle_v2_loads,
content_type='application/x-pickle',
content_encoding='binary',
)
from celery import Celery from celery import Celery
import serializers
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
from argparse import ArgumentParser from argparse import ArgumentParser
import celeryconfig
parser = ArgumentParser() parser = ArgumentParser()
...@@ -21,9 +23,9 @@ celery = Celery('storagedriver', ...@@ -21,9 +23,9 @@ celery = Celery('storagedriver',
broker=AMQP_URI, broker=AMQP_URI,
include=['storagedriver']) include=['storagedriver'])
celery.config_from_object('celeryconfig')
celery.conf.update( celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME, Exchange( Queue(HOSTNAME, Exchange(
'storagedriver', type='direct'), routing_key='storagedriver'), 'storagedriver', type='direct'), routing_key='storagedriver'),
......
...@@ -120,8 +120,7 @@ def get_storage_stat(path): ...@@ -120,8 +120,7 @@ def get_storage_stat(path):
def get_file_statistics(datastore): def get_file_statistics(datastore):
disks = [Disk.get(datastore, name).get_desc() disks = [Disk.get(datastore, name).get_desc()
for name in listdir(datastore) for name in listdir(datastore)
if not name.endswith(".dump") and if not name.endswith((".dump", "trash", "found", "mnt", "download", "pub", ))]
not path.isdir(path.join(datastore, name))]
dumps = [{'name': name, dumps = [{'name': name,
'size': path.getsize(path.join(datastore, name))} 'size': path.getsize(path.join(datastore, name))}
for name in listdir(datastore) if name.endswith(".dump")] for name in listdir(datastore) if name.endswith(".dump")]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment