Задать вопрос
@lovly093

Почему Apache Cassandra не отвечает на сообщение запуска?

Я хочу сделать небольшую адаптацию драйвера для Apache Cassandra, чтобы понять, как обычно работает взаимодействие на уровне низкого применения. Я использую io.netty для этого, но по какой -то причине неизвестно мне, сервер не отправляет никаких сообщений об ошибке или запросе на авторизацию обратно. Если вы имеете опыт в этом вопросе, сообщите мне, как я могу разрешить свой собственный драйвер. Спасибо.

public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
    private final Bootstrap client;
    private final NioEventLoopGroup group;

    private static final String HOST = "127.0.0.1";
    private static final int PORT = 9042;
    private final Initializer initializer;
    private final Bootstrap handler;

    public NativeCQLConnection()
    {
        this.client = new Bootstrap();
        this.group = new NioEventLoopGroup();
        this.initializer = new Initializer(this, "cassandra", "cassandra");

        this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
    }

    public static void main(String[] args)
    {
        new NativeCQLConnection().run();
    }

    @Override
    public void run()
    {
        handler.connect(HOST, PORT);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        ByteBuf startupMessage = this.initializer.createStartupMessage();
        ctx.writeAndFlush(startupMessage);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
    {
        this.group.shutdownGracefully();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
    }

    private static final class Initializer extends ChannelInitializer<SocketChannel>
    {
        private final ChannelHandler handler;
        private final byte[] username;
        private final byte[] password;

        public Initializer(ChannelHandler handler, String username, String password)
        {
            this.handler = handler;
            this.username = username.getBytes(StandardCharsets.UTF_8);
            this.password = password.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        protected void initChannel(SocketChannel channel) throws Exception
        {
            channel.pipeline().addLast(new MessageDecoder(this));
            channel.pipeline().addLast(new MessageEncoder(this));

            channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));

            channel.pipeline().addLast(this.handler);
        }

        private static final String CQL_VERSION_OPTION = "CQL_VERSION";
        private static final String CQL_VERSION = "3.0.0";
        private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
        private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
        private static final String DRIVER_NAME = "Apache Cassandra Java Driver";

        static final String COMPRESSION_OPTION = "COMPRESSION";
        static final String NO_COMPACT_OPTION = "NO_COMPACT";


        public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
            byte[] initialToken = initialResponse();
            ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
            buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
            buffer.writeInt(0); // Stream ID
            buffer.writeInt(initialToken.length);
            buffer.writeBytes(initialToken);
            return buffer;
        }

        public ByteBuf createStartupMessage() {
            ImmutableMap.Builder<String, String> options = new ImmutableMap.Builder<>();
            options.put(CQL_VERSION_OPTION, CQL_VERSION);
            options.put(COMPRESSION_OPTION, "");
            options.put(NO_COMPACT_OPTION, "true");
            options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
            options.put(DRIVER_NAME_OPTION, DRIVER_NAME);

            ByteBuf body = Unpooled.buffer();
            Writer.writeStringMap(options.build(), body);

            ByteBuf buffer = Unpooled.buffer();
            buffer.writeByte(0x04); // version protocol
            buffer.writeByte(0x00); // flags
            buffer.writeByte(0x00); // stream od
            buffer.writeByte(0x01); // Opcode (STARTUP)
            buffer.writeInt(body.readableBytes()); // length body
            buffer.writeBytes(body); // body

            return buffer;
        }

        public byte[] initialResponse() {
            byte[] initialToken = new byte[username.length + password.length + 2];
            initialToken[0] = 0;
            System.arraycopy(username, 0, initialToken, 1, username.length);
            initialToken[username.length + 1] = 0;
            System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
            return initialToken;
        }
    }

    private static final class MessageEncoder extends MessageToMessageEncoder<ByteBuf>
    {
        private final Initializer initializer;

        public MessageEncoder(Initializer initializer)
        {
            this.initializer = initializer;
        }

        @Override
        protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            out.add(msg.retain());
        }
    }

    private static final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

        private final Initializer initializer;

        public MessageDecoder(Initializer initializer)
        {
            this.initializer = initializer;
        }

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            System.out.println(msg.toString(StandardCharsets.UTF_8));

            if (msg.readableBytes() > 0) {
                byte opcodeByte = msg.readByte();
                int opcode = Byte.toUnsignedInt(opcodeByte);
                System.out.println("Opcode: " + opcode);

                if (opcode == 0x03) { // AUTHENTICATE opcode
                    System.out.println("Server requires authentication");
                } else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
                    ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
                    ctx.writeAndFlush(authResponse);
                } else {
                    System.out.println("Unknown opcode: " + opcode);
                }
            }
        }
    }

    public static final class Writer
    {
        public static void writeStringMap(Map<String, String> m, ByteBuf cb) {
            cb.writeShort(m.size());
            for (Map.Entry<String, String> entry : m.entrySet()) {
                writeString(entry.getKey(), cb);
                writeString(entry.getValue(), cb);
            }
        }

        public static void writeString(String str, ByteBuf cb) {
            byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
            cb.writeShort(bytes.length);
            cb.writeBytes(bytes);
        }
    }
}
  • Вопрос задан
  • 45 просмотров
Подписаться 1 Средний Комментировать
Пригласить эксперта
Ответы на вопрос 1
Vamp
@Vamp
Я не делал свой драйвер именно для кассандры, поэтому советы будут только общими.

1. Прочитайте формальную спецификацию протокола.
2. Поизучайте исходники кассандровского драйвера. Посмотрите как аутентификация сделана там.
3. Для отладки сетевых взаимодействий очень рекомендую wireshark. Мега удобный снифер, показывающий какие конкретно данные летают между клиентом и сервером. Маст хэв при отладке сетевых протоколов и проблем. Особенно бинарных протоколов. Посмотрите что шлёт официальный драйвер и что шлёт ваш. Чем отличаются ваши сообщения от эталонных.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы