Server
import asyncio
class Server:
def __init__(self, path: str):
self.path = path
async def serving(self):
service = await asyncio.start_unix_server(self.chat, path=self.path)
async with service:
await service.serve_forever()
async def chat(self, reader, writer):
data = await reader.read(100)
print(f"received: {data.decode()}")
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
if __name__ == "__main__":
server = Server("/tmp/echo.sock")
asyncio.run(server.serving())
Client
import asyncio
class Client:
def __init__(self, path: str) -> None:
self.path = path
async def chat(self, data: bytes) -> None:
reader, writer = await asyncio.open_unix_connection(self.path)
writer.write(data)
await writer.drain()
print("data has been sent")
data = await reader.read(100)
print(f"received: {data.decode()}")
writer.close()
await writer.wait_closed()
if __name__ == "__main__":
client = Client("/tmp/echo.sock")
asyncio.run(client.chat(b"Hello, world!"))