Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
Gelencsér Szabolcs
/
circlestack
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Members
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
1b16674e
authored
Oct 09, 2014
by
Bach Dániel
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'feature-warm_shutdown_celery' into 'master'
Feature warm shutdown celery See merge request !234
parents
f2b2ede4
6d9a0b97
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
53 additions
and
12 deletions
+53
-12
circle/manager/mancelery.py
+10
-0
circle/manager/moncelery.py
+10
-1
circle/manager/slowcelery.py
+10
-1
circle/vm/models/activity.py
+5
-6
miscellaneous/mancelery.conf
+6
-1
miscellaneous/moncelery.conf
+4
-1
miscellaneous/slowcelery.conf
+8
-2
No files found.
circle/manager/mancelery.py
View file @
1b16674e
...
...
@@ -16,12 +16,15 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from
celery
import
Celery
from
celery.signals
import
worker_ready
from
datetime
import
timedelta
from
kombu
import
Queue
,
Exchange
from
os
import
getenv
HOSTNAME
=
"localhost"
CACHE_URI
=
getenv
(
"CACHE_URI"
,
"pylibmc://127.0.0.1:11211/"
)
QUEUE_NAME
=
HOSTNAME
+
'.man'
celery
=
Celery
(
'manager'
,
broker
=
getenv
(
"AMQP_URI"
),
...
...
@@ -57,3 +60,10 @@ celery.conf.update(
}
)
@worker_ready.connect
()
def
cleanup_tasks
(
conf
=
None
,
**
kwargs
):
'''Discard all task and clean up activity.'''
from
vm.models.activity
import
cleanup
cleanup
(
queue_name
=
QUEUE_NAME
)
circle/manager/moncelery.py
View file @
1b16674e
...
...
@@ -16,12 +16,14 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from
celery
import
Celery
from
celery.signals
import
worker_ready
from
datetime
import
timedelta
from
kombu
import
Queue
,
Exchange
from
os
import
getenv
HOSTNAME
=
"localhost"
CACHE_URI
=
getenv
(
"CACHE_URI"
,
"pylibmc://127.0.0.1:11211/"
)
QUEUE_NAME
=
HOSTNAME
+
'.monitor'
celery
=
Celery
(
'monitor'
,
broker
=
getenv
(
"AMQP_URI"
),
...
...
@@ -34,7 +36,7 @@ celery.conf.update(
CELERY_CACHE_BACKEND
=
CACHE_URI
,
CELERY_TASK_RESULT_EXPIRES
=
300
,
CELERY_QUEUES
=
(
Queue
(
HOSTNAME
+
'.monitor'
,
Exchange
(
'monitor'
,
type
=
'direct'
),
Queue
(
QUEUE_NAME
,
Exchange
(
'monitor'
,
type
=
'direct'
),
routing_key
=
"monitor"
),
),
CELERYBEAT_SCHEDULE
=
{
...
...
@@ -70,3 +72,10 @@ celery.conf.update(
}
)
@worker_ready.connect
()
def
cleanup_tasks
(
conf
=
None
,
**
kwargs
):
'''Discard all task and clean up activity.'''
from
vm.models.activity
import
cleanup
cleanup
(
queue_name
=
QUEUE_NAME
)
circle/manager/slowcelery.py
View file @
1b16674e
...
...
@@ -16,12 +16,14 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from
celery
import
Celery
from
celery.signals
import
worker_ready
from
datetime
import
timedelta
from
kombu
import
Queue
,
Exchange
from
os
import
getenv
HOSTNAME
=
"localhost"
CACHE_URI
=
getenv
(
"CACHE_URI"
,
"pylibmc://127.0.0.1:11211/"
)
QUEUE_NAME
=
HOSTNAME
+
'.man.slow'
celery
=
Celery
(
'manager.slow'
,
broker
=
getenv
(
"AMQP_URI"
),
...
...
@@ -36,7 +38,7 @@ celery.conf.update(
CELERY_CACHE_BACKEND
=
CACHE_URI
,
CELERY_TASK_RESULT_EXPIRES
=
300
,
CELERY_QUEUES
=
(
Queue
(
HOSTNAME
+
'.man.slow'
,
Exchange
(
'manager.slow'
,
type
=
'direct'
),
Queue
(
QUEUE_NAME
,
Exchange
(
'manager.slow'
,
type
=
'direct'
),
routing_key
=
"manager.slow"
),
),
CELERYBEAT_SCHEDULE
=
{
...
...
@@ -48,3 +50,10 @@ celery.conf.update(
}
)
@worker_ready.connect
()
def
cleanup_tasks
(
conf
=
None
,
**
kwargs
):
'''Discard all task and clean up activity.'''
from
vm.models.activity
import
cleanup
cleanup
(
queue_name
=
QUEUE_NAME
)
circle/vm/models/activity.py
View file @
1b16674e
...
...
@@ -20,7 +20,6 @@ from contextlib import contextmanager
from
logging
import
getLogger
from
warnings
import
warn
from
celery.signals
import
worker_ready
from
celery.contrib.abortable
import
AbortableAsyncResult
from
django.core.urlresolvers
import
reverse
...
...
@@ -278,17 +277,17 @@ def node_activity(code_suffix, node, task_uuid=None, user=None,
return
activitycontextimpl
(
act
)
@worker_ready.connect
()
def
cleanup
(
conf
=
None
,
**
kwargs
):
# TODO check if other manager workers are running
from
celery.task.control
import
discard_all
discard_all
()
msg_txt
=
ugettext_noop
(
"Manager is restarted, activity is cleaned up. "
"You can try again now."
)
message
=
create_readable
(
msg_txt
,
msg_txt
)
queue_name
=
kwargs
.
get
(
'queue_name'
,
None
)
for
i
in
InstanceActivity
.
objects
.
filter
(
finished__isnull
=
True
):
i
.
finish
(
False
,
result
=
message
)
logger
.
error
(
'Forced finishing stale activity
%
s'
,
i
)
op
=
i
.
get_operation
()
if
op
and
op
.
async_queue
==
queue_name
:
i
.
finish
(
False
,
result
=
message
)
logger
.
error
(
'Forced finishing stale activity
%
s'
,
i
)
for
i
in
NodeActivity
.
objects
.
filter
(
finished__isnull
=
True
):
i
.
finish
(
False
,
result
=
message
)
logger
.
error
(
'Forced finishing stale activity
%
s'
,
i
)
miscellaneous/mancelery.conf
View file @
1b16674e
...
...
@@ -6,9 +6,14 @@ respawn limit 30 30
setgid
cloud
setuid
cloud
kill
timeout
360
kill
signal
SIGTERM
script
cd
/
home
/
cloud
/
circle
/
circle
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
activate
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
postactivate
exec
./
manage
.
py
celery
--
app
=
manager
.
mancelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
mancelery
-
B
-
c
10
./
manage
.
py
celery
-
f
--
app
=
manager
.
mancelery
purge
exec
./
manage
.
py
celery
--
app
=
manager
.
mancelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
mancelery
-
B
-
c
3
end
script
miscellaneous/moncelery.conf
View file @
1b16674e
...
...
@@ -3,6 +3,7 @@ description "CIRCLE moncelery for monitoring jobs"
respawn
respawn
limit
30
30
setgid
cloud
setuid
cloud
...
...
@@ -10,5 +11,7 @@ script
cd
/
home
/
cloud
/
circle
/
circle
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
activate
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
postactivate
exec
./
manage
.
py
celery
--
app
=
manager
.
moncelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
moncelery
-
B
-
c
3
./
manage
.
py
celery
-
f
--
app
=
manager
.
moncelery
purge
exec
./
manage
.
py
celery
--
app
=
manager
.
moncelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
moncelery
-
B
-
c
2
end
script
miscellaneous/slowcelery.conf
View file @
1b16674e
description
"CIRCLE
mancelery for slow
jobs"
description
"CIRCLE
slowcelery for resource intensive or long
jobs"
respawn
respawn
limit
30
30
...
...
@@ -6,9 +6,15 @@ respawn limit 30 30
setgid
cloud
setuid
cloud
kill
timeout
360
kill
signal
INT
script
cd
/
home
/
cloud
/
circle
/
circle
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
activate
. /
home
/
cloud
/.
virtualenvs
/
circle
/
bin
/
postactivate
exec
./
manage
.
py
celery
--
app
=
manager
.
slowcelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
slowcelery
-
B
-
c
5
./
manage
.
py
celery
-
f
--
app
=
manager
.
slowcelery
purge
exec
./
manage
.
py
celery
--
app
=
manager
.
slowcelery
worker
--
autoreload
--
loglevel
=
info
--
hostname
=
slowcelery
-
B
-
c
1
end
script
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment