python分布式进程

Author Avatar
第五季 2017.10.23
字数:826字 时长:4分钟
  • 微信扫一扫分享

概述

  Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务发布到其他多个进程中,依靠网络通信。managers模块封装内部网络通信的细节,提供给我们精简的接口来编写分布式进程程序。
  managers子模块使用Queue进行多进程的通信,通过managers模块把Queue注册到网络上面,就可以让其他机器的进程访问Queue了,进而获取列表中的任务执行,执行完成后可以将结果保存到Queue,然后在调度机器上轮训读取结果处理。

调度进程

  调度进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
from Queue import Queue
from multiprocessing.managers import BaseManager
import multiprocessing
import time
from job import Job

# 发送任务的队列:
task_queue = Queue(10)
# 接收结果的队列:
result_queue = Queue()
dispatch_jobs = Queue()


def return_task_queue():
global task_queue
return task_queue


def return_result_queue():
global result_queue
return result_queue


def add_job(manager):
global dispatch_jobs
dispatch_jobs = manager.get_task_queue()
job_id = 0
while 1:
job_id += 1
if not dispatch_jobs.full():
job = Job(job_id)
print 'Dispatch job: %s' % job.job_id
dispatch_jobs.put(job)
time.sleep(1)
else:
time.sleep(1)
print 'Task full, please wait',


def handle_job_result(manager):
finished_jobs = manager.get_result_queue()
while not dispatch_jobs.empty():
job = finished_jobs.get()
print 'Finished Job: %s' % job.job_id
manager.shutdown()


if __name__ == "__main__":
multiprocessing.freeze_support()
BaseManager.register('get_task_queue', callable=return_task_queue)
BaseManager.register('get_result_queue', callable=return_result_queue)

manager = BaseManager(address=("127.0.0.1", 5000), authkey="jobs")
manager.start()

threading.Thread(target=add_job, args=(manager,)).start()
threading.Thread(target=handle_job_result, args=(manager,)).start()

  当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

任务进程

  然后,在另一台机器上启动任务进程(本机上启动也可以):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from Queue import Queue
from multiprocessing.managers import BaseManager
import time
import os

class Slave:
def __init__(self):
self.dispatch_job_queue = Queue()
self.finished_job_queue = Queue()

def start(self):
# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')

# 连接master
server = '127.0.0.1'
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, 5000), authkey='jobs')
manager.connect()

# 使用上面注册的方法获取队列
dispatch_jobs = manager.get_task_queue()
finished_jobs = manager.get_result_queue()

while 1:
job = dispatch_jobs.get()
print "%s Run job %s" % (os.getpid(), job.job_id)
time.sleep(2)
finished_jobs.put(job)

if __name__ == "__main__":
slave = Slave()
slave.start()

小结

  因为任务进程要通过网络连接到调度进程,所以每个任务进程需要指定调度进程的IP。
  注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由任务进程再去共享的磁盘上读取文件。