注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

蜻蜓点水 举重若轻

君子终日乾乾

 
 
 

日志

 
 

[Python 一招鲜系列] 守护进程一招鲜  

2007-03-05 19:42:34|  分类: Python |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
[Python 一招鲜系列] 守护进程一招鲜 - 沈崴 - 蜻蜓点水 举重若轻

更多精彩请访问 Pythoner 在 Mangbar 的营地
Python 一招鲜系列文章综览

# server.py
while True:
   continue

好了现在我们已经有了一个服务器了, 接下来要做的事情就是让他跑到后台去。






第一章, 原理

本章仅作了解就可以了, 因为这些细节性的东西实在是无聊,
所以我强烈推荐你直接跳到第二章。

   当然, 如果你有足够的耐心和能力, 还是看下去吧 ……

通常我们把后台进程叫做 "守护进程"、"精灵进程", 或者 daemon。
在 Unix 下, 我们使用 fork 实现。

   import os
   def daemonize():
      # 首先 fork 出两个进程
      pid = os.fork()

      # pid 非零的是父进程,
      # 父进程就是你用命令启动的那个程序。
      #
      if pid != 0:
         # 接着我们退出父进程
         # 这样你的命令行程序就退出了。
         #
         # 父进程退出后, 子进程就被系统托管了
         # 这时子进程就转入后台执行
         os._exit(0)

      # 子进程开始
      # 我们在这里启动后台程序 (服务器)
      os.system('python server.py')
      # 当然, 你永远不会使用 os.system 来启动一个程序
      # os.system 会启动 python、server.py 还有你不希望看到的 sh 进程

当然, 真正能用的 daemonize 程序还要作更多的事情,
这些都被记载在典籍《Unix 高级环境编程》中。

我归纳了一下, 分别是一下几个方面。

   1. 切换程序的身份。
    请看 Apache 的设计, 在使用 root 得到 80 端口后旋即转换成 nobody (www-data) 用户。
    很明显, 使用 root 身份执行一个程序是危险的, 后台程序的身份不应该是 root。

       最实用的选择是: 当配置文件里没有说明以何身份执行时,
       就以当前用户的身份启动后台进程。

   2. 自定义部分信号处理程序。

       如果不是必须, 依我看还是不要做太多信号处理,
       否则你的服务器很可能将只能吃到 kill -9 才可退出了, 这显然很蠢。
      如果你不想看到太多的 undead, 处理 SIGCHLD 是必须的。

   3. 重定向 stdin、stdout 和 stderr。
    标准输入输出对 daemon 进程而言没有意义, 因为后台进程是没有终端可供使用的。

       注意, 这里是重定向而不是关闭, 这样可以避免程序在 print 的时候出错。

       我们通常会将 stdout、stderr 重定向到日志文件。
       /dev/nul 是选择之一。
       stdin 是没有意义的。

   4. setsid(), umask(), chdir() 逐一 google。这里不赘述了。

   5. 把 pid 写到 xxx.pid 文件中去。
    当我们需要结束后台进程的时候, 我们就可以从 pid 文件中得到子进程的 pid,
    然后 kill PID。

目前, Python 最好的 Daemon 库依然是 Zope 的 zdaemon,
欲了解健壮的 Daemon 实现, 可以阅读 zdaemon 的源代码。
当然, 阅读 zdaemon 源码是比较痛苦的, 我自己也比较痛恨 zdaemon 的编码风格。

   我痛恨一切没有必要的, 过度的 OO 设计。

zdaemon 的贡献是它引入了 "进程控制器" 这个概念, 它的 daemonize 使用了两次 fork。

   # zdaemon.py
   import os
   def zdaemonize():
      # 首次 fork, 进入后台
      pid = os.fork()

      if pid != 0:
         os._exit(0)

      # 第二次 fork
      pid = os.fork()

      # 父进程, Daemon 管理器
      if pid != 0:
         # 启动进程管理器服务器
         # 这里, manager_server 是假想的
         import manager_server
         manger_server.serve_forever()

      # 子进程, 启动后台服务
      os.system('python server.py')

