読み込み中...

手に入れよう!Ruby並列処理の極意10選

Ruby並列処理の詳細解説とサンプルコード Ruby
この記事は約20分で読めます。

【サイト内のコードはご自由に個人利用・商用利用いただけます】

この記事では、プログラムの基礎知識を前提に話を進めています。

説明のためのコードや、サンプルコードもありますので、もちろん初心者でも理解できるように表現してあります。

本記事のサンプルコードを活用して機能追加、目的を達成できるように作ってありますので、是非ご活用ください。

※この記事は、一般的にプロフェッショナルの指標とされる『実務経験10,000時間以上』を満たす現役のプログラマチームによって監修されています。

※Japanシーモアは、常に解説内容のわかりやすさや記事の品質に注力しております。不具合、分かりにくい説明や不適切な表現、動かないコードなど気になることがございましたら、記事の品質向上の為にお問い合わせフォームにてご共有いただけますと幸いです。
(送信された情報は、プライバシーポリシーのもと、厳正に取扱い、処分させていただきます。)

はじめに

今日は、Rubyでの並列処理について詳しく解説します。

Rubyを使って並列処理を理解し、使いこなすための全てをこの記事で解説します。

初心者から経験者まで、Rubyの並列処理の10の手法を通じて、その魅力とパワーを体感してみてください。

●Rubyと並列処理の関係

Rubyはスクリプト言語の中でも非常に人気があり、その柔軟性と便利さから様々な場面で利用されています。

しかし、一方でRubyは一部の開発者から並列処理のパフォーマンスが低いと評価されることもあります。

しかし、RubyではThreadクラスやFiberクラスなどの並列処理を助ける豊富なライブラリが提供されており、適切な知識と使い方で高度な並列処理を実現することができます。

●並列処理の基本理解

並列処理は、複数の処理を同時に行うことで全体の処理時間を短縮する技術です。

単純な例としては、データベースからの大量のデータ取得や、重い計算を行う場合などがあります。

しかし、並列処理はそのまま実装するとデータの不整合や例外処理の問題など、様々な問題を引き起こす可能性があります。

そのため、適切な並列処理の手法とその理解が必要となります。

●並列処理の10の手法

この章では、Rubyでの並列処理を理解し、使いこなすための10の手法を紹介します。

各手法には具体的なサンプルコードとその解説を添えています。

○Threadクラスの基本

RubyのThreadクラスは、軽量な並列処理を実現するための基本的なクラスです。

Threadクラスを使うことで、複数のタスクを同時に処理することができます。

□サンプルコード1:基本的なThreadの使い方

このコードでは、RubyのThreadクラスを使って基本的な並列処理を行うコードを紹介しています。

この例では、2つのスレッドを作成し、それぞれで1から5までの数字を出力しています。

# スレッド1の作成
thread1 = Thread.new do
  5.times do |n|
    puts "スレッド1: #{n + 1}"
    sleep 1
  end
end

# スレッド2の作成
thread2 = Thread.new do
  5.times do |n|
    puts "スレッド2: #{n + 1}"
    sleep 1
  end
end

# スレッドの終了を待つ
thread1.join
thread2.join

このコードを実行すると、スレッド1とスレッド2が交互に数字を出力します。

それぞれのスレッドは独立して動作し、他のスレッドの状態に影響されません。

最後のjoinメソッドは、指定したスレッドが終了するまで待つメソッドで、これによりすべてのスレッドが終了するまで主スレッド(メインスレッド)が終了しないようにしています。

実行結果は次のようになります。

スレッド1: 1
スレッド2: 1
スレッド1: 2
スレッド2: 2
スレッド1: 3
スレッド2: 3
スレッド1: 4
スレッド2: 4
スレッド1: 5
スレッド2: 5

上記の出力が示すように、両方のスレッドが同時に実行されています。

これにより、複数のタスクを同時に処理することができます。

ただし、並列処理はそのまま利用するとデータの競合や不整合が発生する可能性があるため、注意が必要です。

○Mutexクラスと同期処理

Rubyで並列処理を行う際、複数のスレッドが同じリソースにアクセスしようとすると、データの競合や不整合が生じる可能性があります。

その問題を解決するための機能がMutex(相互排他)クラスです。

Mutexクラスはスレッド間でのリソースの利用をコントロールし、一度に一つのスレッドだけがリソースにアクセスできるように制限します。

Mutexを用いた排他制御のサンプルコードを紹介していきます。

□サンプルコード2:Mutexを使った排他制御

