Работа с Selectors
Содержание
Введение
Абстрактный класс Selector
Метод select
Введение
Ранее "Технические советы" исследовали различные аспекты NIO библиотек, такие как socket channels (работа с каналами сокета) и programming with buffers (программирование с буферами). Следующая статья переходит к мультиплексным аспектам ввода/вывода каналов. Вместо создания нового потока для каждого нового сокета соединения (канала), у вас есть один или несколько потоков для работы со всеми каналами. Вы делаете запрос на выполнение определенных операций, и свободный поток выполняет эту операцию и ожидает следующую задачу. В этом разница большинства типичных моделей одного потока для каждого канала сокета. В мультиплексных моделях ввода/вывода поток не ждет операцию на определенном канале сокета, а может следить за несколькими каналами одновременно и управлять любым, когда там появляется определенная задача.
Абстрактный класс Selector
Основа мультиплексного ввода/вывода – это абстрактный класс Selector. Вы проверяете каналы сокета с помощью Selector и указываете какую операцию вы запрашивали. Затем у вас есть поток, который ждет Selector пока не поступит одна из регистрируемых операций. Поток «узнает», когда такая операция поступила и может ее выполнить, а потом будет ждать следующие операции. Посмотрим каждый шаг. Чтобы создать объект Selector, вы вызываете метод open (не вызывая конструктор класса):
Selector selector = Selector.open();
Это создает экземпляр платформно-зависимого подкласса, реализация которого от вас скрыта. Например, для для платформы Windows таким классом будет sun.nio.ch.WindowsSelectorImpl. Чтобы делать запрос с помощью Selector, вы вызываете функцию register объекта класса ServerSocketChannel или SocketChannel:
ServerSocketChannel serverSocketChannel = ...
serverSocketChannel.
configureBlocking(false);
serverSocketChannel.
register(selector, SelectionKey.OP_ACCEPT);
или
SocketChannel socketChannel = ...
serverSocketChannel.
configureBlocking(false);
serverSocketChannel.
register(selector, SelectionKey.OP_READ);
Обратите внимание на последний параметр в методе register. Этот ключ идентифицирует операцию, которую должен ждать поток. Ключ OP_ACCEPT используется для ожидания сервером соединения с клиентом. Ключ OP_READ используется для принятия данных сервером от клиента, установившего связь. Другие операции могут быть определены ключами OP_CONNECT и OP_WRITE. После запроса с помощью Select большинство программ выполняют такой цикл:
while (true) {
int count = selector.select();
if (count == 0) {
continue;
}
Set keys = selector.selectedKeys();
Iterator itor = keys.iterator();
while (itor.hasNext()) {
SelectionKey selectionKey = (SelectionKey)itor.next();
selectionKey.remove();
// process channel from key here
}
}
Метод select
Используйте метод select для ожидания поступления данных для обработки объектом Selector. Метод select возвращает номер ключа, чей соответствующий канал готов для операций ввода/вывода. Обычно значение ненулевое, но если многочисленные потоки выполняют запросы от Select, оно может принимать и нулевое значение. Далее, надо получить набор ключей, готовых для обработки. Этот набор ключей называют ready set (готовый набор) или selected set (выбранный набор). Все, что регистрирует Selector и что ждет данных, является частью этого набора ключей. После получения набора ключей, последовательно пройдите по каждому ключу, удаляя и обрабатывая его.
Для каждого ключа, с которым вы работаете, вы можете проверить его статус с помощью таких методов как isAcceptable или isReadable. Они сообщают вам, какую операцию ждет ключ. Например, если у сервера есть доступная связь от клиента, тогда вы берете канал сокета из соединения и проверяете его с помощью Selector для OP_READ операций. Не рассматривая исключительных ситуаций, код для этих операций выглядит так:
if (selectionKey.isAcceptable()) {
socket = serverSocket.accept();
channel = socket.getChannel();
if (channel != null) {
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
}
Если же у вас исключительная ситуация, обрабатывая ключ, вы сообщаете ключу отмену, например:
} catch (SomeException e) {
selectionKey.cancel();
}
Это делает недействительным ключ и соответствующее соединение.
Чтобы посмотреть результат, рассмотрим код для «эхо» сервера. Он возвращает клиенту все данные, которые клиент отправил. Вот программа, содержащая все, что было описано в этой статье.
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class SelectorTest {
private static int PORT = 9876;
private static int BUFFER_SIZE = 1024;
public static void main (String args[]) {
ByteBuffer sharedBuffer =
ByteBuffer.allocateDirect(BUFFER_SIZE);
Selector selector = null;
ServerSocket serverSocket = null;
try {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocket = serverSocketChannel.socket();
InetSocketAddress inetSocketAddress =
new InetSocketAddress(PORT);
serverSocket.bind(inetSocketAddress);
selector = Selector.open();
serverSocketChannel.register(
selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
System.err.println("Unable to setup environment");
System.exit(-1);
}
try {
while (true) {
int count = selector.select();
// нечего обрабатывать
if (count == 0) {
continue;
}
Set keySet = selector.selectedKeys();
Iterator itor = keySet.iterator();
while (itor.hasNext()) {
SelectionKey selectionKey =
(SelectionKey) itor.next();
itor.remove();
Socket socket = null;
SocketChannel channel = null;
if (selectionKey.isAcceptable()) {
System.out.println("Got acceptable key");
try {
socket = serverSocket.accept();
System.out.println
("Connection from: " + socket);
channel = socket.getChannel();
} catch (IOException e) {
System.err.println("Unable to accept channel");
e.printStackTrace();
selectionKey.cancel();
}
if (channel != null) {
try {
System.out.println
("Watch for something to read");
channel.configureBlocking(false);
channel.register
(selector, SelectionKey.OP_READ);
} catch (IOException e) {
System.err.println("Unable to use channel");
e.printStackTrace();
selectionKey.cancel();
}
}
}
if (selectionKey.isReadable()) {
System.out.println("Reading channel");
SocketChannel socketChannel =
(SocketChannel) selectionKey.channel();
sharedBuffer.clear();
int bytes = -1;
try {
while (
(bytes = socketChannel.read(sharedBuffer)) > 0)
{
System.out.println("Reading...");
sharedBuffer.flip();
while (sharedBuffer.hasRemaining()) {
System.out.println("Writing...");
socketChannel.write(sharedBuffer);
}
sharedBuffer.clear();
}
} catch (IOException e) {
System.err.println("Error writing back bytes");
e.printStackTrace();
selectionKey.cancel();
}
try {
System.out.println("Closing...");
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
}
}
System.out.println("Next...");
}
}
} catch (IOException e) {
System.err.println("Error during select()");
e.printStackTrace();
}
}
}
Этот сервер будет след за портом 9876. Вы можете изменить порт или взять его из командной строки.
Вот клиент, который тестирует этот сервер. Клиент просто подключается к серверу через локальный хост, порт 9876, и отсылает строку "Hello, World" десять раз. Клиент ждет полсекунды между каждым отправлением.
import java.net.*;
import java.io.*;
public class Connect {
private static final int LOOP_COUNT = 10;
private static final int SLEEP_TIME = 500;
private static final int PORT = 9876;
public static void main(String args[])
throws IOException, InterruptedException {
for (int i=0; i<LOOP_COUNT; i++) {
Socket socket = new Socket("localhost", PORT);
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
Writer writer = new OutputStreamWriter(os, "US-ASCII");
PrintWriter out = new PrintWriter(writer, true);
out.println("Hello, World");
BufferedReader in =
new BufferedReader
(new InputStreamReader(is, "US-ASCII"));
String line;
while ((line = in.readLine()) != null) {
System.out.println(i + ": " + line);
}
socket.close();
Thread.sleep(SLEEP_TIME);
}
}
}
Запустите эхо сервер. Затем запустите программу клиента. Вот что вам должен показать сервер 10 раз:
Got acceptable key
Connection from: Socket[addr= ...]
Watch for something to read
Next...
Reading channel
Reading...
Writing...
Closing...
Next...
Вот что должен показать клиент:
0: Hello, World
1: Hello, World
2: Hello, World
3: Hello, World
4: Hello, World
5: Hello, World
6: Hello, World
7: Hello, World
8: Hello, World
9: Hello, World
Тем не менее, вы по-прежнему можете использовать накопители потоков, чтобы выполнять запросы на распространенных системах, в мультиплексном вводе/выводе каждый поток не ограничен запросом только от одного клиента. Как только операцию можно выполнить, она обрабатывается непосредственно. Если есть задержки между принятием и чтением, ни одному потоку не приходится ждать, работая вхолостую.
За дополнительной информацией о NIO библиотеках смотрите New I/O APIs. Эта страничка включает в себя еще одну статью неблокирующий сервер в виде временного сервера.