processor.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. <?php
  2. require_once(BASE_CORE_PATH . '/framework/function/http.php');
  3. class processor extends queue\ILooper
  4. {
  5. private $mProxy;
  6. public function __construct($comode = false)
  7. {
  8. if($comode) {
  9. parent::__construct(new queue\CoDispatcherServer());
  10. }
  11. else {
  12. parent::__construct(new queue\DispatcherServer());
  13. }
  14. $this->mProxy = new proxy();
  15. }
  16. protected function handle($msg)
  17. {
  18. if (!empty($msg))
  19. {
  20. foreach ($msg as $key => $params)
  21. {
  22. if (empty($params)) continue;
  23. $method = strtolower($key);
  24. Log::record("processor hanlde method={$method}", Log::DEBUG);
  25. try
  26. {
  27. if ($method == 'add') {
  28. $this->mProxy->add($params);
  29. }
  30. elseif ($method == 'notify') {
  31. $channel = $params['channel'];
  32. $input = $params['params'];
  33. if(empty($channel) || empty($params))
  34. return;
  35. $this->mProxy->notify($channel,$input);
  36. }
  37. elseif($method == 'notify_mechant') {
  38. $order_id = intval($params['order_id']);
  39. $manual = $params['manual'] ?? false;
  40. $this->mProxy->notify_merchant($order_id,$manual);
  41. }
  42. elseif($method == 'query') {
  43. $order_id = intval($params['order_id']);
  44. $this->mProxy->query($order_id);
  45. }
  46. else {
  47. Log::record("Error Method={$method}", Log::DEBUG);
  48. }
  49. }
  50. catch (Exception $x) {
  51. Log::record($x->getMessage(), Log::ERR);
  52. }
  53. }
  54. }
  55. }
  56. }