aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/asyncmcd.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/asyncmcd.py')
-rw-r--r--src/py/asyncmcd.py143
1 files changed, 143 insertions, 0 deletions
diff --git a/src/py/asyncmcd.py b/src/py/asyncmcd.py
new file mode 100644
index 0000000..0a4c0e6
--- /dev/null
+++ b/src/py/asyncmcd.py
@@ -0,0 +1,143 @@
+import socket
+import struct
+import json
+
+import tornado.ioloop
+import tornado.stack_context
+import tornado.iostream
+
+import imc.async
+
+class AsyncMCD:
+ def __init__(self):
+ def _conn():
+ print('conn')
+
+ self.TYPE_INT = 0
+ self.TYPE_BYTES = 1
+ self.TYPE_STR = 2
+ self.TYPE_JSON = 3
+
+ self._ioloop = tornado.ioloop.IOLoop.instance()
+ self._opaque_count = 0
+ self._opaque_map = {}
+
+ self._stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ self._stream.connect(('10.8.0.6',11211),_conn)
+
+ self._recv_loop()
+
+ def get(self,key):
+ def _recv(opcode,status,opaque,cas,extra,key,value):
+ del self._opaque_map[opaque]
+
+ if status != 0:
+ imc.async.ret(retid,None)
+
+ else:
+ flag, = struct.unpack('!I',extra)
+ if flag == self.TYPE_INT:
+ ret, = struct.unpack('!Q',value)
+
+ elif flag == self.TYPE_BYTES:
+ ret = value
+
+ elif flag == self.TYPE_STR:
+ ret = value.decode('utf-8')
+
+ elif flag == self.TYPE_JSON:
+ ret = json.loads(value.decode('utf-8'))
+
+ imc.async.ret(retid,ret)
+
+ if not isinstance(key,str):
+ raise TypeError
+
+ key = bytes(key,'utf-8')
+ keylen = len(key)
+
+ opaque = self._get_opaque(_recv)
+ header = self._request_header(0x00,keylen,0,0,keylen,opaque,0)
+ data = bytes(bytearray().join([header,key]))
+
+ self._stream.write(data)
+
+ retid = imc.async.get_retid()
+ return imc.async.switch_top()
+
+ def set(self,key,value,expiration = 0):
+ def _recv(opcode,status,opaque,cas,extra,key,value):
+ del self._opaque_map[opaque]
+ imc.async.ret(retid,status)
+
+ if not isinstance(key,str):
+ raise TypeError
+
+ key = bytes(key,'utf-8')
+ keylen = len(key)
+
+ if isinstance(value,int):
+ value_type = self.TYPE_INT
+ value = struct.pack('!Q',value)
+
+ elif isinstance(value,bytes):
+ value_type = self.TYPE_BYTES
+
+ elif isinstance(value,str):
+ value_type = self.TYPE_STR
+ value = bytes(value,'utf-8')
+
+ else:
+ value_type = 2
+ value = bytes(json.dumps(value),'utf-8')
+
+ valuelen = len(value)
+
+ extra = struct.pack('!II',value_type,expiration)
+ extralen = len(extra)
+
+ opaque = self._get_opaque(_recv)
+ header = self._request_header(0x01,keylen,extralen,0,extralen + keylen + valuelen,opaque,0)
+ data = bytes(bytearray().join([header,extra,key,value]))
+
+ self._stream.write(data)
+
+ retid = imc.async.get_retid()
+ return imc.async.switch_top()
+
+ def _get_opaque(self,data):
+ self._opaque_count += 1
+ self._opaque_map[self._opaque_count] = data
+
+ return self._opaque_count
+
+ def _request_header(self,opcode,keylen,extralen,vid,totallen,opaque,cas):
+ return struct.pack('!BBHBBHIIQ',0x80,opcode,keylen,extralen,0x0,vid,totallen,opaque,cas)
+
+ def _recv_loop(self):
+ def __recv(data):
+ def ___recvdata(data):
+ extra = data[0:extralen]
+ key = data[extralen:extralen + keylen]
+ value = data[extralen + keylen:totallen]
+
+ self._opaque_map[opaque](opcode,status,opaque,cas,extra,key,value)
+ self._stream.read_bytes(24,__recv)
+
+ header = struct.unpack('!BBHBBHIIQ',data)
+ opcode = header[1]
+ keylen = header[2]
+ extralen = header[3]
+ status = header[5]
+ totallen = header[6]
+ opaque = header[7]
+ cas = header[8]
+
+ if totallen == 0:
+ self._opaque_map[opaque](opcode,status,opaque,cas,bytes(),bytes(),bytes())
+ self._stream.read_bytes(24,__recv)
+
+ else:
+ self._stream.read_bytes(totallen,___recvdata)
+
+ self._stream.read_bytes(24,__recv)