Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module gluon.scheduler

   1  #!/usr/bin/env python 
   2  # -*- coding: utf-8 -*- 
   3   
   4  USAGE = """ 
   5  ## Example 
   6   
   7  For any existing app 
   8   
   9  Create File: app/models/scheduler.py ====== 
  10  from gluon.scheduler import Scheduler 
  11   
  12  def demo1(*args,**vars): 
  13      print 'you passed args=%s and vars=%s' % (args, vars) 
  14      return 'done!' 
  15   
  16  def demo2(): 
  17      1/0 
  18   
  19  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
  20  ## run worker nodes with: 
  21   
  22     cd web2py 
  23     python web2py.py -K myapp 
  24  or 
  25     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
  26                               -f applications/myapp/databases/ \ 
  27                               -t mytasks.py 
  28  (-h for info) 
  29  python scheduler.py -h 
  30   
  31  ## schedule jobs using 
  32  http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task 
  33   
  34  ## monitor scheduled jobs 
  35  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0 
  36   
  37  ## view completed jobs 
  38  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 
  39   
  40  ## view workers 
  41  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 
  42   
  43  ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put 
  44  ## the following into /etc/init/web2py-scheduler.conf: 
  45  ## (This assumes your web2py instance is installed in <user>'s home directory, 
  46  ## running as <user>, with app <myapp>, on network interface eth0.) 
  47   
  48  description "web2py task scheduler" 
  49  start on (local-filesystems and net-device-up IFACE=eth0) 
  50  stop on shutdown 
  51  respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds. 
  52  exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp> 
  53  respawn 
  54   
  55  ## You can then start/stop/restart/check status of the daemon with: 
  56  sudo start web2py-scheduler 
  57  sudo stop web2py-scheduler 
  58  sudo restart web2py-scheduler 
  59  sudo status web2py-scheduler 
  60  """ 
  61   
  62  import os 
  63  import time 
  64  import multiprocessing 
  65  import sys 
  66  import threading 
  67  import traceback 
  68  import signal 
  69  import socket 
  70  import datetime 
  71  import logging 
  72  import optparse 
  73  import types 
  74  import Queue 
  75   
  76  path = os.getcwd() 
  77   
  78  if 'WEB2PY_PATH' not in os.environ: 
  79      os.environ['WEB2PY_PATH'] = path 
  80   
  81  try: 
  82      from gluon.contrib.simplejson import loads, dumps 
  83  except: 
  84      from simplejson import loads, dumps 
  85   
  86  IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid()) 
  87   
  88  logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) 
  89   
  90  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB 
  91  from gluon import IS_INT_IN_RANGE, IS_DATETIME 
  92  from gluon.utils import web2py_uuid 
  93  from gluon.storage import Storage 
  94   
  95   
  96  QUEUED = 'QUEUED' 
  97  ASSIGNED = 'ASSIGNED' 
  98  RUNNING = 'RUNNING' 
  99  COMPLETED = 'COMPLETED' 
 100  FAILED = 'FAILED' 
 101  TIMEOUT = 'TIMEOUT' 
 102  STOPPED = 'STOPPED' 
 103  ACTIVE = 'ACTIVE' 
 104  TERMINATE = 'TERMINATE' 
 105  DISABLED = 'DISABLED' 
 106  KILL = 'KILL' 
 107  PICK = 'PICK' 
 108  STOP_TASK = 'STOP_TASK' 
 109  EXPIRED = 'EXPIRED' 
 110  SECONDS = 1 
 111  HEARTBEAT = 3 * SECONDS 
 112  MAXHIBERNATION = 10 
 113  CLEAROUT = '!clear!' 
 114   
 115  CALLABLETYPES = (types.LambdaType, types.FunctionType, 
 116                   types.BuiltinFunctionType, 
 117                   types.MethodType, types.BuiltinMethodType) 
 118   
 119   
