Задать вопрос
@lovly093

Почему Apache Cassandra не отвечает на сообщение запуска?

Я хочу сделать небольшую адаптацию драйвера для Apache Cassandra, чтобы понять, как обычно работает взаимодействие на уровне низкого применения. Я использую io.netty для этого, но по какой -то причине неизвестно мне, сервер не отправляет никаких сообщений об ошибке или запросе на авторизацию обратно. Если вы имеете опыт в этом вопросе, сообщите мне, как я могу разрешить свой собственный драйвер. Спасибо.

public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
    private final Bootstrap client;
    private final NioEventLoopGroup group;

    private static final String HOST = "127.0.0.1";
    private static final int PORT = 9042;
    private final Initializer initializer;
    private final Bootstrap handler;

    public NativeCQLConnection()
    {
        this.client = new Bootstrap();
        this.group = new NioEventLoopGroup();
        this.initializer = new Initializer(this, "cassandra", "cassandra");

        this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
    }

    public static void main(String[] args)
    {
        new NativeCQLConnection().run();
    }

    @Override
    public void run()
    {
        handler.connect(HOST, PORT);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        ByteBuf startupMessage = this.initializer.createStartupMessage();
        ctx.writeAndFlush(startupMessage);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
    {
        this.group.shutdownGracefully();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
    }

    private static final class Initializer extends ChannelInitializer<SocketChannel>
    {
        private final ChannelHandler handler;
        private final byte[] username;
        private final byte[] password;

        public Initializer(ChannelHandler handler, String username, String password)
        {
            this.handler = handler;
            this.username = username.getBytes(StandardCharsets.UTF_8);
            this.password = password.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        protected void initChannel(SocketChannel channel) throws Exception
        {
            channel.pipeline().addLast(new MessageDecoder(this));
            channel.pipeline().addLast(new MessageEncoder(this));

            channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));

            channel.pipeline().addLast(this.handler);
        }

        private static final String CQL_VERSION_OPTION = "CQL_VERSION";
        private static final String CQL_VERSION = "3.0.0";
        private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
        private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
        private static final String DRIVER_NAME = "Apache Cassandra Java Driver";

        static final String COMPRESSION_OPTION = "COMPRESSION";
        static final String NO_COMPACT_OPTION = "NO_COMPACT";


        public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
            byte[] initialToken = initialResponse();
            ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
            buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
            buffer.writeInt(0); // Stream ID
            buffer.writeInt(initialToken.length);
            buffer.writeBytes(initialToken);
            return buffer;
        }

        public ByteBuf createStartupMessage() {
            ImmutableMap.Builder<String, String> options = new ImmutableMap.Builder<>();
            options.put(CQL_VERSION_OPTION, CQL_VERSION);
            options.put(COMPRESSION_OPTION, "");
            options.put(NO_COMPACT_OPTION, "true");
            options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
            options.put(DRIVER_NAME_OPTION, DRIVER_NAME);

            ByteBuf body = Unpooled.buffer();
            Writer.writeStringMap(options.build(), body);

            ByteBuf buffer = Unpooled.buffer();
            buffer.writeByte(0x04); // version protocol
            buffer.writeByte(0x00); // flags
            buffer.writeByte(0x00); // stream od
            buffer.writeByte(0x01); // Opcode (STARTUP)
            buffer.writeInt(body.readableBytes()); // length body
            buffer.writeBytes(body); // body

            return buffer;
        }

        public byte[] initialResponse() {
            byte[] initialToken = new byte[username.length + password.length + 2];
            initialToken[0] = 0;
            System.arraycopy(username, 0, initialToken, 1, username.length);
            initialToken[username.length + 1] = 0;
            System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
            return initialToken;
        }
    }

    private static final class MessageEncoder extends MessageToMessageEncoder<ByteBuf>
    {
        private final Initializer initializer;

        public MessageEncoder(Initializer initializer)
        {
            this.initializer = initializer;
        }

        @Override
        protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            out.add(msg.retain());
        }
    }

    private static final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

        private final Initializer initializer;

        public MessageDecoder(Initializer initializer)
        {
            this.initializer = initializer;
        }

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            System.out.println(msg.toString(StandardCharsets.UTF_8));

            if (msg.readableBytes() > 0) {
                byte opcodeByte = msg.readByte();
                int opcode = Byte.toUnsignedInt(opcodeByte);
                System.out.println("Opcode: " + opcode);

                if (opcode == 0x03) { // AUTHENTICATE opcode
                    System.out.println("Server requires authentication");
                } else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
                    ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
                    ctx.writeAndFlush(authResponse);
                } else {
                    System.out.println("Unknown opcode: " + opcode);
                }
            }
        }
    }

    public static final class Writer
    {
        public static void writeStringMap(Map<String, String> m, ByteBuf cb) {
            cb.writeShort(m.size());
            for (Map.Entry<String, String> entry : m.entrySet()) {
                writeString(entry.getKey(), cb);
                writeString(entry.getValue(), cb);
            }
        }

        public static void writeString(String str, ByteBuf cb) {
            byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
            cb.writeShort(bytes.length);
            cb.writeBytes(bytes);
        }
    }
}
  • Вопрос задан
  • 64 просмотра
Подписаться 1 Средний 1 комментарий
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы