#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromenumimportEnum
[docs]classEncoder:""" A simple encoder to encode the data into bytes """
[docs]defput_int(self,value:int):""" Put an integer into the byte array, 4 bytes """self.byte_array.extend(value.to_bytes(4,byteorder=self.endian))
[docs]defput_long(self,value:int):""" Put a long integer into the byte array, 8 bytes """self.byte_array.extend(value.to_bytes(8,byteorder=self.endian))
[docs]defput_string(self,value:str):""" Put a string into the byte array, first put the length of the string, then the string """self.put_int(len(value))self.byte_array.extend(value.encode("utf-8"))
[docs]defput_byte(self,value:int):""" Put a single byte into the byte array """self.byte_array.extend(value.to_bytes(1,byteorder=self.endian))
[docs]defput_bytes(self,value:bytes):""" Put a byte array into the byte array """self.byte_array.extend(value)
[docs]defput_double(self,value:float):""" Put a double into the byte array, 8 bytes """self.byte_array.extend(value.to_bytes(8,byteorder=self.endian))
[docs]defget_bytes(self):""" Get the bytes from the byte array """# return bytes not bytearrayreturnbytes(self.byte_array)
[docs]classDecoder:""" A simple decoder to decode the bytes into data """
[docs]defget_int(self)->int:""" Get an integer from the byte array, 4 bytes returns: int """value=int.from_bytes(self.byte_array[self.index:self.index+4],# noqa E203byteorder=self.endian,)self.index+=4returnvalue
[docs]defget_long(self)->int:""" Get a long integer from the byte array, 8 bytes returns: int """value=int.from_bytes(self.byte_array[self.index:self.index+8],# noqa E203byteorder=self.endian,)self.index+=8returnvalue
[docs]defget_double(self)->float:""" Get a double from the byte array, 8 bytes returns: float """value=float.from_bytes(self.byte_array[self.index:self.index+8],# noqa E203byteorder=self.endian,)self.index+=8returnvalue
[docs]defget_byte(self)->int:""" Get a single byte from the byte array returns: int """value=int.from_bytes(self.byte_array[self.index:self.index+1],# noqa E203byteorder=self.endian,)self.index+=1returnvalue
[docs]defget_bytes(self,length:int)->bytes:""" Get a byte array from the byte array returns: A byte array """value=self.byte_array[self.index:self.index+length]# noqa E203self.index+=lengthreturnvalue
[docs]defget_string(self)->str:""" Get a string from the byte array, first get the length of the string, then the string returns: str """length=self.get_int()value=self.byte_array[self.index:self.index+length].decode(# noqa E203"utf-8")self.index+=lengthreturnvalue
[docs]defis_empty(self)->bool:""" Whether the byte array is empty """returnself.index==len(self.byte_array)
classInputFormat(Enum):CPP_ENCODER=0# raw bytes encoded by encoder/decoderCYPHER_JSON=1# json format stringCYPHER_PROTO_ADHOC=2# protobuf adhoc bytesCYPHER_PROTO_PROCEDURE=3# protobuf procedure bytesdefappend_format_byte(input:bytes,input_format:InputFormat):""" Append a byte to the end of the input string to denote the input format """new_bytes=input+bytes([input_format.value])returnnew_bytes