Pythonにおけるthreading.Threadの使い方10選

threading.Threadの徹底解説Python
この記事は約45分で読めます。

※本記事のコンテンツは、利用目的を問わずご活用いただけます。実務経験10000時間以上のエンジニアが監修しており、基礎知識があれば初心者にも理解していただけるように、常に解説内容のわかりやすさや記事の品質に注力しております。不具合・分かりにくい説明や不適切な表現、動かないコードなど気になることがございましたら、記事の品質向上の為にお問い合わせフォームにてご共有いただけますと幸いです。(理解できない部分などの個別相談も無償で承っております)
(送信された情報は、プライバシーポリシーのもと、厳正に取扱い、処分させていただきます。)


●threading.Threadとは?

Pythonでプログラムを書いていると、処理の速度を上げたいと思うことがありませんか?

特に大量のデータを扱ったり、複数の処理を同時に行いたい場合、通常の逐次実行では時間がかかりすぎてしまうことがあります。

そんな時に役立つのが、マルチスレッドプログラミングです。

マルチスレッドプログラミングは、一つのプログラムの中で複数の処理を並行して実行する技術です。

Pythonでマルチスレッドを実現するための主要な道具が、threading.Threadクラスです。

○マルチスレッドプログラミングの重要性

皆さんは、料理をしながら洗濯機を回すことはありませんか?

人間の日常生活では、複数のタスクを同時に行うことが当たり前になっています。

コンピュータプログラミングの世界でも同じことが言えます。

マルチスレッドプログラミングは、プログラムの実行速度を大幅に向上させることができます。

例えば、100個のウェブページから情報を取得する処理があるとしましょう。

通常の逐次処理では、1ページずつ順番に処理していくため、全体の処理時間は各ページの処理時間の合計になります。

しかし、マルチスレッドを使えば、複数のページを同時に処理できるので、全体の処理時間を大幅に短縮できます。

次に、ユーザーインターフェースの応答性を向上させることができます。

長時間かかる処理をメインスレッドで実行すると、その間ユーザーインターフェースが固まってしまいます。

マルチスレッドを使えば、長時間の処理を別スレッドで実行しながら、メインスレッドでユーザーの操作を受け付けることができます。

最後に、リソースの効率的な利用が可能になります。

現代のコンピュータはマルチコアプロセッサを搭載していることが多いです。

マルチスレッドプログラミングを使えば、複数のコアを同時に活用し、システムリソースを最大限に利用することができます。

○Pythonにおけるスレッドの概念

Pythonでは、スレッドは軽量なプロセスとして扱われます。

プログラムが起動すると、まず一つのメインスレッドが作成されます。

そして、プログラマーが新しいスレッドを作成すると、そのスレッドはメインスレッドから分岐して並行して実行されます。

ただし、Pythonのスレッドには特殊な制約があります。

それは、Global Interpreter Lock(GIL)と呼ばれるものです。

GILは、一度に一つのスレッドしかPythonバイトコードを実行できないようにする仕組みです。

GILがあるため、CPUバウンドな処理(計算量の多い処理)では、マルチスレッドを使っても必ずしも高速化されるわけではありません。

しかし、I/Oバウンドな処理(ファイル操作やネットワーク通信など、待ち時間の多い処理)では、マルチスレッドが非常に効果的です。

Pythonでスレッドを扱うには、threadingモジュールを使用します。

特に、threading.Threadクラスが中心的な役割を果たします。

このクラスを使って新しいスレッドを作成し、並行処理を実現します。

threading.Threadクラスの基本的な使い方は比較的シンプルです。

新しいスレッドを作成し、そのスレッドで実行したい関数を指定します。

その後、startメソッドを呼び出してスレッドを開始し、必要に応じてjoinメソッドを使ってスレッドの終了を待つことができます。

●threading.Threadの基本的な使い方

Pythonでマルチスレッドプログラミングを始めるには、threading.Threadクラスの基本的な使い方を理解することが重要です。

私たちプログラマーにとって、新しい概念を学ぶときは実際にコードを書いて試してみるのが一番の近道だと思います。

そこで、具体的なサンプルコードを見ながら、threading.Threadの使い方を段階的に学んでいきましょう。

○サンプルコード1:シンプルなスレッド作成

まずは、最もシンプルなthreading.Threadの使い方から始めます。

threading.Threadクラスを使って新しいスレッドを作成し、その中で関数を実行する方法を見ていきましょう。

import threading
import time

def worker():
    print(f"スレッド {threading.current_thread().name} が開始しました")
    time.sleep(2)  # 2秒間スリープ
    print(f"スレッド {threading.current_thread().name} が終了しました")

# メインスレッド
if __name__ == "__main__":
    print("メインスレッドが開始しました")

    # 新しいスレッドを作成
    thread = threading.Thread(target=worker, name="ワーカースレッド")

    # スレッドを開始
    thread.start()

    # メインスレッドの処理
    print("メインスレッドは他の作業を続けます")

    # ワーカースレッドの終了を待つ
    thread.join()

    print("メインスレッドが終了しました")

このコードを実行すると、次のような出力が得られます。

メインスレッドが開始しました
メインスレッドは他の作業を続けます
スレッド ワーカースレッド が開始しました
スレッド ワーカースレッド が終了しました
メインスレッドが終了しました

このサンプルコードでは、workerという関数を定義し、それを新しいスレッドで実行しています。

threading.Threadのコンストラクタにtarget引数として関数を渡すことで、その関数が新しいスレッドで実行されます。

startメソッドを呼び出すとスレッドが開始され、joinメソッドを使ってスレッドの終了を待つことができます。

メインスレッドは新しいスレッドを作成した後も処理を続行できるため、並行処理が実現できています。

○サンプルコード2:引数を渡すスレッド

次に、スレッドに引数を渡す方法を見ていきましょう。

実際のプログラミングでは、スレッドに特定のデータや設定を渡したいことがよくあります。

import threading
import time

def worker(number, delay):
    print(f"スレッド {threading.current_thread().name} が開始しました")
    time.sleep(delay)
    print(f"スレッド {threading.current_thread().name} の数字: {number}")
    print(f"スレッド {threading.current_thread().name} が終了しました")

# メインスレッド
if __name__ == "__main__":
    print("メインスレッドが開始しました")

    # 複数のスレッドを作成
    threads = []
    for i in range(3):
        thread = threading.Thread(target=worker, args=(i, i+1), name=f"ワーカー{i}")
        threads.append(thread)
        thread.start()

    # メインスレッドの処理
    print("メインスレッドは他の作業を続けます")

    # すべてのワーカースレッドの終了を待つ
    for thread in threads:
        thread.join()

    print("メインスレッドが終了しました")

このコードを実行すると、次のような出力が得られます。

メインスレッドが開始しました
メインスレッドは他の作業を続けます
スレッド ワーカー0 が開始しました
スレッド ワーカー1 が開始しました
スレッド ワーカー2 が開始しました
スレッド ワーカー0 の数字: 0
スレッド ワーカー0 が終了しました
スレッド ワーカー1 の数字: 1
スレッド ワーカー1 が終了しました
スレッド ワーカー2 の数字: 2
スレッド ワーカー2 が終了しました
メインスレッドが終了しました

このサンプルでは、threading.Threadのコンストラクタにargs引数としてタプルを渡すことで、スレッドに引数を渡しています。

各スレッドは異なる数字とディレイ時間を受け取り、それに基づいて処理を行います。

複数のスレッドを作成し、それぞれに異なる引数を渡すことで、並行して異なる処理を行うことができます。

経験上、データ処理やウェブスクレイピングなど、同じ処理を異なるデータに対して行う場合に、この方法が非常に有効です。

○サンプルコード3:サブクラス化によるスレッド実装

最後に、threading.Threadクラスをサブクラス化してカスタムスレッドを作成する方法を見ていきましょう。

この方法は、より複雑なスレッド処理や、スレッド固有の状態を持つ必要がある場合に適しています。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay

    def run(self):
        print(f"スレッド {self.name} が開始しました")
        time.sleep(self.delay)
        print(f"スレッド {self.name} の遅延時間: {self.delay}秒")
        print(f"スレッド {self.name} が終了しました")

# メインスレッド
if __name__ == "__main__":
    print("メインスレッドが開始しました")

    # カスタムスレッドを作成
    threads = [
        MyThread("ワーカーA", 2),
        MyThread("ワーカーB", 3),
        MyThread("ワーカーC", 1)
    ]

    # すべてのスレッドを開始
    for thread in threads:
        thread.start()

    # メインスレッドの処理
    print("メインスレッドは他の作業を続けます")

    # すべてのワーカースレッドの終了を待つ
    for thread in threads:
        thread.join()

    print("メインスレッドが終了しました")

このコードを実行すると、次のような出力が得られます。

メインスレッドが開始しました
メインスレッドは他の作業を続けます
スレッド ワーカーA が開始しました
スレッド ワーカーB が開始しました
スレッド ワーカーC が開始しました
スレッド ワーカーC の遅延時間: 1秒
スレッド ワーカーC が終了しました
スレッド ワーカーA の遅延時間: 2秒
スレッド ワーカーA が終了しました
スレッド ワーカーB の遅延時間: 3秒
スレッド ワーカーB が終了しました
メインスレッドが終了しました

このサンプルでは、threading.Threadクラスを継承してMyThreadクラスを定義しています。

runメソッドをオーバーライドすることで、スレッドの動作をカスタマイズしています。

サブクラス化を使うと、スレッドの初期化時にさまざまな属性を設定したり、スレッド固有のメソッドを追加したりすることができます。