このコードではMutexクラスを使って、リソースのアクセス制御を行うコードを紹介しています。

この例では、並列に加算処理を行いつつ、Mutexを使用してデータの整合性を保つ方法を表しています。

require 'thread'

mutex = Mutex.new
count = 0

# スレッドの配列を作成
threads = 10.times.map do
  Thread.new do
    # スレッドごとに1000回加算
    1000.times do
      mutex.synchronize do
        # 加算処理
        count += 1
      end
    end
  end
end

# 全スレッドの終了を待つ
threads.each(&:join)

puts count

このコードを実行すると、最終的に変数countの値は10000となります。

10のスレッドがそれぞれ1000回加算を行っているため、理想的には10000となるべきですが、排他制御を行わない場合、スレッド間の処理の競合により期待した結果と異なる値になる可能性があります。

しかし、このコードではMutexのsynchronizeメソッドを使って排他制御を行っていますので、期待通りの結果が得られます。

○Thread#joinとThread#value

Rubyにおける並列処理で、スレッドの終了を待つための方法としてThread#joinメソッドがあります。

また、スレッドが計算した結果を取得するための方法としてThread#valueメソッドがあります。

これらのメソッドはスレッド間でのデータの受け渡しや同期を容易にします。

次に、Thread#joinとThread#valueを使ったサンプルコードを見てみましょう。

□サンプルコード3:Thread#joinとThread#valueの使い方

このコードではThread#joinとThread#valueを使って、スレッド間でのデータの受け渡しと同期を行うコードを紹介しています。

この例では、複数のスレッドで計算を行い、その結果をメインスレッドで収集しています。

# スレッドの配列を作成
threads = 10.times.map do |i|
  Thread.new do
    # スレッドごとに計算を行い、その結果を返す
    i ** 2
  end
end

# 各スレッドの結果を収集
results = threads.map do |thread|
  thread.value
end

puts results

このコードを実行すると、各スレッドが行った計算結果が配列として出力されます。具体的には、0から9までの数字をそれぞれ二乗した値が出力されます。

これは各スレッドが「i ** 2」の計算を行い、その結果をThread#valueによって取得しているからです。

このように、Thread#joinとThread#valueを使うことで、複数のスレッド間でデータの受け渡しや同期を行うことが可能となります。

○Thread#raiseとThread#kill

Thread#raiseとThread#killは、Rubyの並列処理における特殊な操作を提供します。Thread#raiseは、スレッド内で例外を発生させます。

一方、Thread#killは、スレッドを強制終了します。

これらのメソッドは、特定のスレッドが異常な状態になった場合や、即時に処理を中止する必要がある場合に用いられます。

それでは、具体的なコードを見てみましょう。

□サンプルコード4:Thread#raiseとThread#killの使い方

このコードではThread#raiseとThread#killを使って、スレッド内での例外発生とスレッドの強制終了を行うコードを紹介しています。

この例では、一つ目のスレッドで例外を発生させ、二つ目のスレッドを強制終了しています。

# 例外を発生させるスレッド
thread1 = Thread.new do
  begin
    sleep 2
  rescue => e
    puts "例外発生: #{e.message}"
  end
end

# スレッドを強制終了する
thread2 = Thread.new do
  sleep 3
end

# 例外を発生させる
thread1.raise("Thread#raiseによる例外")

# スレッドを強制終了する
thread2.kill

# 確認
puts "Thread1の状態: #{thread1.status}"
puts "Thread2の状態: #{thread2.status}"

このコードを実行すると、「例外発生: Thread#raiseによる例外」が出力され、thread1の状態が”aborting”、thread2の状態が”false”と表示されます。

これは、Thread#raiseによってthread1内で例外が発生し、Thread#killによってthread2が強制終了されたことを示しています。

このように、Thread#raiseとThread#killは、スレッドの異常終了や即時中止の際に有効な手段となります。

○並列処理と例外処理

並列処理中に例外が発生した場合、その例外の取り扱い方が非常に重要になります。

Rubyでは、Thread#raiseを使ってスレッド内部で例外を送出することができます。

しかし、例外を捕捉して適切に処理しなければ、スレッドが予期せぬ終了を迎え、システム全体に影響を与える可能性があります。

それでは、並列処理中に例外が発生した際の取り扱いを表すコードを見ていきましょう。

□サンプルコード5:並列処理中の例外の取り扱い

このコードでは、並列処理中に例外が発生した場合の取り扱いを行うコードを紹介しています。

この例では、スレッド内で例外を発生させ、その例外を捕捉して適切に処理しています。

# スレッドを生成
thread = Thread.new do
  begin
    # 例外を発生させる
    raise '並列処理中の例外'
  rescue => exception
    # 例外をキャッチして表示
    puts "キャッチした例外: #{exception.message}"
  end
end

# スレッドが終了するまで待機
thread.join

このコードを実行すると、「キャッチした例外: 並列処理中の例外」と表示されます。

このように、スレッド内部で例外が発生した場合でも、適切に例外を捕捉することでその影響を抑え、スレッドの安全な終了を実現できます。

○GVL(Global VM Lock)

GVLはRubyにおける重要な概念であり、並列処理のパフォーマンスに大きな影響を及ぼします。

GVL(Global VM Lock)は、Rubyのインタープリタが一度に一つのスレッドだけが実行されるように制御するメカニズムです。

GVLは、スレッドセーフでないリソースへの同時アクセスを防ぐために存在しますが、このロックがあるためにRubyの並列処理は真の並列性を実現できないという制限があります。

それでは、GVLがRubyの並列処理にどのような影響を及ぼすかを示すコードを見ていきましょう。

□サンプルコード6:GVLとその影響

このコードでは、GVLによってRubyの並列処理がどのように制約されているかを示すコードを紹介しています。

この例では、CPU集中型の作業を複数のスレッドで同時に実行しています。

require 'benchmark'

# CPU集中型の処理を定義
def cpu_intensive_task
  1_000_000.times { Math.sqrt(rand) }
end

# 1つのスレッドで実行
single_thread_time = Benchmark.realtime do
  cpu_intensive_task
end
puts "1つのスレッドの実行時間: #{single_thread_time}"

# 2つのスレッドで実行
two_thread_time = Benchmark.realtime do
  threads = 2.times.map do
    Thread.new { cpu_intensive_task }
  end
  threads.each(&:join)
end
puts "2つのスレッドの実行時間: #{two_thread_time}"

このコードを実行すると、「1つのスレッドの実行時間」と「2つのスレッドの実行時間」が表示されますが、一般的には、「2つのスレッドの実行時間」が「1つのスレッドの実行時間」の約2倍になることがわかります。

これはGVLの存在により、2つのスレッドが実際には並列に実行されず、一つずつ順番に実行されていることを表しています。

○ThreadGroupクラスの活用

Rubyの並列処理においては、スレッドの管理が重要になります。

この管理を容易にするのがThreadGroupクラスです。

スレッドをグループ化することで、スレッドの管理を一元化し、グループ内の全てのスレッドに対して一度に操作を行うことが可能になります。

それでは、ThreadGroupの活用法を見ていきましょう。

□サンプルコード7:ThreadGroupの使い方

このコードでは、ThreadGroupクラスを使ってスレッドをグループ化し、管理するコードを紹介しています。

この例では、3つのスレッドを作成し、それらを一つのThreadGroupに加えています。

# スレッドグループを作成
tg = ThreadGroup.new

# スレッドを作成し、グループに追加
3.times do |i|
  tg.add(Thread.new { sleep(i+1); puts "スレッド#{i+1}終了" })
end

# 全てのスレッドが終了するまで待機
tg.list.each(&:join)

このコードを実行すると、「スレッド1終了」「スレッド2終了」「スレッド3終了」というメッセージが1秒ごとに順番に表示されます。

それぞれのスレッドは指定された秒数だけ待機した後にメッセージを出力し、ThreadGroupによって全てのスレッドの終了を待つことができます。

これにより、スレッドのライフサイクルの管理が容易になります。

○スレッドプールの活用

スレッドプールとは、予め生成されたスレッドの集まりのことで、新たなスレッドを作成するオーバーヘッドを避け、リソースを効率的に使用するための方法です。

多数の短いライフサイクルのタスクを頻繁に実行する場合、それぞれにスレッドを作成するよりもスレッドプールを利用するほうが効率的です。

スレッドプールは特に、Webサーバーのように複数のクライアントからのリクエストを同時に処理する必要があるアプリケーションでよく用いられます。

それでは、スレッドプールの具体的な活用例を見ていきましょう。

□サンプルコード8:スレッドプールの活用例

このコードでは、QueueクラスとArrayクラスを使用して、独自のスレッドプールを実装するコードを紹介しています。

この例では、3つのスレッドをプールし、それぞれのスレッドにタスクを割り当て、処理結果を収集しています。

# スレッドプールとタスクキューを作成
thread_pool = Array.new(3)
task_queue = Queue.new

# タスクをキューに追加
10.times { |i| task_queue.push(i) }

# スレッドを作成し、プールに追加
thread_pool.map! do
  Thread.new do
    until task_queue.empty?
      task = task_queue.pop(true) rescue nil
      if task
        # ここで各タスクの処理を行う
        puts "タスク#{task}を処理しました"
      end
    end
  end
end

# 全てのスレッドが終了するまで待機
thread_pool.each(&:join)

このコードを実行すると、「タスク0を処理しました」から「タスク9を処理しました」までのメッセージが表示されます。

ここで重要なのは、スレッドプールから取得したスレッドが、タスクキューからタスクを取り出し、そのタスクを処理するという流れです。

これにより、スレッドの再利用と並列処理が可能になります。

○Fiberクラスとコルーチン

FiberクラスはRuby 1.9で導入され、コルーチンと呼ばれるプログラミングパターンを実装するためのクラスです。

コルーチンはサブルーチンの一種で、手続きが一時停止し、その後再開するところから進行します。

これにより、複雑な状態管理を必要とせずに、複数の処理の実行を行き来することが可能となります。

FiberはThreadと似ていますが、スケジューリングがOSによるものではなく、開発者が明示的にコントロールする点で異なります。

そのため、マルチスレッドプログラミングにおける競合状態やデッドロックなどの問題を避けることができます。

それでは、具体的なFiberの活用例を見てみましょう。

□サンプルコード9:Fiberを使った非同期処理

このコードでは、Fiberクラスを使用して非同期処理を行う例を紹介しています。

この例では、Fiberを用いて2つの非同期タスクを実行し、その結果を表示しています。

# Fiberの定義
fiber1 = Fiber.new do
  puts "Fiber1が始まります"
  Fiber.yield # 一時停止
  puts "Fiber1が再開します"
end

fiber2 = Fiber.new do
  puts "Fiber2が始まります"
  Fiber.yield # 一時停止
  puts "Fiber2が再開します"
end

# Fiberの実行
fiber1.resume
fiber2.resume
fiber1.resume
fiber2.resume

このコードを実行すると、次の出力が得られます。

Fiber1が始まります
Fiber2が始まります
Fiber1が再開します
Fiber2が再開します

この結果からわかるように、各Fiberは明示的に再開するまで一時停止します。

これにより、非同期処理を明示的にコントロールでき、複雑な状態の管理を避けることが可能になります。

それでは、次に並列処理の具体的な活用例について見ていきましょう。

ここでは大規模データ処理における並列処理の有用性について探っていきます。

○並列処理の活用例

並列処理は、大量のデータを効率よく処理する場合に特に有用です。

例えば、大規模なデータベースからのデータ取得や、CPU集約型の計算などを高速に実行するために用いられます。

ここで注意したいのは、すべてのタスクが並列処理に適しているわけではないということです。

I/Oバウンド(ネットワークやディスクI/Oがボトルネックとなる)タスクや、CPUバウンド(計算処理がボトルネックとなる)タスクにより、並列化の効果は大きく異なります。

タスクの特性を理解し、適切な並列化手法を選択することが、高パフォーマンスなシステムを構築するための鍵となります。

それでは、並列処理を用いた大規模データ処理の具体例を見ていきましょう。

□サンプルコード10:並列処理を活用した大規模データ処理

このコードでは、Threadクラスを使って大規模なデータの処理を並列に行うコードを紹介しています。

この例では、10万件のデータを10のスレッドで分割し、それぞれのスレッドでデータを並列処理しています。

require 'benchmark'

# 大規模データの模擬
data = Array.new(100_000) { rand }

# 並列処理の実行
result = Benchmark.measure do
  threads = []

  # 10分割で処理を行う
  data.each_slice(10_000) do |slice|
    threads << Thread.new do
      slice.sort!
    end
  end

  # 各スレッドの終了を待つ
  threads.each(&:join)
end

puts "処理時間: #{result.real}"

このコードを実行すると、並列化により処理時間が大幅に短縮されることが確認できます。

これにより、Rubyを用いて大規模データを効率よく処理することが可能となります。

●並列処理の注意点と対策

並列処理は大幅なパフォーマンス向上をもたらしますが、それだけではなく、いくつかの問題に直面する可能性もあります。

その代表例として「競合状態」、「デッドロック」、「データの不整合」などがあります。

競合状態は、複数のスレッドが同時に共有データにアクセスし、データの整合性が保たれない状況を指します。

これを解消するためには、排他制御(MutexやSemaphoreなど)を使い、一度に一つのスレッドのみがデータにアクセスできるように制御します。

しかし、排他制御が適切に行われないと、別の問題である「デッドロック」が発生する可能性があります。

デッドロックとは、複数のスレッドがお互いに必要なリソースを保持しつつ、他方のリソースの解放を待ち続ける状態を指します。

これを防ぐためには、リソースの取得順序を統一する、タイムアウトを設けるなどの対策が有効です。

また、「データの不整合」は、複数スレッドがデータを読み書きすることで起こりえます。

この解決策としては、アトミック操作(一連の操作が中断されることなく完全に実行されることを保証する操作)を用いる、またはトランザクションを適用することが考えられます。

それでは、これらの注意点と対策を具体的なコードとともに見ていきましょう。

まずは、「競合状態」を防ぐためのMutex(排他制御)の利用例を見てみましょう。

このコードでは、複数のスレッドから共有データにアクセスする際にMutexを使ってアクセス制御を行う例を紹介しています。

この例では、スレッド内でincrementメソッドを呼び出し、Mutex#synchronizeメソッドを使って排他制御を行っています。

require 'thread'

# 共有データ
counter = 0

# 排他制御用のMutex
mutex = Mutex.new

# 並列処理
threads = 10.times.map do
  Thread.new do
    1000.times do
      mutex.synchronize do
        counter += 1  # 共有データに対する操作
      end
    end


  end
end

# スレッドの終了を待つ
threads.each(&:join)

# 結果の表示
puts "カウンタの値: #{counter}"

このコードを実行すると、「カウンタの値: 10000」と表示され、期待通りの結果が得られます。

Mutexを使わない場合(排他制御を行わない場合)は、期待通りの結果が得られない可能性があることを理解してください。

●Ruby並列処理のカスタマイズ方法

Rubyでは並列処理をさらに効率的に行うための様々なカスタマイズ方法が提供されています。

その中でも、今回は「スレッドプール」の作成と使用について説明します。

スレッドプールとは、プログラムの開始時にスレッドを一定数作成し、それらを再利用することでスレッドの生成と破棄に伴うオーバーヘッドを軽減する手法です。

スレッドプールは、サーバーアプリケーションなどでよく使用されます。

それでは、具体的なコードを用いてスレッドプールの作成と利用を見ていきましょう。

このコードでは、QueueとThreadを使ってスレッドプールを作成し、それを使用して並列処理を行う例を紹介しています。

この例では、先に作成したスレッドがタスクを順次処理し、全てのタスクが終了したら終了します。

require 'thread'

# タスクのキュー
queue = Queue.new

# タスクを追加
100.times { |i| queue.push(i) }

# スレッドプールの作成
threads = 10.times.map do
  Thread.new do
    while (task = queue.pop(true) rescue nil)
      # タスクの処理(ここでは2秒スリープ)
      sleep 2
      puts "タスク #{task} 完了"
    end
  end
end

# スレッドの終了を待つ
threads.each(&:join)

このコードを実行すると、各スレッドがタスクを順次処理し、「タスク 〇〇 完了」と表示します。

ただし、タスクの処理順序はスレッドの実行順序に依存するため、タスクの番号が連続しない場合があります。

スレッドプールは、同時に実行されるタスクの数を制御することで、リソースの消費を抑えつつ、処理を効率的に行うことが可能です。

また、事前にスレッドを作成しておくことで、スレッドの生成と破棄のコストを削減できます。

これらの特性から、並列処理のパフォーマンスを向上させるための重要なテクニックの一つと言えます。

まとめ

今回の記事では、Rubyにおける並列処理の基本的な要点と注意点、さらにはそのカスタマイズ方法について解説しました。

並列処理は処理速度を劇的に向上させることが可能ですが、その一方でデッドロックやレースコンディションなど、様々な問題が発生する可能性があります。

しかし、Rubyではこれらの問題を解決するための多くの手法が提供されています。

これらの手法を学び、理解し、適切に利用することで、Rubyにおける並列処理のパワーを最大限に引き出すことが可能です。

Rubyでの並列処理は、プログラムの性能を引き出すための強力なツールです。

これを上手に扱うことで、より速く、効率的なプログラムを作成することができます。

今回学んだ知識が、あなたのRubyプログラミングにおける並列処理の理解と技術向上に役立てば幸いです。