120 -class Task(object):
121 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
122 logger.debug(' new task allocated: %s.%s', app, function) 123 self.app = app 124 self.function = function 125 self.timeout = timeout 126 self.args = args # json 127 self.vars = vars # json 128 self.__dict__.update(kwargs)
129
130 - def __str__(self):
131 return '<Task: %s>' % self.function
132 133
134 -class TaskReport(object):
135 - def __init__(self, status, result=None, output=None, tb=None):
136 logger.debug(' new task report: %s', status) 137 if tb: 138 logger.debug(' traceback: %s', tb) 139 else: 140 logger.debug(' result: %s', result) 141 self.status = status 142 self.result = result 143 self.output = output 144 self.tb = tb
145
146 - def __str__(self):
147 return '<TaskReport: %s>' % self.status
148 149
150 -def demo_function(*argv, **kwargs):
151 """ test function """ 152 for i in range(argv[0]): 153 print 'click', i 154 time.sleep(1) 155 return 'done'
156 157 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 158 #and subsequent usage as function Keyword arguments unicode variable names won't work! 159 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python 160 161
162 -def _decode_list(lst):
163 newlist = [] 164 for i in lst: 165 if isinstance(i, unicode): 166 i = i.encode('utf-8') 167 elif isinstance(i, list): 168 i = _decode_list(i) 169 newlist.append(i) 170 return newlist
171 172
173 -def _decode_dict(dct):
174 newdict = {} 175 for k, v in dct.iteritems(): 176 if isinstance(k, unicode): 177 k = k.encode('utf-8') 178 if isinstance(v, unicode): 179 v = v.encode('utf-8') 180 elif isinstance(v, list): 181 v = _decode_list(v) 182 newdict[k] = v 183 return newdict
184 185
186 -def executor(queue, task, out):
187 """ the background process """ 188 logger.debug(' task started') 189 190 class LogOutput(object): 191 """Facility to log output at intervals""" 192 def __init__(self, out_queue): 193 self.out_queue = out_queue 194 self.stdout = sys.stdout 195 sys.stdout = self
196 197 def __del__(self): 198 sys.stdout = self.stdout 199 200 def flush(self): 201 pass 202 203 def write(self, data): 204 self.out_queue.put(data) 205 206 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid}) 207 stdout = LogOutput(out) 208 try: 209 if task.app: 210 os.chdir(os.environ['WEB2PY_PATH']) 211 from gluon.shell import env, parse_path_info 212 from gluon import current 213 level = logging.getLogger().getEffectiveLevel() 214 logging.getLogger().setLevel(logging.WARN) 215 # Get controller-specific subdirectory if task.app is of 216 # form 'app/controller' 217 (a, c, f) = parse_path_info(task.app) 218 _env = env(a=a, c=c, import_models=True) 219 logging.getLogger().setLevel(level) 220 f = task.function 221 functions = current._scheduler.tasks 222 if not functions: 223 #look into env 224 _function = _env.get(f) 225 else: 226 _function = functions.get(f) 227 if not isinstance(_function, CALLABLETYPES): 228 raise NameError( 229 "name '%s' not found in scheduler's environment" % f) 230 #Inject W2P_TASK into environment 231 _env.update({'W2P_TASK' : W2P_TASK}) 232 globals().update(_env) 233 args = loads(task.args) 234 vars = loads(task.vars, object_hook=_decode_dict) 235 result = dumps(_function(*args, **vars)) 236 else: 237 ### for testing purpose only 238 result = eval(task.function)( 239 *loads(task.args, object_hook=_decode_dict), 240 **loads(task.vars, object_hook=_decode_dict)) 241 queue.put(TaskReport('COMPLETED', result=result)) 242 except BaseException, e: 243 tb = traceback.format_exc() 244 queue.put(TaskReport('FAILED', tb=tb)) 245 del stdout 246 247
248 -class MetaScheduler(threading.Thread):
249 - def __init__(self):
250 threading.Thread.__init__(self) 251 self.process = None # the background process 252 self.have_heartbeat = True # set to False to kill 253 self.empty_runs = 0
254 255
256 - def async(self, task):
257 """ 258 starts the background process and returns: 259 ('ok',result,output) 260 ('error',exception,None) 261 ('timeout',None,None) 262 ('terminated',None,None) 263 """ 264 db = self.db 265 sr = db.scheduler_run 266 out = multiprocessing.Queue() 267 queue = multiprocessing.Queue(maxsize=1) 268 p = multiprocessing.Process(target=executor, args=(queue, task, out)) 269 self.process = p 270 logger.debug(' task starting') 271 p.start() 272 273 task_output = "" 274 tout = "" 275 276 try: 277 if task.sync_output > 0: 278 run_timeout = task.sync_output 279 else: 280 run_timeout = task.timeout 281 282 start = time.time() 283 284 while p.is_alive() and ( 285 not task.timeout or time.time() - start < task.timeout): 286 if tout: 287 try: 288 logger.debug(' partial output saved') 289 db(sr.id == task.run_id).update(run_output=task_output) 290 db.commit() 291 except: 292 pass 293 p.join(timeout=run_timeout) 294 tout = "" 295 while not out.empty(): 296 tout += out.get() 297 if tout: 298 logger.debug(' partial output: "%s"' % str(tout)) 299 if CLEAROUT in tout: 300 task_output = tout[ 301 tout.rfind(CLEAROUT) + len(CLEAROUT):] 302 else: 303 task_output += tout 304 except: 305 p.terminate() 306 p.join() 307 self.have_heartbeat = False 308 logger.debug(' task stopped by general exception') 309 tr = TaskReport(STOPPED) 310 else: 311 if p.is_alive(): 312 p.terminate() 313 logger.debug(' task timeout') 314 try: 315 # we try to get a traceback here 316 tr = queue.get(timeout=2) 317 tr.status = TIMEOUT 318 tr.output = task_output 319 except Queue.Empty: 320 tr = TaskReport(TIMEOUT) 321 elif queue.empty(): 322 self.have_heartbeat = False 323 logger.debug(' task stopped') 324 tr = TaskReport(STOPPED) 325 else: 326 logger.debug(' task completed or failed') 327 tr = queue.get() 328 tr.output = task_output 329 return tr
330
331 - def die(self):
332 logger.info('die!') 333 self.have_heartbeat = False 334 self.terminate_process()
335
336 - def give_up(self):
337 logger.info('Giving up as soon as possible!') 338 self.have_heartbeat = False
339
340 - def terminate_process(self):
341 try: 342 self.process.terminate() 343 except: 344 pass # no process to terminate
345
346 - def run(self):
347 """ the thread that sends heartbeat """ 348 counter = 0 349 while self.have_heartbeat: 350 self.send_heartbeat(counter) 351 counter += 1
352
353 - def start_heartbeats(self):
354 self.start()
355
356 - def send_heartbeat(self, counter):
357 print 'thum' 358 time.sleep(1)
359
360 - def pop_task(self):
361 return Task( 362 app=None, 363 function='demo_function', 364 timeout=7, 365 args='[2]', 366 vars='{}')
367
368 - def report_task(self, task, task_report):
369 print 'reporting task' 370 pass
371
372 - def sleep(self):
373 pass
374
375 - def loop(self):
376 try: 377 self.start_heartbeats() 378 while True and self.have_heartbeat: 379 logger.debug('looping...') 380 task = self.pop_task() 381 if task: 382 self.empty_runs = 0 383 self.report_task(task, self.async(task)) 384 else: 385 self.empty_runs += 1 386 logger.debug('sleeping...') 387 if self.max_empty_runs != 0: 388 logger.debug('empty runs %s/%s', 389 self.empty_runs, self.max_empty_runs) 390 if self.empty_runs >= self.max_empty_runs: 391 logger.info( 392 'empty runs limit reached, killing myself') 393 self.die() 394 self.sleep() 395 except KeyboardInterrupt: 396 self.die()
397 398 399 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) 400 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 401 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) 402 403
404 -class TYPE(object):
405 """ 406 validator that check whether field is valid json and validate its type 407 """ 408
409 - def __init__(self, myclass=list, parse=False):
410 self.myclass = myclass 411 self.parse = parse
412
413 - def __call__(self, value):
414 from gluon import current 415 try: 416 obj = loads(value) 417 except: 418 return (value, current.T('invalid json')) 419 else: 420 if isinstance(obj, self.myclass): 421 if self.parse: 422 return (obj, None) 423 else: 424 return (value, None) 425 else: 426 return (value, current.T('Not of type: %s') % self.myclass)
427 428
429 -class Scheduler(MetaScheduler):
430 - def __init__(self, db, tasks=None, migrate=True, 431 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT, 432 max_empty_runs=0, discard_results=False, utc_time=False):
433 434 MetaScheduler.__init__(self) 435 436 self.db = db 437 self.db_thread = None 438 self.tasks = tasks 439 self.group_names = group_names 440 self.heartbeat = heartbeat 441 self.worker_name = worker_name or IDENTIFIER 442 #list containing status as recorded in the table plus a boost parameter 443 #for hibernation (i.e. when someone stop the worker acting on the worker table) 444 self.worker_status = [RUNNING, 1] 445 self.max_empty_runs = max_empty_runs 446 self.discard_results = discard_results 447 self.is_a_ticker = False 448 self.do_assign_tasks = False 449 self.greedy = False 450 self.utc_time = utc_time 451 452 from gluon import current 453 current._scheduler = self 454 455 self.define_tables(db, migrate=migrate)
456
457 - def __get_migrate(self, tablename, migrate=True):
458 if migrate is False: 459 return False 460 elif migrate is True: 461 return True 462 elif isinstance(migrate, str): 463 return "%s%s.table" % (migrate , tablename) 464 return True
465
466 - def now(self):
467 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
468
469 - def set_requirements(self, scheduler_task):
470 from gluon import current 471 if hasattr(current, 'request'): 472 scheduler_task.application_name.default = '%s/%s' % ( 473 current.request.application, current.request.controller 474 )
475
476 - def define_tables(self, db, migrate):
477 from gluon.dal import DEFAULT 478 logger.debug('defining tables (migrate=%s)', migrate) 479 now = self.now 480 db.define_table( 481 'scheduler_task', 482 Field('application_name', requires=IS_NOT_EMPTY(), 483 default=None, writable=False), 484 Field('task_name', default=None), 485 Field('group_name', default='main'), 486 Field('status', requires=IS_IN_SET(TASK_STATUS), 487 default=QUEUED, writable=False), 488 Field('function_name', 489 requires=IS_IN_SET(sorted(self.tasks.keys())) 490 if self.tasks else DEFAULT), 491 Field('uuid', length=255, 492 requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'), 493 unique=True, default=web2py_uuid), 494 Field('args', 'text', default='[]', requires=TYPE(list)), 495 Field('vars', 'text', default='{}', requires=TYPE(dict)), 496 Field('enabled', 'boolean', default=True), 497 Field('start_time', 'datetime', default=now, 498 requires=IS_DATETIME()), 499 Field('next_run_time', 'datetime', default=now), 500 Field('stop_time', 'datetime'), 501 Field('repeats', 'integer', default=1, comment="0=unlimited", 502 requires=IS_INT_IN_RANGE(0, None)), 503 Field('retry_failed', 'integer', default=0, comment="-1=unlimited", 504 requires=IS_INT_IN_RANGE(-1, None)), 505 Field('period', 'integer', default=60, comment='seconds', 506 requires=IS_INT_IN_RANGE(0, None)), 507 Field('timeout', 'integer', default=60, comment='seconds', 508 requires=IS_INT_IN_RANGE(0, None)), 509 Field('sync_output', 'integer', default=0, 510 comment="update output every n sec: 0=never", 511 requires=IS_INT_IN_RANGE(0, None)), 512 Field('times_run', 'integer', default=0, writable=False), 513 Field('times_failed', 'integer', default=0, writable=False), 514 Field('last_run_time', 'datetime', writable=False, readable=False), 515 Field('assigned_worker_name', default='', writable=False), 516 on_define=self.set_requirements, 517 migrate=self.__get_migrate('scheduler_task', migrate), 518 format='%(task_name)s') 519 520 db.define_table( 521 'scheduler_run', 522 Field('task_id', 'reference scheduler_task'), 523 Field('status', requires=IS_IN_SET(RUN_STATUS)), 524 Field('start_time', 'datetime'), 525 Field('stop_time', 'datetime'), 526 Field('run_output', 'text'), 527 Field('run_result', 'text'), 528 Field('traceback', 'text'), 529 Field('worker_name', default=self.worker_name), 530 migrate=self.__get_migrate('scheduler_run', migrate) 531 ) 532 533 db.define_table( 534 'scheduler_worker', 535 Field('worker_name', length=255, unique=True), 536 Field('first_heartbeat', 'datetime'), 537 Field('last_heartbeat', 'datetime'), 538 Field('status', requires=IS_IN_SET(WORKER_STATUS)), 539 Field('is_ticker', 'boolean', default=False, writable=False), 540 Field('group_names', 'list:string', default=self.group_names), 541 migrate=self.__get_migrate('scheduler_worker', migrate) 542 ) 543 544 if migrate is not False: 545 db.commit()
546
547 - def loop(self, worker_name=None):
548 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 549 try: 550 self.start_heartbeats() 551 while True and self.have_heartbeat: 552 if self.worker_status[0] == DISABLED: 553 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1]) 554 self.sleep() 555 continue 556 logger.debug('looping...') 557 task = self.wrapped_pop_task() 558 if task: 559 self.empty_runs = 0 560 self.worker_status[0] = RUNNING 561 self.report_task(task, self.async(task)) 562 self.worker_status[0] = ACTIVE 563 else: 564 self.empty_runs += 1 565 logger.debug('sleeping...') 566 if self.max_empty_runs != 0: 567 logger.debug('empty runs %s/%s', 568 self.empty_runs, self.max_empty_runs) 569 if self.empty_runs >= self.max_empty_runs: 570 logger.info( 571 'empty runs limit reached, killing myself') 572 self.die() 573 self.sleep() 574 except (KeyboardInterrupt, SystemExit): 575 logger.info('catched') 576 self.die()
577
578 - def wrapped_assign_tasks(self, db):
579 logger.debug('Assigning tasks...') 580 db.commit() #db.commit() only for Mysql 581 x = 0 582 while x < 10: 583 try: 584 self.assign_tasks(db) 585 db.commit() 586 logger.debug('Tasks assigned...') 587 break 588 except: 589 db.rollback() 590 logger.error('TICKER: error assigning tasks (%s)', x) 591 x += 1 592 time.sleep(0.5)
593
594 - def wrapped_pop_task(self):
595 db = self.db 596 db.commit() #another nifty db.commit() only for Mysql 597 x = 0 598 while x < 10: 599 try: 600 rtn = self.pop_task(db) 601 return rtn 602 break 603 except: 604 db.rollback() 605 logger.error(' error popping tasks') 606 x += 1 607 time.sleep(0.5)
608
609 - def pop_task(self, db):
610 now = self.now() 611 st = self.db.scheduler_task 612 if self.is_a_ticker and self.do_assign_tasks: 613 #I'm a ticker, and 5 loops passed without reassigning tasks, let's do 614 #that and loop again 615 self.wrapped_assign_tasks(db) 616 return None 617 #ready to process something 618 grabbed = db(st.assigned_worker_name == self.worker_name)( 619 st.status == ASSIGNED) 620 621 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() 622 if task: 623 task.update_record(status=RUNNING, last_run_time=now) 624 #noone will touch my task! 625 db.commit() 626 logger.debug(' work to do %s', task.id) 627 else: 628 if self.greedy and self.is_a_ticker: 629 #there are other tasks ready to be assigned 630 logger.info('TICKER: greedy loop') 631 self.wrapped_assign_tasks(db) 632 else: 633 logger.info('nothing to do') 634 return None 635 next_run_time = task.last_run_time + datetime.timedelta( 636 seconds=task.period) 637 times_run = task.times_run + 1 638 if times_run < task.repeats or task.repeats == 0: 639 #need to run (repeating task) 640 run_again = True 641 else: 642 #no need to run again 643 run_again = False 644 run_id = 0 645 while True and not self.discard_results: 646 logger.debug(' new scheduler_run record') 647 try: 648 run_id = db.scheduler_run.insert( 649 task_id=task.id, 650 status=RUNNING, 651 start_time=now, 652 worker_name=self.worker_name) 653 db.commit() 654 break 655 except: 656 time.sleep(0.5) 657 db.rollback() 658 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 659 return Task( 660 app=task.application_name, 661 function=task.function_name, 662 timeout=task.timeout, 663 args=task.args, # in json 664 vars=task.vars, # in json 665 task_id=task.id, 666 run_id=run_id, 667 run_again=run_again, 668 next_run_time=next_run_time, 669 times_run=times_run, 670 stop_time=task.stop_time, 671 retry_failed=task.retry_failed, 672 times_failed=task.times_failed, 673 sync_output=task.sync_output, 674 uuid=task.uuid)
675
676 - def report_task(self, task, task_report):
677 db = self.db 678 now = self.now() 679 while True: 680 try: 681 if not self.discard_results: 682 if task_report.result != 'null' or task_report.tb: 683 #result is 'null' as a string if task completed 684 #if it's stopped it's None as NoneType, so we record 685 #the STOPPED "run" anyway 686 logger.debug(' recording task report in db (%s)', 687 task_report.status) 688 db(db.scheduler_run.id == task.run_id).update( 689 status=task_report.status, 690 stop_time=now, 691 run_result=task_report.result, 692 run_output=task_report.output, 693 traceback=task_report.tb) 694 else: 695 logger.debug(' deleting task report in db because of no result') 696 db(db.scheduler_run.id == task.run_id).delete() 697 #if there is a stop_time and the following run would exceed it 698 is_expired = (task.stop_time 699 and task.next_run_time > task.stop_time 700 and True or False) 701 status = (task.run_again and is_expired and EXPIRED 702 or task.run_again and not is_expired 703 and QUEUED or COMPLETED) 704 if task_report.status == COMPLETED: 705 d = dict(status=status, 706 next_run_time=task.next_run_time, 707 times_run=task.times_run, 708 times_failed=0 709 ) 710 db(db.scheduler_task.id == task.task_id)( 711 db.scheduler_task.status == RUNNING).update(**d) 712 else: 713 st_mapping = {'FAILED': 'FAILED', 714 'TIMEOUT': 'TIMEOUT', 715 'STOPPED': 'QUEUED'}[task_report.status] 716 status = (task.retry_failed 717 and task.times_failed < task.retry_failed 718 and QUEUED or task.retry_failed == -1 719 and QUEUED or st_mapping) 720 db( 721 (db.scheduler_task.id == task.task_id) & 722 (db.scheduler_task.status == RUNNING) 723 ).update( 724 times_failed=db.scheduler_task.times_failed + 1, 725 next_run_time=task.next_run_time, 726 status=status 727 ) 728 db.commit() 729 logger.info('task completed (%s)', task_report.status) 730 break 731 except: 732 db.rollback() 733 time.sleep(0.5)
734
735 - def adj_hibernation(self):
736 if self.worker_status[0] == DISABLED: 737 wk_st = self.worker_status[1] 738 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION 739 self.worker_status[1] = hibernation
740
741 - def send_heartbeat(self, counter):
742 if not self.db_thread: 743 logger.debug('thread building own DAL object') 744 self.db_thread = DAL( 745 self.db._uri, folder=self.db._adapter.folder) 746 self.define_tables(self.db_thread, migrate=False) 747 try: 748 db = self.db_thread 749 sw, st = db.scheduler_worker, db.scheduler_task 750 now = self.now() 751 # record heartbeat 752 mybackedstatus = db(sw.worker_name == self.worker_name).select().first() 753 if not mybackedstatus: 754 sw.insert(status=ACTIVE, worker_name=self.worker_name, 755 first_heartbeat=now, last_heartbeat=now, 756 group_names=self.group_names) 757 self.worker_status = [ACTIVE, 1] # activating the process 758 mybackedstatus = ACTIVE 759 else: 760 mybackedstatus = mybackedstatus.status 761 if mybackedstatus == DISABLED: 762 # keep sleeping 763 self.worker_status[0] = DISABLED 764 if self.worker_status[1] == MAXHIBERNATION: 765 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 766 db(sw.worker_name == self.worker_name).update( 767 last_heartbeat=now) 768 elif mybackedstatus == TERMINATE: 769 self.worker_status[0] = TERMINATE 770 logger.debug("Waiting to terminate the current task") 771 self.give_up() 772 return 773 elif mybackedstatus == KILL: 774 self.worker_status[0] = KILL 775 self.die() 776 else: 777 if mybackedstatus == STOP_TASK: 778 logger.info('Asked to kill the current task') 779 self.terminate_process() 780 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 781 db(sw.worker_name == self.worker_name).update( 782 last_heartbeat=now, status=ACTIVE) 783 self.worker_status[1] = 1 # re-activating the process 784 if self.worker_status[0] != RUNNING: 785 self.worker_status[0] = ACTIVE 786 787 self.do_assign_tasks = False 788 if counter % 5 == 0 or mybackedstatus == PICK: 789 try: 790 # delete inactive workers 791 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3) 792 departure = now - datetime.timedelta( 793 seconds=self.heartbeat * 3 * MAXHIBERNATION) 794 logger.debug( 795 ' freeing workers that have not sent heartbeat') 796 inactive_workers = db( 797 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) | 798 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE)) 799 ) 800 db(st.assigned_worker_name.belongs( 801 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\ 802 .update(assigned_worker_name='', status=QUEUED) 803 inactive_workers.delete() 804 try: 805 self.is_a_ticker = self.being_a_ticker() 806 except: 807 logger.error('Error coordinating TICKER') 808 if self.worker_status[0] == ACTIVE: 809 self.do_assign_tasks = True 810 except: 811 logger.error('Error cleaning up') 812 db.commit() 813 except: 814 logger.error('Error retrieving status') 815 db.rollback() 816 self.adj_hibernation() 817 self.sleep()
818
819 - def being_a_ticker(self):
820 db = self.db_thread 821 sw = db.scheduler_worker 822 all_active = db( 823 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE) 824 ).select() 825 ticker = all_active.find(lambda row: row.is_ticker is True).first() 826 not_busy = self.worker_status[0] == ACTIVE 827 if not ticker: 828 #if no other tickers are around 829 if not_busy: 830 #only if I'm not busy 831 db(sw.worker_name == self.worker_name).update(is_ticker=True) 832 db(sw.worker_name != self.worker_name).update(is_ticker=False) 833 logger.info("TICKER: I'm a ticker") 834 else: 835 #I'm busy 836 if len(all_active) >= 1: 837 #so I'll "downgrade" myself to a "poor worker" 838 db(sw.worker_name == self.worker_name).update(is_ticker=False) 839 else: 840 not_busy = True 841 db.commit() 842 return not_busy 843 else: 844 logger.info( 845 "%s is a ticker, I'm a poor worker" % ticker.worker_name) 846 return False
847
848 - def assign_tasks(self, db):
849 sw, st = db.scheduler_worker, db.scheduler_task 850 now = self.now() 851 all_workers = db(sw.status == ACTIVE).select() 852 #build workers as dict of groups 853 wkgroups = {} 854 for w in all_workers: 855 group_names = w.group_names 856 for gname in group_names: 857 if gname not in wkgroups: 858 wkgroups[gname] = dict( 859 workers=[{'name': w.worker_name, 'c': 0}]) 860 else: 861 wkgroups[gname]['workers'].append( 862 {'name': w.worker_name, 'c': 0}) 863 #set queued tasks that expired between "runs" (i.e., you turned off 864 #the scheduler): then it wasn't expired, but now it is 865 db(st.status.belongs( 866 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) 867 868 all_available = db( 869 (st.status.belongs((QUEUED, ASSIGNED))) & 870 ((st.times_run < st.repeats) | (st.repeats == 0)) & 871 (st.start_time <= now) & 872 ((st.stop_time == None) | (st.stop_time > now)) & 873 (st.next_run_time <= now) & 874 (st.enabled == True) 875 ) 876 limit = len(all_workers) * (50 / (len(wkgroups) or 1)) 877 #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. 878 #this can be adjusted with some added intelligence (like esteeming how many tasks will 879 #a worker complete before the ticker reassign them around, but the gain is quite small 880 #50 is quite a sweet spot also for fast tasks, with sane heartbeat values 881 #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less 882 #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. 883 884 #If a worker is currently elaborating a long task, all other tasks assigned 885 #to him needs to be reassigned "freely" to other workers, that may be free. 886 #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability 887 888 #let's freeze it up 889 db.commit() 890 x = 0 891 for group in wkgroups.keys(): 892 tasks = all_available(st.group_name == group).select( 893 limitby=(0, limit), orderby = st.next_run_time) 894 #let's break up the queue evenly among workers 895 for task in tasks: 896 x += 1 897 gname = task.group_name 898 ws = wkgroups.get(gname) 899 if ws: 900 counter = 0 901 myw = 0 902 for i, w in enumerate(ws['workers']): 903 if w['c'] < counter: 904 myw = i 905 counter = w['c'] 906 d = dict( 907 status=ASSIGNED, 908 assigned_worker_name=wkgroups[gname]['workers'][myw]['name'] 909 ) 910 if not task.task_name: 911 d['task_name'] = task.function_name 912 task.update_record(**d) 913 wkgroups[gname]['workers'][myw]['c'] += 1 914 915 db.commit() 916 #I didn't report tasks but I'm working nonetheless!!!! 917 if x > 0: 918 self.empty_runs = 0 919 #I'll be greedy only if tasks assigned are equal to the limit 920 # (meaning there could be others ready to be assigned) 921 self.greedy = x >= limit and True or False 922 logger.info('TICKER: workers are %s', len(all_workers)) 923 logger.info('TICKER: tasks are %s', x)
924
925 - def sleep(self):
926 time.sleep(self.heartbeat * self.worker_status[1])
927 # should only sleep until next available task 928
929 - def set_worker_status(self, group_names=None, action=ACTIVE):
930 if not group_names: 931 group_names = self.group_names 932 elif isinstance(group_names, str): 933 group_names = [group_names] 934 for group in group_names: 935 self.db( 936 self.db.scheduler_worker.group_names.contains(group) 937 ).update(status=action)
938
939 - def disable(self, group_names=None):
940 self.set_worker_status(group_names=group_names,action=DISABLED)
941
942 - def resume(self, group_names=None):
943 self.set_worker_status(group_names=group_names,action=ACTIVE)
944
945 - def terminate(self, group_names=None):
946 self.set_worker_status(group_names=group_names,action=TERMINATE)
947
948 - def kill(self, group_names=None):
949 self.set_worker_status(group_names=group_names,action=KILL)
950
951 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
952 """ 953 Queue tasks. This takes care of handling the validation of all 954 values. 955 :param function: the function (anything callable with a __name__) 956 :param pargs: "raw" args to be passed to the function. Automatically 957 jsonified. 958 :param pvars: "raw" kwargs to be passed to the function. Automatically 959 jsonified 960 :param kwargs: all the scheduler_task columns. args and vars here should be 961 in json format already, they will override pargs and pvars 962 963 returns a dict just as a normal validate_and_insert, plus a uuid key holding 964 the uuid of the queued task. If validation is not passed, both id and uuid 965 will be None, and you'll get an "error" dict holding the errors found. 966 """ 967 if hasattr(function, '__name__'): 968 function = function.__name__ 969 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) 970 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) 971 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() 972 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function 973 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None 974 rtn = self.db.scheduler_task.validate_and_insert( 975 function_name=function, 976 task_name=tname, 977 args=targs, 978 vars=tvars, 979 uuid=tuuid, 980 **kwargs) 981 if not rtn.errors: 982 rtn.uuid = tuuid 983 if immediate: 984 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK) 985 else: 986 rtn.uuid = None 987 return rtn
988
989 - def task_status(self, ref, output=False):
990 """ 991 Shortcut for task status retrieval 992 993 :param ref: can be 994 - integer --> lookup will be done by scheduler_task.id 995 - string --> lookup will be done by scheduler_task.uuid 996 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1') 997 :param output: fetch also the scheduler_run record 998 999 Returns a single Row object, for the last queued task 1000 If output == True, returns also the last scheduler_run record 1001 scheduler_run record is fetched by a left join, so it can 1002 have all fields == None 1003 1004 """ 1005 from gluon.dal import Query 1006 sr, st = self.db.scheduler_run, self.db.scheduler_task 1007 if isinstance(ref, int): 1008 q = st.id == ref 1009 elif isinstance(ref, str): 1010 q = st.uuid == ref 1011 elif isinstance(ref, Query): 1012 q = ref 1013 else: 1014 raise SyntaxError( 1015 "You can retrieve results only by id, uuid or Query") 1016 fields = [st.ALL] 1017 left = False 1018 orderby = ~st.id 1019 if output: 1020 fields = st.ALL, sr.ALL 1021 left = sr.on(sr.task_id == st.id) 1022 orderby = ~st.id | ~sr.id 1023 row = self.db(q).select( 1024 *fields, 1025 **dict(orderby=orderby, 1026 left=left, 1027 limitby=(0, 1)) 1028 ).first() 1029 if row and output: 1030 row.result = row.scheduler_run.run_result and \ 1031 loads(row.scheduler_run.run_result, 1032 object_hook=_decode_dict) or None 1033 return row
1034
1035 - def stop_task(self, ref):
1036 """ 1037 Experimental!!! 1038 Shortcut for task termination. 1039 If the task is RUNNING it will terminate it --> execution will be set as FAILED 1040 If the task is QUEUED, its stop_time will be set as to "now", 1041 the enabled flag will be set to False, status to STOPPED 1042 1043 :param ref: can be 1044 - integer --> lookup will be done by scheduler_task.id 1045 - string --> lookup will be done by scheduler_task.uuid 1046 Returns: 1047 - 1 if task was stopped (meaning an update has been done) 1048 - None if task was not found, or if task was not RUNNING or QUEUED 1049 """ 1050 from gluon.dal import Query 1051 st, sw = self.db.scheduler_task, self.db.scheduler_worker 1052 if isinstance(ref, int): 1053 q = st.id == ref 1054 elif isinstance(ref, str): 1055 q = st.uuid == ref 1056 else: 1057 raise SyntaxError( 1058 "You can retrieve results only by id or uuid") 1059 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first() 1060 rtn = None 1061 if not task: 1062 return rtn 1063 if task.status == 'RUNNING': 1064 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK) 1065 elif task.status == 'QUEUED': 1066 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED) 1067 return rtn
1068 1069
1070 -def main():
1071 """ 1072 allows to run worker without python web2py.py .... by simply python this.py 1073 """ 1074 parser = optparse.OptionParser() 1075 parser.add_option( 1076 "-w", "--worker_name", dest="worker_name", default=None, 1077 help="start a worker with name") 1078 parser.add_option( 1079 "-b", "--heartbeat", dest="heartbeat", default=10, 1080 type='int', help="heartbeat time in seconds (default 10)") 1081 parser.add_option( 1082 "-L", "--logger_level", dest="logger_level", 1083 default=30, 1084 type='int', 1085 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)") 1086 parser.add_option("-E", "--empty-runs", 1087 dest="max_empty_runs", 1088 type='int', 1089 default=0, 1090 help="max loops with no grabbed tasks permitted (0 for never check)") 1091 parser.add_option( 1092 "-g", "--group_names", dest="group_names", 1093 default='main', 1094 help="comma separated list of groups to be picked by the worker") 1095 parser.add_option( 1096 "-f", "--db_folder", dest="db_folder", 1097 default='/Users/mdipierro/web2py/applications/scheduler/databases', 1098 help="location of the dal database folder") 1099 parser.add_option( 1100 "-u", "--db_uri", dest="db_uri", 1101 default='sqlite://storage.sqlite', 1102 help="database URI string (web2py DAL syntax)") 1103 parser.add_option( 1104 "-t", "--tasks", dest="tasks", default=None, 1105 help="file containing task files, must define" + 1106 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 1107 parser.add_option( 1108 "-U", "--utc-time", dest="utc_time", default=False, 1109 help="work with UTC timestamps" 1110 ) 1111 (options, args) = parser.parse_args() 1112 if not options.tasks or not options.db_uri: 1113 print USAGE 1114 if options.tasks: 1115 path, filename = os.path.split(options.tasks) 1116 if filename.endswith('.py'): 1117 filename = filename[:-3] 1118 sys.path.append(path) 1119 print 'importing tasks...' 1120 tasks = __import__(filename, globals(), locals(), [], -1).tasks 1121 print 'tasks found: ' + ', '.join(tasks.keys()) 1122 else: 1123 tasks = {} 1124 group_names = [x.strip() for x in options.group_names.split(',')] 1125 1126 logging.getLogger().setLevel(options.logger_level) 1127 1128 print 'groups for this worker: ' + ', '.join(group_names) 1129 print 'connecting to database in folder: ' + options.db_folder or './' 1130 print 'using URI: ' + options.db_uri 1131 db = DAL(options.db_uri, folder=options.db_folder) 1132 print 'instantiating scheduler...' 1133 scheduler = Scheduler(db=db, 1134 worker_name=options.worker_name, 1135 tasks=tasks, 1136 migrate=True, 1137 group_names=group_names, 1138 heartbeat=options.heartbeat, 1139 max_empty_runs=options.max_empty_runs, 1140 utc_time=options.utc_time) 1141 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 1142 print 'starting main worker loop...' 1143 scheduler.loop()
1144 1145 if __name__ == '__main__': 1146 main() 1147