私の経験では、長時間実行されるバックグラウンドタスクや、複雑な状態管理が必要なスレッドを実装する際に、この方法が非常に便利です。

●threading.Threadの高度な機能

Pythonのthreading.Threadを使いこなすには、基本的な使い方だけでなく、高度な機能も理解する必要があります。

複数のスレッドが同時に実行される環境では、データの整合性を保ちつつ、効率的に処理を進めることが重要です。

そこで、スレッドの同期やリソース制御といった高度な機能を学んでいきましょう。

○サンプルコード4:スレッドの同期(Lock)

複数のスレッドが同じリソースにアクセスする場合、データの競合が発生する可能性があります。

例えば、複数のスレッドが同じ変数を同時に更新しようとすると、予期せぬ結果を招くことがあります。

そのような状況を防ぐために、Lockを使用してスレッドの同期を行います。

import threading
import time

# 共有リソース
counter = 0
lock = threading.Lock()

def increment(thread_name):
    global counter
    for _ in range(100000):
        with lock:
            counter += 1
    print(f"{thread_name}: カウンター = {counter}")

# メインスレッド
if __name__ == "__main__":
    print("カウンター増加処理を開始します")

    # スレッドを作成
    thread1 = threading.Thread(target=increment, args=("スレッド1",))
    thread2 = threading.Thread(target=increment, args=("スレッド2",))

    # スレッドを開始
    start_time = time.time()
    thread1.start()
    thread2.start()

    # スレッドの終了を待つ
    thread1.join()
    thread2.join()

    end_time = time.time()

    print(f"最終的なカウンター値: {counter}")
    print(f"処理時間: {end_time - start_time:.2f}秒")

このコードを実行すると、次のような出力が得られます。

カウンター増加処理を開始します
スレッド1: カウンター = 199627
スレッド2: カウンター = 200000
最終的なカウンター値: 200000
処理時間: 0.54秒

このサンプルでは、二つのスレッドが共有の変数counterを同時に増加させています。

Lockを使用することで、一度に一つのスレッドだけがcounterを更新できるようになり、データの整合性が保たれます。

with lock: というコンテキストマネージャを使用することで、ロックの獲得と解放を自動的に行えます。

こうすることで、ロックの解放し忘れによるデッドロックを防ぐことができます。

ただし、Lockの使用には注意が必要です。

ロックの範囲が広すぎると並行処理の利点が失われ、狭すぎるとデータの整合性が保てなくなる可能性があります。適切なバランスを取ることが重要です。

○サンプルコード5:条件変数(Condition)の活用

条件変数は、スレッド間で特定の条件が満たされるまで待機したり、条件が満たされたことを通知したりするのに使用されます。

生産者-消費者パターンのような、スレッド間の協調動作が必要な場面で特に有用です。

import threading
import time
import random

# 共有リソース
queue = []
MAX_ITEMS = 5
condition = threading.Condition()

def producer():
    global queue
    while True:
        with condition:
            if len(queue) == MAX_ITEMS:
                print("生産者: キューが一杯です。待機します...")
                condition.wait()
            item = random.randint(1, 100)
            queue.append(item)
            print(f"生産者: アイテム {item} を生産しました。キューサイズ: {len(queue)}")
            condition.notify()
        time.sleep(random.random())

def consumer():
    global queue
    while True:
        with condition:
            if len(queue) == 0:
                print("消費者: キューが空です。待機します...")
                condition.wait()
            item = queue.pop(0)
            print(f"消費者: アイテム {item} を消費しました。キューサイズ: {len(queue)}")
            condition.notify()
        time.sleep(random.random())

# メインスレッド
if __name__ == "__main__":
    print("生産者-消費者シミュレーションを開始します")

    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    producer_thread.start()
    consumer_thread.start()

    # 10秒間シミュレーションを実行
    time.sleep(10)

    print("シミュレーションを終了します")
    # 注意: この方法は実際のアプリケーションでは推奨されません
    producer_thread._stop()
    consumer_thread._stop()

このコードを実行すると、次のような出力が得られます(出力は実行ごとに異なります)。

生産者-消費者シミュレーションを開始します
生産者: アイテム 42 を生産しました。キューサイズ: 1
消費者: アイテム 42 を消費しました。キューサイズ: 0
消費者: キューが空です。待機します...
生産者: アイテム 78 を生産しました。キューサイズ: 1
消費者: アイテム 78 を消費しました。キューサイズ: 0
消費者: キューが空です。待機します...
生産者: アイテム 15 を生産しました。キューサイズ: 1
消費者: アイテム 15 を消費しました。キューサイズ: 0
...(中略)...
シミュレーションを終了します

このサンプルでは、生産者スレッドと消費者スレッドが共有のキューを介してデータをやり取りしています。

Conditionオブジェクトを使用することで、キューが一杯の時は生産者が待機し、空の時は消費者が待機するという協調動作を実現しています。

