"+b+"",d);return d.appendChild(c),this._controls[b]={container:d,label:e},this},addFileChooser:function(d,e,f,g){var h=this._createContainer(),i=a(""+d+"",h),j=b("file",d,"qs_file_chooser",h);f&&(j.accept=f);var k=c("label",null,"qs_file_chooser_label",h);k.setAttribute("for",d),k.textContent=e||"Choose a file...",this._controls[d]={container:h,control:j,label:i,getValue:function(){return this.control.files[0]}};var l=this;return j.addEventListener("change",function(){j.files&&j.files.length&&(k.textContent=j.files[0].name,g&&g(j.files[0]),l._callGCH(d))}),this},addHTML:function(b,d){var e=this._createContainer(),f=a(""+b+": ",e),g=c("div",null,null,e);return g.innerHTML=d,this._controls[b]={container:e,label:f,control:g,getValue:function(){return this.control.innerHTML},setValue:function(a){this.control.innerHTML=a}},this},addImage:function(b,d,e){var f=this._createContainer(),g=a(""+b+"",f);return img=c("img",null,"qs_image",f),img.src=d,this._controls[b]={container:f,control:img,label:g,getValue:function(){return this.control.src},setValue:function(a){this.control.src=a,e&&img.addEventListener("load",function b(){img.removeEventListener("load",b),e(a)})}},this},addRange:function(a,b,c,d,e,f){return this._addNumber("range",a,b,c,d,e,f)},addNumber:function(a,b,c,d,e,f){return this._addNumber("number",a,b,c,d,e,f)},_addNumber:function(c,e,f,g,h,i,j){var k=this._createContainer(),l=a("",k),m="range"===c?"qs_range":"qs_text_input qs_number",n=b(c,e,m,k);n.min=f||0,n.max=g||100,n.step=i||1,n.value=h||0,l.innerHTML=""+e+": "+n.value,this._controls[e]={container:k,control:n,label:l,title:e,callback:j,getValue:function(){return parseFloat(this.control.value)},setValue:function(a){this.control.value=a,this.label.innerHTML=""+this.title+": "+this.control.value,j&&j(parseFloat(a))}};var o="input";"range"===c&&d()&&(o="change");var p=this;return n.addEventListener(o,function(){l.innerHTML=""+e+": "+n.value,j&&j(parseFloat(n.value)),p._callGCH(e)}),this},bindRange:function(a,b,c,d,e,f){return this.addRange(a,b,c,d,e,function(b){f[a]=b})},bindNumber:function(a,b,c,d,e,f){return this.addNumber(a,b,c,d,e,function(b){f[a]=b})},setRangeParameters:function(a,b,c,d){return this.setNumberParameters(a,b,c,d)},setNumberParameters:function(a,b,c,d){var e=this._controls[a],f=e.control.value;return e.control.min=b,e.control.max=c,e.control.step=d,e.control.value!==f&&e.callback&&e.callback(e.control.value),this},addPassword:function(a,b,c){return this._addText("password",a,b,c)},bindPassword:function(a,b,c){return this.addPassword(a,b,function(b){c[a]=b})},addProgressBar:function(b,d,e,f){var g=this._createContainer(),h=a("",g),i=c("div",null,"qs_progress",g),j=c("div",null,"qs_progress_value",i);return j.style.width=e/d*100+"%","numbers"===f?h.innerHTML=""+b+": "+e+" / "+d:"percent"===f?h.innerHTML=""+b+": "+Math.round(e/d*100)+"%":h.innerHTML=""+b+"",this._controls[b]={container:g,control:i,valueDiv:j,valueDisplay:f,label:h,value:e,max:d,title:b,getValue:function(){return this.value},setValue:function(a){this.value=Math.max(0,Math.min(a,this.max)),this.valueDiv.style.width=this.value/this.max*100+"%","numbers"===this.valueDisplay?this.label.innerHTML=""+this.title+": "+this.value+" / "+this.max:"percent"===this.valueDisplay&&(this.label.innerHTML=""+this.title+": "+Math.round(this.value/this.max*100)+"%")}},this},setProgressMax:function(a,b){var c=this._controls[a];return c.max=b,c.value=Math.min(c.value,c.max),c.valueDiv.style.width=c.value/c.max*100+"%","numbers"===c.valueDisplay?c.label.innerHTML=""+c.title+": "+c.value+" / "+c.max:"percent"===c.valueDisplay?c.label.innerHTML=""+c.title+": "+Math.round(c.value/c.max*100)+"%":c.label.innerHTML=""+c.title+"",this},addText:function(a,b,c){return this._addText("text",a,b,c)},_addText:function(d,e,f,g){var h,i=this._createContainer(),j=a(""+e+"",i);"textarea"===d?(h=c("textarea",e,"qs_textarea",i),h.rows=5):h=b(d,e,"qs_text_input",i),h.value=f||"",this._controls[e]={container:i,control:h,label:j,getValue:function(){return this.control.value},setValue:function(a){this.control.value=a,g&&g(a)}};var k=this;return h.addEventListener("input",function(){g&&g(h.value),k._callGCH(e)}),this},bindText:function(a,b,c){return this.addText(a,b,function(b){c[a]=b})},addTextArea:function(a,b,c){return this._addText("textarea",a,b,c)},setTextAreaRows:function(a,b){return this._controls[a].control.rows=b,this},bindTextArea:function(a,b,c){return this.addTextArea(a,b,function(b){c[a]=b})},addTime:function(c,e,f){var g;if(e instanceof Date){var h=e.getHours();h<10&&(h="0"+h);var i=e.getMinutes();i<10&&(i="0"+i);var j=e.getSeconds();j<10&&(j="0"+j),g=h+":"+i+":"+j}else g=e;if(d())return this.addText(c,g,f);var k=this._createContainer(),l=a(""+c+"",k),m=b("time",c,"qs_text_input",k);m.value=g||"",this._controls[c]={container:k,control:m,label:l,getValue:function(){return this.control.value},setValue:function(a){var b;if(a instanceof Date){var c=a.getHours();c<10&&(c="0"+c);var d=a.getMinutes();d<10&&(d="0"+d);var e=a.getSeconds();e<10&&(e="0"+e),b=c+":"+d+":"+e}else b=a;this.control.value=b||"",f&&f(b)}};var n=this;return m.addEventListener("input",function(){f&&f(m.value),n._callGCH(c)}),this},bindTime:function(a,b,c){return this.addTime(a,b,function(b){c[a]=b})}};"object"==typeof exports&&"object"==typeof module?module.exports=j:"function"==typeof define&&define.amd?define(j):window.QuickSettings=j}();
\ No newline at end of file
diff --git a/mrq/dashboard/static/js/views/jobs.js b/mrq/dashboard/static/js/views/jobs.js
index db96de54..f6371c59 100644
--- a/mrq/dashboard/static/js/views/jobs.js
+++ b/mrq/dashboard/static/js/views/jobs.js
@@ -105,6 +105,13 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
var action = $(evt.target).data("action");
var data = _.clone(this.filters);
+ if (action == "move"){
+ if (destination_queue == null || destination_queue == "") {
+ return ;
+ }
+ data["destination_queue"] = prompt("Enter destination queue");
+ }
+
data["action"] = action;
self.jobaction(evt, data);
@@ -223,6 +230,18 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
});
self.refreshCallStack(job_id);
+ } else if (action == "move") {
+
+ var queue = prompt("Enter destination queue");
+ if (queue != null && queue != "")
+ {
+ self.jobaction(evt, {
+ "id": job_id,
+ "action": action,
+ "destination_queue": queue
+ });
+ }
+
} else {
self.jobaction(evt, {
@@ -343,6 +362,10 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
display.push("cputime "+String(source.time).substring(0,6)+"s ("+source.switches+" switches)");
}
+ if (source.retry_count) {
+ display.push("retried " + String(source.retry_count) + " times");
+ }
+
return "" + display.join("
") + "";
} else {
@@ -392,6 +415,7 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
"
"+
""+
""+
+ ""+
"";
}
return "";
diff --git a/mrq/dashboard/static/js/views/workergroups.js b/mrq/dashboard/static/js/views/workergroups.js
index d9d96718..ad820d61 100644
--- a/mrq/dashboard/static/js/views/workergroups.js
+++ b/mrq/dashboard/static/js/views/workergroups.js
@@ -1,4 +1,4 @@
-define(["jquery", "underscore", "models", "views/generic/page"],function($, _, Models, Page) {
+define(["jquery", "underscore", "models", "views/generic/page", "quicksettings"],function($, _, Models, Page, QuickSettings) {
return Page.extend({
@@ -10,28 +10,161 @@ define(["jquery", "underscore", "models", "views/generic/page"],function($, _, M
"click .submit": "submit"
},
- render: function() {
- var self = this;
- $.get("/api/workergroups").done(function(data) {
- self.renderTemplate();
- self.$("textarea").val(JSON.stringify(data["workergroups"], null, 8));
- });
+ addCommandPanel: function() {
+ var _this = this;
+
+ if (typeof this.commandPanel === "undefined")
+ {
+ this.commandPanel = QuickSettings.create(25, 80, "Actions", $(this.el)[0])
+ .addButton("Add a Worker Group", function() {
+ _this.addPanel();
+ })
+ .addButton("Save", function() {
+ _this.save();
+ })
+ .addButton("Reload", function() {
+ _this.reload();
+ })
+ .addHTML("Status", "OK")
+ .setDraggable(false)
+ .setWidth(150);
+ }
},
- submit: function(el) {
- var self = this;
+ addPanel: function(workergroup = null, workergroupName = "") {
+ var _this = this;
+ var panelIndex = this.workergroupPanels.length;
+ var workerPanel = QuickSettings.create(225 + this.workergroupPanels.length * 350, 80, "Worker group configuration", $(this.el)[0])
+ .addText("Workgroup Name", workergroupName)
+ .hideTitle("Workgroup Name")
+ .addButton("Remove this Worker Group", function() {
+ _this.workergroupPanels[panelIndex].destroy();
+ try {
+ delete _this.workergroupPanels[panelIndex];
+ }
+ catch (e) {}
+ _this.workergroupPanels[panelIndex] = null;
+ })
+ .addText("Process Termination Timeout", workergroup ? workergroup["process_termination_timeout"] : 0)
+ .addButton("Add a Profile", function() {
+ _this.addProfileToPanel(_this.workergroupPanels[panelIndex]);
+ })
+ .setDraggable(false)
+ .setHeight(850)
+ .setWidth(300);
+ workerPanel.profilesNumber = 0;
+ this.workergroupPanels.push(workerPanel);
+ if (workergroup != null)
+ {
+ this.serials[workergroupName] = workergroup["serial"]
+ _.forEach(workergroup["profiles"], function(profile, profileName) {
+ this.addProfileToPanel(workerPanel, profile, profileName);
+ }, this);
+ }
+ },
+
+ addProfileToPanel: function(workerPanel, profile = null, profileName = "") {
+ workerPanel.profilesNumber += 1;
+ header = "Profile " + String(workerPanel.profilesNumber) + " - ";
+ workerPanel.addHTML("separator", "
")
+ .hideTitle("separator")
+ .addText(header + "Profile Name", profileName)
+ .addText(header + "Memory", profile ? profile["memory"]: 0)
+ .addText(header + "CPU", profile ? profile["cpu"] : 0)
+ .addRange(header + "MinCount", 0, 100, profile ? profile["min_count"] : 0, 1)
+ .addRange(header + "MaxCount", 0, 100, profile ? profile["max_count"] : 0, 1)
+ .addTextArea(header + "Command", profile ? profile["command"]: "")
+ .addButton("Remove this Profile");
+ },
+
+ remove_profile: function() {
+ },
+
+ reload: function(force = false) {
+ if (force || confirm('It will discard every changes that hasn\'t be saved. Are you sure?')) {
+ _.forEach(this.workergroupPanels, function(panel) {
+ if (panel !== null && panel !== undefined)
+ panel.destroy();
+ delete panel;
+ })
+ this.render();
+ }
+ },
+
+
+ // Check for "continue" usage instead of nested ifs
+ save: function() {
+ var _this = this;
+ this.commandPanel._controls["Status"].setValue("Saving...");
- self.$("button")[0].innerHTML = "Wait...";
+ data = {};
+ _.forEach(this.workergroupPanels, function(panel) {
+ // Equivalent of python's "not in"
+ if ($.inArray(panel, [null, undefined]) == -1)
+ {
+ panelJSON = panel.getValuesAsJSON();
+ if ($.inArray(panelJSON["Workgroup Name"], [null, ""]) == -1)
+ {
+ workergroup = {
+ "profiles" : {},
+ "process_termination_timeout": parseInt(panelJSON["Process Termination Timeout"], 10)
+ }
- var val = self.$("textarea").val();
+ if (panelJSON["Workgroup Name"] in _this.serials)
+ workergroup["serial"] = _this.serials[panelJSON["Workgroup Name"]];
- $.post("/api/workergroups", {"workergroups": val}).done(function(data) {
- if (data.status != "ok") {
+ _.forEach(_.range(1, panel.profilesNumber + 1), function(index) {
+ header = "Profile " + String(index) + " - ";
+ if ($.inArray(panelJSON[header + "Profile Name"], [null, ""]) == -1)
+ {
+ profile = {};
+ profile["memory"] = parseInt(panelJSON[header + "Memory"], 10);
+ profile["cpu"] = parseInt(panelJSON[header + "CPU"], 10);
+ profile["min_count"] = parseInt(panelJSON[header + "MinCount"], 10);
+ profile["max_count"] = parseInt(panelJSON[header + "MaxCount"], 10);
+ profile["command"] = panelJSON[header + "Command"];
+ workergroup["profiles"][panelJSON[header + "Profile Name"]] = profile;
+ }
+ })
+ data[panelJSON["Workgroup Name"]] = workergroup;
+ }
+ }
+ })
+
+ $.post("/api/workergroups", {"workergroups": JSON.stringify(data)}).done(function(result) {
+ if (result.status === "ok")
+ {
+ _this.commandPanel._controls["Status"].setValue("Saved");
+ _this.reload(true);
+ }
+ else if (result.status === "outdated")
+ {
+ string = "";
+ _.forEach(result.outdated_wgcs, function(wgc) {
+ string += "- " + wgc + "
";
+ })
+ _this.commandPanel._controls["Status"].setValue("These configurations were outdated and were not saved:
" + string + "
The others were saved.");
+ }
+ else
+ {
+ _this.commandPanel._controls["Status"].setValue("FAILED");
return alert("There was an error while saving!");
}
- self.$("button")[0].innerHTML = "Save";
+ });
+ },
+
+ render: function() {
+ var _this = this;
+
+ this.workergroupPanels = [];
+ this.addCommandPanel();
+ this.serials = {};
+
+ $.get("/api/workergroups").done(function(data) {
+ _.forEach(data["workergroups"], function(workergroup, workergroupName) {
+ _this.addPanel(workergroup, workergroupName);
+ });
});
}
});
-
});
diff --git a/mrq/dashboard/templates/index.html b/mrq/dashboard/templates/index.html
index 4ae9c0bc..717e8ba9 100644
--- a/mrq/dashboard/templates/index.html
+++ b/mrq/dashboard/templates/index.html
@@ -258,10 +258,6 @@ Current I/O operations
Worker groups
@@ -394,6 +390,8 @@
+
+
diff --git a/mrq/job.py b/mrq/job.py
index c25d6da7..ece30f22 100644
--- a/mrq/job.py
+++ b/mrq/job.py
@@ -249,6 +249,14 @@ def abort(self):
self._attach_original_exception(exc)
raise exc
+ def kill(self):
+ """ Kill the current job """
+ context.connections.redis.rpush("{}:wcmd:{}".format(context.get_current_config()["redis_prefix"],
+ context.get_current_worker()),
+ "kill {}".format(self.id))
+ self._save_status("killed")
+ pass
+
def cancel(self):
""" Markes the current job as cancelled. Doesn't interrupt it. """
self._save_status("cancel")
@@ -321,15 +329,15 @@ def perform(self):
# pylint: disable=protected-access
gevent.sleep(0)
- current_greenlet = gevent.getcurrent()
+ self.current_greenlet = gevent.getcurrent()
t = (datetime.datetime.utcnow() - self.datestarted).total_seconds()
context.log.debug(
"Job %s success: %0.6fs total, %0.6fs in greenlet, %s switches" %
(self.id,
t,
- current_greenlet._trace_time,
- current_greenlet._trace_switches - 1)
+ self.current_greenlet._trace_time,
+ self.current_greenlet._trace_switches - 1)
)
else:
@@ -454,13 +462,13 @@ def _save_status(self, status, updates=None, exception=False, w=None, j=None):
db_updates["totaltime"] = (now - self.datestarted).total_seconds()
if context.get_current_config().get("trace_greenlets"):
- current_greenlet = gevent.getcurrent()
+ self.current_greenlet = gevent.getcurrent()
# TODO are we sure the current job is doing the save_status() on itself?
- if hasattr(current_greenlet, "_trace_time"):
+ if hasattr(self.current_greenlet, "_trace_time"):
# pylint: disable=protected-access
- db_updates["time"] = current_greenlet._trace_time
- db_updates["switches"] = current_greenlet._trace_switches
+ db_updates["time"] = self.current_greenlet._trace_time
+ db_updates["switches"] = self.current_greenlet._trace_switches
if exception:
trace = traceback.format_exc()
diff --git a/mrq/worker.py b/mrq/worker.py
index 26922388..e67bee4e 100644
--- a/mrq/worker.py
+++ b/mrq/worker.py
@@ -328,6 +328,18 @@ def report_worker(self, w=0):
except Exception as e: # pylint: disable=broad-except
self.log.debug("Worker report failed: %s" % e)
+ def greenlet_command_handler(self):
+ """
+ This greenlet is used to execute commands directly in the worker
+ """
+ while True:
+ command = self.redis.blpop("{}:wcmd:{}".format(get_current_config()["redis_prefix"],
+ self.id)
+ ).split(" ")
+ if command[0] == "kill":
+ Job(command[1]).current_greenlet.kill(block=True)
+ pass
+
def greenlet_admin(self):
""" This greenlet is used to get status information about the worker
when --admin_port was given
@@ -436,6 +448,9 @@ def work_init(self):
if self.config["admin_port"]:
self.greenlets["admin"] = gevent.spawn(self.greenlet_admin)
+ # ehould add a condition using the config
+ self.greenlets["command_handler"] = gevent.spawn(self.greenlet_command_handler)
+
self.install_signal_handlers()
def work_loop(self, max_jobs=None, max_time=None):
diff --git a/tests/tasks/general.py b/tests/tasks/general.py
index 067d72a2..86a912d7 100644
--- a/tests/tasks/general.py
+++ b/tests/tasks/general.py
@@ -83,6 +83,12 @@ def run(self, params):
pass
+class Sleep(Task):
+ def run(self, params):
+ time.sleep(params.get("sleep", 100))
+ return True
+
+
class Retry(Task):
def run(self, params):
diff --git a/tests/test_kill.py b/tests/test_kill.py
new file mode 100644
index 00000000..93f01351
--- /dev/null
+++ b/tests/test_kill.py
@@ -0,0 +1,35 @@
+from mrq.job import Job
+import time
+
+
+def test_kill_by_id(worker):
+ worker.start()
+
+ job_id1 = worker.send_task("tests.tasks.general.Sleep", {}, block=False)
+
+ Job(job_id1).kill()
+ time.sleep(1)
+
+ job1 = Job(job_id1).fetch().data
+
+ assert job1["status"] == "killed"
+
+
+# def test_kill_by_path(worker):
+# worker.start()
+
+# job_id1 = worker.send_task("tests.tasks.general.Sleep", {}, block=False)
+# job_id2 = worker.send_task("tests.tasks.general.Sleep", {}, block=False)
+
+# worker.send_task("mrq.basetasks.utils.JobAction", {
+# "path": "tests.tasks.general.Sleep",
+# "action": "kill"
+# }, block=False)
+
+# time.sleep(1)
+
+# job1 = Job(job_id1).fetch().data
+# job2 = Job(job_id2).fetch().data
+
+# assert job1["status"] == "killed"
+# assert job2["status"] == "killed"