Симфонія асинхронии: завдання JavaFX і сокети Netty

Всім доброго п'ятниці!

У нас нарешті дійшли руки до книги про Netty, яку нам рекомендували в тому числі вдячні читачі нашого хаброблога.



Зізнатися, у нас давно не виходило нічого вузькотематичного по Java. Але тема Netty викликає на Хабре найбільш живий інтерес, тому ми вирішили розмістити оглядовий матеріал по ній (автор почерпнув ідею посади цієї книги) і влаштувати самий орієнтовний опитування. Заходьте, висловлюйтеся!


У цій статті розказано, як інтегрувати клієнт/серверна фреймворк Netty в додаток JavaFX. Хоча і можна написати розподілене додаток, скориставшись простими протоколами, які працюють за моделлю запит/відповідь, наприклад, HTTP або RMI, ці протоколи часто неефективні та недостатньо функціональні для додатків, що вимагають постійних серверних оновлень, push-повідомлень, які виконують довгострокові операції. Netty використовує ефективну мережеву реалізацію, в основі якої лежить асинхронна обробка і з'єднання, що залежать від станів. Така структура дозволяє обійтися без додаткових прийомів, наприклад, не вимагає робити опитування для оновлення клієнтського коду.

Інтегруючи Netty з JavaFX, потрібно гарантувати, що взаємодії з UI у вас реалізуються в потоці interaction FX, не блокуючи UI. Таким чином, потрібно обернути виклики Netty в клас Task з FX. Клас FX Task надає потік для довгострокових операцій, і в більшості випадків можна дозволити Netty просто чекати відгуку (
wait()
). Це робиться за допомогою виклику sync(), який забезпечує блокування, але не призводить до подвисанию програми.

Цей приклад зроблений на основі програми для обміну ехо-запитів між клієнтом і сервером, яку я знайшов у книзі "Netty in Action" Нормана Маурера і Марвіна Аллена Вольфталя. Після того, як з'єднання буде встановлено, клієнт збирає рядок
java.lang.String
та надсилає її на сервер. Сервер перетворює цю рядок за допомогою
toUpperCase()
і надсилає одержану рядок назад до клієнта. Клієнт відображає рядок у інтерфейсі користувача.

Весь код до цього посту лежить на GitHub.

Проект

Для зручності я спакував весь серверний та клієнтський код в один проект Maven. Наступна UML-діаграма класів демонструє, які класи є в нашій програмі.



Діаграма класів ехо-клієнта на FX

В
EchoServer
та
EchoClient
містить функції
main()
, що є вхідними точками для серверних і клієнтських процесів. В
EchoServer
містить код Netty для початкового завантаження, зв'язування та створення конвеєра зі спеціальним обробником
EchoServerHandler
.
EchoClient
створює об'єкт користувальницького інтерфейсу
EchoClientController
, в якому міститься код Netty для створення з'єднання, розриву з'єднання, надсилання й отримання. Контролер
EchoClientController
також створює клієнтський конвеєр за допомогою
EchoClientHandler
.

На діаграмі показана послідовність з'єднання/відправка/отримання/розрив з'єднання. Вона не нормалізовано, тому деякі операції («Enter Text», «Netty Connect») номінальні і в коді відсутні. Обмін даними в програмі в основному реалізований за допомогою стандартного зв'язування JavaFX і Netty Futures.



Отже, ось як схематично виглядає наша послідовність.

  1. Користувач натискає кнопку Connect.
  2. Контролер
    EchoClientController
    виконує початкову завантаження і підключається до
    EchoServer
    .
  3. Користувач вводить текст і натискає кнопку Send.
  4. каналі викликається операція
    writeAndFlush()
    . Викликаються методи
    channelRead()
    та
    channelReadComplete()
    обробника
    EchoServerHandler
    .
  5. Метод
    channelRead() 
    обробника
    EchoServerHandler
    виконує власний метод
    write()
    , а метод
    channelReadComplete()
    виконує
    flush()
    .
  6. EchoClientHandler 
    отримує дані
  7. EchoClientHandler
    встановлює властивість
    StringProperty
    , пов'язане з UI. Автоматично оновлюється поле
    TextField
    UI.
  8. Користувач натискає кнопку Disconnect.
  9. Контролер EchoClientController закриває свій канал Channel і відключає групу EventGroup (на схемі відсутній).


Клієнтський код

Оскільки весь код є на GitHub, основну увагу в цій статті я приділю взаємодії клієнтської JavaFX і Netty interaction. Опускаю тривіальний підклас EchoClient JavaFX Application, створює сцену (Stage) і завантажує файл EchoClient.fxml. Зацікавив нас клієнтський код знаходиться в класі
EchoClientController
.

connect()

Метод
connect()
бере з UI хост і порт і створює канал Netty, який потім зберігається як поле
EchoClientController
.