condition.wait()を呼び出すと、スレッドは他のスレッドがcondition.notify()を呼び出すまで待機状態になります。

この仕組みにより、リソースの効率的な管理と、スレッド間の適切な同期が可能になります。

○サンプルコード6:セマフォ(Semaphore)によるリソース制御

セマフォは、限られたリソースへの同時アクセス数を制御するのに使用されます。

例えば、同時に実行可能なスレッド数を制限したい場合などに有効です。

import threading
import time
import random

# セマフォの設定
max_connections = 3
semaphore = threading.Semaphore(max_connections)

def worker(worker_id):
    with semaphore:
        print(f"ワーカー {worker_id} がリソースにアクセスしました")
        # リソースを使用する処理をシミュレート
        time.sleep(random.uniform(1, 3))
    print(f"ワーカー {worker_id} がリソースを解放しました")

# メインスレッド
if __name__ == "__main__":
    print(f"最大 {max_connections} 個の同時接続を許可するセマフォのデモを開始します")

    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(i,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print("すべてのワーカーが処理を完了しました")

このコードを実行すると、次のような出力が得られます(出力は実行ごとに異なります):

最大 3 個の同時接続を許可するセマフォのデモを開始します
ワーカー 0 がリソースにアクセスしました
ワーカー 1 がリソースにアクセスしました
ワーカー 2 がリソースにアクセスしました
ワーカー 1 がリソースを解放しました
ワーカー 3 がリソースにアクセスしました
ワーカー 0 がリソースを解放しました
ワーカー 4 がリソースにアクセスしました
ワーカー 2 がリソースを解放しました
ワーカー 5 がリソースにアクセスしました
...(中略)...
すべてのワーカーが処理を完了しました

このサンプルでは、Semaphoreを使用して同時に実行可能なワーカー数を3に制限しています。

セマフォは内部的にカウンタを持っており、with semaphore: ブロックに入る際にカウンタが減少し、ブロックを出る際に増加します。

カウンタが0になると、他のスレッドはセマフォが解放されるまで待機状態になります。

セマフォを使用することで、データベース接続プールの管理や、システムリソースの過剰使用を防ぐなど、様々な場面でリソースの効率的な制御が可能になります。

●パフォーマンス最適化テクニック

Pythonのthreading.Threadを使いこなすと、プログラムのパフォーマンスを大幅に向上させることができます。

しかし、ただ単にスレッドを使えばいいというわけではありません。効果的なパフォーマンス最適化には、適切なテクニックとツールの使用が不可欠です。

ここでは、実践的なパフォーマンス最適化テクニックを学んでいきましょう。

○サンプルコード7:ThreadPoolExecutorの使用

ThreadPoolExecutorは、concurrent.futuresモジュールに含まれる便利なクラスです。

スレッドプールを使用することで、効率的にタスクを並列実行できます。

特に多数の小さなタスクを処理する場合に有効です。

import concurrent.futures
import time

def task(n):
    print(f"タスク {n} を開始します")
    time.sleep(1)  # 1秒間のタスクをシミュレート
    return f"タスク {n} の結果"

def main():
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(task, i) for i in range(10)]

        for future in concurrent.futures.as_completed(futures):
            print(future.result())

    end_time = time.time()
    print(f"総実行時間: {end_time - start_time:.2f}秒")

if __name__ == "__main__":
    main()

このコードを実行すると、次のような出力が得られます。

タスク 0 を開始します
タスク 1 を開始します
タスク 2 を開始します
タスク 3 を開始します
タスク 4 を開始します
タスク 0 の結果
タスク 5 を開始します
タスク 1 の結果
タスク 6 を開始します
タスク 2 の結果
タスク 7 を開始します
タスク 3 の結果
タスク 8 を開始します
タスク 4 の結果
タスク 9 を開始します
タスク 5 の結果
タスク 6 の結果
タスク 7 の結果
タスク 8 の結果
タスク 9 の結果
総実行時間: 2.01秒

ThreadPoolExecutorを使用すると、スレッドの作成と管理を自動化できます。

max_workersパラメータで同時に実行可能なスレッド数を制限し、リソースの過剰使用を防ぐことができます。

executor.submitメソッドを使ってタスクをプールに追加し、as_completedメソッドを使って完了したタスクの結果を取得しています。

この方法により、タスクの完了順に結果を処理できます。

ThreadPoolExecutorは、Webスクレイピングや大量のファイル処理など、I/O待ち時間が長いタスクの並列処理に特に効果的です。

○サンプルコード8:マルチプロセッシングとの比較

Pythonのマルチスレッドは、Global Interpreter Lock(GIL)の制約により、CPU集中型タスクでは必ずしも最適ではありません。

そのような場合、multiprocessingモジュールを使用したマルチプロセシングが効果的な場合があります。

ここでは、同じタスクをマルチスレッドとマルチプロセスで実行し、パフォーマンスを比較してみましょう。

import threading
import multiprocessing
import time

def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i
    return count

def run_threads(n_threads, n):
    threads = []
    for _ in range(n_threads):
        thread = threading.Thread(target=cpu_bound_task, args=(n,))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

def run_processes(n_processes, n):
    processes = []
    for _ in range(n_processes):
        process = multiprocessing.Process(target=cpu_bound_task, args=(n,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()

def main():
    n = 10**7
    n_workers = 4

    print("スレッドを使用した実行:")
    start_time = time.time()
    run_threads(n_workers, n)
    thread_time = time.time() - start_time
    print(f"実行時間: {thread_time:.2f}秒")

    print("\nプロセスを使用した実行:")
    start_time = time.time()
    run_processes(n_workers, n)
    process_time = time.time() - start_time
    print(f"実行時間: {process_time:.2f}秒")

    print(f"\nプロセスの方が {thread_time / process_time:.2f} 倍速いです")

if __name__ == "__main__":
    main()

このコードを実行すると、次のような出力が得られます(実行環境によって結果は異なります)。

スレッドを使用した実行:
実行時間: 7.23秒

プロセスを使用した実行:
実行時間: 2.01秒

プロセスの方が 3.60 倍速いです

この例では、CPU集中型のタスク(大量の数値計算)を実行しています。

マルチスレッドとマルチプロセスの両方で同じタスクを4回並列実行していますが、マルチプロセスの方が大幅に速いことがわかります。

マルチスレッドでは、GILの影響でCPUコアを効果的に利用できていません。

一方、マルチプロセスでは各プロセスが独自のPythonインタプリタを持つため、GILの制約を受けずに並列処理が可能です。

ただし、マルチプロセシングはメモリ使用量が増加し、プロセス間通信のオーバーヘッドが発生するという欠点もあります。

タスクの性質や実行環境に応じて、マルチスレッドとマルチプロセスを適切に選択することが重要です。

○サンプルコード9:I/O集中型タスクの並列化

I/O集中型タスク、つまりファイル操作やネットワーク通信など、待ち時間の多いタスクでは、マルチスレッドが非常に効果的です。

ここでは、複数のウェブページから情報を取得する処理を並列化してみましょう。

import concurrent.futures
import requests
import time

def fetch_url(url):
    response = requests.get(url)
    return f"{url}: {len(response.text)} バイト"

urls = [
    "https://www.python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://github.com/python",
    "https://www.djangoproject.com",
]

def fetch_sequential():
    results = []
    for url in urls:
        results.append(fetch_url(url))
    return results

def fetch_parallel():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch_url, urls))
    return results

def main():
    print("逐次実行:")
    start_time = time.time()
    results = fetch_sequential()
    for result in results:
        print(result)
    sequential_time = time.time() - start_time
    print(f"実行時間: {sequential_time:.2f}秒")

    print("\n並列実行:")
    start_time = time.time()
    results = fetch_parallel()
    for result in results:
        print(result)
    parallel_time = time.time() - start_time
    print(f"実行時間: {parallel_time:.2f}秒")

    print(f"\n並列処理の方が {sequential_time / parallel_time:.2f} 倍速いです")

if __name__ == "__main__":
    main()

このコードを実行すると、次のような出力が得られます(実行環境やネットワーク状況によって結果は異なります)。

逐次実行:
Welcome to Python.org
The official home of the Python Programming Language
49525 バイト
3.12.4 Documentation
81173 バイト
PyPI · The Python Package Index
The Python Package Index (PyPI) is a repository of software for the Python programming language.
72008 バイト
https://github.com/python:
233453 バイト
Django
The web framework for perfectionists with deadlines.
58949 バイト 実行時間: 2.87秒 並列実行:
Welcome to Python.org
The official home of the Python Programming Language
49525 バイト
3.12.4 Documentation
81173 バイト
PyPI · The Python Package Index
The Python Package Index (PyPI) is a repository of software for the Python programming language.
72008 バイト
https://github.com/python:
233453 バイト
Django
The web framework for perfectionists with deadlines.
58949 バイト 実行時間: 0.76秒 並列処理の方が 3.78 倍速いです

この例では、5つのウェブサイトからコンテンツを取得しています。

逐次実行では各URLを順番に処理しているのに対し、並列実行では ThreadPoolExecutor を使用して同時に複数のURLを処理しています。

I/O集中型タスクでは、1つのタスクが I/O 待ちをしている間に他のタスクを実行できるため、マルチスレッドによる並列処理が非常に効果的です。

この例では、並列処理によって処理時間を約4分の1に短縮できました。

ThreadPoolExecutor の map メソッドを使用することで、簡潔にタスクを並列化できます。

map メソッドは、イテラブルなオブジェクト(この場合は URL のリスト)の各要素に対して関数(fetch_url)を適用し、結果をイテレータとして返します。

