123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- <?php
- /**
- * Created by PhpStorm.
- * User: stanley-king
- * Date: 2017/12/14
- * Time: 下午12:07
- */
- namespace room;
- use event\IProcessor;
- use process_looper;
- use errcode;
- use algorithm;
- use QueueClient;
- use scope_trace;
- use Log;
- class room_processor implements IProcessor
- {
- private $mAccConnes;
- private $mFactory;
- private $mRooms;
- private $mChatwo;
- public function __construct()
- {
- $this->mAccConnes = [];
- $this->mFactory = new factory();
- $this->mRooms = [];
- $this->mChatwo = new chatwo();
- }
-
- public function onStart()
- {
- }
- public function onConnected($bufid,$stream,$host,$port,$args)
- {
- }
- public function onClose($bufid)
- {
- $this->remove_acc($bufid);
- }
- public function onRequest($bufid, $body)
- {
- $trace = new scope_trace(__METHOD__);
- $input = json_decode($body,true);
- if($input == false) {
- process_looper::instance()->close($bufid);
- return false;
- }
- $act = $input['act'];
- if(empty($act)) return false;
- if($act == proto_type::act_factory) {
- $ret = $this->onFactory($bufid,$input);
- process_looper::instance()->write($bufid,$ret);
- return true;
- }
- elseif($act == proto_type::act_access) {
- $ret = $this->onAccess($bufid,$input);
- process_looper::instance()->write($bufid,$ret);
- return true;
- }
- elseif($act == proto_type::act_room) {
- $ret = $this->onRoom($bufid,$input);
- process_looper::instance()->write($bufid,$ret);
- return true;
- }
- elseif($act == proto_type::act_chatwo) {
- $ret = $this->onChatwo($bufid,$input);
- return true;
- }
- else {
- return false;
- }
- }
- ////////////////////////////////////////Factory Message Handler/////////////////////////////////////////////////////
- private function onFactory($bufid,$input)
- {
- $op = $input['op'];
- if($op == 'build') {
- return $this->factory_build($bufid,$input);
- }
- elseif($op == 'list_room') {
- return $this->success(['rooms' => $this->mRooms]);
- }
- elseif($op == 'invite') {
- return $this->factory_invite($bufid,$input);
- }
- elseif($op == 'change') {
- return $this->factory_change($bufid,$input);
- }
- elseif($op == 'notice_room') {
- return $this->factory_notice_room($bufid,$input);
- }
- elseif($op == 'notice_users') {
- return $this->factory_notice_users($bufid,$input);
- }
- elseif($op == 'notice_all') {
- return $this->factory_notice_all($bufid,$input);
- }
- elseif($op == 'leave') {
- return $this->factory_leave($bufid,$input);
- }
- elseif($op == 'kickout') {
- return $this->factory_kickout($bufid,$input);
- }
- else {
- return $this->error(errcode::ErrRoomFactoryOp);
- }
- }
- /**
- * @param $input
- * @return mixed|string
- */
- private function factory_build($bufid,$input)
- {
- $roomid = intval($input['room']);
- $newroom = boolval($input['newroom']);
- if ($roomid <= 0) {
- return $this->error(errcode::ErrRoomParam);
- }
- if (array_key_exists($roomid, $this->mRooms)) {
- return $this->success(['room' => $roomid]);
- }
- $room = $this->mFactory->build($roomid);
- if ($room == false) {
- return $this->error(errcode::ErrRoomBuild);
- }
- else {
- $this->broad_access(msg_builder::build_message($roomid, $room->room_type()));
- $this->mRooms[$roomid] = $room;
- if ($newroom) {
- $this->broad_access(msg_builder::create_push($room->creator(), $roomid, $room->room_info()));
- }
- return $this->success(['room' => $room->room_id()]);
- }
- }
- /**
- * @param $input
- * @return mixed|string
- */
- private function factory_invite($bufid,$input)
- {
- $roomid = intval($input['room']);
- $inviter = intval($input['inviter']);
- if ($roomid <= 0) {
- return $this->error(errcode::ErrRoomParam);
- }
- $room = $this->room($roomid);
- if ($room != false)
- {
- $invitees = $input['invitees'];
- if ($inviter > 0 && !empty($invitees))
- {
- $result = $room->invite($inviter, $invitees,$newusers);
- if ($result != false)
- {
- if (!empty($newusers))
- {
- $room_info = $room->room_info();
- $this->broad_access(msg_builder::invite_message($roomid, $room->room_type(), $newusers));
- $inviter = $room->userinfos($inviter);
- $this->broad_access(msg_builder::invited_push($roomid, $newusers,$inviter,$room_info));
- $this->broadcast_roomsg($room);
- $left = $room->usercount(); $right = count($newusers);
- $last_count = $left - $right;
- Log::record("ready to update -------$last_count||$left||$right|| ",Log::DEBUG);
- if($last_count >= 0 && $last_count < 10) {
- QueueClient::push("OnUpdateRoom",['room_id' => $roomid]);
- } else {
- $count = count($newusers);
- Model('room')->editRoom(['room_id' => $roomid],['users' => ['exp',"users+{$count}"]]);
- }
- }
- return $this->success($result);
- }
- }
- }
- return $this->error(errcode::ErrRoomInvite);
- }
- private function factory_leave($bufid,$input)
- {
- $roomid = intval($input['room']);
- if ($roomid <= 0) {
- return $this->error(errcode::ErrRoomParam);
- }
- $room = $this->room($roomid);
- if ($room != false) {
- $userid = intval($input['user']);
- if ($userid > 0) {
- $result = $room->leave($userid);
- if ($result != false)
- {
- $this->broad_access(msg_builder::leave_message($roomid, $room->room_type(), [$userid]));
- $this->reply_roomsg($bufid,$room);
- $this->broadcast_roomsg($room);
- $user_count = $room->usercount();
- if($user_count >= 0 && $user_count < 10) {
- QueueClient::push("OnUpdateRoom",['room_id' => $roomid]);
- }
- return $this->success($result);
- }
- }
- }
- return $this->error(errcode::ErrRoomLeave);
- }
- private function factory_kickout($bufid,$input)
- {
- $roomid = intval($input['room']);
- if ($roomid <= 0) {
- return $this->error(errcode::ErrRoomParam);
- }
- $room = $this->room($roomid);
- if ($room != false)
- {
- $user = intval($input['user']);
- $kicks = $input['kicks'];
- if ($user > 0)
- {
- $users = $room->kickout($user,$kicks);
- if ($users != false)
- {
- $this->broad_access(msg_builder::leave_message($roomid, $room->room_type(), $users));
- $this->broadcast_roomsg($room);
- $user_count = $room->usercount();
- if($user_count >= 0 && $user_count < 10) {
- QueueClient::push("OnUpdateRoom",['room_id' => $roomid]);
- }
- return $this->success(['users' => $users]);
- }
- else {
- return $this->success(['users' => []]);
- }
- }
- }
- return $this->error(errcode::ErrRoomLeave);
- }
- private function factory_change($bufid,$input)
- {
- $roomid = intval($input['room']);
- if ($roomid <= 0) {
- return $this->error(errcode::ErrRoomParam);
- }
- $room = $this->room($roomid);
- if ($room != false)
- {
- $result = $room->change();
- if ($result != false) {
- $this->broad_access(msg_builder::change_push($roomid, $room->room_info()));
- return $this->success($result);
- }
- }
- return $this->error(errcode::ErrRoomChange);
- }
- private function factory_notice_room($bufid, $input)
- {
- $roomid = intval($input['room']);
- $type = $input['type'];
- $content = $input['content'];
- $user = $input['user'];
- if ($roomid <= 0 || empty($type) || empty($content)) {
- return $this->error(errcode::ErrRoomParam);
- }
- $room = $this->room($roomid);
- if ($room != false)
- {
- $result = $room->notice($type,$content,$user);
- if($result) {
- $this->broadcast_roomsg($room);
- return $this->success($result);
- } else {
- return $this->error(errcode::ErrRoomNotice);
- }
- }
- return $this->error(errcode::ErrRoomChange);
- }
- ///Push消息处理区/////////////////////////////////////////////////////////////////////////////////////////////////////
- private function factory_notice_users($bufid, $input)
- {
- $content = $input['content'];
- $msg = $input['msg'];
- $users = $input['users'];
- $type = $input['type'];
- if($type === proto_type::push_command) {
- $pushid = 1;
- $msg_type = proto_type::msg_type_command;
- $send = $content;
- }
- elseif($type === proto_type::push_notify) {
- $pushid = 2;
- $msg_type = proto_type::msg_type_nofity;
- $send['content'] = $msg;
- }
- elseif($type === proto_type::push_apply) {
- $pushid = 3;
- $msg_type = proto_type::msg_type_apply;
- $send['content'] = $msg;
- }
- else {
- return $this->error(errcode::ErrParamter);
- }
- if($msg_type != proto_type::msg_type_command)
- {
- $mod_room = Model('room');
- $orgmsg = json_encode($content,JSON_UNESCAPED_UNICODE);
- foreach ($users as $user) {
- $msgid = $mod_room->addRoomMsg(['room_id' => 0,'member_id' => $user, 'type' => $msg_type,'msg' => $msg, 'orgmsg' => $orgmsg, 'add_time' => time(),'msg_type' => 1]);
- $send['msgid'] = $msgid;
- $send['send_time'] = time();
- $this->write_push_users($pushid,[$user],$send);
- }
- }
- else
- {
- $send['msgid'] = 0;
- $send['send_time'] = time();
- $this->write_push_users($pushid,$users,$send);
- }
- return $this->success(NULL);
- }
- private function factory_notice_all($bufid, $input)
- {
- $content = $input['content'];
- $type = $input['type'];
- if($type === proto_type::push_command) {
- $pushid = 1;
- }
- elseif($type === proto_type::push_notify) {
- $pushid = 2;
- }
- elseif($type === proto_type::push_apply) {
- $pushid = 3;
- }
- else {
- return $this->error(errcode::ErrParamter);
- }
- $this->write_push($content);
- return $this->success(NULL);
- }
- ////////////////////////////////////////Access Message Handler//////////////////////////////////////////////////////
- private function onAccess($bufid,$input)
- {
- $op = $input['op'];
- if($op == 'list') {
- $this->add_acc($bufid);
- $reply = ['act' => 'access','op' => $op,'rooms' => $this->rooms()];
- return $this->success($reply);
- }
- else {
- return $this->error(errcode::ErrRoomAccessOp);
- }
- }
- private function rooms()
- {
- $result = [];
- foreach ($this->mRooms as $roomid => $room) {
- $result[] = $roomid;
- }
- return $result;
- }
- ////////////////////////////////////////Room Message Handler////////////////////////////////////////////////////////
- private function onRoom($bufid,$input)
- {
- $roomid = intval($input['room']);
- $room = $this->room($roomid);
- if($room != false)
- {
- $function = $input['op'] . 'Op';
- if (method_exists($room,$function))
- {
- $ret = $room->$function($input);
- if($ret != false) {
- $this->reply_roomsg($bufid,$room);
- $this->broadcast_roomsg($room);
- }
- return $this->success($ret);
- }
- }
- return $this->error(errcode::ErrRoom);
- }
- ////////////////////////////////////////Peer Message Handler////////////////////////////////////////////////////////
- private function onChatwo($bufid, $input)
- {
- $function = $input['op'] . 'Op';
- if (method_exists($this->mChatwo,$function))
- {
- $msg = $this->mChatwo->$function($input);
- if($msg != false)
- {
- $body = json_encode($msg);
- foreach ($this->mAccConnes as $conn) {
- process_looper::instance()->write($conn,$body);
- }
- }
- return true;
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- private function write_push_users($pushid,$users,$content)
- {
- $msg = [];
- $msg['act'] = "room";
- $msg['op'] = "relay";
- $msg['relay_type'] = "users";
- $msg['receivers'] = $users;
- $msg['room'] = 0;
- $msg['receiver_type'] = proto_type::sroom_push;
- $msg['msgtype'] = "message";
- $msg['body']['act'] = proto_type::act_push;
- $msg['body']['op'] = 'message';
- $msg['body']['push'] = $pushid;
- $msg['body']['content'] = $content;
- $msg['body']['msgtype'] = "message";
- $body = json_encode($msg);
- foreach ($this->mAccConnes as $conn) {
- process_looper::instance()->write($conn,$body);
- }
- }
- private function write_push($content)
- {
- $msg = [];
- $msg['act'] = "room";
- $msg['op'] = "relay";
- $msg['relay_type'] = "alluser";
- $msg['receivers'] = [];
- $msg['room'] = 0;
- $msg['receiver_type'] = proto_type::sroom_push;
- $msg['msgtype'] = "message";
- $msg['body']['act'] = proto_type::act_push;
- $msg['body']['op'] = 'message';
- $msg['body']['push'] = 0;
- $msg['body']['content'] = $content;
- $msg['body']['msgtype'] = "message";
- $body = $this->success($msg);
- foreach ($this->mAccConnes as $conn) {
- process_looper::instance()->write($conn,$body);
- }
- }
- ////////////////////////////////返回包///////////////////////////////////////////////////////////////////////////////
- private function success($val)
- {
- $code = errcode::Success;
- $data['code'] = $code;
- $data['message'] = errcode::msg($code);
- $data['data'] = $val;
- $data['msgtype'] = "reply";
- return json_encode($data);
- }
- private function error($code,$message = '')
- {
- if (empty($message)) {
- $message = errcode::msg($code);
- }
- $data['code'] = $code;
- $data['message'] = $message;
- $data['data'] = null;
- $data['msgtype'] = "reply";
- return json_encode($data);
- }
- private function reply_roomsg($bufid,base_room $room)
- {
- $retmsgs = $room->relay_reply_msgs();
- foreach ($retmsgs as $msg) {
- $body = json_encode($msg);
- process_looper::instance()->write($bufid,$body);
- }
- }
- private function broad_access($messages)
- {
- $body = json_encode($messages);
- foreach ($this->mAccConnes as $bufid) {
- process_looper::instance()->write($bufid,$body);
- }
- }
- private function broadcast_roomsg(base_room $room)
- {
- $broad_msgs = $room->relay_broadcast_msgs();
- foreach ($broad_msgs as $msg)
- {
- $body = json_encode($msg);
- foreach ($this->mAccConnes as $bufid) {
- process_looper::instance()->write($bufid,$body);
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- private function room($roomid)
- {
- if($roomid <= 0) return false;
- if(array_key_exists($roomid,$this->mRooms)) {
- return $this->mRooms[$roomid];
- } else {
- return false;
- }
- }
- private function add_acc($bufid) {
- $connid = intval($bufid);
- if($connid <= 0) return false;
- if(!algorithm::binary_search($this->mAccConnes,$connid)) {
- $pos = algorithm::lower_bonud($this->mAccConnes,$connid);
- algorithm::array_insert($this->mAccConnes,$pos,$connid);
- }
- return true;
- }
- private function remove_acc($bufid)
- {
- $connid = intval($bufid);
- if($connid <= 0) return false;
- if(algorithm::binary_search($this->mAccConnes,$connid)) {
- $pos = algorithm::lower_bonud($this->mAccConnes,$connid);
- algorithm::array_erase($this->mAccConnes,$pos);
- }
- return true;
- }
- }
|