Python | Python learning multi-process detailed explanation

Python | Python learning multi-process detailed explanation

Detailed process

How to understand parallel and concurrency?

  • Concurrency: The current number of tasks is more than the number of cores of the processor, which is called concurrency
  • Parallel: The current number of tasks is less than or equal to the number of processor cores, which is called parallel

In order to make full use of multi-core CPU resources, multi-processes are required in most cases in Python.

How do we create multiple processes in python?

The difference between process and thread

We are often confused by multi-processes and multi-threads. They seem to be the same in length, but they are essentially different. Many bigwigs have also made a lot of easy-to-understand explanations on the concept of processes and threads. Here we quote from teacher Ruan Yifeng’s blog post , You can go and take a look first to clarify the difference between threads and processes.

A simple explanation of processes and threads (http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html)

How to create multiple processes in python

1. Use the os.fork() method to create multiple processes
2. Use multiprocessing to create multiple processes
3. Use Pool in multiprocessing to create process pool objects

What is the difference between the methods of creating multiple processes?

Use os.fork() in the os module to create a process

Here, using os.fork() to create a process will return two values, one of which is zero. The other value is the ID of the created child process, where you can use os.getpid() to get the ID of the child process, and you can use os.getppid() to get the parent process ID. It should be noted that in the child process created by os.fork(), the parent process and the child process perform the same task, and when the executed task ends, the parent process will end by itself, and will not wait for the end of the child process to end. .

import os
res = os.fork()
if res == 0: # The value returned by the child process is always equal to 0
    print('This is a child process %s',os.getpid()) # Get the child process ID
else:
    print('This is the parent process %s',os.getppid()) # Get the parent process ID

Use the multiprocessing module to create multiple processes

The Process class is provided in multiprocessing to generate process instances
Process([group [, target [, name [, args [, kwargs]]]]])

among them:

1. Group grouping, not actually used
2. Target means the calling object, and the task execution function is passed in as a parameter
3. args means to provide parameters to the calling object in the form of tuples. For example, if target is a function a, it has two parameters m and n, then the parameter is args=(m, n), when there is only one parameter. The parameter is args=(m,)
4. kwargs represents the dictionary of the calling object
5. name is an alias, which is equivalent to giving a name to the process

At the same time, the following methods are provided in the Process class to implement the operation of the process:

1. start() allows the process to execute the object called by the target
2. Join() blocks, the default main process will not wait until the end of the child process ends, you need to use join() to make the main process wait for the end of the execution of the child process to end
3. terminate() ends the current, regardless of whether the task is completed or not

Give a chestnut:

# multiprocess
import os 
from multiprocessing import Process
def test(i):
    print('-----1111-----%s----%s'% (os.getpid(),os.getppid()),i)
if __name__ =='__main__':
    for i in range(1,10):
        p = Process(target=test,args=(i,))
        print(os.getpid())
        p.start()


# Output
5224
5224
5224
5224
5224
5224
5224
5224
-----1111-----4704----5224 1
-----1111-----1292----5224 4
-----1111-----5276----5224 2
-----1111-----5152----5224 6
5224
-----1111-----5196----5224 5
-----1111-----5272----5224 3
-----1111-----4640----5224 7
-----1111-----5684----5224 8
-----1111-----1044----5224 9
[Finished in 1.3s]

Use the Pool class in multiprocessing to create a process pool object

The method of using the Pool class to create a process is basically similar to the method of using the Process class to create a process. But you need to pay attention to the following points:

1. pool.apply_async() non-blocking
2. pool.apply() blocking
3. Pool.join() the main process. After creating/adding tasks, the main process will not wait until the tasks in the process pool are executed by default. Instead, it will end immediately when the tasks of the main process are completed. If no join( ) Will cause tasks in the process pool not to be executed.

Give a chestnut:

# pool
from multiprocessing import Pool
import os,time,random

def worker(msg):
    t_start = time.time()
    print("%s starts to execute, the process number is %d"%(msg,os.getpid()))
    #random.random() Randomly generate floating-point numbers between 0 and 1
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"Execution completed, time-consuming %0.2f"%(t_stop-t_start))