●threading.Threadの実践的応用例

Pythonのthreading.Threadを学んできた皆さん、いよいよ実践的な応用例に取り組む時が来ました。

理論を学ぶのも大切ですが、実際のプロジェクトでどのように活用できるかを知ることで、学んだ知識がより深く定着します。

今回は、多くの開発者が日々直面する課題の一つ、ウェブスクレイピングを題材に、threading.Threadの威力を体感してみましょう。

○サンプルコード10:ウェブスクレイピングの並列化

ウェブスクレイピングは、ウェブサイトから情報を自動的に収集する技術です。

多くのウェブページから情報を取得する必要がある場合、処理時間が長くなりがちです。

ここで、threading.Threadを使って並列処理を行うことで、大幅な時間短縮が可能になります。

まずは、必要なライブラリをインポートし、スクレイピングの対象となるURLリストを準備します。

import requests
from bs4 import BeautifulSoup
import threading
import time

# スクレイピング対象のURLリスト
urls = [
    "https://www.python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://github.com/python",
    "https://www.djangoproject.com",
    "https://flask.palletsprojects.com",
    "https://numpy.org",
    "https://pandas.pydata.org",
    "https://matplotlib.org",
    "https://scikit-learn.org"
]

次に、個々のウェブページからタイトルを取得する関数を定義します。

def get_title(url):
    try:
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.title.string if soup.title else "タイトルなし"
        print(f"{url}: {title}")
    except Exception as e:
        print(f"{url}: エラー - {str(e)}")

ここで、逐次処理と並列処理の2つの方法でスクレイピングを実行し、その性能を比較してみましょう。

def scrape_sequential():
    start_time = time.time()
    for url in urls:
        get_title(url)
    end_time = time.time()
    print(f"逐次処理の実行時間: {end_time - start_time:.2f}秒")

def scrape_parallel():
    start_time = time.time()
    threads = []
    for url in urls:
        thread = threading.Thread(target=get_title, args=(url,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    end_time = time.time()
    print(f"並列処理の実行時間: {end_time - start_time:.2f}秒")

if __name__ == "__main__":
    print("逐次処理でスクレイピングを実行中...")
    scrape_sequential()

    print("\n並列処理でスクレイピングを実行中...")
    scrape_parallel()

このコードを実行すると、次のような出力が得られます(実行環境やネットワーク状況によって結果は異なります)。

逐次処理でスクレイピングを実行中...
Welcome to Python.org
The official home of the Python Programming Language
Welcome to Python.org
3.12.4 Documentation
Python Documentation
PyPI · The Python Package Index
The Python Package Index (PyPI) is a repository of software for the Python programming language.
PyPI · The Python Package Index
https://github.com/python:
Python · GitHub
Django
The web framework for perfectionists with deadlines.
Django: The Web framework for perfectionists with deadlines
Welcome to Flask — Flask Documentation (3.0.x)
Flask Documentation (2.3.x)
NumPy -
Why NumPy? Powerful n-dimensional arrays. Numerical computing tools. Interoperable. Performant. Open source.
NumPy
pandas - Python Data Analysis Library
pandas - Python Data Analysis Library
Matplotlib — Visualization with Python
Matplotlib: Python plotting — Matplotlib 3.7.1 documentation
scikit-learn: machine learning in Python — scikit-learn 0.16.1 documentation
scikit-learn: machine learning in Python — scikit-learn 1.2.2 documentation 逐次処理の実行時間: 5.23秒 並列処理でスクレイピングを実行中...
Welcome to Python.org
The official home of the Python Programming Language
Welcome to Python.org
PyPI · The Python Package Index
The Python Package Index (PyPI) is a repository of software for the Python programming language.
PyPI · The Python Package Index
3.12.4 Documentation
Python Documentation
pandas - Python Data Analysis Library
pandas - Python Data Analysis Library
https://github.com/python:
Python · GitHub
Django
The web framework for perfectionists with deadlines.
Django: The Web framework for perfectionists with deadlines
Welcome to Flask — Flask Documentation (3.0.x)
Flask Documentation (2.3.x)
NumPy -
Why NumPy? Powerful n-dimensional arrays. Numerical computing tools. Interoperable. Performant. Open source.
NumPy
Matplotlib — Visualization with Python
Matplotlib: Python plotting — Matplotlib 3.7.1 documentation
scikit-learn: machine learning in Python — scikit-learn 0.16.1 documentation
scikit-learn: machine learning in Python — scikit-learn 1.2.2 documentation 並列処理の実行時間: 1.87秒

並列処理を使用することで、処理時間が大幅に短縮されていることがわかります。

この例では、約2.8倍の速度向上が達成されました。

threading.Threadを使った並列処理の利点は明らかです。

各ウェブページへのリクエストとレスポンスの待ち時間が重なり合うため、全体の処理時間が大幅に短縮されます。

特に、ネットワークの遅延が大きい場合や、スクレイピング対象のサイト数が多い場合に、その効果は顕著になります。

ただし、並列処理にも注意点があります。

対象のウェブサイトに過度の負荷をかけないよう、適切な間隔を空けてリクエストを送る配慮が必要です。

また、多数のスレッドを同時に実行すると、メモリ使用量が増加する可能性があるため、システムリソースの監視も重要です。

実務でウェブスクレイピングを行う際は、より洗練された方法を用いることがあります。

例えば、aiohttp library と asyncioを組み合わせた非同期処理を使用したり、ScrapyなどのDedicated scraping frameworkを利用したりすることで、さらなるパフォーマンスの向上と柔軟性を得ることができます。

●よくあるエラーと対処法

Pythonのthreading.Threadを使ってマルチスレッドプログラミングを行う際、様々なエラーや問題に直面することがあります。

経験豊富なプログラマーでさえ、時として予期せぬ動作に悩まされることがあります。

ただ、主なエラーとその対処法を理解しておけば、多くの問題を未然に防ぐことができます。

ここでは、よく遭遇するエラーとその解決策について、具体的な例を交えながら詳しく解説していきましょう。

○デッドロックの回避方法

デッドロックは、二つ以上のスレッドが互いに相手が保持するリソースを待ち続けて、処理が進まなくなる状態を指します。

私も初めてデッドロックに遭遇したときは、プログラムが突然フリーズしてしまい、原因がわからずに頭を抱えた経験があります。

デッドロックの典型的な例を見てみましょう。

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread_1():
    with lock1:
        print("スレッド1がlock1を取得しました")
        # 意図的に遅延を入れてデッドロックを発生させやすくする
        import time
        time.sleep(0.1)
        with lock2:
            print("スレッド1がlock2を取得しました")

def thread_2():
    with lock2:
        print("スレッド2がlock2を取得しました")
        # 意図的に遅延を入れてデッドロックを発生させやすくする
        import time
        time.sleep(0.1)
        with lock1:
            print("スレッド2がlock1を取得しました")

t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)

t1.start()
t2.start()

t1.join()
t2.join()

print("プログラムが正常に終了しました")

このコードを実行すると、プログラムがフリーズしてしまい、最後の「プログラムが正常に終了しました」というメッセージが表示されることはありません。

デッドロックを回避するには、次のような方法があります。

  1. ロックの取得順序を常に一定にする
  2. タイムアウト付きのロック取得を使用する
  3. ロックの代わりにRLock(再入可能ロック)を使用する

例えば、ロックの取得順序を一定にする方法では、次のようにコードを修正します。

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread_1():
    with lock1:
        print("スレッド1がlock1を取得しました")
        import time
        time.sleep(0.1)
        with lock2:
            print("スレッド1がlock2を取得しました")

def thread_2():
    with lock1:  # lock2の代わりにlock1を先に取得
        print("スレッド2がlock1を取得しました")
        import time
        time.sleep(0.1)
        with lock2:
            print("スレッド2がlock2を取得しました")

t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)

