您的位置:首页 > 热点 >

看点:多线程编程系列之线程间通信和协作

2023-06-08 09:55:30 来源:今日头条
一、 线程间通信的方式和实现

在多线程编程中,线程间通信是非常常见的需求,它指的是多个线程之间通过某种机制来交换信息,协调彼此的行为。线程间通信的方式常用的有以下几种:

共享内存:线程之间可以通过共享内存来交换信息,每个线程拥有对共享内存区域的读写权限。一般情况下需要使用锁来保证共享内存的同步与互斥。


【资料图】

using System.Threading;class Program {    private static int data = 0;    private static bool isRunning = true;    static void Main(string[] args) {        using (var mutex = new Mutex(false, "sharedMutex"))        using (var mappedFile = MemoryMappedFile.CreateOrOpen("sharedMemory", 1024))        using (var accessor = mappedFile.CreateViewAccessor()) {            var thread1 = new Thread(() => {                while (isRunning) {                    mutex.WaitOne();                    accessor.Write(0, data);                    mutex.ReleaseMutex();                    Thread.Sleep(1000);                }            });            var thread2 = new Thread(() => {                while (isRunning) {                    mutex.WaitOne();                    data = accessor.ReadInt32(0);                    mutex.ReleaseMutex();                    Console.WriteLine($"Data received: {data}");                    Thread.Sleep(1000);                }            });            thread1.Start();            thread2.Start();            Console.WriteLine("Press enter to exit.");            Console.ReadLine();            isRunning = false;            thread1.Join();            thread2.Join();        }    }}

该程序创建了一个名为 sharedMemory 的内存映射文件和一个名为 sharedMutex 的互斥对象。两个线程分别负责将数据写入内存映射文件和读取内存映射文件中的数据。在操作前,需要通过互斥对象进行同步与互斥。

消息传递:线程之间可以通过发送消息来交换信息,每个线程拥有一个接收缓冲区和一个发送缓冲区。这里的消息指的是数据包或者数据流,线程之间通过操作缓冲区来完成数据交换。

