public class FileServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String fileName) throws Exception {
RandomAccessFile file = new RandomAccessFile(fileName, "r");
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
}
}
eventLoop.schedule(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
// Какие-либо действия
}, 100, TimeUnit.MILLISECONDS);
class MessageListener implements Runnable {
private final ChannelGroup group;
private volatile boolean run = true;
public MessageListener(ChannelGroup group) {
this.group = group;
}
public void run() {
while(run) {
ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
if (!records.isEmpty())
group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
}
}
public void stop() {
run = false;
}
}
class SomeHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof NewMsgEvent) {
ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
records.forEach(record -> {
ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
});
ctx.flush();
}
else {
super.userEventTriggered(ctx, evt);
}
}
}
MessageListener
мог отправить событие только в один нужный конвейер или чтобы только нужный обработчик на событие отреагировал. import io.netty.util.Timer;
import io.netty.util.TimerTask;
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// Отложенное действие
}
}, 30, TimeUnit.SECONDS);
eventLoop.schedule(() -> {
// Отложенное действие
}, 30, TimeUnit.MILLISECONDS);
new ServerBootstrap().group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) {
final long startTime = System.currentTimeMillis();
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void read(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.wrappedBuffer("Hello!".getBytes()));
.addListener(ChannelFutureListener.CLOSE);
}
});
ChannelFuture f = socketChannel.closeFuture();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(System.currentTimeMillis() - startTime);
}
});
}
})
.bind("localhost", 1025)
.sync()
.channel()
.closeFuture()
.syncUninterruptibly();
import java.io.Serializable;
public class Person implements Serializable {
private final Long id;
private String name;
public Person(Long id, String name) {
this.id = id;
this.name = name;
}
public Long getId() {
return id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return getName();
}
}
public class SimpleClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(new Person(1L, "John"));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Person person = (Person) msg;
System.out.println(person);
}
});
}
})
.connect("localhost", 8080)
.sync()
.channel()
.closeFuture()
.sync();
} finally {
group.shutdownGracefully();
}
}
}
public class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
new ServerBootstrap().group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Person person = (Person) msg;
System.out.println(person);
person.setName("John Doe");
ctx.writeAndFlush(person)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.channel().close();
}
});
}
})
.bind("localhost", 8080)
.sync()
.channel()
.closeFuture()
.syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
}