def main():
    po=Pool(3) #Define a process pool, the maximum number of processes is 3
    for i in range(0,10):
        #Pool.apply_async (the target to be called, (parameter ancestor passed to the target,))
        #Each cycle will use the idle child process to call the target
        po.apply_async(worker,(i,))

    print("----start----")
    po.close() #Close the process pool, po no longer receives new requests after closing
    po.join() #Wait for the completion of all child processes in po, which must be placed after the close statement
    print("-----end-----")


if __name__ =='__main__':
    main()

How to communicate between processes?

There are many ways to communicate between processes, including but not limited to named pipes, unnamed pipes, shared memory, queues, etc. Mainly learn about the use of queue-Queue

Use of Queue

1. Queue.qsize(): returns the number of messages contained in the current queue;
2. Queue.empty(): If the queue is empty, return True, otherwise False;
3. Queue.full(): If the queue is full, return True, otherwise False;
4. Queue.get([block[, timeout]]): Get a message in the queue and then remove it from the queue. The default value of block is True;
5. Queue.get_nowait(): equivalent to Queue.get(False);
6. Queue.put(item,[block[, timeout]]): write the item message to the queue, the default value of block is True;
7. Queue.put_nowait(item): equivalent to Queue.put(item, False);

First instantiate the Queue object, for example: p = Queue(num) where num can be empty or a negative number represents an unlimited number of acceptable messages. If the block uses the default value and the timeout (in seconds) is not set, if the message queue is empty, the program will be blocked (stopped in the reading state) until the message is read from the message queue. If the timeout is set, then It will wait for timeout seconds. If it has not read any message, it will throw a Queue.Empty exception; if the block value is False, if the message queue is empty, it will immediately throw a Queue.Empty exception; For example:

#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #Initialize a Queue object, which can receive up to three put messages
q.put("Message 1") 
q.put("Message 2")
print(q.full()) #False
q.put("Message 3")
print(q.full()) #True

#Because the message queue is full, the try below will throw an exception, the first try will wait 2 seconds before throwing an exception, the second Try will throw an exception immediately
try:
    q.put("Message 4",True,2)
except:
    print("The message queue is full, the number of existing messages: %s"%q.qsize())

try:
    q.put_nowait("Message 4")
except:
    print("The message queue is full, the number of existing messages: %s"%q.qsize())

#Recommended method, first judge whether the message queue is full, and then write
if not q.full():
    q.put_nowait("Message 4")

#When reading the message, first judge whether the message queue is empty, and then read
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

Give another chestnut:

from multiprocessing import Process, Queue
import os, time, random

# Write the code executed by the data process:
def write(q):
    for value in ['A','B','C']:
        print'Put %s to queue...'% value
        q.put(value)
        time.sleep(random.random())

# Read the code executed by the data process:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print'Get %s from queue.'% value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # The parent process creates a Queue and passes it to each child process:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # Start the child process pw, write:
    pw.start()    
    # Wait for pw to end:
    pw.join()
    # Start the child process pr, read:
    pr.start()
    pr.join()
    print'All data are written and read'

The use of Queue in the process pool

To create a pool process, you need to use Queue() in multiprocessing.Manager() instead of multiprocessing.Queue(), otherwise you will get the following error message:

RuntimeError: Queue objects should only be shared between processes through inheritance.

Give a chestnut:

#Modify Queue in import to Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader start (%s), parent process is (%s)"%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print("reader gets the message from Queue: %s"%q.get(True))

def writer(q):
    print("writer starts (%s), parent process is (%s)"%(os.getpid(),os.getppid()))
    for i in "dongGe":
        q.put(i)

if __name__=="__main__":
    print("(%s) start"%os.getpid())
    q=Manager().Queue() #Use the Queue in the Manager to initialize
    po=Pool()
    #Use blocking mode to create a process, so that there is no need to use an infinite loop in the reader, and you can use the reader to read after the writer is completely executed
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print("(%s) End"%os.getpid())

Write at the back

This is the first python study notes of Xianyu. It seems that it hasn't been updated for a long time, not because of laziness, because Xianyu is learning.

Reference: https://cloud.tencent.com/developer/article/1518516 Python | Detailed explanation of multiple processes of Python learning-Cloud + Community-Tencent Cloud