Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
CIRCLE
/
agentdriver
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
1
Merge Requests
1
Wiki
Members
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
f6dde5b9
authored
Feb 11, 2024
by
Szeberényi Imre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
broken connection + inotify fix
parent
9635d802
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
80 additions
and
8 deletions
+80
-8
agentcelery.py
+1
-0
agentdriver.py
+1
-0
protocol.py
+78
-8
No files found.
agentcelery.py
View file @
f6dde5b9
...
@@ -16,6 +16,7 @@ AMQP_URI = getenv('AMQP_URI')
...
@@ -16,6 +16,7 @@ AMQP_URI = getenv('AMQP_URI')
celery
=
Celery
(
'agent'
,
broker
=
AMQP_URI
)
celery
=
Celery
(
'agent'
,
broker
=
AMQP_URI
)
celery
.
conf
.
update
(
CELERY_RESULT_BACKEND
=
'amqp'
,
celery
.
conf
.
update
(
CELERY_RESULT_BACKEND
=
'amqp'
,
CELERY_TASK_RESULT_EXPIRES
=
300
,
CELERY_TASK_RESULT_EXPIRES
=
300
,
CELERYD_PREFETCH_MULTIPLIER
=
32
,
CELERY_QUEUES
=
(
Queue
(
HOSTNAME
+
'.agent'
,
CELERY_QUEUES
=
(
Queue
(
HOSTNAME
+
'.agent'
,
Exchange
(
'agent'
,
type
=
'direct'
),
Exchange
(
'agent'
,
type
=
'direct'
),
routing_key
=
'agent'
),
))
routing_key
=
'agent'
),
))
...
...
agentdriver.py
View file @
f6dde5b9
...
@@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks
...
@@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks
def
reactor_started
():
def
reactor_started
():
logger
.
info
(
"reactor_started"
)
reactor
.
running_tasks
=
{}
reactor
.
running_tasks
=
{}
reactor
.
ended_tasks
=
{}
reactor
.
ended_tasks
=
{}
for
f
in
listdir
(
SOCKET_DIR
):
for
f
in
listdir
(
SOCKET_DIR
):
...
...
protocol.py
View file @
f6dde5b9
#!/usr/bin/env python
#!/usr/bin/env python
from
twisted.internet
import
protocol
,
reactor
from
twisted.internet
import
protocol
,
reactor
,
inotify
import
pickle
import
pickle
import
logging
import
logging
import
time
import
time
import
struct
import
struct
from
os
import
getenv
from
os
import
getenv
import
gc
from
utils
import
SerialLineReceiverBase
from
utils
import
SerialLineReceiverBase
...
@@ -15,9 +16,16 @@ logger = logging.getLogger()
...
@@ -15,9 +16,16 @@ logger = logging.getLogger()
reactor
.
connections
=
{}
reactor
.
connections
=
{}
def
numObjsByName
(
name
):
num
=
0
for
ob
in
gc
.
get_objects
():
if
isinstance
(
ob
,
name
):
num
+=
1
return
num
class
GraphiteClientProtocol
(
protocol
.
Protocol
):
class
GraphiteClientProtocol
(
protocol
.
Protocol
):
def
connectionMade
(
self
):
def
connectionMade
(
self
):
logger
.
info
(
"Monitor connection
%
s"
,
self
.
name
)
timestamp
=
time
.
time
()
timestamp
=
time
.
time
()
data_list
=
[]
data_list
=
[]
for
key
,
value
in
self
.
data
.
items
():
for
key
,
value
in
self
.
data
.
items
():
...
@@ -41,18 +49,37 @@ class GraphiteClientFactory(protocol.ClientFactory):
...
@@ -41,18 +49,37 @@ class GraphiteClientFactory(protocol.ClientFactory):
def
inotify_handler
(
self
,
file
,
mask
):
def
inotify_handler
(
self
,
file
,
mask
):
if
file
.
basename
()
.
startswith
(
'cloud'
):
return
vm
=
file
.
basename
()
.
replace
(
'vio-'
,
''
)
vm
=
file
.
basename
()
.
replace
(
'vio-'
,
''
)
logger
.
info
(
'inotify:
%
s (
%
s)'
,
vm
,
file
.
path
)
logger
.
info
(
'inotify:
%
s (
%
s)'
,
vm
,
file
.
path
)
for
conn
in
reactor
.
connections
.
get
(
vm
,
[]):
if
mask
:
if
file
.
path
==
conn
.
transport
.
addr
:
logger
.
info
(
"event
%
s (
%
s) on
%
s"
%
(
', '
.
join
(
inotify
.
humanReadableMask
(
mask
)),
mask
,
file
))
if
vm
in
reactor
.
running_tasks
:
for
addr
in
reactor
.
running_tasks
[
vm
]
.
get
(
'started'
,
None
)
.
copy
()
:
if
file
.
path
==
addr
:
if
mask
and
mask
==
inotify
.
IN_DELETE
and
reactor
.
running_tasks
[
vm
][
'started'
][
addr
]:
_p
=
reactor
.
running_tasks
[
vm
][
'started'
][
addr
]
logger
.
info
(
"DELETE
%
s"
,
_p
)
logger
.
info
(
'NumOBJSerialLineReceiverFactory1:
%
s'
,
numObjsByName
(
SerialLineReceiverFactory
))
del
_p
reactor
.
running_tasks
[
vm
][
'started'
]
.
pop
(
addr
)
logger
.
info
(
'NumOBJSerialLineReceiverFactory2:
%
s'
,
numObjsByName
(
SerialLineReceiverFactory
))
logger
.
info
(
'reacror_running1
%
s'
,
reactor
.
running_tasks
)
return
elif
reactor
.
running_tasks
[
vm
][
'started'
][
addr
]:
return
return
serial
=
SerialLineReceiverFactory
(
vm
)
serial
=
SerialLineReceiverFactory
(
vm
)
logger
.
info
(
"connecting to
%
s (
%
s)"
,
vm
,
file
.
path
)
logger
.
info
(
"connecting to
%
s (
%
s)"
,
vm
,
file
.
path
)
reactor
.
connectUNIX
(
file
.
path
,
serial
)
ic
=
reactor
.
connectUNIX
(
file
.
path
,
serial
,
10
)
logger
.
info
(
'IConnector state:
%
s'
,
ic
.
state
)
logger
.
info
(
'reacror_running2
%
s'
,
reactor
.
running_tasks
)
# logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
class
SerialLineReceiver
(
SerialLineReceiverBase
):
class
SerialLineReceiver
(
SerialLineReceiverBase
):
def
send_to_graphite
(
self
,
data
):
def
send_to_graphite
(
self
,
data
):
logger
.
info
(
"Send_TO_Graphite"
)
client
=
GraphiteClientFactory
()
client
=
GraphiteClientFactory
()
client
.
protocol
.
data
=
data
client
.
protocol
.
data
=
data
client
.
protocol
.
name
=
self
.
factory
.
vm
client
.
protocol
.
name
=
self
.
factory
.
vm
...
@@ -61,6 +88,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
...
@@ -61,6 +88,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
client
)
client
)
def
handle_command
(
self
,
command
,
args
):
def
handle_command
(
self
,
command
,
args
):
logger
.
info
(
'serial_command:
%
s
%
s'
,
command
,
args
)
if
command
==
'agent_stopped'
:
if
command
==
'agent_stopped'
:
agent_stopped
.
apply_async
(
queue
=
'localhost.man'
,
agent_stopped
.
apply_async
(
queue
=
'localhost.man'
,
args
=
(
self
.
factory
.
vm
,
))
args
=
(
self
.
factory
.
vm
,
))
...
@@ -77,6 +105,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
...
@@ -77,6 +105,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
args
=
args
)
args
=
args
)
def
handle_response
(
self
,
response
,
args
):
def
handle_response
(
self
,
response
,
args
):
logger
.
info
(
'handle_reponse:
%
s
%
s'
,
response
,
args
)
vm
=
self
.
factory
.
vm
vm
=
self
.
factory
.
vm
if
response
==
'status'
:
if
response
==
'status'
:
self
.
send_to_graphite
(
args
)
self
.
send_to_graphite
(
args
)
...
@@ -89,17 +118,25 @@ class SerialLineReceiver(SerialLineReceiverBase):
...
@@ -89,17 +118,25 @@ class SerialLineReceiver(SerialLineReceiverBase):
reactor
.
ended_tasks
[
vm
][
uuid
]
=
args
reactor
.
ended_tasks
[
vm
][
uuid
]
=
args
event
.
set
()
event
.
set
()
def
connectionMade
(
self
):
def
connectionMade
(
self
):
logger
.
info
(
"connected to
%
s (
%
s)"
,
self
.
factory
.
vm
,
logger
.
info
(
"connected to
%
s (
%
s)"
,
self
.
factory
.
vm
,
self
.
transport
.
addr
)
self
.
transport
.
addr
)
logger
.
info
(
"reactor connections:
%
s"
,
reactor
.
connections
)
if
self
.
factory
.
vm
not
in
reactor
.
connections
:
if
self
.
factory
.
vm
not
in
reactor
.
connections
:
reactor
.
connections
[
self
.
factory
.
vm
]
=
set
()
reactor
.
connections
[
self
.
factory
.
vm
]
=
set
()
logger
.
info
(
"reactor connections factory:
%
s"
,
reactor
.
connections
[
self
.
factory
.
vm
])
reactor
.
connections
[
self
.
factory
.
vm
]
.
add
(
self
)
reactor
.
connections
[
self
.
factory
.
vm
]
.
add
(
self
)
logger
.
info
(
'NumOBJSerialLineReceiver:
%
s'
,
numObjsByName
(
SerialLineReceiver
))
def
connectionLost
(
self
,
reason
):
def
connectionLost
(
self
,
reason
):
logger
.
info
(
"disconnected from
%
s (
%
s)"
,
self
.
factory
.
vm
,
logger
.
info
(
"disconnected from
%
s (
%
s)"
,
self
.
factory
.
vm
,
self
.
transport
.
addr
)
self
.
transport
.
addr
)
reactor
.
connections
[
self
.
factory
.
vm
]
.
remove
(
self
)
reactor
.
connections
[
self
.
factory
.
vm
]
.
remove
(
self
)
vm
=
self
.
factory
.
vm
# for addr in reactor.running_tasks[vm].get('started', None):
# if addr == self.transport.addr :
# reactor.running_tasks[vm]['started'][addr] = None
logger
.
info
(
"active connetions:
%
s"
,
reactor
.
running_tasks
[
vm
])
class
SerialLineReceiverFactory
(
protocol
.
ClientFactory
):
class
SerialLineReceiverFactory
(
protocol
.
ClientFactory
):
...
@@ -111,3 +148,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
...
@@ -111,3 +148,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor
.
running_tasks
[
vm
]
=
{}
reactor
.
running_tasks
[
vm
]
=
{}
if
vm
not
in
reactor
.
ended_tasks
:
if
vm
not
in
reactor
.
ended_tasks
:
reactor
.
ended_tasks
[
vm
]
=
{}
reactor
.
ended_tasks
[
vm
]
=
{}
def
startedConnecting
(
self
,
connector
):
vm
=
self
.
vm
addr
=
connector
.
address
logger
.
info
(
"startedConnecting to
%
s (
%
s)"
,
vm
,
addr
)
logger
.
info
(
"started connetions:
%
s"
,
reactor
.
running_tasks
[
vm
])
if
not
reactor
.
running_tasks
[
vm
]
.
get
(
'started'
,
None
):
reactor
.
running_tasks
[
vm
][
'started'
]
=
{}
reactor
.
running_tasks
[
vm
][
'started'
][
addr
]
=
self
logger
.
info
(
'NumOBJSerialLineReceiverFactory:
%
s'
,
numObjsByName
(
SerialLineReceiverFactory
))
logger
.
info
(
'NumOBJSerialLineReceiver:
%
s'
,
numObjsByName
(
SerialLineReceiver
))
logger
.
info
(
"NumConnetions:
%
s"
,
reactor
.
running_tasks
)
def
clientrConnectionLost
(
self
,
connector
):
vm
=
self
.
vm
addr
=
connector
.
address
logger
.
info
(
"clientConnectionLost with
%
s (
%
s)"
,
vm
,
addr
)
for
_addr
in
reactor
.
running_tasks
[
vm
]
.
get
(
'started'
,
None
):
if
_addr
==
addr
:
reactor
.
running_tasks
[
vm
][
'started'
]
.
pop
(
addr
)
logger
.
info
(
"active connetions:
%
s"
,
reactor
.
running_tasks
[
vm
])
def
clientrConnectionFailed
(
self
,
connector
,
reason
):
vm
=
self
.
vm
addr
=
connector
.
address
logger
.
info
(
"clientConnectionFailed with
%
s (
%
s)"
,
vm
,
connector
.
addr
)
for
_addr
in
reactor
.
running_tasks
[
vm
]
.
get
(
'started'
,
None
):
if
_addr
==
addr
:
reactor
.
running_tasks
[
vm
][
'started'
]
.
pop
(
addr
)
logger
.
info
(
"active connetions:
%
s"
,
reactor
.
running_tasks
[
vm
])
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment