読み込み中...

Pythonでスレッド間通信を行う際の基本と活用8選

スレッド間通信 徹底解説 Python
この記事は約29分で読めます。

【サイト内のコードはご自由に個人利用・商用利用いただけます】

この記事では、プログラムの基礎知識を前提に話を進めています。

説明のためのコードや、サンプルコードもありますので、もちろん初心者でも理解できるように表現してあります。

本記事のサンプルコードを活用して機能追加、目的を達成できるように作ってありますので、是非ご活用ください。

※この記事は、一般的にプロフェッショナルの指標とされる『実務経験10,000時間以上』を満たす現役のプログラマチームによって監修されています。

※Japanシーモアは、常に解説内容のわかりやすさや記事の品質に注力しております。不具合、分かりにくい説明や不適切な表現、動かないコードなど気になることがございましたら、記事の品質向上の為にお問い合わせフォームにてご共有いただけますと幸いです。
(送信された情報は、プライバシーポリシーのもと、厳正に取扱い、処分させていただきます。)

●Pythonのスレッド間通信とは?

プログラミングで効率的なコードを書くことは、多くの開発者にとって重要な目標です。

特に、複数の処理を同時に行う必要がある場合、マルチスレッドプログラミングが非常に有効な手段となります。

Pythonでは、このマルチスレッドプログラミングを実現するための機能が豊富に用意されています。

マルチスレッドプログラミングの核心部分といえるのが、スレッド間通信です。

スレッド間通信とは、複数のスレッドが互いにデータや情報をやり取りする仕組みのことを指します。

各スレッドが独立して動作しながらも、必要に応じて協調して作業を進めるためには、この通信機能が欠かせません。

○マルチスレッドプログラミングの重要性

現代のコンピューターは、複数のコアを持つCPUが主流となっています。

この恩恵を最大限に活かすためには、マルチスレッドプログラミングの知識が不可欠です。

例えば、ウェブスクレイピングを行う際に、複数のページを同時に取得することで処理時間を大幅に短縮できます。

また、大規模なデータ処理や計算を行う場合も、タスクを分割して並列処理することで、効率的に作業を進められます。

マルチスレッドプログラミングを習得することで、プログラマーは複雑な問題をより効率的に解決できるようになります。

単一のスレッドで逐次的に処理を行う場合と比べて、処理速度が飛躍的に向上する可能性があります。

特に、入出力処理が多いアプリケーションや、複数のタスクを同時に実行する必要があるシステムで、その威力を発揮します。

○スレッド間通信の必要性と利点

スレッド間通信が必要となる典型的な状況として、複数のスレッドが共有リソースにアクセスする場合があります。

例えば、あるスレッドがデータを生成し、別のスレッドがそのデータを処理するようなシナリオです。

この場合、データの受け渡しを適切に行わないと、データの整合性が失われたり、予期せぬエラーが発生したりする可能性があります。

スレッド間通信の利点は多岐にわたります。

まず、処理の並列化により、全体的な実行速度が向上します。

また、タスクを適切に分割することで、プログラムの構造がより明確になり、保守性が高まります。

さらに、リアルタイムシステムやインタラクティブなアプリケーションの開発において、スレッド間通信は欠かせない要素となっています。

○基本的な通信メカニズム

Pythonにおけるスレッド間通信の基本的なメカニズムには、いくつかの方法があります。

最も単純なのは、グローバル変数を使用する方法です。

しかし、この方法は競合状態を引き起こす可能性があるため、注意が必要です。

より安全な方法として、Queueモジュールの使用があります。

Queueは、スレッドセーフなデータ構造を提供し、複数のスレッド間でデータを安全にやり取りできます。

また、Eventオブジェクトを使用すると、あるスレッドが別のスレッドに特定のイベントが発生したことを通知できます。

これは、スレッド間の同期を取る際に非常に有用です。

Lockメカニズムは、共有リソースへのアクセスを制御するために使用されます。

あるスレッドがリソースを使用している間、他のスレッドがそのリソースにアクセスするのを防ぎます。

●Pythonでのスレッド間通信/8つの実践テクニック

Pythonプログラミングの醍醐味は、複雑な問題をシンプルに解決できることにあります。スレッド間通信もその例外ではありません。

初心者の方々も、心配する必要はありません。

順を追って学んでいけば、誰でも習得できる技術なのです。

さて、ここからが本題です。

Pythonでのスレッド間通信には、様々な方法があります。

単純なものから高度なものまで、8つの実践的なテクニックを紹介します。

各テクニックには、それぞれ特徴があり、状況に応じて使い分けることが重要です。

○サンプルコード1:グローバル変数を使った簡単な通信

最も基本的な方法は、グローバル変数を使用することです。

シンプルですが、注意点もあります。

import threading
import time

# グローバル変数
shared_variable = 0

def increment_variable():
    global shared_variable
    for _ in range(1000000):
        shared_variable += 1

def main():
    thread1 = threading.Thread(target=increment_variable)
    thread2 = threading.Thread(target=increment_variable)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print(f"最終的な値: {shared_variable}")

if __name__ == "__main__":
    main()

実行結果は予想外かもしれません。

最終的な値: 1345678

予想では2000000になるはずでした。しかし、実際の結果は異なります。

なぜでしょうか?

理由は単純です。

二つのスレッドが同時に変数を更新しようとするため、競合状態が発生します。

結果として、一部の更新が失われてしまいます。

○サンプルコード2:Queueを使ったデータ転送

Queueは、スレッド間で安全にデータを受け渡すための優れた方法です。

生産者・消費者パターンの実装に適しています。

import threading
import queue
import time

def producer(q):
    for i in range(5):
        q.put(i)
        print(f"生産: {i}")
        time.sleep(1)

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消費: {item}")
        q.task_done()

def main():
    q = queue.Queue()
    producer_thread = threading.Thread(target=producer, args=(q,))
    consumer_thread = threading.Thread(target=consumer, args=(q,))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    q.put(None)  # 終了シグナル
    consumer_thread.join()

if __name__ == "__main__":
    main()

実行結果を見てみましょう。

生産: 0
消費: 0
生産: 1
消費: 1
生産: 2
消費: 2
生産: 3
消費: 3
生産: 4
消費: 4

Queueを使うことで、生産者と消費者が協調して動作していることがわかります。

データの受け渡しが安全に行われています。

○サンプルコード3:Eventでスレッド間の制御を実現

Eventは、あるスレッドから別のスレッドに「何かが起こった」ことを通知するのに便利です。

例えば、特定の条件が満たされたときに処理を開始したい場合に使用できます。

import threading
import time

def worker(event):
    print("作業者:待機中...")
    event.wait()
    print("作業者:仕事開始!")

def main():
    event = threading.Event()
    thread = threading.Thread(target=worker, args=(event,))
    thread.start()

    time.sleep(3)  # 3秒待機
    print("メイン:シグナル送信")
    event.set()

    thread.join()

if __name__ == "__main__":
    main()

実行結果は次のようになります。

作業者:待機中...
メイン:シグナル送信
作業者:仕事開始!

Eventを使用することで、スレッド間で適切なタイミングを合わせることができます。

非同期プログラミングの世界では、タイミング制御が極めて重要です。

○サンプルコード4:Lockを使ったリソース保護

共有リソースへのアクセスを制御するには、Lockが効果的です。

競合状態を防ぎ、データの整合性を保つことができます。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

def main():
    threads = []
    for _ in range(5):
        thread = threading.Thread(target=increment)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"カウンター最終値: {counter}")

if __name__ == "__main__":
    main()

実行結果は次の通りです。

カウンター最終値: 500000

Lockを使用することで、複数のスレッドが同時にcounterを更新することを防ぎます。

結果として、期待通りの値が得られます。

○サンプルコード5:Conditionを用いた高度な同期処理

Conditionは、複雑な同期シナリオに対応できる強力なツールです。

例えば、生産者・消費者パターンをより洗練された形で実装できます。

import threading
import time

class Buffer:
    def __init__(self, size):
        self.buffer = []
        self.size = size
        self.condition = threading.Condition()

    def add(self, item):
        with self.condition:
            while len(self.buffer) == self.size:
                self.condition.wait()
            self.buffer.append(item)
            self.condition.notify()

    def remove(self):
        with self.condition:
            while len(self.buffer) == 0:
                self.condition.wait()
            item = self.buffer.pop(0)
            self.condition.notify()
            return item

def producer(buffer):
    for i in range(10):
        buffer.add(i)
        print(f"生産: {i}")
        time.sleep(0.1)

def consumer(buffer):
    for _ in range(10):
        item = buffer.remove()
        print(f"消費: {item}")
        time.sleep(0.2)

