processor.php 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. <?php
  2. require_once(BASE_CORE_PATH . '/framework/function/http.php');
  3. class processor extends queue\ILooper
  4. {
  5. private $mProxy;
  6. public function __construct()
  7. {
  8. parent::__construct(new queue\DispatcherServer());
  9. $this->mProxy = new proxy();
  10. }
  11. protected function handle($msg)
  12. {
  13. if (empty($msg)) {
  14. Log::record('empty body', Log::DEBUG);
  15. }
  16. else
  17. {
  18. foreach ($msg as $key => $params)
  19. {
  20. if (empty($params)) continue;
  21. $method = strtolower($key);
  22. Log::record("processor hanlde method={$method}", Log::DEBUG);
  23. try
  24. {
  25. if ($method == 'add') {
  26. $this->mProxy->add($params);
  27. }
  28. elseif ($method == 'notify') {
  29. $channel = $params['channel'];
  30. $input = $params['params'];
  31. if(empty($channel) || empty($params))
  32. return;
  33. $this->mProxy->notify($channel,$input);
  34. }
  35. elseif($method == 'notify_mechant') {
  36. $order_id = intval($params['order_id']);
  37. $manual = $params['manual'] ?? false;
  38. $this->mProxy->notify_merchant($order_id,$manual);
  39. }
  40. elseif($method == 'query') {
  41. $order_id = intval($params['order_id']);
  42. $this->mProxy->query($order_id);
  43. }
  44. else {
  45. Log::record("Error Method={$method}", Log::DEBUG);
  46. }
  47. }
  48. catch (Exception $x) {
  49. Log::record($x->getMessage(), Log::ERR);
  50. }
  51. }
  52. }
  53. }
  54. }