IPマルチキャスト

最近は分散キャッシュをさわることが多いのですが、そうなると、IPマルチキャストは避けて通れません。Ehcacheでさえ、自動のピア・ディスカバリにはマルチキャストが使われてます。ということで、IPマルチキャストを用いたデータ送受信テスト用のコードを書いてみました。

MulticastSender

マルチキャストアドレスへのデータ送信君です。

  • コンストラクタでは、マルチキャスト用のアドレスとポートを指定しています。
  • send(byte[] data)メソッドでは、バイト列を送信しています。バイト列の最大サイズは、パケットサイズの最大長(65535) - IPヘッダ長(20) - UDPヘッダ長(8) で 65507バイトとなっています。
  • void send(InetAddress mcastaddr, int port, byte[] data) は、お手軽に使うための static な便利メソッドで、インスタンス生成したりするのが面倒な場合用です。
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;

/**
 * DatagramPaket sender for IP multicast.
 * 
 * @author beyondseeker
 * 
 */
public class MulticastSender extends MulticastManipulator {

	/**
	 * Create MulticastSocket and join multicast group.
	 * 
	 * @param mcastaddr multicast address
	 * @param port multicast port
	 * @throws IOException
	 */
	public MulticastSender(InetAddress mcastaddr, int port) throws IOException {
		super(mcastaddr, port);
	}
	
	/**
	 * Send data.
	 * 
	 * @param data data to send. Max data length is 65507 because of the max length of UDP payload.
	 * @throws IOException
	 */
	public void send(byte[] data) throws IOException {
		DatagramPacket dp = new DatagramPacket(data, data.length, mcastaddr, port);
		ms.send(dp);
	}
	
	/**
	 * Convenient static method for sending a multicast packet.
	 * 
	 * @param mcastaddr multicast address
	 * @param port multicast port
	 * @param data data to send. Max data length is 65507 because of the max length of UDP payload.
	 * @throws Exception
	 */
	public static void send(InetAddress mcastaddr, int port, byte[] data) throws Exception {
		MulticastSender sender = new MulticastSender(mcastaddr, port);
		sender.send(data);
		sender.close();
	}
}

MulticastReceiver

マルチキャストアドレスからのデータ受信君です。

  • コンストラクタでは、マルチキャスト用のアドレスとポートを指定しています。
  • receive()メソッドは、UDPのデータグラムを受け取るメソッドで、データグラムを受け取るまでスレッドがブロックします。受け取ったデータグラムからはバッファとバッファ長を取得できます。
  • receiveBytes()メソッドは、receive()メソッドのラッパーで、データグラムから実データ部分を抜き出して返します。バイト列のコピーが発生する分 receive() メソッドよりは低速ですが、純粋にペイロードだけ使用したい場合は、こちらのメソッドのほうがソースが見やすくなると思います。ちなみに、おいらのノートPCで、64KBのバイト列を1万回ループさせてみたところ、一回あたり70ナノ秒程度だったので、性能低下はあまり気にしなくてもいいとおもいます。
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.util.Arrays;

/**
 * DatagramPaket receiver for IP multicast.
 * 
 * @author beyondseeker
 *
 */
public class MulticastReceiver extends MulticastManipulator {

	/** Buffer length of payload. */
	private static final int BUF_LEN = 65535 - 20 - 8;

	/**
	 * Create MulticastSocket and join multicast group.
	 * 
	 * @param mcastaddr multicast address
	 * @param port multicast port
	 * @throws IOException
	 */
	public MulticastReceiver(InetAddress mcastaddr, int port) throws IOException {
		super(mcastaddr, port);
	}
	
	/**
	 * Receive DatagramPacket.
	 * 
	 * @return received data
	 * @throws IOException
	 */
	public DatagramPacket receive() throws IOException {
	    byte[] buffer = new byte[BUF_LEN];
	    DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
		ms.receive(dp);
		return dp;
	}
	
	/**
	 * Receive DatagramPacket and get its payload as byte[].
	 * 
	 * @return received data
	 * @throws IOException
	 */
	public byte[] receiveBytes() throws IOException {
	    DatagramPacket dp = receive();
		return Arrays.copyOf(dp.getData(), dp.getLength());
	}
}

MulticastManipulator

MulticastSender と MulticastReceiver の抽象クラスです。低レベルのライブラリなので、過度のカプセル化は意識せず、自由度を高くしつつ、共通部分の抜き出しをしています。

  • close()メソッドでは、MulticastSocketをマルチキャストグループから脱退させ、ソケットをクローズしています。
import java.io.IOException;
import java.net.InetAddress;
import java.net.MulticastSocket;

/**
 * Abstract class for classes to manipulate MulticastSocket.
 * 
 * @author beyondseeker
 *
 */
abstract public class MulticastManipulator {
	
	/** multicast address */
	public final InetAddress mcastaddr;
	
	/** multicast port */
	public final int port;
	
	/** MulticastSocket */
	public final MulticastSocket ms;
	
	/**
	 * Create MulticastSocket and join multicast group.
	 * 
	 * @param mcastaddr multicast address
	 * @param ms MulticastSocket
	 */
	public MulticastManipulator(InetAddress mcastaddr, int port) throws IOException {
		this.mcastaddr = mcastaddr;
		this.port = port;
		this.ms = new MulticastSocket(port);
		ms.joinGroup(mcastaddr);
	}
	
	/**
	 * Leave multicast group and close multicast socket.
	 * 
	 * @throws IOException
	 */
	public void close() throws IOException {
		ms.leaveGroup(mcastaddr);
		ms.close();
	}

}

MulticastReceiverTest

マルチキャストアドレス 228.5.6.7:6789 からパケットを受け取り、バイト列を文字列として解釈して表示するサンプル。

public class MulticastReceiverTest {
	
	public static void main(String[] args) throws Exception {
		InetAddress mcastaddr = InetAddress.getByName("228.5.6.7");
		int port = 6789;
		byte[] bytes = new MulticastReceiver(mcastaddr, port).receiveBytes();
		System.out.println(new String(bytes));		
	}
	
}

MulticastSenderTest

マルチキャストアドレス 228.5.6.7:6789 に、文字列 "This is only a test." のバイト表現を送信するサンプル。

public class MulticastSenderTest {

	public static void main(String[] args) throws Exception {		
		InetAddress mcastaddr = InetAddress.getByName("228.5.6.7");
		int port = 6789;
		byte[] data = "This is only a test.".getBytes();
		MulticastSender.send(mcastaddr, port, data);
	}

}