З EchoClientController.java

@FXML
HBox hboxStatus;

@FXML
ProgressIndicator piStatus;

@FXML
Label lblStatus;

private BooleanProperty connected = new SimpleBooleanProperty(false);
private StringProperty receivingMessageModel = new SimpleStringProperty("");
private Channel channel;

@FXML
public void connect() {

if( connected.get() ) {
return; // з'єднання вже встановлено; попередити та відключити
}

String host = tfHost.getText();
int port = Integer.parseInt(tfPort.getText());

group = new NioEventLoopGroup();

Task<Channel> task = new Task<Channel>() {
@Override
protected Channel call() throws Exception {

updateMessage("Bootstrapping");
updateProgress(0.1 d, 1.0 d);

Bootstrap b = new Bootstrap();
b
.group(group)
.channel(NioSocketChannel.class)
.remoteAddress( new InetSocketAddress(host, port) )
.handler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel));
}
});

ChannelFuture f = b.connect();
Channel chn = f.channel();

updateMessage("Connecting");
updateProgress(0.2 d, 1.0 d);

f.sync();

return chn;
}

@Override
protected void succeeded() {

channel = getValue();
connected.set(true);
}

@Override
protected void failed() {

Throwable exc = getException();
logger.error( "client connect error", exc );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( exc.getClass().getName() );
alert.setContentText( exc.getMessage() );
alert.showAndWait();

connected.set(false);
}
};

hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());

new Thread(task).start();
}


Виклики Netty для початкового завантаження і з'єднання обгорнуті в завдання JavaFX. Завдання – ключова концепція при програмуванні на JavaFX, і у мене є правило: класти в завдання будь-код, який потенційно може виконуватися довше секунди. Таким чином, у завдання у мене потрапляє практично всі, за винятком маніпуляцій з Java-об'єктами в оперативній пам'яті.

Завдання надає кілька властивостей:
runningProperty
,
messageProperty
,
progressProperty
. Я пов'язую їх з елементами UI: контейнером HBox, мітку Label, індикатором ProgressIndicator. Завдяки зв'язуванню JavaFX відпадає необхідність реєструвати слухачі і викликати методи setter() елементів управління користувальницького інтерфейсу.

Метод
call()
повертає канал. У цій реалізації мене не хвилює асинхронне поведінка Netty – адже я вже працюю в новому
Thread()
– тому можу дочекатися, поки повернеться виклик
sync()
. Обчислене значення каналу встановлюється в поле методу
succeeded()
. Якщо Netty видасть виняток, то викликається метод
failed()
, повідомлення логируется і виводиться користувачеві в діалоговому вікні.

Методи
succeeded()
,
failed()
,
updateMessage()
та
updateProgress()
виконуються в потоці FX, а
call()
— ні. Метод
call()
жодним чином не повинен оновлювати UI. Метод call() повинен займатися лише довгостроковій експлуатацією Netty.

send()

Метод
send()
використовує збережений об'єкт
Channel
, щоб викликати метод
writeAndFlush()
. Цей
writeAndFlush()
буде запускатися за допомогою делегата
EchoClientHandler
допомогою фреймворку Netty.

Також з EchoClientController.java

@FXML
public void send() {

if( !connected.get() ) {
return;
}

final String toSend = tfSend.getText();

Task task = new Task() {

@Override
protected Void call() throws Exception {

ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) );
f.sync();

return null;
}

@Override
protected void failed() {

Throwable exc = getException();
logger.error( "client send error", exc );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( exc.getClass().getName() );
alert.setContentText( exc.getMessage() );
alert.showAndWait();

connected.set(false);
}

};

hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());

new Thread(task).start();
}


Зверніть увагу на схожість з
connect()
. Новоспечена завдання зв'язується все з тими ж трьома progress-об'єктами. Метод
succeeded()
немає, а метод
failed()
містить ту ж логіку, що і обробник помилок у реалізації
connect()
.

Завдання нічого не повертає (зворотний тип Void). В оптимістичному сценарії виклик повинен спрацювати відразу, але якщо б він не спрацював, то варто було б дочекатися помилку. Оскільки метод
call()
у мене вже в новому потоці, я можу дозволити собі чекати в методі
sync()
.

disconnect()

Метод
disconnect()
працює з завданням Task за тим же принципом, що і два попередніх методу. Два інших методу використовують одну пару updateMessage/Progress. У цьому методі обгортання з'єднання з Netty відбувається за два окремих етапи. На виконання
sync()
close() потрібно не так багато часу. Метод
shutdownGracefully()
виконується істотно довше. Проте, в моїх експериментах UI жодного разу не підвисав.

@FXML
public void disconnect() {

if( !connected.get() ) {
return;
}

Task<Voidgt; task = new Task<Void>() {

@Override
protected Void call() throws Exception {

updateMessage("Disconnecting");
updateProgress(0.1 d, 1.0 d);

channel.close().sync(); 

updateMessage("Closing group");
updateProgress(0.5 d, 1.0 d);
group.shutdownGracefully().sync();

return null;
}

@Override
protected void succeeded() {

connected.set(false);
}

@Override
protected void failed() {

connected.set(false);

Throwable t = getException();
logger.error( "client disconnect error", t );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( t.getClass().getName() );
alert.setContentText( t.getMessage() );
alert.showAndWait();

}

};

hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());

new Thread(task).start();
}


Зчитування

Зчитування з сервера опосередковується через об'єкт
EchoClientHandler
. При створенні цього об'єкта ставиться посилання на властивість
StringProperty
, є елементом моделі, з яким пов'язується користувальницький інтерфейс. Я міг би передавати елементи UI безпосередньо обробникові, але при цьому порушується принцип поділу відповідальності і стає складніше застосовувати це повідомлення відразу до кількох подань. Таким чином, властивість
StringProperty
може зв'язуватися з будь-яким кількістю елементів UI, і коли надійде оновлення від обробника, оновляться всі ці UI-елементи.

Ось код EchoClientHandler.java. Зверніть увагу на захист FX Thread у методі
channelRead0()
.

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {

private Logger logger = LoggerFactory.getLogger( EchoClientHandler.class );

private final StringProperty receivingMessageModel;

public EchoClientHandler(StringProperty receivingMessageModel) {
this.receivingMessageModel = receivingMessageModel;
}

@Override
protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) 
throws Exception {
final String cm = in.toString(CharsetUtil.UTF_8);
Platform.runLater( () -> receivingMessageModel.set(cm) );
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error( "error in echo client", cause );
ctx.close();
} 
}


Останнє зауваження про послідовність зв'язування… ми не знаємо, коли буде викликатися
channelRead0()
(в даному випадку ми покладаємося на асинхронність Netty), але коли такий виклик відбудеться, ми встановимо об'єкт моделі. Я закінчую оновлення моделі об'єкта, забезпечуючи деякий захист FX Thread. FX – оскільки це фреймворк для зв'язування – оновить всі елементи UI, наприклад,
TextField
.

Заключні зауваження про клієнтському коді

При інтеграції Netty з JavaFX найважливіше – використовувати завдання. Завдяки завданням UI не підвисає, завдяки надаються властивостями всю роботу можна відстежувати візуально. Завдяки завданням частково відпадає необхідність асинхронної обробки з боку Netty (як мінімум, на прикладному рівні), тому завдання можуть скільки завгодно блокуватися, не блокуючи користувальницький інтерфейс. Отримуючи повідомлення про нових даних, спробуйте використовувати зв'язування JavaFX, опосередковане через виділений об'єкт моделі та оновлювати таким чином UI, а не виконувати окремі виклики до конкретних об'єктів.

Серверний код

Просто наводжу тут весь серверний код без коментарів, так як стаття присвячена клієнтським аспектів Netty. Дуже схожий приклад є в книзі Manning

З EchoServer.java

public class EchoServer {

private final int port;

public EchoServer(int port) {
this.port = порт;
}

public static void main(String[] args) throws Exception {
if( args.length != 1 ) {
System.err.println("usage: java EchoServer port");
System.exit(1);
}

int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}

public void start() throws Exception {

final EchoServerHandler echoServerHandler = new EchoServerHandler();

EventLoopGroup group = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b
.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( echoServerHandler );
} 
});

ChannelFuture f = b.bind().sync();

f.channel().closeFuture().sync();

} finally {
group.shutdownGracefully().sync();
}
}
}

З EchoServerHandler.java
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

private Logger logger = LoggerFactory.getLogger( EchoServerHandler.class );

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf in = (ByteBuf)msg;
String in_s = in.toString(CharsetUtil.UTF_8);
String uc = in_s.toUpperCase();
if( logger.isInfoEnabled() ) {
logger.info("[READ] read " + in_s + ", writing " + uc);
}
in.setBytes(0, uc.getBytes(CharsetUtil.UTF_8));
ctx.write(in); // записує байти назад до адресата (не скидає)
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
if( logger.isDebugEnabled() ) {
logger.debug("[READ COMPLETE]");
}
ctx.flush();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if(logger.isDebugEnabled() ) {
logger.debug("[ACTIVE CHANNEL]");
}
ctx.channel().closeFuture().addListener(f -> logger.debug("[CLOSE]"));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error( "error in echo server", cause);
ctx.close();
}
}


Хоча сучасні швидкісні комп'ютери цілком можуть витрачати частину циклів на опитування, завдяки ефективному мережевому рівню ваш додаток буде швидко реагувати і вийде динамічним, а також позбавить сервер від зайвої роботи.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.