diff options
Diffstat (limited to 'src/py/asyncmcd.py')
-rw-r--r-- | src/py/asyncmcd.py | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/src/py/asyncmcd.py b/src/py/asyncmcd.py new file mode 100644 index 0000000..0a4c0e6 --- /dev/null +++ b/src/py/asyncmcd.py @@ -0,0 +1,143 @@ +import socket +import struct +import json + +import tornado.ioloop +import tornado.stack_context +import tornado.iostream + +import imc.async + +class AsyncMCD: + def __init__(self): + def _conn(): + print('conn') + + self.TYPE_INT = 0 + self.TYPE_BYTES = 1 + self.TYPE_STR = 2 + self.TYPE_JSON = 3 + + self._ioloop = tornado.ioloop.IOLoop.instance() + self._opaque_count = 0 + self._opaque_map = {} + + self._stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + self._stream.connect(('10.8.0.6',11211),_conn) + + self._recv_loop() + + def get(self,key): + def _recv(opcode,status,opaque,cas,extra,key,value): + del self._opaque_map[opaque] + + if status != 0: + imc.async.ret(retid,None) + + else: + flag, = struct.unpack('!I',extra) + if flag == self.TYPE_INT: + ret, = struct.unpack('!Q',value) + + elif flag == self.TYPE_BYTES: + ret = value + + elif flag == self.TYPE_STR: + ret = value.decode('utf-8') + + elif flag == self.TYPE_JSON: + ret = json.loads(value.decode('utf-8')) + + imc.async.ret(retid,ret) + + if not isinstance(key,str): + raise TypeError + + key = bytes(key,'utf-8') + keylen = len(key) + + opaque = self._get_opaque(_recv) + header = self._request_header(0x00,keylen,0,0,keylen,opaque,0) + data = bytes(bytearray().join([header,key])) + + self._stream.write(data) + + retid = imc.async.get_retid() + return imc.async.switch_top() + + def set(self,key,value,expiration = 0): + def _recv(opcode,status,opaque,cas,extra,key,value): + del self._opaque_map[opaque] + imc.async.ret(retid,status) + + if not isinstance(key,str): + raise TypeError + + key = bytes(key,'utf-8') + keylen = len(key) + + if isinstance(value,int): + value_type = self.TYPE_INT + value = struct.pack('!Q',value) + + elif isinstance(value,bytes): + value_type = self.TYPE_BYTES + + elif isinstance(value,str): + value_type = self.TYPE_STR + value = bytes(value,'utf-8') + + else: + value_type = 2 + value = bytes(json.dumps(value),'utf-8') + + valuelen = len(value) + + extra = struct.pack('!II',value_type,expiration) + extralen = len(extra) + + opaque = self._get_opaque(_recv) + header = self._request_header(0x01,keylen,extralen,0,extralen + keylen + valuelen,opaque,0) + data = bytes(bytearray().join([header,extra,key,value])) + + self._stream.write(data) + + retid = imc.async.get_retid() + return imc.async.switch_top() + + def _get_opaque(self,data): + self._opaque_count += 1 + self._opaque_map[self._opaque_count] = data + + return self._opaque_count + + def _request_header(self,opcode,keylen,extralen,vid,totallen,opaque,cas): + return struct.pack('!BBHBBHIIQ',0x80,opcode,keylen,extralen,0x0,vid,totallen,opaque,cas) + + def _recv_loop(self): + def __recv(data): + def ___recvdata(data): + extra = data[0:extralen] + key = data[extralen:extralen + keylen] + value = data[extralen + keylen:totallen] + + self._opaque_map[opaque](opcode,status,opaque,cas,extra,key,value) + self._stream.read_bytes(24,__recv) + + header = struct.unpack('!BBHBBHIIQ',data) + opcode = header[1] + keylen = header[2] + extralen = header[3] + status = header[5] + totallen = header[6] + opaque = header[7] + cas = header[8] + + if totallen == 0: + self._opaque_map[opaque](opcode,status,opaque,cas,bytes(),bytes(),bytes()) + self._stream.read_bytes(24,__recv) + + else: + self._stream.read_bytes(totallen,___recvdata) + + self._stream.read_bytes(24,__recv) |