进程管理器 manger_server 通常是一个 TCP 服务, 他使用 Unix Socket 与外界交互。
这样, pid 文件就变成了 Unix Socket 文件。
你通过 Unix Socket (TCP Socket) 接口对进程管理器发送控制指令,
比如 'start'、'status' 和 'stop',
进程管理器再对他的子进程 (后台守护进程) 发送控制信号, 启动或者关闭守护进程。

通过进程管理器, 后台进程的控制就和后台进程的运行逻辑分开了。

zdaemon 也有一个致命缺点, 其过度对象化的设计, 使封装层次失控,
加上采用了底层的 Unix Socket, 使其代码量过度膨胀。

   人类能够理解的代码行数是有限制的。
      --《Unix 编程艺术》

   所以 Python 的语法决定了 Python 能够编写出目前最复杂的程序。
   代码量失控是 Python 的最大敌人, 这说明我们可能正在用 Java 写 Python。

为此我修改了 zdaemon, 将它实现在 270 行代码中。
用 Local XMLRPC Server (Unix Socket) 取代 zdaemon 的 Socket 底层,
为我赢得了不少行数。

同样, 在编写 Local XMLRPC Server 的时候, 再次遇到 Python 标准库 SimpleXMLRPCServer.py
的过度封装问题, 有兴趣的话大家可以阅读第三章 Local XMLRPC Server 的实现部分,

   过度向对象实际上就是完全无法重用, 这是一个典型的例子。

使用 XMLRPC Server 的额外好处,
就是让我们可以更方便地增加除了 start、stop、status 这种控制命令。

   daemon = Daemon(...)
   daemon.register_function(...)

   不过, 额外的控制命令, 通常是不需要的。

我的 daemon 程序 (daemon.py) 源码放在第三章中,
同样, 阅读源码是一件枯燥的事情, 你可以忽略掉第三章。
但是第二章是必须理解的。






第二章、使用 daemon.py 的通用范例
   你可以不用修改就用于你的项目

#!/usr/bin/env python2.5
# -*- coding: utf-8 -*-

import os, sys
from os import fork
from time import sleep
from sys import stderr, stdout
from socket import error as SocketError
from traceback import print_exc

import schema # 载入配置信息, 还记得《配置文件》一招鲜吗?
import pyetc # ... 要不要复习一下 :)

from daemon import Daemon, ServerProxy, Fault, \
   error as DaemonError

## #

def usage():
   print '使用方法: %s start|stop|status' %sys.argv[0]

def start():
   # 读取配置文件
   # demo.conf
   pyetc.load(schema.ETC('demo.conf'), env=schema.env)
   conf = schema.config.daemon
   # ... 请见《配置文件一招鲜》最后一个范例

   try:
      # 创建 Daemon 对象
      daemon = Daemon(
         address = conf.address, # 进程控制器地址/pid 文件位置
         program = conf.program, # 后台进程程序位置
         verbose = conf.verbose # 调试
         )

      print '进程管理器已经启动'

      # 启动后台进程
      #    daemon(arg1, arg2, ...)
      #    参数 arg1, arg2 ... 将被用于启动后台进程,
      #    这里相当于命令行:
      #       program.py arg1 arg2 ...
      daemon()

   except DaemonError, msg:
      print '进程管理器未启动, 原因是: ', msg

   except:
      print_exc(file=stderr)

def stop():
   pyetc.load(schema.ETC('demo.conf'), env=schema.env)
   conf = schema.config.daemon

   # 取得进程控制器
   try:
      daemon = ServerProxy(conf.address)
      daemon.stop()

      print '进程已经退出'

   except SocketError:
      print '进程管理器未启动'

   except Fault:
      print '进程管理器已被强制关闭'

   except:
      print_exc(file=stderr)

