Я хочу сделать небольшую адаптацию драйвера для Apache Cassandra, чтобы понять, как обычно работает взаимодействие на уровне низкого применения. Я использую io.netty для этого, но по какой -то причине неизвестно мне, сервер не отправляет никаких сообщений об ошибке или запросе на авторизацию обратно. Если вы имеете опыт в этом вопросе, сообщите мне, как я могу разрешить свой собственный драйвер. Спасибо.
public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
private final Bootstrap client;
private final NioEventLoopGroup group;
private static final String HOST = "127.0.0.1";
private static final int PORT = 9042;
private final Initializer initializer;
private final Bootstrap handler;
public NativeCQLConnection()
{
this.client = new Bootstrap();
this.group = new NioEventLoopGroup();
this.initializer = new Initializer(this, "cassandra", "cassandra");
this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
}
public static void main(String[] args)
{
new NativeCQLConnection().run();
}
@Override
public void run()
{
handler.connect(HOST, PORT);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
ByteBuf startupMessage = this.initializer.createStartupMessage();
ctx.writeAndFlush(startupMessage);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
this.group.shutdownGracefully();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
cause.printStackTrace();
}
private static final class Initializer extends ChannelInitializer<SocketChannel>
{
private final ChannelHandler handler;
private final byte[] username;
private final byte[] password;
public Initializer(ChannelHandler handler, String username, String password)
{
this.handler = handler;
this.username = username.getBytes(StandardCharsets.UTF_8);
this.password = password.getBytes(StandardCharsets.UTF_8);
}
@Override
protected void initChannel(SocketChannel channel) throws Exception
{
channel.pipeline().addLast(new MessageDecoder(this));
channel.pipeline().addLast(new MessageEncoder(this));
channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
channel.pipeline().addLast(this.handler);
}
private static final String CQL_VERSION_OPTION = "CQL_VERSION";
private static final String CQL_VERSION = "3.0.0";
private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
private static final String DRIVER_NAME = "Apache Cassandra Java Driver";
static final String COMPRESSION_OPTION = "COMPRESSION";
static final String NO_COMPACT_OPTION = "NO_COMPACT";
public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
byte[] initialToken = initialResponse();
ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
buffer.writeInt(0); // Stream ID
buffer.writeInt(initialToken.length);
buffer.writeBytes(initialToken);
return buffer;
}
public ByteBuf createStartupMessage() {
ImmutableMap.Builder<String, String> options = new ImmutableMap.Builder<>();
options.put(CQL_VERSION_OPTION, CQL_VERSION);
options.put(COMPRESSION_OPTION, "");
options.put(NO_COMPACT_OPTION, "true");
options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
options.put(DRIVER_NAME_OPTION, DRIVER_NAME);
ByteBuf body = Unpooled.buffer();
Writer.writeStringMap(options.build(), body);
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(0x04); // version protocol
buffer.writeByte(0x00); // flags
buffer.writeByte(0x00); // stream od
buffer.writeByte(0x01); // Opcode (STARTUP)
buffer.writeInt(body.readableBytes()); // length body
buffer.writeBytes(body); // body
return buffer;
}
public byte[] initialResponse() {
byte[] initialToken = new byte[username.length + password.length + 2];
initialToken[0] = 0;
System.arraycopy(username, 0, initialToken, 1, username.length);
initialToken[username.length + 1] = 0;
System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
return initialToken;
}
}
private static final class MessageEncoder extends MessageToMessageEncoder<ByteBuf>
{
private final Initializer initializer;
public MessageEncoder(Initializer initializer)
{
this.initializer = initializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.retain());
}
}
private static final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
private final Initializer initializer;
public MessageDecoder(Initializer initializer)
{
this.initializer = initializer;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
System.out.println(msg.toString(StandardCharsets.UTF_8));
if (msg.readableBytes() > 0) {
byte opcodeByte = msg.readByte();
int opcode = Byte.toUnsignedInt(opcodeByte);
System.out.println("Opcode: " + opcode);
if (opcode == 0x03) { // AUTHENTICATE opcode
System.out.println("Server requires authentication");
} else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
ctx.writeAndFlush(authResponse);
} else {
System.out.println("Unknown opcode: " + opcode);
}
}
}
}
public static final class Writer
{
public static void writeStringMap(Map<String, String> m, ByteBuf cb) {
cb.writeShort(m.size());
for (Map.Entry<String, String> entry : m.entrySet()) {
writeString(entry.getKey(), cb);
writeString(entry.getValue(), cb);
}
}
public static void writeString(String str, ByteBuf cb) {
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
cb.writeShort(bytes.length);
cb.writeBytes(bytes);
}
}
}