Работа с Selectors

Содержание

Введение
Абстрактный класс Selector
Метод select

Введение

Ранее "Технические советы" исследовали различные аспекты NIO библиотек, такие как socket channels (работа с каналами сокета) и programming with buffers (программирование с буферами). Следующая статья переходит к мультиплексным аспектам ввода/вывода каналов. Вместо создания нового потока для каждого нового сокета соединения (канала), у вас есть один или несколько потоков для работы со всеми каналами. Вы делаете запрос на выполнение определенных операций, и свободный поток выполняет эту операцию и ожидает следующую задачу. В этом разница большинства типичных моделей одного потока для каждого канала сокета. В мультиплексных моделях ввода/вывода поток не ждет операцию на определенном канале сокета, а может следить за несколькими каналами одновременно и управлять любым, когда там появляется определенная задача.

Абстрактный класс Selector

Основа мультиплексного ввода/вывода – это абстрактный класс Selector. Вы проверяете каналы сокета с помощью Selector и указываете какую операцию вы запрашивали. Затем у вас есть поток, который ждет Selector пока не поступит одна из регистрируемых операций. Поток «узнает», когда такая операция поступила и может ее выполнить, а потом будет ждать следующие операции. Посмотрим каждый шаг. Чтобы создать объект Selector, вы вызываете метод open (не вызывая конструктор класса):

Selector selector = Selector.open();

Это создает экземпляр платформно-зависимого подкласса, реализация которого от вас скрыта. Например, для для платформы Windows таким классом будет sun.nio.ch.WindowsSelectorImpl. Чтобы делать запрос с помощью Selector, вы вызываете функцию register объекта класса ServerSocketChannel или SocketChannel:

ServerSocketChannel serverSocketChannel = ...
   serverSocketChannel.
     configureBlocking
(false);
   serverSocketChannel.
     register
(selector, SelectionKey.OP_ACCEPT);

или

SocketChannel socketChannel = ...
   serverSocketChannel.
     configureBlocking
(false);
   serverSocketChannel.
     register
(selector, SelectionKey.OP_READ);

Обратите внимание на последний параметр в методе register. Этот ключ идентифицирует операцию, которую должен ждать поток. Ключ OP_ACCEPT используется для ожидания сервером соединения с клиентом. Ключ OP_READ используется для принятия данных сервером от клиента, установившего связь. Другие операции могут быть определены ключами OP_CONNECT и OP_WRITE. После запроса с помощью Select большинство программ выполняют такой цикл:

  while (true) {
      int count = selector.select();
     
if (count == 0) {
         continue;
     
}
   Set keys = selector.selectedKeys();
  
      Iterator itor = keys.iterator
();
     
while (itor.hasNext()) {
        SelectionKey selectionKey = (SelectionKey)itor.next();
         selectionKey.remove
();
        
// process channel from key here
      }
   }

Метод select

Используйте метод select для ожидания поступления данных для обработки объектом Selector. Метод select возвращает номер ключа, чей соответствующий канал готов для операций ввода/вывода. Обычно значение ненулевое, но если многочисленные потоки выполняют запросы от Select, оно может принимать и нулевое значение. Далее, надо получить набор ключей, готовых для обработки. Этот набор ключей называют ready set (готовый набор) или selected set (выбранный набор).  Все, что регистрирует Selector и что ждет данных, является частью этого набора ключей. После получения набора ключей, последовательно пройдите по каждому ключу, удаляя и обрабатывая его.

Для каждого ключа, с которым вы работаете, вы можете проверить его статус с помощью таких методов как isAcceptable или isReadable. Они сообщают вам, какую операцию ждет ключ. Например, если у сервера есть доступная связь от клиента, тогда вы берете канал сокета из соединения и проверяете его с помощью Selector для OP_READ операций. Не рассматривая исключительных ситуаций, код для этих операций выглядит так:

if (selectionKey.isAcceptable()) {
     socket = serverSocket.accept();
     channel = socket.getChannel
();
    
if (channel != null) {
       channel.configureBlocking(false);
       channel.register
(selector, SelectionKey.OP_READ);
    
}
   }

Если же у вас исключительная ситуация, обрабатывая ключ, вы сообщаете ключу отмену, например:

} catch (SomeException e) {
     selectionKey.cancel();
  
}

Это делает недействительным ключ и соответствующее соединение.