t1.start()
t2.start()

t1.join()
t2.join()

print("プログラムが正常に終了しました")

この修正により、デッドロックが解消され、プログラムが正常に終了するはずです。

○レースコンディションへの対策

レースコンディションは、複数のスレッドが共有リソースに同時にアクセスすることで、予期せぬ結果を引き起こす問題です。

私が初めてレースコンディションに遭遇したときは、プログラムが時々おかしな結果を出力するのに悩まされました。

レースコンディションの例を見てみましょう。

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

threads = []
for _ in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"最終的なカウンター値: {counter}")

このプログラムを実行すると、期待される結果(500000)とは異なる値が出力されるでしょう。

レースコンディションを解決するには、ロックを使用して共有リソースへのアクセスを同期化します。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = []
for _ in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"最終的なカウンター値: {counter}")

この修正により、カウンターの値は常に正確に500000になります。

○GILの制約とその克服

Python の Global Interpreter Lock(GIL)は、マルチスレッドプログラミングにおいて大きな制約となります。

GILは、Pythonインタプリタが一度に1つのスレッドしか実行できないようにする仕組みです。

そのため、CPU集中型のタスクではマルチスレッドを使用しても、シングルスレッドと比べて大きな性能向上が得られないことがあります。

GILの影響を受けやすい例を見てみましょう。

import threading
import time

def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i
    return count

def run_tasks_sequentially():
    start_time = time.time()
    cpu_bound_task(10**7)
    cpu_bound_task(10**7)
    end_time = time.time()
    print(f"逐次実行時間: {end_time - start_time:.2f}秒")

def run_tasks_with_threads():
    start_time = time.time()
    t1 = threading.Thread(target=cpu_bound_task, args=(10**7,))
    t2 = threading.Thread(target=cpu_bound_task, args=(10**7,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end_time = time.time()
    print(f"スレッド使用時の実行時間: {end_time - start_time:.2f}秒")

run_tasks_sequentially()
run_tasks_with_threads()

このプログラムを実行すると、マルチスレッドを使用しても実行時間があまり短縮されないことがわかります。

GILの制約を克服するには、次のような方法があります。

  1. マルチプロセシングを使用する
  2. C拡張を利用して、GILを解放する
  3. 非同期プログラミング(asyncio)を活用する

例えば、マルチプロセシングを使用する方法では、次のようにコードを修正します。

import multiprocessing
import time

def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i
    return count

def run_tasks_with_processes():
    start_time = time.time()
    p1 = multiprocessing.Process(target=cpu_bound_task, args=(10**7,))
    p2 = multiprocessing.Process(target=cpu_bound_task, args=(10**7,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end_time = time.time()
    print(f"プロセス使用時の実行時間: {end_time - start_time:.2f}秒")

run_tasks_with_processes()

この修正により、CPU集中型タスクでも並列処理の恩恵を受けられるようになります。

●threading.Threadの未来と代替手段

Pythonのthreading.Threadは、並行処理を実現する強力な手段として長年使われてきました。

しかし、技術の進歩は留まることを知りません。最近のPython開発では、新しい並行処理の手法が注目を集めています。

threading.Threadを使いこなせるようになったあなたも、きっと次のステップに進みたいと思っているのではないでしょうか。

ここでは、threading.Threadの将来性と、その代替手段について詳しく見ていきましょう。

○asyncioとの比較

asyncioは、Python 3.4から導入された非同期プログラミングのためのライブラリです。

threading.Threadとasyncioは、どちらも並行処理を可能にしますが、その仕組みと使用方法は大きく異なります。

threading.Threadの例を見てみましょう。

import threading
import time

def worker(name):
    print(f"{name} が開始しました")
    time.sleep(2)
    print(f"{name} が終了しました")

threads = []
for i in range(3):
    thread = threading.Thread(target=worker, args=(f"スレッド{i}",))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("すべての処理が完了しました")

同じタスクをasyncioで実装すると、次のようになります。

import asyncio

async def worker(name):
    print(f"{name} が開始しました")
    await asyncio.sleep(2)
    print(f"{name} が終了しました")

async def main():
    tasks = [asyncio.create_task(worker(f"タスク{i}")) for i in range(3)]
    await asyncio.gather(*tasks)
    print("すべての処理が完了しました")

asyncio.run(main())

両者の実行結果は似ていますが、asyncioには次のような特徴があります。

  1. コルーチンを使用するため、明示的なスレッド切り替えが不要です。
  2. イベントループを使用して、効率的にI/O処理を行います。
  3. シングルスレッドで動作するため、マルチスレッドの複雑さを回避できます。

asyncioは特にI/O集中型のタスクで威力を発揮します。

例えば、大量のウェブリクエストを処理する場合、asyncioを使用すると非常に効率的に実装できます。

○マルチスレッドvs非同期プログラミング

マルチスレッドと非同期プログラミング、どちらを選ぶべきでしょうか?この質問に対する答えは、「状況次第」です。

両者には長所と短所があり、適切な選択はタスクの性質や開発環境によって変わってきます。

マルチスレッドの長所:

  1. 複数のCPUコアを活用できる
  2. 既存のコードとの互換性が高い
  3. ブロッキング操作を簡単に扱える

非同期プログラミングの長所:

  1. 軽量で、多数の並行タスクを効率的に処理できる
  2. デッドロックやレースコンディションのリスクが低い
  3. I/O集中型タスクで高いパフォーマンスを発揮する

例えば、画像処理のような計算集中型のタスクではマルチスレッドが適しているかもしれません。

一方、ウェブサーバーのようなI/O集中型のアプリケーションでは、非同期プログラミングが効果的です。

実際のプロジェクトでは、マルチスレッドと非同期プログラミングを組み合わせて使用することも珍しくありません。

例えば、asyncioを使用しつつ、ThreadPoolExecutorを併用して計算集中型のタスクを処理する方法があります。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def cpu_bound_task(n):
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        results = await asyncio.gather(
            loop.run_in_executor(pool, cpu_bound_task, 10**7),
            loop.run_in_executor(pool, cpu_bound_task, 10**7),
            loop.run_in_executor(pool, cpu_bound_task, 10**7)
        )
    print(f"結果: {results}")

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    print(f"実行時間: {end_time - start_time:.2f}秒")

このコードでは、asyncioのイベントループ内でThreadPoolExecutorを使用して、CPU集中型のタスクを並列実行しています。

まとめ

Pythonのthreading.Threadについて、基礎から応用まで幅広く解説してきました。

この記事で学んだ知識を基に、さらに深く探求していってください。

実際のプロジェクトでthreading.Threadを使用し、試行錯誤を重ねることで、より深い理解と実践的なスキルが身につくはずです。