Browse Source

rewrite queue watcher

stanley-king 7 years ago
parent
commit
c287e359b5
8 changed files with 161 additions and 249 deletions
  1. 0 219
      document/queue-daemon.py
  2. 19 11
      helper/account_helper.php
  3. 1 0
      helper/push_helper.php
  4. 1 3
      mac_start.sh
  5. 1 0
      mobile/wxnotify.php
  6. 115 0
      qwatch.py
  7. 24 14
      restart.py
  8. 0 2
      test/TestFriends.php

+ 0 - 219
document/queue-daemon.py

@@ -1,219 +0,0 @@
-#!/usr/bin/env python
-import sys, os, time, signal, atexit
-
-shutdown = False
-graceful_shutdown = False
-
-def sig_handler(sig,frame):
-	global shutdown
-	global graceful_shutdown
-	
-	if sig == signal.SIGINT:
-		graceful_shutdown = True
-	elif sig == signal.SIGTERM:
-		shutdown = True		
-
-class Daemon(object):	
-	def __init__(self, cmd, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
-			self.stdin = stdin
-			self.stdout = stdout
-			self.stderr = stderr
-			self.cmd = cmd
-			self.pidfile = os.path.splitext(sys.argv[0])[0] + ".pid"
-	
-	def register_handler(self):
-		signal.signal(signal.SIGINT, sig_handler)
-		signal.signal(signal.SIGTERM, sig_handler)
-		signal.signal(signal.SIGCHLD, sig_handler)
-		
-	def daemonize(self):
-		try: 
-				pid = os.fork() 
-				if pid > 0:
-						# exit first parent
-						sys.exit(0) 
-		except OSError, e: 
-				sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
-				sys.exit(1)
-
-		# decouple from parent environment
-		os.chdir(".") 
-		os.setsid() 
-		os.umask(0) 
-
-		# do second fork
-		try: 
-			pid = os.fork() 
-			if pid > 0:
-				# exit from second parent
-				sys.exit(0) 
-		except OSError, e: 
-			sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
-			sys.exit(1) 
-		
-		# redirect standard file descriptors
-		si = file(self.stdin, 'r')
-		so = file(self.stdout, 'a+')
-		se = file(self.stderr, 'a+', 0)
-		
-		pid = str(os.getpid())
-		
-		#sys.stderr.write("\n%s\n" % pid)
-		#sys.stderr.flush()
-		
-		if self.pidfile:
-			file(self.pidfile,'w+').write("%s\n" % pid)
-		
-		#atexit.register(self.delpid)
-		
-		os.dup2(si.fileno(), sys.stdin.fileno())
-		os.dup2(so.fileno(), sys.stdout.fileno())
-		os.dup2(se.fileno(), sys.stderr.fileno())
-	
-	def get_childs_pid(self, pid):
-		li = []
-		cmd = "ps -ef|awk '{print $2,$3}'"
-		for item in os.popen(cmd).readlines():
-			item = item.strip()
-			l = item.split(' ')
-			p1 = l[0].strip()
-			p2 = l[1].strip()
-			if p2 == pid:
-				li.append(p1)
-						
-		return li		
-	
-	def get_grand_childs_pid(self,child_pids):
-		li = []
-		for pid in child_pids:
-			lii = self.get_childs_pid(pid)
-			if len(lii) == 0:
-				continue
-			li.append(lii[0])
-			
-		return li			
-				
-	def start(self):
-		try:
-			num_childs = 1
-			if num_childs > 0:
-				child = False
-				global shutdown
-				global graceful_shutdown
-				while not child and not shutdown and not graceful_shutdown:
-					if num_childs > 0:
-						pid = os.fork()
-						if pid > 0:
-							num_childs = num_childs - 1
-						elif pid == 0:
-							child = True
-						else:
-							sys.exit(1)
-					else:
-						exit_stat = os.wait()
-						pid = exit_stat[0]
-						num_childs = num_childs + 1
-						time.sleep(1)
-										 
-			os.system(self.cmd)	
-		except OSError, e:
-			sys.stderr.write("fork #3 failed: %d (%s)\n" % (e.errno, e.strerror))
-			sys.exit(1); 	
-		
-	def run(self):
-		try:
-			pf = file(self.pidfile,'r')
-			pid = int(pf.read().strip())
-			pf.close()
-		except IOError:
-			pid = None
-
-		if pid:
-			message = "pidfile %s already exist. Daemon already running?\n"
-			sys.stderr.write(message % self.pidfile)
-			sys.exit(1)
-			
-		self.daemonize()
-		self.start()
-	
-	def delpid(self):
-		os.remove(self.pidfile)
-		
-		
-	def stop(self):
-		try:
-				pf = file(self.pidfile,'r')
-				pid = int(pf.read().strip())
-				pf.close()
-		except IOError:
-				pid = None
-
-		if not pid:
-			message = "pidfile %s does not exist. Daemon not running?\n"
-			sys.stderr.write(message % self.pidfile)
-			return
-
-		li = self.get_childs_pid(str(pid))
-		lig = self.get_grand_childs_pid(li)
-		for item in lig:
-			try:
-				os.kill(int(item),signal.SIGINT)
-				os.kill(int(item),signal.SIGTERM)
-				os.kill(int(item),signal.SIGKILL)
-			except:
-				pass
-		        
-		try:
-			while 1:
-				os.kill(pid, signal.SIGTERM)
-				time.sleep(0.5)
-		except OSError, err:
-			err = str(err)
-			if err.find("No such process") > 0:
-				if os.path.exists(self.pidfile):
-					os.remove(self.pidfile)
-				else:
-					print str(err)
-					sys.exit(1)	
-		
-def main():
-	cmd = "php /Users/stanley-king/work/PHPProject/shopnc/crontab.php queue index"
-	d = Daemon(cmd)
-	d.register_handler()
-	
-	if len(sys.argv) == 2:
-		if 'start' == sys.argv[1]:
-			try:
-				d.run()
-			except:
-				pass
-	
-		elif 'stop' == sys.argv[1]:
-			print 'stopping...'
-			d.stop()
-						
-		elif 'status' == sys.argv[1]:
-			try:
-				pf = file(d.pidfile,'r')
-				pid = int(pf.read().strip())
-				pf.close()
-			except IOError:
-				pid = None	
-			except SystemExit:
-				pid = None
-			
-			if pid:		
-				print 'running as pid %s' % pid	
-			else:
-				print 'not running'
-				
-		else:
-			print "Unknown command"
-			sys.exit(2)
-		
-	else:						
-		print "usage: %s start|stop|status" % sys.argv[0]
-		sys.exit(2)
-	
-if __name__ == '__main__':
-	main()		

+ 19 - 11
helper/account_helper.php

@@ -7,16 +7,18 @@
  * Time: 下午2:46
  */
 
-require_once (BASE_ROOT_PATH . '/helper/bonus_helper.php');
-require_once (BASE_ROOT_PATH . '/helper/model_helper.php');
-require_once (BASE_ROOT_PATH . '/helper/push_helper.php');
-require_once (BASE_ROOT_PATH . '/helper/sms_helper.php');
-require_once (BASE_ROOT_PATH . '/helper/bonus/parameters.php');
-require_once (BASE_ROOT_PATH . '/helper/user_session/favorite.php');
-require_once (BASE_ROOT_PATH . '/helper/user_session/anotice.php');
-require_once (BASE_ROOT_PATH . '/helper/fcode/mfcode.php');
-require_once (BASE_ROOT_PATH . '/helper/fcode/operator.php');
-require_once (BASE_ROOT_PATH . '/helper/user_session/fcode.php');
+require_once(BASE_ROOT_PATH . '/helper/bonus_helper.php');
+require_once(BASE_ROOT_PATH . '/helper/model_helper.php');
+require_once(BASE_ROOT_PATH . '/helper/push_helper.php');
+require_once(BASE_ROOT_PATH . '/helper/sms_helper.php');
+require_once(BASE_ROOT_PATH . '/helper/bonus/parameters.php');
+require_once(BASE_ROOT_PATH . '/helper/user_session/favorite.php');
+require_once(BASE_ROOT_PATH . '/helper/user_session/anotice.php');
+require_once(BASE_ROOT_PATH . '/helper/fcode/mfcode.php');
+require_once(BASE_ROOT_PATH . '/helper/fcode/operator.php');
+require_once(BASE_ROOT_PATH . '/helper/user_session/fcode.php');
+require_once(BASE_ROOT_PATH . '/helper/search/tcp_client.php');
+
 
 class account_helper
 {
@@ -485,11 +487,16 @@ class account_helper
     //在支付成功之后,可以提醒用户收到货后,可以收到红包奖励。
     public static function onPaySuccess($pay_sn)
     {
-        Log::record("account_helper::onPaySuccess pay_sn={$pay_sn}",Log::DEBUG);
         $mod = Model('order');
         $info = $mod->getOrderInfo(array('pay_sn' => $pay_sn));
         $buyer_id = intval($info['buyer_id']);
         push_helper::paysuccess($buyer_id);
+
+        $inviters = search\relation_client::instance()->fetch_inviters(['user_id' => $buyer_id]);
+        $invitees = search\relation_client::instance()->fetch_invitees(['user_id' => $buyer_id]);
+        if(empty($inviters))
+        {
+        }
     }
 
     //在发货之后,可以提醒用户留意收货,在订单中查看物流,短信通知
@@ -508,6 +515,7 @@ class account_helper
 
     public static function onOrderSuccess($order_id)
     {
+        //todo
         if(empty($pay_sn)) {
             Log::record("account_helper::onOrderSuccess $order_id error.",Log::ERR);
         } else {

+ 1 - 0
helper/push_helper.php

@@ -13,6 +13,7 @@ class push_helper
         $thief = $to_info->nickname();
         $text = "{$thief}偷走了您{$amount}元的红包.";
 
+
         $push_param['alias'] = $from_info->member_id();
         $push_param['title'] = "摇红包";
         $push_param['text'] = $text;

+ 1 - 3
mac_start.sh

@@ -5,7 +5,5 @@ sudo nginx
 sudo pkill php-fpm
 php-fpm
 redis-server /etc/redis/6379.conf
-rm ./document/queue-daemon.pid
 sudo pkill -9 $(pidof python)
-./document/queue-daemon.py start
-cd ..
+./qwatch.py

+ 1 - 0
mobile/wxnotify.php

@@ -16,6 +16,7 @@ require_once (WXPAY_PATH . '/lib/OpenWxPay.Config.php');
 require_once (WXPAY_PATH . '/lib/OpenWxPay.Data.php');
 require_once (WXPAY_PATH . '/lib/OpenWxPay.Exception.php');
 require_once (WXPAY_PATH . '/lib/OpenWxPay.Notify.php');
+
 require_once (BASE_DATA_PATH . '/logic/delivery.logic.php');
 require_once (BASE_ROOT_PATH . '/helper/pay_helper.php');
 

+ 115 - 0
qwatch.py

@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+
+import sys, os, signal, subprocess, shlex,time,atexit
+
+shutdown = False
+queue_proc = None
+
+def file_pid(name):
+    proc1 = subprocess.Popen(shlex.split('ps aux'), stdout=subprocess.PIPE)
+    proc2 = subprocess.Popen(shlex.split('grep ' + name), stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    proc3 = subprocess.Popen(shlex.split('awk \'{print $2}\' '), stdin=proc2.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+    proc1.stdout.close()
+    proc2.stdout.close()
+    out, err = proc3.communicate()
+
+    lines = out.splitlines()
+    pids = []
+    for line in lines :
+        item = line.strip()
+        pid = int(item)
+        if pid > 0:
+            pids.append(pid)
+
+    if len(pids) > 0 :
+        return pids
+    else:
+        return []
+
+
+def kill_pid(name):
+	print "start restart " + name
+	cur_pid = os.getpid()
+	pids = file_pid(name)
+	for pid in pids:
+		try:
+			if cur_pid != pid:
+				os.kill(pid, signal.SIGKILL)
+				print 'kill pid=', pid
+			else:
+				continue
+		except OSError, e:
+			print "OSError no=", e.errno, " err=", e.strerror
+			pass
+		except BaseException, be:
+			pass
+	return
+
+def sig_handler(sig,frame):
+	global shutdown
+
+	frame = None
+	if sig == signal.SIGINT:
+		shutdown = True
+	elif sig == signal.SIGTERM:
+		shutdown = True
+
+	if shutdown == True and queue_proc != None:
+		queue_proc.terminate()
+
+class Daemon(object):
+	def __init__(self,stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):
+		self.stdin = stdin
+	 	self.stdout = stdout
+	 	self.stderr = stderr
+
+	def register_handler(self):
+		signal.signal(signal.SIGINT,  sig_handler)
+		signal.signal(signal.SIGTERM, sig_handler)
+		signal.signal(signal.SIGCHLD, sig_handler)
+
+	def daemonize(self):
+		try:
+			pid = os.fork()
+			if pid != 0:
+				sys.exit(0)
+		except OSError, e:
+			sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
+			sys.exit(1)
+
+		os.setsid()
+		os.umask(0)
+
+		try:
+			pid = os.fork() 
+			if pid != 0:
+				sys.exit(0)
+		except OSError, e: 
+			sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
+			sys.exit(1)
+
+		os.close(0)
+		os.close(1)
+		os.close(2)
+
+		file(self.stdin,  'r')
+		file(self.stdout, 'a+')
+		file(self.stderr, 'a+')
+
+def main():
+	kill_pid("qwatch.py")
+	d = Daemon()
+	d.daemonize()
+	d.register_handler()
+	kill_pid("crontab.php")
+
+	while(shutdown == False):
+		cmds = ["php", "./crontab.php", "queue", "index"]
+		subproc = subprocess.Popen(cmds,subprocess.PIPE)
+		queue_proc = subproc
+		print "create queue porcess pid=",subproc.pid
+		subproc.wait()
+
+if __name__ == '__main__':
+	main()		

+ 24 - 14
restart.py

@@ -2,6 +2,7 @@
 
 import sys, os, signal, subprocess, shlex,time
 
+
 def file_pid(name):
     proc1 = subprocess.Popen(shlex.split('ps aux'), stdout=subprocess.PIPE)
     proc2 = subprocess.Popen(shlex.split('grep ' + name), stdin=proc1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -19,30 +20,36 @@ def file_pid(name):
         if pid > 0:
             pids.append(pid)
 
-    if len(pids) > 1 :
+    if len(pids) > 0 :
         return pids
-    else :
+    else:
         return []
 
-def has_proc(name) :
+
+def has_proc(name):
     pids = file_pid(name)
     return len(pids) > 0
 
-def restart(item) :
+
+def restart(item):
     name = item['file_name']
     cmd  = item['cmd']
     if not name:
         print 'file name is empty'
         return
 
+    cur_pid = os.getpid()
     pids = file_pid(name)
     print "start restart " + name
-    for pid in pids :
+    for pid in pids:
         try:
-            os.kill(pid,signal.SIGKILL)
-            print 'kill pid=' , pid
+            if cur_pid != pid:
+                os.kill(pid, signal.SIGKILL)
+                print 'kill pid=', pid
+            else:
+                continue
         except OSError, e:
-            print "OSError no=" , e.errno," err=" , e.strerror
+            print "OSError no=", e.errno, " err=", e.strerror
             pass
         except BaseException, be:
             pass
@@ -54,10 +61,12 @@ def restart(item) :
         time.sleep(1)
     return
 
+
 def mac_sys():
     plat_name = sys.platform
     return (plat_name == 'darwin')
 
+
 def main():
     print "you can input: fcgi,queue,ugc,center,all to restart server or no input it will restart fcgi,queue"
 
@@ -68,9 +77,9 @@ def main():
 
     cmds = []
     if mac_sys() :
-        if option == 'fcgi' :
+        if option == 'fcgi':
             cmds.append({'file_name': 'fcgi_run.php','cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 1 -f "php fcgi_run.php"'})
-        elif option == 'queue' :
+        elif option == 'queue':
             cmds.append({'file_name': 'crontab.php','cmd': ''})
         elif option == 'ugc' :
             cmds.append({'file_name': 'mac_ugcman', 'cmd': 'sudo ./mac_ugcman'})
@@ -84,7 +93,7 @@ def main():
             cmds.append({'file_name': 'ugc_srv.php',    'cmd': 'php ugc_srv.php'})
             cmds.append({'file_name': 'centra_srv.php', 'cmd': 'php centra_srv.php'})
         else:
-            cmds.append({'file_name': 'fcgi_run.php',       'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 1 -f "php fcgi_run.php"'})
+            cmds.append({'file_name': 'fcgi_run.php', 'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 1 -f "php fcgi_run.php"'})
             cmds.append({'file_name': 'crontab.php',  'cmd': ''})
     else :
         if option == 'fcgi':
@@ -97,18 +106,19 @@ def main():
         elif option == 'center':
             cmds.append({'file_name': 'centra_srv.php', 'cmd': 'php centra_srv.php'})
         elif option == 'all':
-            cmds.append({'file_name': 'fcgi_run.php',           'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 10 -f "php fcgi_run.php"'})
+            cmds.append({'file_name': 'fcgi_run.php',   'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 10 -f "php fcgi_run.php"'})
             cmds.append({'file_name': 'crontab.php',    'cmd': ''})
             cmds.append({'file_name': 'ugcman',         'cmd': './ugcman'})
             cmds.append({'file_name': 'ugc_srv.php',    'cmd': 'php ugc_srv.php'})
             cmds.append({'file_name': 'centra_srv.php', 'cmd': 'php centra_srv.php'})
         else:
-            cmds.append({'file_name': 'fcgi_run.php',       'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 10 -f "php fcgi_run.php"'})
-            cmds.append({'file_name': 'crontab.php queue',  'cmd': ''})
+            cmds.append({'file_name': 'fcgi_run.php',   'cmd': 'spawn-fcgi -a 127.0.0.1 -p 9100 -F 10 -f "php fcgi_run.php"'})
+            cmds.append({'file_name': 'crontab.php',    'cmd': ''})
 
     for item in cmds :
         restart(item)
     sys.exit(2)
 
+
 if __name__ == '__main__':
     main()

+ 0 - 2
test/TestFriends.php

@@ -103,8 +103,6 @@ class TestFriends extends PHPUnit_Framework_TestCase
 
     public function testFetchInviters()
     {
-        $invitees = search\relation_client::instance()->fetch_invitees(['user_id' => 36490]);
-
         $mod_member = Model('member');
         $items = $mod_member->where(['inviter_id' => ['gt',0]])->field('*')->limit(false)->select();