Чтобы посмотреть результат, рассмотрим код для «эхо» сервера. Он возвращает клиенту все данные, которые клиент отправил. Вот программа, содержащая все, что было описано в этой статье.

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class SelectorTest {
     private static int PORT = 9876;
    
private static int BUFFER_SIZE = 1024;
    
public static void main (String args[]) {
       ByteBuffer sharedBuffer =
         ByteBuffer.allocateDirect
(BUFFER_SIZE);
       Selector selector =
null;
         ServerSocket serverSocket =
null;
      
try {
         ServerSocketChannel serverSocketChannel =
           ServerSocketChannel.open
();
         serverSocketChannel.configureBlocking
(false);
         serverSocket = serverSocketChannel.socket
();
         InetSocketAddress inetSocketAddress =
           new InetSocketAddress
(PORT);
         serverSocket.bind
(inetSocketAddress);
         selector = Selector.open
();
         serverSocketChannel.register
(
           selector, SelectionKey.OP_ACCEPT);
      
} catch (IOException e) {
         System.err.println("Unable to setup environment");
         System.exit
(-1);
      
}
       try {
         while (true) {
           int count = selector.select();
          
// нечего обрабатывать
           if (count == 0) {
             continue;
          
}
           Set keySet = selector.selectedKeys();
           Iterator itor = keySet.iterator
();
          
while (itor.hasNext()) {
             SelectionKey selectionKey =
               
(SelectionKey) itor.next();
             itor.remove
();
             Socket socket =
null;
             SocketChannel channel =
null;
            
if (selectionKey.isAcceptable()) {
               System.out.println("Got acceptable key");
              
try {
                 socket = serverSocket.accept();
                 System.out.println
                    
("Connection from: " + socket);
                 channel = socket.getChannel
();
              
} catch (IOException e) {
                 System.err.println("Unable to accept channel");
                 e.printStackTrace
();
                 selectionKey.cancel
();
              
}
               if (channel != null) {
                 try {
                   System.out.println
                      
("Watch for something to read");
                   channel.configureBlocking
(false);
                   channel.register
                      
(selector, SelectionKey.OP_READ);
                
} catch (IOException e) {
                   System.err.println("Unable to use channel");
                   e.printStackTrace
();
                   selectionKey.cancel
();
                
}
               }
             }
             if (selectionKey.isReadable()) {
               System.out.println("Reading channel");
               SocketChannel socketChannel =
                 
(SocketChannel) selectionKey.channel();
               sharedBuffer.clear
();
              
int bytes = -1;
              
try {
                 while (
                  (bytes = socketChannel.read(sharedBuffer)) > 0)
                   {
                     System.out.println("Reading...");
                     sharedBuffer.flip
();
                    
while (sharedBuffer.hasRemaining()) {
                       System.out.println("Writing...");
                       socketChannel.write
(sharedBuffer);
                    
}
                     sharedBuffer.clear();
                  
}
               } catch (IOException e) {
                 System.err.println("Error writing back bytes");
                 e.printStackTrace
();
                 selectionKey.cancel
();
              
}
               try {
                 System.out.println("Closing...");
                 socketChannel.close
();
               
} catch (IOException e) {
                 e.printStackTrace();
                 selectionKey.cancel
();
              
}
             }
             System.out.println("Next...");
          
}
         }
       } catch (IOException e) {
         System.err.println("Error during select()");
         e.printStackTrace
();
      
}
     }
   }

Этот сервер будет след за портом 9876. Вы можете изменить порт или взять его из командной строки.

Вот клиент, который тестирует этот сервер. Клиент просто подключается к серверу через локальный хост, порт 9876, и отсылает строку "Hello, World" десять раз. Клиент ждет полсекунды между каждым отправлением.

import java.net.*;
import java.io.*;

  
public class Connect {
     private static final int LOOP_COUNT = 10;
    
private static final int SLEEP_TIME = 500;
    
private static final int PORT = 9876;
    
public static void main(String args[])
         throws IOException, InterruptedException {
       for (int i=0; i&ltLOOP_COUNT; i++) {
         Socket socket = new Socket("localhost", PORT);
         InputStream is = socket.getInputStream
();
         OutputStream os = socket.getOutputStream
();
         Writer writer =
new OutputStreamWriter(os, "US-ASCII");
         PrintWriter out =
new PrintWriter(writer, true);
         out.println
("Hello, World");
         BufferedReader in =
           new BufferedReader
            
(new InputStreamReader(is, "US-ASCII"));
         String line;
        
while ((line = in.readLine()) != null) {
           System.out.println(i + ": " + line);
        
}
        socket.close();
         Thread.sleep
(SLEEP_TIME);
      
}
     }
   }
  

Запустите эхо сервер. Затем запустите программу клиента. Вот что вам должен показать сервер 10 раз:

Got acceptable key
Connection from: Socket
[addr= ...]
Watch for something to read
Next...
Reading channel
Reading...
Writing...
Closing...
Next...

Вот что должен показать клиент:

0: Hello, World
1: Hello, World
2: Hello, World
3: Hello, World
4: Hello, World
5: Hello, World
6: Hello, World
7: Hello, World
8: Hello, World
9: Hello, World

Тем не менее, вы по-прежнему можете использовать накопители потоков, чтобы выполнять запросы на распространенных системах, в мультиплексном вводе/выводе каждый поток не ограничен запросом только от одного клиента. Как только операцию можно выполнить, она обрабатывается непосредственно. Если есть задержки между принятием и чтением, ни одному потоку не приходится ждать, работая вхолостую.

За дополнительной информацией о NIO библиотеках смотрите New I/O APIs. Эта страничка включает в себя еще одну статью неблокирующий сервер в виде временного сервера.

Теги: java selectors