using System.Collections.Concurrent;class Program {    private static bool isRunning = true;    private static BlockingCollection messageQueue = new BlockingCollection();    static void Main(string[] args) {        var thread1 = new Thread(() => {            while (isRunning) {                var message = messageQueue.Take();                Console.WriteLine($"Thread 1 received message: {message}");            }        });        var thread2 = new Thread(() => {            while (isRunning) {                var message = messageQueue.Take();                Console.WriteLine($"Thread 2 received message: {message}");            }        });        thread1.Start();        thread2.Start();        // Simulate sending some messages        messageQueue.Add("Hello from thread 1");        messageQueue.Add("Hello from thread 2");        messageQueue.Add("Hello again from thread 1");        messageQueue.Add("Goodbye from thread 2");        Console.WriteLine("Press enter to exit.");        Console.ReadLine();        isRunning = false;        thread1.Join();        thread2.Join();    }}

该程序使用 BlockingCollection 类来实现简单的消息队列,两个线程分别负责从消息队列中取出消息并进行处理。在发送消息时,可以将其添加到消息队列中。

管道:线程之间可以通过管道来交换信息,一个线程将数据写入管道,另一个线程则从管道中读取数据。管道本质上也是一种共享内存的方式,并且会自动进行同步(管道的大小是有限制的)

信号量:线程之间可以通过信号量来同步和互斥访问资源,一个线程获取信号量后就可以进行访问操作,其他线程则需要等待。信号量可以用于实现进程之间的同步和互斥,但在多线程应用中使用时需要注意信号量的实现。

using System.Threading;class Program {    private static bool isRunning = true;    private static AutoResetEvent signal = new AutoResetEvent(false);    static void Main(string[] args) {        var thread1 = new Thread(() => {            while (isRunning) {                Console.WriteLine("Thread 1 is waiting...");                signal.WaitOne();                Console.WriteLine("Thread 1 received signal.");            }        });        var thread2 = new Thread(() => {            while (isRunning) {                Console.WriteLine("Thread 2 is waiting...");                signal.WaitOne();                Console.WriteLine("Thread 2 received signal.");            }        });        thread1.Start();        thread2.Start();        // Send signals to the threads        signal.Set(); // signals only one of the waiting threads        signal.Set(); // signals the other waiting thread        Console.WriteLine("Press enter to exit.");        Console.ReadLine();        isRunning = false;        signal.Set(); // unblock any waiting threads        thread1.Join();        thread2.Join();    }}

该程序使用 AutoResetEvent 类来实现线程间的同步。两个线程等待信号并进行处理,主线程发送信号来通知等待的线程进行处理。可以使用 Set() 方法发送信号并使用 WaitOne() 方法等待信号。

using System.Threading;class Program {    private static bool isRunning = true;    private static ManualResetEvent signal = new ManualResetEvent(false);    static void Main(string[] args) {        var thread1 = new Thread(() => {            while (isRunning) {                Console.WriteLine("Thread 1 is waiting...");                signal.WaitOne();                Console.WriteLine("Thread 1 received signal.");                signal.Reset(); // reset the signal            }        });        var thread2 = new Thread(() => {            while (isRunning) {                Console.WriteLine("Thread 2 is waiting...");                signal.WaitOne();                Console.WriteLine("Thread 2 received signal.");                signal.Reset(); // reset the signal            }        });        thread1.Start();        thread2.Start();        // Send signals to the threads        signal.Set(); // signals both of the waiting threads        Console.WriteLine("Press enter to exit.");        Console.ReadLine();        isRunning = false;        signal.Set(); // unblock any waiting threads        thread1.Join();        thread2.Join();    }}

该程序使用 ManualResetEvent 类来实现线程间的同步。两个线程等待信号并进行处理,主线程发送信号来通知等待的线程进行处理。可以使用 Set() 方法发送信号并使用 WaitOne() 方法等待信号,同时使用 Reset() 方法将信号状态重置为未发出状态,以便下次等待。

互斥锁:线程之间可以通过互斥锁来同步和互斥访问共享资源,一个线程获取锁后就可以进行访问操作,其他线程则需要等待释放锁。互斥锁是一种经典的同步和互斥机制,在多线程编程中用得比较广泛,一般和条件变量一起使用。

using System.Threading;class Program {    private static bool isRunning = true;    private static object lockObject = new object();    private static int counter = 0;    static void Main(string[] args) {        var thread1 = new Thread(() => {            while (isRunning) {                lock (lockObject) {                    while (counter % 2 == 1) {                        Monitor.Wait(lockObject);                    }                    Console.WriteLine($"Thread 1: {counter++}");                    Monitor.PulseAll(lockObject);                }            }        });        var thread2 = new Thread(() => {            while (isRunning) {                lock (lockObject) {                    while (counter % 2 == 0) {                        Monitor.Wait(lockObject);                    }                    Console.WriteLine($"Thread 2: {counter++}");                    Monitor.PulseAll(lockObject);                }            }        });        thread1.Start();        thread2.Start();        Console.WriteLine("Press enter to exit.");        Console.ReadLine();        isRunning = false;        thread1.Join();        thread2.Join();    }}
二、同步和异步线程间通信的比较

同步和异步线程间通信的主要区别在于调用者是否需要等待被调用者完成任务才能继续执行下一步操作。

同步线程间通信指的是调用者主动向被调用者请求一个任务,并等待被调用者完成后再继续执行。这种模式对于简单的应用程序来说很容易实现,但有时会引发线程死锁的问题,因为如果多个线程都在等待对方完成任务,就会形成死循环。

异步线程间通信则是被调用者在处理任务的同时,通知调用者任务的状态。这种模式可以提高程序的响应速度,因为调用者可以继续执行其他任务,而不必等待被调用者完成任务才能进行下一步操作。

C# 语言提供了多种方式来实现线程间的同步和异步通信。其中,同步通信可以使用 Mutex、Semaphore 和 Monitor 等互斥量类来实现线程锁定和等待,在获取到资源后再释放锁定。异步通信可以使用委托、事件和 Completion 是C# 5.0 开始的异步编程功能,可以使用 async 和 await 关键字来快速实现异步编程。

三、 多个线程协作完成任务

在多线程编程中,有时我们需要多个线程协作完成一个复杂的任务。这些线程需要互相通信、协调以达到同一目标。下面是一些常用的多线程协作技术:

信号量 Semaphore:Semaphore 可以用来控制某一资源的访问权,比如网络连接数限制、数据库连接池等。Semaphore 通过计数器来控制资源的数量,并提供了 Acquire 和 Release 等方法来允许或阻塞线程访问资源。多个线程可以共享一个 Semaphore,当 Semaphore 计数为 0 时,其他线程就需要等待。

Mutex:Mutex 是一种操作系统提供的同步机制,它可以保证在同一时刻只有一个线程访问共享资源。Mutex 提供了 Lock 和 Unlock 等方法来保护临界区。如果一个线程获得 Mutex,其他线程就必须等待直到该线程释放 Mutex。

AutoResetEvent 和 ManualResetEvent:这两种事件用于线程间的同步,AutoResetEvent 的 WaitOne 方法会阻塞当前线程直到事件被发出,发出后事件重置为未发出状态;ManualResetEvent 则不会自动重置,需要调用 Reset 方法手动将事件重置为未发出状态。

CountdownEvent:CountdownEvent通常用于多个线程都需要完成某个任务后才能继续执行的场景。当所有线程都完成任务后,调用Done方法通知CountdownEvent,等待的线程就会被唤醒。

关键词: