Commit 5a251e2c by Őry Máté Committed by cloud

send metrics in chunks of 100

parent f235c71d
#!/usr/bin/python #!/usr/bin/python
from datetime import datetime from datetime import datetime
from itertools import islice
from socket import gethostname from socket import gethostname
import argparse import argparse
import logging import logging
...@@ -95,13 +96,14 @@ class Client: ...@@ -95,13 +96,14 @@ class Client:
metric name given in the message body. (This option must be enabled metric name given in the message body. (This option must be enabled
on the server. Otherwise it can't parse the data sent.) on the server. Otherwise it can't parse the data sent.)
""" """
body = "\n".join(message)
try: try:
self.channel.basic_publish(exchange=self.amqp_queue, self.channel.basic_publish(exchange=self.amqp_queue,
routing_key='', body="\n".join(message)) routing_key='', body=body)
return True
except: except:
logger.error('An error has occured while sending metrics.') logger.error('An error has occured while sending metrics (%dB).',
return False len(body))
raise
def collect_node(self, metricCollectors): def collect_node(self, metricCollectors):
""" """
...@@ -198,6 +200,13 @@ class Client: ...@@ -198,6 +200,13 @@ class Client:
freqs = set([i[1] for i in items if i[1]>0]) freqs = set([i[1] for i in items if i[1]>0])
return reduce(operator.mul, freqs, 1) return reduce(operator.mul, freqs, 1)
@staticmethod
def _chunker(seq, size):
"""Yield seq in size-long chunks.
"""
for pos in xrange(0, len(seq), size):
yield islice(seq, pos, pos + size)
def run(self, metricCollectors=[]): def run(self, metricCollectors=[]):
""" """
Call this method to start reporting to the server, it needs the Call this method to start reporting to the server, it needs the
...@@ -215,9 +224,9 @@ class Client: ...@@ -215,9 +224,9 @@ class Client:
if self.debugMode == "True": if self.debugMode == "True":
for i in metrics: for i in metrics:
logger.debug('Metric to send: %s', i) logger.debug('Metric to send: %s', i)
for chunk in self._chunker(metrics, 100):
self.send(chunk)
logger.info("%d metrics sent", len(metrics)) logger.info("%d metrics sent", len(metrics))
if not self.send(metrics):
raise RuntimeError
time.sleep(1) time.sleep(1)
self.beat += 1 self.beat += 1
if self.beat % maxFrequency == 0: if self.beat % maxFrequency == 0:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment