Главная > Java сниппеты > Работа с Selectors

Тема Зацепин
268

Java-разработчик 🧩
583
2 минуты

Работа с Selectors

Введение Абстрактный класс SelectorМетод select

Добавлено : 18 Mar 2009, 17:32

Содержание

Введение

Ранее "Технические советы" исследовали различные аспекты NIO библиотек, такие как socket channels (работа с каналами сокета) и programming with buffers (программирование с буферами). Следующая статья переходит к мультиплексным аспектам ввода/вывода каналов. Вместо создания нового потока для каждого нового сокета соединения (канала), у вас есть один или несколько потоков для работы со всеми каналами. Вы делаете запрос на выполнение определенных операций, и свободный поток выполняет эту операцию и ожидает следующую задачу. В этом разница большинства типичных моделей одного потока для каждого канала сокета. В мультиплексных моделях ввода/вывода поток не ждет операцию на определенном канале сокета, а может следить за несколькими каналами одновременно и управлять любым, когда там появляется определенная задача.

Абстрактный класс Selector

Основа мультиплексного ввода/вывода – это абстрактный класс Selector. Вы проверяете каналы сокета с помощью Selector и указываете какую операцию вы запрашивали. Затем у вас есть поток, который ждет Selector пока не поступит одна из регистрируемых операций. Поток «узнает», когда такая операция поступила и может ее выполнить, а потом будет ждать следующие операции. Посмотрим каждый шаг. Чтобы создать объект Selector, вы вызываете метод open (не вызывая конструктор класса):

Selector selector = Selector.open();

Это создает экземпляр платформно-зависимого подкласса, реализация которого от вас скрыта. Например, для для платформы Windows таким классом будет sun.nio.ch.WindowsSelectorImpl. Чтобы делать запрос с помощью Selector, вы вызываете функцию register объекта класса ServerSocketChannel или SocketChannel:

ServerSocketChannel serverSocketChannel = ...
serverSocketChannel.
configureBlocking
(false);
serverSocketChannel.
register
(selector, SelectionKey.OP_ACCEPT);

или

SocketChannel socketChannel = ...
serverSocketChannel.
configureBlocking
(false);
serverSocketChannel.
register
(selector, SelectionKey.OP_READ);

Обратите внимание на последний параметр в методе register. Этот ключ идентифицирует операцию, которую должен ждать поток. Ключ OP_ACCEPT используется для ожидания сервером соединения с клиентом. Ключ OP_READ используется для принятия данных сервером от клиента, установившего связь. Другие операции могут быть определены ключами OP_CONNECT и OP_WRITE. После запроса с помощью Select большинство программ выполняют такой цикл:

while (true) {
int count = selector.select();
if (count == 0) {
continue;
}
Set keys = selector.selectedKeys();

Iterator itor = keys.iterator
();
while (itor.hasNext()) {
SelectionKey selectionKey = (SelectionKey)itor.next();
selectionKey.remove
();
// process channel from key here
}
}

Метод select

Используйте метод select для ожидания поступления данных для обработки объектом Selector. Метод select возвращает номер ключа, чей соответствующий канал готов для операций ввода/вывода. Обычно значение ненулевое, но если многочисленные потоки выполняют запросы от Select, оно может принимать и нулевое значение. Далее, надо получить набор ключей, готовых для обработки. Этот набор ключей называют ready set (готовый набор) или selected set (выбранный набор). Все, что регистрирует Selector и что ждет данных, является частью этого набора ключей. После получения набора ключей, последовательно пройдите по каждому ключу, удаляя и обрабатывая его.

Для каждого ключа, с которым вы работаете, вы можете проверить его статус с помощью таких методов как isAcceptable или isReadable. Они сообщают вам, какую операцию ждет ключ. Например, если у сервера есть доступная связь от клиента, тогда вы берете канал сокета из соединения и проверяете его с помощью Selector для OP_READ операций. Не рассматривая исключительных ситуаций, код для этих операций выглядит так:

if (selectionKey.isAcceptable()) {
socket = serverSocket.accept();
channel = socket.getChannel
();
if (channel != null) {
channel.configureBlocking(false);
channel.register
(selector, SelectionKey.OP_READ);
}
}

Если же у вас исключительная ситуация, обрабатывая ключ, вы сообщаете ключу отмену, например:

} catch (SomeException e) {
selectionKey.cancel();
}

Это делает недействительным ключ и соответствующее соединение.

Чтобы посмотреть результат, рассмотрим код для «эхо» сервера. Он возвращает клиенту все данные, которые клиент отправил. Вот программа, содержащая все, что было описано в этой статье.

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class SelectorTest {
private static int PORT = 9876;
private static int BUFFER_SIZE = 1024;
public static void main (String args[]) {
ByteBuffer sharedBuffer =
ByteBuffer.allocateDirect
(BUFFER_SIZE);
Selector selector =
null;
ServerSocket serverSocket =
null;
try {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open
();
serverSocketChannel.configureBlocking
(false);
serverSocket = serverSocketChannel.socket
();
InetSocketAddress inetSocketAddress =
new InetSocketAddress
(PORT);
serverSocket.bind
(inetSocketAddress);
selector = Selector.open
();
serverSocketChannel.register
(
selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
System.err.println("Unable to setup environment");
System.exit
(-1);
}
try {
while (true) {
int count = selector.select();
// нечего обрабатывать
if (count == 0) {
continue;
}
Set keySet = selector.selectedKeys();
Iterator itor = keySet.iterator
();
while (itor.hasNext()) {
SelectionKey selectionKey =
(SelectionKey) itor.next();
itor.remove
();
Socket socket =
null;
SocketChannel channel =
null;
if (selectionKey.isAcceptable()) {
System.out.println("Got acceptable key");
try {
socket = serverSocket.accept();
System.out.println
("Connection from: " + socket);
channel = socket.getChannel
();
} catch (IOException e) {
System.err.println("Unable to accept channel");
e.printStackTrace
();
selectionKey.cancel
();
}
if (channel != null) {
try {
System.out.println
("Watch for something to read");
channel.configureBlocking
(false);
channel.register
(selector, SelectionKey.OP_READ);
} catch (IOException e) {
System.err.println("Unable to use channel");
e.printStackTrace
();
selectionKey.cancel
();
}
}
}
if (selectionKey.isReadable()) {
System.out.println("Reading channel");
SocketChannel socketChannel =
(SocketChannel) selectionKey.channel();
sharedBuffer.clear
();
int bytes = -1;
try {
while (
(bytes = socketChannel.read(sharedBuffer)) > 0)
{
System.out.println("Reading...");
sharedBuffer.flip
();
while (sharedBuffer.hasRemaining()) {
System.out.println("Writing...");
socketChannel.write
(sharedBuffer);
}
sharedBuffer.clear();
}
} catch (IOException e) {
System.err.println("Error writing back bytes");
e.printStackTrace
();
selectionKey.cancel
();
}
try {
System.out.println("Closing...");
socketChannel.close
();
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel
();
}
}
System.out.println("Next...");
}
}
} catch (IOException e) {
System.err.println("Error during select()");
e.printStackTrace
();
}
}
}

Этот сервер будет след за портом 9876. Вы можете изменить порт или взять его из командной строки.

Вот клиент, который тестирует этот сервер. Клиент просто подключается к серверу через локальный хост, порт 9876, и отсылает строку "Hello, World" десять раз. Клиент ждет полсекунды между каждым отправлением.

import java.net.*;
import java.io.*;

public class Connect {
private static final int LOOP_COUNT = 10;
private static final int SLEEP_TIME = 500;
private static final int PORT = 9876;
public static void main(String args[])
throws IOException, InterruptedException {
for (int i=0; i<LOOP_COUNT; i++) {
Socket socket = new Socket("localhost", PORT);
InputStream is = socket.getInputStream
();
OutputStream os = socket.getOutputStream
();
Writer writer =
new OutputStreamWriter(os, "US-ASCII");
PrintWriter out =
new PrintWriter(writer, true);
out.println
("Hello, World");
BufferedReader in =
new BufferedReader
(new InputStreamReader(is, "US-ASCII"));
String line;
while ((line = in.readLine()) != null) {
System.out.println(i + ": " + line);
}
socket.close();
Thread.sleep
(SLEEP_TIME);
}
}
}

Запустите эхо сервер. Затем запустите программу клиента. Вот что вам должен показать сервер 10 раз:

Got acceptable key
Connection from: Socket
[addr= ...]
Watch for something to read
Next...
Reading channel
Reading...
Writing...
Closing...
Next...

Вот что должен показать клиент:

0: Hello, World
1: Hello, World
2: Hello, World
3: Hello, World
4: Hello, World
5: Hello, World
6: Hello, World
7: Hello, World
8: Hello, World
9: Hello, World

Тем не менее, вы по-прежнему можете использовать накопители потоков, чтобы выполнять запросы на распространенных системах, в мультиплексном вводе/выводе каждый поток не ограничен запросом только от одного клиента. Как только операцию можно выполнить, она обрабатывается непосредственно. Если есть задержки между принятием и чтением, ни одному потоку не приходится ждать, работая вхолостую.

За дополнительной информацией о NIO библиотеках смотрите New I/O APIs. Эта страничка включает в себя еще одну статью неблокирующий сервер в виде временного сервера.

Теги: javaselectors

Еще от автора

Применение WeakHashmap для списков слушателей

В статье от 11мая 1999 года Reference Objects были описаны основные идеи применения ссылочных объектов, но не приводилось детального описания. Данная статья позволит вам получить больше сведений, касающихся данной темы. В основном ссылочные объекты применяются для косвенных ссылок на память необходимую объектам. Ссылочные объекты хранятся в очереди (класс ReferenceQueue), в которой отслеживается доступность ссылочных объектов. Исходя из типа ссылочного объекта, сборщик мусора может освобождать память даже тогда, когда обычные ссылки не могут быть освобождены.

Заставки в Mustang

Согласно определению, данному в Wikipedia, заставка - это компьютерный термин, обозначающий рисунок, появляющийся во время загрузки программы или операционной системы. Заставка для пользователя является визуальным отображением инициализации программы. До выхода версии Java SE 6 (кодовое название Mustang) единственной возможностью применения заставки было создание окна, во время запуска метода main, и размещение в нем картинки. Хотя данный способ и работал, но он требовал полной инициализации исполняемой Java среды до появления окна заставки. При инициализации загружались библиотеки AWT и Swing, таким образом, появление заставки задерживалось. В Mustang появился новый аргумент командной строки, значительно облегчающий использование заставок. Этот способ позволяет выводить заставку значительно быстрее до запуска исполняемой Java среды. Окончательное добавление данной функциональности находится на рассмотрении в JCP.

Анонимные классы

1 Введение 2 Типичный пример применения 3 Сортировка списка с использованием анонимных классов 4 Примеры использования 5 Ссылки

Еще по теме

Гибкое журналирование с помощью log4j

Log4j – это инструмент для журналирования с открытым исходным кодом, разработанный под эгидой глобального проекта Jakarta Apache. Он представляет собой набор API с помощью которых, разработчики могут вставлять в свой код выражения, которые выводят некоторую информацию (отладочную, информационную, сообщения об ошибках и т.д.), и конфигурировать этот вывод с помощью внешний конфигурационных файлов. В этой статье рассматриваются основные идеи, положенные в данный инструмент, а также будут затронуты некоторые интересные моменты, касающиеся написания демонстрационного web-приложения.

Аннотации в Java (java annotation types). Пример 1

Продолжаю серию статей о нововведениях в Java (начиная с версии 1.5). На этот раз разговор пойдет об аннотациях (annotation type).

Указатели и виртуальные функции в Java

В настоящее время в Интернете можно найти множество статей как о перспективности платформы Java, так и об её ограниченности. Многих программистов, только присматривающихся к Яве, могут отпугнуть частые заявления, типа: «низкое быстродействие», «отсутствие указателей» и т.д.

Блокировки

Одной из популярных функциональных возможностей библиотек J2SE 5.0 является добавление средств обеспечения параллельной работы. Предоставленные как часть JSR 166 эти средства обеспечивают развитые возможности программирования параллельных процессов, устраняющие необходимость использования разработчиками ключевого слова synchronized и связанных с ним блокировок. Среди предлагаемых ими функциональных возможностей присутствуют: поддержка блокировочных таймаутов, множественные переменные условия для одной блокировки, блокировки чтения/записи и способность прерывать поток, ожидающий снятия блокировки. Более подробную информацию по дополнительной поддержке блокировок можно найти в документации по пакету java.util.concurrent.locks.