def status():
   pyetc.load(schema.ETC('demo.conf'), env=schema.env)
   conf = schema.config.daemon

   try:
      daemon = ServerProxy(conf.address)
      status = daemon.status()

      if status == 'running':
         print '进程 "%s" 正在运行' %conf.program

      elif status == 'stopped':
         stdout.write( ('进程管理器正在运行, 但是进程 "%s" 已经停止, '
            '正在停止进程管理器 ... '
            ) %conf.program )

         stop()

      else:
         print '进程管理器正在运行, 进程状态未知'

   except SocketError:
      print '进程管理器未启动'

   except:
      print_exc(file=stderr)

## #

# 解析参数

if len(sys.argv) != 2:
   usage()

elif sys.argv[1] == 'start':
   start()

elif sys.argv[1] == 'stop':
   stop()

elif sys.argv[1] == 'status':
   status()

else: usage()






第三章、附件: daemon.py

import os, sys
from httplib import HTTP, HTTPConnection
from pwd import getpwnam, getpwuid
from signal import signal as setsignal, SIGCHLD, SIGTERM
from sys import stderr, stdout
from urllib import urlopen
from SocketServer import UnixStreamServer
from socket import socket, error as SocketError, \
   AF_UNIX, SOCK_STREAM
from SimpleXMLRPCServer import SimpleXMLRPCDispatcher, \
   SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
from xmlrpclib import Fault, Transport, dumps as xmlrpc_dumps, \
   _Method as _XMLRPCMethod, ServerProxy as ServerProxy_N___G
from os import execv, chdir, chmod, fork, geteuid, getpid, \
   kill, setgid, setuid, umask, unlink, waitpid, \
   error as OSError, WNOHANG

class error(Exception): pass

class nul:
   write = staticmethod(lambda s: None)
   flush = staticmethod(lambda : None)
   read = staticmethod(lambda n: '' )

class UnixStreamXMLRPCServer(UnixStreamServer, SimpleXMLRPCDispatcher):
   def __init__(self, address, requestHandler=SimpleXMLRPCRequestHandler,
      allow_none=False, encoding=None):

      self.logRequests = False

      SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding)
      UnixStreamServer.__init__(self, address, requestHandler)

class Daemon:
   def __init__(self, **args):
      address = args[ 'address' ]
      allow_none = args.get( 'allow_none', True )
      encoding = args.get( 'encoding' , 'utf-8' )

      self.verbose = args.get( 'verbose' , False )
      self.stdout = args.get( 'stdout' , nul )
      self.stderr = args.get( 'stderr' , nul )

      try:
         ServerProxy(address).ping()
      except SocketError:
         pass
      else:
         raise error, 'Another daemon is already up using socket %s' %repr(address)

      if isinstance(address, str):
         try:
            unlink(address)
         except OSError:
            pass

         self.manager = UnixStreamXMLRPCServer(address,
            allow_none=allow_none, encoding=encoding)

         self.pidfile = address
      else:
         self.manager = SimpleXMLRPCServer(address,
            allow_none=allow_none, encoding=encoding)

      self.pid = None
      self.running = True

      self.program = args[ 'program' ]

      DaemonizeTools.setuid(
         user = args.get( 'user' , None ) )

      SignalTools.setsignals(self)

      DaemonizeTools.daemonize(
         directory = args.get( 'directory' , None ),
         umask = args.get( 'umask' , 022 ) )

      self.register_function = lambda *args: (
         self.manager.register_function(*args) )

      self.register_function(lambda: self.stop() , 'stop' )
      self.register_function(lambda: self.status(), 'status')
      self.register_function(lambda: True , 'ping' )

   def __call__(self, *args):
      if not self.verbose:
         DaemonizeTools.close_files(self.stdout, self.stderr)
         del self.stdout, self.stderr

      pid = fork()

      if pid != 0:
         self.pid = pid
         while self.running:
            self.manager.handle_request()

         return pid

      else:
         try:
            for i in xrange(3, 100):
               try:
                  os.close(i)
               except OSError:
                  pass

            try:
               execv(sys.executable, tuple(
                  [sys.executable, self.program] + list(args) ) )
            except OSError, err:
               print >> stderr, ( 'can\'t exec %r: %s\n'
                  % (self.program, err) )

         finally:
            os._exit(127)

   def status(self):
      if not self.pid:
         return 'stopped'
      else:
         return 'running'

   def stop(self):
      if not self.pid:
         self.running = False
         if hasattr(self, 'pidfile'):
            try:
               unlink(self.pidfile)
            except OSError:
               pass

         raise error, 'no subprocess running'

      kill(self.pid, SIGTERM)
      self.running = False
      if hasattr(self, 'pidfile'):
         try:
            unlink(self.pidfile)
         except OSError:
            pass

class UnixStreamHTTPConnection(HTTPConnection):
   def connect(self):
      self.sock = socket(AF_UNIX, SOCK_STREAM)
      self.sock.connect(self.host)

class UnixStreamHTTP(HTTP):
   _connection_class = UnixStreamHTTPConnection

class UnixStreamTransport(Transport):
   def make_connection(self, host):
      return UnixStreamHTTP(host)

class UnixStreamServerProxy_NG:
   def __init__(self, uri, transport=None, encoding=None, verbose=0,
      allow_none=0, use_datetime=0):

      self.__host = uri
      self.__handler = '/RPC2'

      if not transport:
         self.__transport = UnixStreamTransport(use_datetime=use_datetime)

      self.__encoding = encoding
      self.__verbose = verbose
      self.__allow_none = allow_none

   def __request(self, methodname, params):
      request = xmlrpc_dumps(params, methodname, encoding=self.__encoding,
         allow_none=self.__allow_none)

      response = self.__transport.request(
         self.__host, self.__handler, request,
         verbose=self.__verbose )

      if len(response) == 1:
         response = response[0]

      return response

   def __getattr__(self, name):
      return _XMLRPCMethod(self.__request, name)

def ServerProxy(address, **args):
   if isinstance(address, str):
      return UnixStreamServerProxy_NG(address, **args)

   else:
      host, port = address
      host = (host, '127.0.0.1')[host == '0.0.0.0']
      return ServerProxy_N___G('http://%s:%d' %(host, port), **args)

class DaemonizeTools:
   @staticmethod
   def setuid(**args):
      user = args['user']
      if user is None:
         return

      try:
         uid = int(user)

      except ValueError:
         try:
            pwrec = pwd.getpwnam(user)
         except KeyError:
            raise error, 'username %r not found' % user

         uid = pwrec[2]

      else:
         try:
            pwrec = pwd.getpwuid(uid)
         except KeyError:
            raise error, 'uid %r not found' % user

      euid = geteuid()
      if euid != 0 and euid != uid:
         raise error, 'only root can change users'

      setgid(pwrec[3])
      setuid(uid)

   @staticmethod
   def daemonize(**args):
      pid = fork()
      if pid != 0:
         os._exit(0)

      if args['directory']:
         try:
            chdir(args['directory'])
         except OSError, err:
            print >> stderr, ( 'can\'t chdir into %r: %s'
               % (args['directory'], err) )
         else:
            print >> stderr, ( 'set current directory: %r'
               % args['directory'] )

      os.setsid()
      umask(args['umask'])


   @staticmethod
   def close_files(stdout, stderr):
      os.close(0)
      sys.stdin = sys.__stdin__ = nul
      os.close(1)
      sys.stdout = sys.__stdout__ = stdout
      os.close(2)
      sys.stderr = sys.__stderr__ = stderr

class SignalTools:
   daemon = None

   @staticmethod
   def setsignals(daemon):
      SignalTools.daemon = daemon
      setsignal(SIGCHLD, SignalTools.sigchild)

   @staticmethod
   def sigchild(sig, frame):
      try:
         pid, sts = waitpid(-1, WNOHANG)
         if pid == SignalTools.daemon.pid:
            SignalTools.daemon.pid = None

      except OSError:
         return
  评论这张
 
阅读(9799)| 评论(9)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017