py

作者: sustainer123 (caster)   2024-09-28 19:01:43
利用多進程實現簡單的分散式運算
process可以分散到多台機器運行
multiprocessing的manager可以實現這功能
他把queue丟到網路上並被其他機器讀取
py:
import random, os, time
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
task_queue = Queue()
result_queue = Queue()
def get_task_queue():
return task_queue
def get_result_queue():
return result_queue
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable=get_task_queue)
QueueManager.register('get_result_queue', callable=get_result_queue)
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
print(os.getpid())
s = manager.get_server()
s.serve_forever()
這邊先建立伺服器管理任務
這邊分成四個步驟:
1.建立queue queue負責進程間的交流
2.將創建的queue丟到網上,callable是能調用的對象,其他機器可以透過網路直接
調用這函數,然後透過函數取得queue
3.建立manager,設定端口與密碼,密碼需要二進位,所以前面有個b
4.啟動伺服器
運行這段程式碼 其他機器就能連進伺服器
py:
import random, time, os
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
n = random.randint(0, 10000)
print(f'put task {n}')
task.put(n)
print('少女祈禱中')
for i in range(10):
try:
r = result.get(timeout=10)
print(f'result {r}')
except Exception as e:
print(e)
print(os.getpid())
這段程式碼的功能是產生資料,存入任務queue,等待,讀取運算結果
這邊是負責執行任務的機器
首先使用QueueManager註冊要調用的函數
第二步 連結伺服器
第三步 取得queue
第四步 執行任務
py:
import time, queue, os
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
try:
n = task.get(timeout=1)
print(f'run task {n}')
r = f'{n} * {n} = {n * n}'
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
print(os.getpid() )
print('worker exit.')
這段程式碼的功能是對task queue裡面的數字平方
完成後放入 result queue
大致步驟跟上面一樣 不贅述
透過這三段程式碼 我們就完成了一個簡單的分散式架構
我印象Celery dask之類的好像更常用
但我沒修過分散式系統 就請其他大師說明
另外有錯的部分歡迎指正 感謝
程式碼參考下面兩個網站 不過直接照抄跑不起來 所以我有另外修改
參考資料:
https://liaoxuefeng.com/books/python/process-thread/process-manager/index.html
https://docs.python.org/3.10/library/multiprocessing.html

Links booklink

Contact Us: admin [ a t ] ucptt.com