manager.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. <?php
  2. namespace task;
  3. use Exception;
  4. use trans_wapper;
  5. use Log;
  6. class manager
  7. {
  8. private $mHandler;
  9. public function __construct()
  10. {
  11. $this->mHandler = new handler();
  12. }
  13. public function add_task($method, $params, $is_show = 1, $del_timeout = -1, $title = '', $mchid = 0)
  14. {
  15. if(empty($method) || empty($params)) {
  16. return false;
  17. }
  18. //task 唯一性
  19. $hash_fun = function ($method,$params) {
  20. $val = $method . '-' . serialize($params);
  21. return md5($val);
  22. };
  23. $task_hash = $hash_fun($method,$params);
  24. if(empty($title)) {
  25. $title_fun = "{$method}_title";
  26. $title = $this->mHandler->$title_fun($params);
  27. }
  28. $mod_task = Model('task');
  29. $item = $mod_task->getByHash($task_hash);
  30. if(!empty($item) && $del_timeout > 0)
  31. {
  32. if(time() - $item['add_time'] > $del_timeout) {
  33. $mod_task->where(['task_id' => $item['task_id']])->delete();
  34. $item = [];
  35. }
  36. }
  37. if(empty($item))
  38. {
  39. $item = ['type' => $method, 'title' => $title,
  40. 'params' => serialize($params), 'task_hash' => $task_hash,
  41. 'add_time' => time(), 'is_show' => $is_show, 'mchid' => $mchid];
  42. $task_id = $mod_task->insert($item);
  43. $item = $mod_task->where(['task_id' => $task_id])->master(true)->find();
  44. }
  45. return task_wrapper::CreateTask($item);
  46. }
  47. //弹出一个任务,并且将任务设为进行中
  48. public function pop_task($task_id,$merchant = false)
  49. {
  50. $mod_task = Model('task');
  51. $trans = new trans_wapper($mod_task,'pop_task');
  52. try
  53. {
  54. if($merchant)
  55. {
  56. $item = $mod_task->field('*')
  57. ->where(['task_id' => ['gt',$task_id],'state' => task_wrapper::Waiting,'mchid' => ['gt',0]])
  58. ->master(true)->lock(true)->find();
  59. } else {
  60. $item = $mod_task->field('*')
  61. ->where(['task_id' => ['gt',$task_id],'state' => task_wrapper::Waiting,'mchid' => 0])
  62. ->master(true)->lock(true)->find();
  63. }
  64. if(!empty($item)) {
  65. $mod_task->where(['task_id' => $item['task_id']])->update(['state' => task_wrapper::Processing,'act_time' => time()]);
  66. }
  67. $trans->commit();
  68. return $item;
  69. }
  70. catch (Exception $ex)
  71. {
  72. $trans->rollback();
  73. Log::record($ex->getMessage(),Log::ERR);
  74. return [];
  75. }
  76. }
  77. //任务结束,并且
  78. public function fini_task($task_id,$succ)
  79. {
  80. }
  81. public function del_task()
  82. {
  83. }
  84. public function getResult($task)
  85. {
  86. }
  87. public function handle($task)
  88. {
  89. $method = $task['type'];
  90. $params = unserialize($task['params']);
  91. $handler = new handler();
  92. $handler->set_task_id($task['task_id']);
  93. [$succ,$result] = $handler->$method($params);
  94. $mod_task = Model('task');
  95. if(!empty($result))
  96. {
  97. $mod_task->where(['task_id' => $task['task_id']])->update(['state' => task_wrapper::Handled,
  98. 'result_state' => $succ ? 1 : 0, 'result' => serialize($result),
  99. 'finish_time' => time()]);
  100. }
  101. return $task['task_id'];
  102. }
  103. }