1
2
3
4 USAGE = """
5 ## Example
6
7 For any existing app
8
9 Create File: app/models/scheduler.py ======
10 from gluon.scheduler import Scheduler
11
12 def demo1(*args,**vars):
13 print 'you passed args=%s and vars=%s' % (args, vars)
14 return 'done!'
15
16 def demo2():
17 1/0
18
19 scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
20 ## run worker nodes with:
21
22 cd web2py
23 python web2py.py -K myapp
24 or
25 python gluon/scheduler.py -u sqlite://storage.sqlite \
26 -f applications/myapp/databases/ \
27 -t mytasks.py
28 (-h for info)
29 python scheduler.py -h
30
31 ## schedule jobs using
32 http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
33
34 ## monitor scheduled jobs
35 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0
36
37 ## view completed jobs
38 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
39
40 ## view workers
41 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
42
43 ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put
44 ## the following into /etc/init/web2py-scheduler.conf:
45 ## (This assumes your web2py instance is installed in <user>'s home directory,
46 ## running as <user>, with app <myapp>, on network interface eth0.)
47
48 description "web2py task scheduler"
49 start on (local-filesystems and net-device-up IFACE=eth0)
50 stop on shutdown
51 respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds.
52 exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp>
53 respawn
54
55 ## You can then start/stop/restart/check status of the daemon with:
56 sudo start web2py-scheduler
57 sudo stop web2py-scheduler
58 sudo restart web2py-scheduler
59 sudo status web2py-scheduler
60 """
61
62 import os
63 import time
64 import multiprocessing
65 import sys
66 import threading
67 import traceback
68 import signal
69 import socket
70 import datetime
71 import logging
72 import optparse
73 import types
74 import Queue
75
76 path = os.getcwd()
77
78 if 'WEB2PY_PATH' not in os.environ:
79 os.environ['WEB2PY_PATH'] = path
80
81 try:
82 from gluon.contrib.simplejson import loads, dumps
83 except:
84 from simplejson import loads, dumps
85
86 IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid())
87
88 logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
89
90 from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB
91 from gluon import IS_INT_IN_RANGE, IS_DATETIME
92 from gluon.utils import web2py_uuid
93 from gluon.storage import Storage
94
95
96 QUEUED = 'QUEUED'
97 ASSIGNED = 'ASSIGNED'
98 RUNNING = 'RUNNING'
99 COMPLETED = 'COMPLETED'
100 FAILED = 'FAILED'
101 TIMEOUT = 'TIMEOUT'
102 STOPPED = 'STOPPED'
103 ACTIVE = 'ACTIVE'
104 TERMINATE = 'TERMINATE'
105 DISABLED = 'DISABLED'
106 KILL = 'KILL'
107 PICK = 'PICK'
108 STOP_TASK = 'STOP_TASK'
109 EXPIRED = 'EXPIRED'
110 SECONDS = 1
111 HEARTBEAT = 3 * SECONDS
112 MAXHIBERNATION = 10
113 CLEAROUT = '!clear!'
114
115 CALLABLETYPES = (types.LambdaType, types.FunctionType,
116 types.BuiltinFunctionType,
117 types.MethodType, types.BuiltinMethodType)
118
119
121 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
122 logger.debug(' new task allocated: %s.%s', app, function)
123 self.app = app
124 self.function = function
125 self.timeout = timeout
126 self.args = args
127 self.vars = vars
128 self.__dict__.update(kwargs)
129
131 return '<Task: %s>' % self.function
132
133
135 - def __init__(self, status, result=None, output=None, tb=None):
136 logger.debug(' new task report: %s', status)
137 if tb:
138 logger.debug(' traceback: %s', tb)
139 else:
140 logger.debug(' result: %s', result)
141 self.status = status
142 self.result = result
143 self.output = output
144 self.tb = tb
145
147 return '<TaskReport: %s>' % self.status
148
149
151 """ test function """
152 for i in range(argv[0]):
153 print 'click', i
154 time.sleep(1)
155 return 'done'
156
157
158
159
160
161
163 newlist = []
164 for i in lst:
165 if isinstance(i, unicode):
166 i = i.encode('utf-8')
167 elif isinstance(i, list):
168 i = _decode_list(i)
169 newlist.append(i)
170 return newlist
171
172
174 newdict = {}
175 for k, v in dct.iteritems():
176 if isinstance(k, unicode):
177 k = k.encode('utf-8')
178 if isinstance(v, unicode):
179 v = v.encode('utf-8')
180 elif isinstance(v, list):
181 v = _decode_list(v)
182 newdict[k] = v
183 return newdict
184
185
187 """ the background process """
188 logger.debug(' task started')
189
190 class LogOutput(object):
191 """Facility to log output at intervals"""
192 def __init__(self, out_queue):
193 self.out_queue = out_queue
194 self.stdout = sys.stdout
195 sys.stdout = self
196
197 def __del__(self):
198 sys.stdout = self.stdout
199
200 def flush(self):
201 pass
202
203 def write(self, data):
204 self.out_queue.put(data)
205
206 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid})
207 stdout = LogOutput(out)
208 try:
209 if task.app:
210 os.chdir(os.environ['WEB2PY_PATH'])
211 from gluon.shell import env, parse_path_info
212 from gluon import current
213 level = logging.getLogger().getEffectiveLevel()
214 logging.getLogger().setLevel(logging.WARN)
215
216
217 (a, c, f) = parse_path_info(task.app)
218 _env = env(a=a, c=c, import_models=True)
219 logging.getLogger().setLevel(level)
220 f = task.function
221 functions = current._scheduler.tasks
222 if not functions:
223
224 _function = _env.get(f)
225 else:
226 _function = functions.get(f)
227 if not isinstance(_function, CALLABLETYPES):
228 raise NameError(
229 "name '%s' not found in scheduler's environment" % f)
230
231 _env.update({'W2P_TASK' : W2P_TASK})
232 globals().update(_env)
233 args = loads(task.args)
234 vars = loads(task.vars, object_hook=_decode_dict)
235 result = dumps(_function(*args, **vars))
236 else:
237
238 result = eval(task.function)(
239 *loads(task.args, object_hook=_decode_dict),
240 **loads(task.vars, object_hook=_decode_dict))
241 queue.put(TaskReport('COMPLETED', result=result))
242 except BaseException, e:
243 tb = traceback.format_exc()
244 queue.put(TaskReport('FAILED', tb=tb))
245 del stdout
246
247
397
398
399 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
400 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
401 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK)
402
403
405 """
406 validator that check whether field is valid json and validate its type
407 """
408
409 - def __init__(self, myclass=list, parse=False):
412
414 from gluon import current
415 try:
416 obj = loads(value)
417 except:
418 return (value, current.T('invalid json'))
419 else:
420 if isinstance(obj, self.myclass):
421 if self.parse:
422 return (obj, None)
423 else:
424 return (value, None)
425 else:
426 return (value, current.T('Not of type: %s') % self.myclass)
427
428
430 - def __init__(self, db, tasks=None, migrate=True,
431 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT,
432 max_empty_runs=0, discard_results=False, utc_time=False):
433
434 MetaScheduler.__init__(self)
435
436 self.db = db
437 self.db_thread = None
438 self.tasks = tasks
439 self.group_names = group_names
440 self.heartbeat = heartbeat
441 self.worker_name = worker_name or IDENTIFIER
442
443
444 self.worker_status = [RUNNING, 1]
445 self.max_empty_runs = max_empty_runs
446 self.discard_results = discard_results
447 self.is_a_ticker = False
448 self.do_assign_tasks = False
449 self.greedy = False
450 self.utc_time = utc_time
451
452 from gluon import current
453 current._scheduler = self
454
455 self.define_tables(db, migrate=migrate)
456
458 if migrate is False:
459 return False
460 elif migrate is True:
461 return True
462 elif isinstance(migrate, str):
463 return "%s%s.table" % (migrate , tablename)
464 return True
465
467 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
468
475
477 from gluon.dal import DEFAULT
478 logger.debug('defining tables (migrate=%s)', migrate)
479 now = self.now
480 db.define_table(
481 'scheduler_task',
482 Field('application_name', requires=IS_NOT_EMPTY(),
483 default=None, writable=False),
484 Field('task_name', default=None),
485 Field('group_name', default='main'),
486 Field('status', requires=IS_IN_SET(TASK_STATUS),
487 default=QUEUED, writable=False),
488 Field('function_name',
489 requires=IS_IN_SET(sorted(self.tasks.keys()))
490 if self.tasks else DEFAULT),
491 Field('uuid', length=255,
492 requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
493 unique=True, default=web2py_uuid),
494 Field('args', 'text', default='[]', requires=TYPE(list)),
495 Field('vars', 'text', default='{}', requires=TYPE(dict)),
496 Field('enabled', 'boolean', default=True),
497 Field('start_time', 'datetime', default=now,
498 requires=IS_DATETIME()),
499 Field('next_run_time', 'datetime', default=now),
500 Field('stop_time', 'datetime'),
501 Field('repeats', 'integer', default=1, comment="0=unlimited",
502 requires=IS_INT_IN_RANGE(0, None)),
503 Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
504 requires=IS_INT_IN_RANGE(-1, None)),
505 Field('period', 'integer', default=60, comment='seconds',
506 requires=IS_INT_IN_RANGE(0, None)),
507 Field('timeout', 'integer', default=60, comment='seconds',
508 requires=IS_INT_IN_RANGE(0, None)),
509 Field('sync_output', 'integer', default=0,
510 comment="update output every n sec: 0=never",
511 requires=IS_INT_IN_RANGE(0, None)),
512 Field('times_run', 'integer', default=0, writable=False),
513 Field('times_failed', 'integer', default=0, writable=False),
514 Field('last_run_time', 'datetime', writable=False, readable=False),
515 Field('assigned_worker_name', default='', writable=False),
516 on_define=self.set_requirements,
517 migrate=self.__get_migrate('scheduler_task', migrate),
518 format='%(task_name)s')
519
520 db.define_table(
521 'scheduler_run',
522 Field('task_id', 'reference scheduler_task'),
523 Field('status', requires=IS_IN_SET(RUN_STATUS)),
524 Field('start_time', 'datetime'),
525 Field('stop_time', 'datetime'),
526 Field('run_output', 'text'),
527 Field('run_result', 'text'),
528 Field('traceback', 'text'),
529 Field('worker_name', default=self.worker_name),
530 migrate=self.__get_migrate('scheduler_run', migrate)
531 )
532
533 db.define_table(
534 'scheduler_worker',
535 Field('worker_name', length=255, unique=True),
536 Field('first_heartbeat', 'datetime'),
537 Field('last_heartbeat', 'datetime'),
538 Field('status', requires=IS_IN_SET(WORKER_STATUS)),
539 Field('is_ticker', 'boolean', default=False, writable=False),
540 Field('group_names', 'list:string', default=self.group_names),
541 migrate=self.__get_migrate('scheduler_worker', migrate)
542 )
543
544 if migrate is not False:
545 db.commit()
546
547 - def loop(self, worker_name=None):
548 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
549 try:
550 self.start_heartbeats()
551 while True and self.have_heartbeat:
552 if self.worker_status[0] == DISABLED:
553 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1])
554 self.sleep()
555 continue
556 logger.debug('looping...')
557 task = self.wrapped_pop_task()
558 if task:
559 self.empty_runs = 0
560 self.worker_status[0] = RUNNING
561 self.report_task(task, self.async(task))
562 self.worker_status[0] = ACTIVE
563 else:
564 self.empty_runs += 1
565 logger.debug('sleeping...')
566 if self.max_empty_runs != 0:
567 logger.debug('empty runs %s/%s',
568 self.empty_runs, self.max_empty_runs)
569 if self.empty_runs >= self.max_empty_runs:
570 logger.info(
571 'empty runs limit reached, killing myself')
572 self.die()
573 self.sleep()
574 except (KeyboardInterrupt, SystemExit):
575 logger.info('catched')
576 self.die()
577
593
608
610 now = self.now()
611 st = self.db.scheduler_task
612 if self.is_a_ticker and self.do_assign_tasks:
613
614
615 self.wrapped_assign_tasks(db)
616 return None
617
618 grabbed = db(st.assigned_worker_name == self.worker_name)(
619 st.status == ASSIGNED)
620
621 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
622 if task:
623 task.update_record(status=RUNNING, last_run_time=now)
624
625 db.commit()
626 logger.debug(' work to do %s', task.id)
627 else:
628 if self.greedy and self.is_a_ticker:
629
630 logger.info('TICKER: greedy loop')
631 self.wrapped_assign_tasks(db)
632 else:
633 logger.info('nothing to do')
634 return None
635 next_run_time = task.last_run_time + datetime.timedelta(
636 seconds=task.period)
637 times_run = task.times_run + 1
638 if times_run < task.repeats or task.repeats == 0:
639
640 run_again = True
641 else:
642
643 run_again = False
644 run_id = 0
645 while True and not self.discard_results:
646 logger.debug(' new scheduler_run record')
647 try:
648 run_id = db.scheduler_run.insert(
649 task_id=task.id,
650 status=RUNNING,
651 start_time=now,
652 worker_name=self.worker_name)
653 db.commit()
654 break
655 except:
656 time.sleep(0.5)
657 db.rollback()
658 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
659 return Task(
660 app=task.application_name,
661 function=task.function_name,
662 timeout=task.timeout,
663 args=task.args,
664 vars=task.vars,
665 task_id=task.id,
666 run_id=run_id,
667 run_again=run_again,
668 next_run_time=next_run_time,
669 times_run=times_run,
670 stop_time=task.stop_time,
671 retry_failed=task.retry_failed,
672 times_failed=task.times_failed,
673 sync_output=task.sync_output,
674 uuid=task.uuid)
675
677 db = self.db
678 now = self.now()
679 while True:
680 try:
681 if not self.discard_results:
682 if task_report.result != 'null' or task_report.tb:
683
684
685
686 logger.debug(' recording task report in db (%s)',
687 task_report.status)
688 db(db.scheduler_run.id == task.run_id).update(
689 status=task_report.status,
690 stop_time=now,
691 run_result=task_report.result,
692 run_output=task_report.output,
693 traceback=task_report.tb)
694 else:
695 logger.debug(' deleting task report in db because of no result')
696 db(db.scheduler_run.id == task.run_id).delete()
697
698 is_expired = (task.stop_time
699 and task.next_run_time > task.stop_time
700 and True or False)
701 status = (task.run_again and is_expired and EXPIRED
702 or task.run_again and not is_expired
703 and QUEUED or COMPLETED)
704 if task_report.status == COMPLETED:
705 d = dict(status=status,
706 next_run_time=task.next_run_time,
707 times_run=task.times_run,
708 times_failed=0
709 )
710 db(db.scheduler_task.id == task.task_id)(
711 db.scheduler_task.status == RUNNING).update(**d)
712 else:
713 st_mapping = {'FAILED': 'FAILED',
714 'TIMEOUT': 'TIMEOUT',
715 'STOPPED': 'QUEUED'}[task_report.status]
716 status = (task.retry_failed
717 and task.times_failed < task.retry_failed
718 and QUEUED or task.retry_failed == -1
719 and QUEUED or st_mapping)
720 db(
721 (db.scheduler_task.id == task.task_id) &
722 (db.scheduler_task.status == RUNNING)
723 ).update(
724 times_failed=db.scheduler_task.times_failed + 1,
725 next_run_time=task.next_run_time,
726 status=status
727 )
728 db.commit()
729 logger.info('task completed (%s)', task_report.status)
730 break
731 except:
732 db.rollback()
733 time.sleep(0.5)
734
736 if self.worker_status[0] == DISABLED:
737 wk_st = self.worker_status[1]
738 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION
739 self.worker_status[1] = hibernation
740
742 if not self.db_thread:
743 logger.debug('thread building own DAL object')
744 self.db_thread = DAL(
745 self.db._uri, folder=self.db._adapter.folder)
746 self.define_tables(self.db_thread, migrate=False)
747 try:
748 db = self.db_thread
749 sw, st = db.scheduler_worker, db.scheduler_task
750 now = self.now()
751
752 mybackedstatus = db(sw.worker_name == self.worker_name).select().first()
753 if not mybackedstatus:
754 sw.insert(status=ACTIVE, worker_name=self.worker_name,
755 first_heartbeat=now, last_heartbeat=now,
756 group_names=self.group_names)
757 self.worker_status = [ACTIVE, 1]
758 mybackedstatus = ACTIVE
759 else:
760 mybackedstatus = mybackedstatus.status
761 if mybackedstatus == DISABLED:
762
763 self.worker_status[0] = DISABLED
764 if self.worker_status[1] == MAXHIBERNATION:
765 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
766 db(sw.worker_name == self.worker_name).update(
767 last_heartbeat=now)
768 elif mybackedstatus == TERMINATE:
769 self.worker_status[0] = TERMINATE
770 logger.debug("Waiting to terminate the current task")
771 self.give_up()
772 return
773 elif mybackedstatus == KILL:
774 self.worker_status[0] = KILL
775 self.die()
776 else:
777 if mybackedstatus == STOP_TASK:
778 logger.info('Asked to kill the current task')
779 self.terminate_process()
780 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
781 db(sw.worker_name == self.worker_name).update(
782 last_heartbeat=now, status=ACTIVE)
783 self.worker_status[1] = 1
784 if self.worker_status[0] != RUNNING:
785 self.worker_status[0] = ACTIVE
786
787 self.do_assign_tasks = False
788 if counter % 5 == 0 or mybackedstatus == PICK:
789 try:
790
791 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3)
792 departure = now - datetime.timedelta(
793 seconds=self.heartbeat * 3 * MAXHIBERNATION)
794 logger.debug(
795 ' freeing workers that have not sent heartbeat')
796 inactive_workers = db(
797 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) |
798 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE))
799 )
800 db(st.assigned_worker_name.belongs(
801 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\
802 .update(assigned_worker_name='', status=QUEUED)
803 inactive_workers.delete()
804 try:
805 self.is_a_ticker = self.being_a_ticker()
806 except:
807 logger.error('Error coordinating TICKER')
808 if self.worker_status[0] == ACTIVE:
809 self.do_assign_tasks = True
810 except:
811 logger.error('Error cleaning up')
812 db.commit()
813 except:
814 logger.error('Error retrieving status')
815 db.rollback()
816 self.adj_hibernation()
817 self.sleep()
818
820 db = self.db_thread
821 sw = db.scheduler_worker
822 all_active = db(
823 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE)
824 ).select()
825 ticker = all_active.find(lambda row: row.is_ticker is True).first()
826 not_busy = self.worker_status[0] == ACTIVE
827 if not ticker:
828
829 if not_busy:
830
831 db(sw.worker_name == self.worker_name).update(is_ticker=True)
832 db(sw.worker_name != self.worker_name).update(is_ticker=False)
833 logger.info("TICKER: I'm a ticker")
834 else:
835
836 if len(all_active) >= 1:
837
838 db(sw.worker_name == self.worker_name).update(is_ticker=False)
839 else:
840 not_busy = True
841 db.commit()
842 return not_busy
843 else:
844 logger.info(
845 "%s is a ticker, I'm a poor worker" % ticker.worker_name)
846 return False
847
849 sw, st = db.scheduler_worker, db.scheduler_task
850 now = self.now()
851 all_workers = db(sw.status == ACTIVE).select()
852
853 wkgroups = {}
854 for w in all_workers:
855 group_names = w.group_names
856 for gname in group_names:
857 if gname not in wkgroups:
858 wkgroups[gname] = dict(
859 workers=[{'name': w.worker_name, 'c': 0}])
860 else:
861 wkgroups[gname]['workers'].append(
862 {'name': w.worker_name, 'c': 0})
863
864
865 db(st.status.belongs(
866 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
867
868 all_available = db(
869 (st.status.belongs((QUEUED, ASSIGNED))) &
870 ((st.times_run < st.repeats) | (st.repeats == 0)) &
871 (st.start_time <= now) &
872 ((st.stop_time == None) | (st.stop_time > now)) &
873 (st.next_run_time <= now) &
874 (st.enabled == True)
875 )
876 limit = len(all_workers) * (50 / (len(wkgroups) or 1))
877
878
879
880
881
882
883
884
885
886
887
888
889 db.commit()
890 x = 0
891 for group in wkgroups.keys():
892 tasks = all_available(st.group_name == group).select(
893 limitby=(0, limit), orderby = st.next_run_time)
894
895 for task in tasks:
896 x += 1
897 gname = task.group_name
898 ws = wkgroups.get(gname)
899 if ws:
900 counter = 0
901 myw = 0
902 for i, w in enumerate(ws['workers']):
903 if w['c'] < counter:
904 myw = i
905 counter = w['c']
906 d = dict(
907 status=ASSIGNED,
908 assigned_worker_name=wkgroups[gname]['workers'][myw]['name']
909 )
910 if not task.task_name:
911 d['task_name'] = task.function_name
912 task.update_record(**d)
913 wkgroups[gname]['workers'][myw]['c'] += 1
914
915 db.commit()
916
917 if x > 0:
918 self.empty_runs = 0
919
920
921 self.greedy = x >= limit and True or False
922 logger.info('TICKER: workers are %s', len(all_workers))
923 logger.info('TICKER: tasks are %s', x)
924
926 time.sleep(self.heartbeat * self.worker_status[1])
927
928
930 if not group_names:
931 group_names = self.group_names
932 elif isinstance(group_names, str):
933 group_names = [group_names]
934 for group in group_names:
935 self.db(
936 self.db.scheduler_worker.group_names.contains(group)
937 ).update(status=action)
938
939 - def disable(self, group_names=None):
941
942 - def resume(self, group_names=None):
944
947
948 - def kill(self, group_names=None):
950
951 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
952 """
953 Queue tasks. This takes care of handling the validation of all
954 values.
955 :param function: the function (anything callable with a __name__)
956 :param pargs: "raw" args to be passed to the function. Automatically
957 jsonified.
958 :param pvars: "raw" kwargs to be passed to the function. Automatically
959 jsonified
960 :param kwargs: all the scheduler_task columns. args and vars here should be
961 in json format already, they will override pargs and pvars
962
963 returns a dict just as a normal validate_and_insert, plus a uuid key holding
964 the uuid of the queued task. If validation is not passed, both id and uuid
965 will be None, and you'll get an "error" dict holding the errors found.
966 """
967 if hasattr(function, '__name__'):
968 function = function.__name__
969 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
970 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
971 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
972 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
973 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
974 rtn = self.db.scheduler_task.validate_and_insert(
975 function_name=function,
976 task_name=tname,
977 args=targs,
978 vars=tvars,
979 uuid=tuuid,
980 **kwargs)
981 if not rtn.errors:
982 rtn.uuid = tuuid
983 if immediate:
984 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK)
985 else:
986 rtn.uuid = None
987 return rtn
988
990 """
991 Shortcut for task status retrieval
992
993 :param ref: can be
994 - integer --> lookup will be done by scheduler_task.id
995 - string --> lookup will be done by scheduler_task.uuid
996 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1')
997 :param output: fetch also the scheduler_run record
998
999 Returns a single Row object, for the last queued task
1000 If output == True, returns also the last scheduler_run record
1001 scheduler_run record is fetched by a left join, so it can
1002 have all fields == None
1003
1004 """
1005 from gluon.dal import Query
1006 sr, st = self.db.scheduler_run, self.db.scheduler_task
1007 if isinstance(ref, int):
1008 q = st.id == ref
1009 elif isinstance(ref, str):
1010 q = st.uuid == ref
1011 elif isinstance(ref, Query):
1012 q = ref
1013 else:
1014 raise SyntaxError(
1015 "You can retrieve results only by id, uuid or Query")
1016 fields = [st.ALL]
1017 left = False
1018 orderby = ~st.id
1019 if output:
1020 fields = st.ALL, sr.ALL
1021 left = sr.on(sr.task_id == st.id)
1022 orderby = ~st.id | ~sr.id
1023 row = self.db(q).select(
1024 *fields,
1025 **dict(orderby=orderby,
1026 left=left,
1027 limitby=(0, 1))
1028 ).first()
1029 if row and output:
1030 row.result = row.scheduler_run.run_result and \
1031 loads(row.scheduler_run.run_result,
1032 object_hook=_decode_dict) or None
1033 return row
1034
1036 """
1037 Experimental!!!
1038 Shortcut for task termination.
1039 If the task is RUNNING it will terminate it --> execution will be set as FAILED
1040 If the task is QUEUED, its stop_time will be set as to "now",
1041 the enabled flag will be set to False, status to STOPPED
1042
1043 :param ref: can be
1044 - integer --> lookup will be done by scheduler_task.id
1045 - string --> lookup will be done by scheduler_task.uuid
1046 Returns:
1047 - 1 if task was stopped (meaning an update has been done)
1048 - None if task was not found, or if task was not RUNNING or QUEUED
1049 """
1050 from gluon.dal import Query
1051 st, sw = self.db.scheduler_task, self.db.scheduler_worker
1052 if isinstance(ref, int):
1053 q = st.id == ref
1054 elif isinstance(ref, str):
1055 q = st.uuid == ref
1056 else:
1057 raise SyntaxError(
1058 "You can retrieve results only by id or uuid")
1059 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first()
1060 rtn = None
1061 if not task:
1062 return rtn
1063 if task.status == 'RUNNING':
1064 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK)
1065 elif task.status == 'QUEUED':
1066 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED)
1067 return rtn
1068
1069
1071 """
1072 allows to run worker without python web2py.py .... by simply python this.py
1073 """
1074 parser = optparse.OptionParser()
1075 parser.add_option(
1076 "-w", "--worker_name", dest="worker_name", default=None,
1077 help="start a worker with name")
1078 parser.add_option(
1079 "-b", "--heartbeat", dest="heartbeat", default=10,
1080 type='int', help="heartbeat time in seconds (default 10)")
1081 parser.add_option(
1082 "-L", "--logger_level", dest="logger_level",
1083 default=30,
1084 type='int',
1085 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)")
1086 parser.add_option("-E", "--empty-runs",
1087 dest="max_empty_runs",
1088 type='int',
1089 default=0,
1090 help="max loops with no grabbed tasks permitted (0 for never check)")
1091 parser.add_option(
1092 "-g", "--group_names", dest="group_names",
1093 default='main',
1094 help="comma separated list of groups to be picked by the worker")
1095 parser.add_option(
1096 "-f", "--db_folder", dest="db_folder",
1097 default='/Users/mdipierro/web2py/applications/scheduler/databases',
1098 help="location of the dal database folder")
1099 parser.add_option(
1100 "-u", "--db_uri", dest="db_uri",
1101 default='sqlite://storage.sqlite',
1102 help="database URI string (web2py DAL syntax)")
1103 parser.add_option(
1104 "-t", "--tasks", dest="tasks", default=None,
1105 help="file containing task files, must define" +
1106 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
1107 parser.add_option(
1108 "-U", "--utc-time", dest="utc_time", default=False,
1109 help="work with UTC timestamps"
1110 )
1111 (options, args) = parser.parse_args()
1112 if not options.tasks or not options.db_uri:
1113 print USAGE
1114 if options.tasks:
1115 path, filename = os.path.split(options.tasks)
1116 if filename.endswith('.py'):
1117 filename = filename[:-3]
1118 sys.path.append(path)
1119 print 'importing tasks...'
1120 tasks = __import__(filename, globals(), locals(), [], -1).tasks
1121 print 'tasks found: ' + ', '.join(tasks.keys())
1122 else:
1123 tasks = {}
1124 group_names = [x.strip() for x in options.group_names.split(',')]
1125
1126 logging.getLogger().setLevel(options.logger_level)
1127
1128 print 'groups for this worker: ' + ', '.join(group_names)
1129 print 'connecting to database in folder: ' + options.db_folder or './'
1130 print 'using URI: ' + options.db_uri
1131 db = DAL(options.db_uri, folder=options.db_folder)
1132 print 'instantiating scheduler...'
1133 scheduler = Scheduler(db=db,
1134 worker_name=options.worker_name,
1135 tasks=tasks,
1136 migrate=True,
1137 group_names=group_names,
1138 heartbeat=options.heartbeat,
1139 max_empty_runs=options.max_empty_runs,
1140 utc_time=options.utc_time)
1141 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
1142 print 'starting main worker loop...'
1143 scheduler.loop()
1144
1145 if __name__ == '__main__':
1146 main()
1147