def main():
    buffer = Buffer(5)
    producer_thread = threading.Thread(target=producer, args=(buffer,))
    consumer_thread = threading.Thread(target=consumer, args=(buffer,))

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

if __name__ == "__main__":
    main()

実行結果を見てみましょう。

生産: 0
生産: 1
消費: 0
生産: 2
生産: 3
消費: 1
生産: 4
生産: 5
消費: 2
生産: 6
消費: 3
生産: 7
消費: 4
生産: 8
消費: 5
生産: 9
消費: 6
消費: 7
消費: 8
消費: 9

Conditionを使用することで、生産者と消費者のバランスを取りながら、効率的にデータを処理できています。

○サンプルコード6:Semaphoreによるアクセス制限

Semaphoreは、同時に実行可能なスレッド数を制限したい場合に便利です。

例えば、データベース接続のプールを管理する際に使用できます。

import threading
import time
import random

class DatabaseConnection:
    def query(self):
        time.sleep(random.random())

semaphore = threading.Semaphore(3)  # 最大3つの同時接続

def worker(worker_id):
    with semaphore:
        print(f"ワーカー {worker_id} がデータベースに接続")
        connection = DatabaseConnection()
        connection.query()
        print(f"ワーカー {worker_id} が接続解除")

def main():
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(i,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

実行結果は次のようになります(実行ごとに順序が変わる可能性があります)。

ワーカー 0 がデータベースに接続
ワーカー 1 がデータベースに接続
ワーカー 2 がデータベースに接続
ワーカー 1 が接続解除
ワーカー 3 がデータベースに接続
ワーカー 0 が接続解除
ワーカー 4 がデータベースに接続
ワーカー 2 が接続解除
ワーカー 5 がデータベースに接続
ワーカー 3 が接続解除
ワーカー 6 がデータベースに接続
ワーカー 4 が接続解除
ワーカー 7 がデータベースに接続
ワーカー 5 が接続解除
ワーカー 8 がデータベースに接続
ワーカー 6 が接続解除
ワーカー 9 がデータベースに接続
ワーカー 7 が接続解除
ワーカー 8 が接続解除
ワーカー 9 が接続解除

Semaphoreを使用することで、同時に3つまでのワーカーがデータベースに接続できるよう制限しています。

○サンプルコード7:multiprocessing.Valueの活用

multiprocessing.Valueは、プロセス間で共有可能な値を作成します。

スレッド間でも使用でき、型安全性が高いという利点があります。

import threading
import multiprocessing

def increment(value):
    for _ in range(1000000):
        with value.get_lock():
            value.value += 1

def main():
    value = multiprocessing.Value('i', 0)
    threads = []
    for _ in range(4):
        thread = threading.Thread(target=increment, args=(value,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"最終値: {value.value}")

if __name__ == "__main__":
    main()

実行結果は次の通りです。

最終値: 4000000

multiprocessing.Valueを使用することで、型安全性を保ちつつ、正確な結果を得ることができました。

○サンプルコード8:Pipeによる双方向通信の実装

Pipeは、双方向通信を実現するための優れた方法です。

2つのプロセスやスレッド間でデータを送受信できます。

import threading
import multiprocessing

def sender(conn):
    for i in range(5):
        conn.send(f"メッセージ {i}")
    conn.send(None)  # 終了シグナル

def receiver(conn):
    while True:
        message = conn.recv()
        if message is None:
            break
        print(f"受信: {message}")

def main():
    parent_conn, child_conn = multiprocessing.Pipe()

    sender_thread = threading.Thread(target=sender, args=(parent_conn,))
    receiver_thread = threading.Thread(target=receiver, args=(child_conn,))

    sender_thread.start()
    receiver_thread.start()

    sender_thread.join()
    receiver_thread.join()

if __name__ == "__main__":
    main()

実行結果は次のようになります。

受信: メッセージ 0
受信: メッセージ 1
受信: メッセージ 2
受信: メッセージ 3
受信: メッセージ 4

Pipeを使用することで、2つのスレッド間で効率的にデータを送受信できることがわかります。

●よくあるエラーと対処法

プログラミングでは、エラーは避けられません。

特にマルチスレッドプログラミングでは、複雑な状況が発生しやすく、予期せぬ問題に直面することがあります。

しかし、心配する必要はありません。

主なエラーとその対処法を知っておけば、多くの問題を未然に防ぐことができます。

○競合状態(Race Condition)の解決策

競合状態は、複数のスレッドが同じリソースに同時にアクセスしようとする際に発生します。

例えば、二人が同時に一つの椅子に座ろうとするようなものです。

解決策としては、ロックメカニズムの使用が効果的です。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print(f"カウンター: {counter}")

実行結果

カウンター: 500000

lockを使用することで、一度に一つのスレッドだけがcounterを更新できるようになります。

結果として、競合状態を回避し、正確な結果を得ることができます。

○デッドロックの回避方法

デッドロックは、複数のスレッドが互いに相手が保持しているリソースを待っている状態です。

まるで、二人が互いに道を譲り合って動けなくなるような状況です。

回避方法の一つは、リソースの獲得順序を常に一定にすることです。

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def worker1():
    with lock1:
        print("ワーカー1: lock1獲得")
        with lock2:
            print("ワーカー1: lock2獲得")

def worker2():
    with lock1:  # lock2ではなくlock1を先に獲得
        print("ワーカー2: lock1獲得")
        with lock2:
            print("ワーカー2: lock2獲得")

thread1 = threading.Thread(target=worker1)
thread2 = threading.Thread(target=worker2)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

実行結果

ワーカー1: lock1獲得
ワーカー1: lock2獲得
ワーカー2: lock1獲得
ワーカー2: lock2獲得

両方のワーカーが同じ順序でロックを獲得することで、デッドロックを回避しています。

○メモリリークの防止策

メモリリークは、プログラムが不要になったメモリを解放しない問題です。

水道の蛇口を閉め忘れて水が溢れ出すようなものです。

Pythonでは、ガベージコレクションが自動的に行われますが、循環参照には注意が必要です。

import weakref

class Node:
    def __init__(self, value):
        self.value = value
        self.parent = None
        self.children = []

    def add_child(self, child):
        self.children.append(child)
        child.parent = weakref.ref(self)  # 弱参照を使用

root = Node(1)
child = Node(2)
root.add_child(child)

# rootとchildの参照を削除
del root
del child

# ガベージコレクションが自動的に行われる

weakrefモジュールを使用することで、循環参照によるメモリリークを防ぐことができます。

親ノードへの参照を弱参照にすることで、ガベージコレクタがオブジェクトを適切に回収できるようになります。

●スレッド間通信の応用例

理論を学んだら、実践です。スレッド間通信の技術は、様々な場面で活用できます。

具体的な応用例を見ていきましょう。

○サンプルコード9:大規模データ処理での活用

大量のデータを処理する際、マルチスレッドを使うと処理速度を大幅に向上させることができます。

例えば、大きなテキストファイルから特定の単語を検索する場合を考えてみましょう。

import threading
import queue

def search_word(file_chunk, target_word, result_queue):
    count = 0
    for line in file_chunk:
        count += line.count(target_word)
    result_queue.put(count)

def parallel_search(filename, target_word, num_threads=4):
    with open(filename, 'r') as file:
        content = file.readlines()

    chunk_size = len(content) // num_threads
    chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]

    result_queue = queue.Queue()
    threads = []

    for chunk in chunks:
        thread = threading.Thread(target=search_word, args=(chunk, target_word, result_queue))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    total_count = 0
    while not result_queue.empty():
        total_count += result_queue.get()

    return total_count

# 使用例
filename = "large_text_file.txt"
target_word = "Python"
occurrences = parallel_search(filename, target_word)
print(f"'{target_word}'の出現回数: {occurrences}")

実行結果(ファイルの内容によって異なります)

'Python'の出現回数: 1250

複数のスレッドでファイルの異なる部分を同時に検索することで、処理速度を大幅に向上させることができます。

○サンプルコード10:リアルタイムシステムでの使用

リアルタイムシステムでは、データの生成と消費が同時に行われることがあります。

例えば、センサーからデータを継続的に受信し、それを処理するシステムを考えてみましょう。

import threading
import queue
import time
import random

def sensor(data_queue):
    while True:
        # センサーからデータを読み取る(ここではランダムな値を生成)
        data = random.randint(0, 100)
        data_queue.put(data)
        time.sleep(0.1)  # 100ミリ秒ごとにデータを生成

def processor(data_queue, result_queue):
    while True:
        data = data_queue.get()
        # データを処理(ここでは単純に2倍にする)
        result = data * 2
        result_queue.put(result)

def display(result_queue):
    while True:
        result = result_queue.get()
        print(f"処理結果: {result}")

data_queue = queue.Queue()
result_queue = queue.Queue()

sensor_thread = threading.Thread(target=sensor, args=(data_queue,))
processor_thread = threading.Thread(target=processor, args=(data_queue, result_queue))
display_thread = threading.Thread(target=display, args=(result_queue,))

sensor_thread.start()
processor_thread.start()
display_thread.start()

# メインスレッドで10秒間実行
time.sleep(10)

実行結果(一部)

処理結果: 138
処理結果: 116
処理結果: 72
処理結果: 18
処理結果: 180
...

マルチスレッドとキューを使用することで、データの生成、処理、表示を並行して行うことができます。

センサーからのデータ取得が処理や表示の速度に影響されません。

○サンプルコード11:ゲーム開発での並行処理

ゲーム開発では、画面の描画、ユーザー入力の処理、ゲームロジックの更新など、多くの処理を同時に行う必要があります。

簡単な例として、キャラクターが動き回る2Dゲームを考えてみましょう。

import threading
import time
import random

class Character:
    def __init__(self, name):
        self.name = name
        self.x = 0
        self.y = 0
        self.lock = threading.Lock()

    def move(self):
        with self.lock:
            self.x += random.randint(-1, 1)
            self.y += random.randint(-1, 1)

def character_movement(character):
    while True:
        character.move()
        time.sleep(0.5)  # 0.5秒ごとに移動

def game_logic(characters):
    while True:
        print("ゲーム状態:")
        for character in characters:
            with character.lock:
                print(f"{character.name}: ({character.x}, {character.y})")
        time.sleep(1)  # 1秒ごとに状態を表示

characters = [Character("プレイヤー"), Character("敵1"), Character("敵2")]

movement_threads = [threading.Thread(target=character_movement, args=(char,)) for char in characters]
logic_thread = threading.Thread(target=game_logic, args=(characters,))

for thread in movement_threads:
    thread.start()
logic_thread.start()

# メインスレッドで30秒間実行
time.sleep(30)

実行結果(一部)

ゲーム状態:
プレイヤー: (1, -2)
敵1: (0, 1)
敵2: (-1, 0)
ゲーム状態:
プレイヤー: (2, -1)
敵1: (-1, 2)
敵2: (-2, 1)
...

複数のスレッドを使用することで、各キャラクターの動きとゲーム状態の表示を同時に管理できます。

○サンプルコード12:ウェブスクレイピングの高速化

ウェブスクレイピングは、複数のウェブページから情報を収集する作業です。

マルチスレッドを使用することで、同時に複数のページにアクセスし、処理速度を大幅に向上させることができます。

import threading
import queue
import requests
from bs4 import BeautifulSoup

def scrape_url(url, result_queue):
    try:
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.title.string if soup.title else "No title"
        result_queue.put((url, title))
    except Exception as e:
        result_queue.put((url, f"Error: {str(e)}"))

def parallel_scrape(urls, max_threads=5):
    result_queue = queue.Queue()
    threads = []

    for url in urls:
        thread = threading.Thread(target=scrape_url, args=(url, result_queue))
        threads.append(thread)
        thread.start()

        # スレッド数を制限
        if len(threads) >= max_threads:
            for thread in threads:
                thread.join()
            threads = []

    # 残りのスレッドを待機
    for thread in threads:
        thread.join()

    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    return results

# 使用例
urls = [
    "https://www.python.org",
    "https://www.google.com",
    "https://www.github.com",
    "https://www.stackoverflow.com",
    "https://www.reddit.com"
]

results = parallel_scrape(urls)
for url, title in results:
    print(f"URL: {url}")
    print(f"Title: {title}")
    print()

実行結果

URL: https://www.python.org
Title: Welcome to Python.org

URL: https://www.google.com
Title: Google

URL: https://www.github.com
Title: GitHub: Let's build from here · GitHub

URL: https://www.stackoverflow.com
Title: Stack Overflow - Where Developers Learn, Share, & Build Careers

URL: https://www.reddit.com
Title: Reddit - Dive into anything

マルチスレッドを使用することで、複数のウェブサイトから同時にデータを取得し、処理速度を大幅に向上させることができます。

まとめ

Pythonにおけるスレッド間通信は、効率的なプログラミングの鍵となる重要な技術です。

本記事では、基本的な概念から実践的なテクニック、さらには応用例まで幅広く解説しました。

今回学んだ知識を土台に、さらなる高みを目指してプログラミングスキルを磨いていってください。