1
简单的Python 多进程异步处理
它启动后,监视队列,如果有新的请求进来,就fork 一个子进程去处理。
为了更易理解,删减了一些异常处理、日志等代码。
#!/usr/bin/env python
#encoding: utf-8
import logging
import os
import time
class Queue(object):
'''
基类,队列的抽象接口
'''
def pop(self):
pass
class QueueObserver:
'''
监视队列,若发现请求,建立新的进程处理之
'''
def __init__(self, queue, func, max=1, sleepInterval=0.1):
'''
queue - 必选,队列对象,必须继承自Queue 类,并实现pop 方法
func - 必选,要执行的函数引用
max - 可选,最多启动多少个进程,默认为1,单进程
sleepInterval - 可选,默认为0.1秒
'''
self.children = []
self.queue = queue
assert queue
self.func = func
assert func
self.max = max
self.sleepInterval = sleepInterval
def start(self):
while True:
item = self.queue.pop()
if item == None:
# Empty queue, sleepInterval and check it again
time.sleep(self.sleepInterval)
continue
# Got a job
pid = os.fork()
if pid:
# The parent
self.children.append(pid)
self.collect_children()
while len(self.children) >= self.max:
# Limit the number of forked processes
self.collect_children()
time.sleep(self.sleepInterval)
else:
# The child
ecode = 0
self.func(item)
logging.debug('P-%d has done: %s.' % (os.getpid(), item))
os._exit(ecode)
def collect_children(self):
'''
清理已完成的子进程
'''
while self.children:
try:
pid, status = os.waitpid(0, os.WNOHANG)
except os.error:
pid = None
if pid:
self.children.remove(pid)
else:
break
if __name__ == '__main__':
import redis
class RedisQueue(Queue):
'''
演示用的实现,基于Redis 的队列
'''
def __init__(self, host, port, key):
self.r = redis.Redis(host, port)
self.key = key
def pop(self):
return self.r.rpop(self.key)
def test(x):
logging.info(int(x) * 2)
logging.basicConfig(level=logging.DEBUG)
q = RedisQueue('localhost', 6300, 'Q')
qo = QueueObserver(q,test)
qo.start()
pid = os.fork()
if pid:
# The parent
self.children.append(pid)
self.collect_children()
while len(self.children) >= self.max:
# Limit the number of forked processes
self.collect_children()
time.sleep(self.sleepInterval)
应该改成
while len(self.children) >= self.max:
# Limit the number of forked processes
self.collect_children()
time.sleep(self.sleepInterval)
pid = os.fork()
不然还是会超,39最近出这个问题 居然搜到你博客来了 哈哈