(back to SCUBA24)
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import reactor, stdio from twisted.protocols.basic import LineReceiver class ChatClientProtocol(LineReceiver): def connectionMade(self): stdio.StandardIO(ChatClientProtocolStdio(self)) def lineReceived(self, line): print(line.decode()) class ChatClientProtocolStdio(Protocol): def __init__(self, protocol): self.protocol = protocol def dataReceived(self, data): self.protocol.sendLine(data.strip()) class ChatClientFactory(ClientFactory): protocol = ChatClientProtocol def clientConnectionFailed(self, connector, reason): print("Connection failed - goodbye!") reactor.stop() def clientConnectionLost(self, connector, reason): print("Connection lost - goodbye!") reactor.stop() reactor.connectTCP("localhost", 8123, ChatClientFactory()) reactor.run()
from twisted.internet import reactor from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver class Chat(LineReceiver): def connectionMade(self): self.sendLine("What's your name?".encode()) def lineReceived(self, line): print(line.decode()) class ChatFactory(Factory): protocol = Chat def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory()) reactor.run()
from twisted.internet.task import LoopingCall from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import reactor from twisted.protocols.basic import LineReceiver import queue import pygame DESIRED_FPS = 30.0 send_queue = queue.Queue() receive_queue = queue.Queue() my_protocol = None connection_ready = False pygame.init() pygame.font.init() SCREEN_WIDTH, SCREEN_HEIGHT = 800, 800 screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT)) # bg = ... # screen.blit(bg, (0,0)) pygame.display.update() pygame.display.set_caption("Game") class GameProtocol(LineReceiver): def connectionMade(self): global my_protocol my_protocol = self global connection_ready connection_ready = True def lineReceived(self, line): print(f"Received: {line.decode('utf-8')}") receive_queue.put(line.decode()) def empty_send_queue(): while not send_queue.empty(): action = send_queue.get() my_protocol.sendLine((action).encode('utf-8')) def process_receive_queue(): while not receive_queue.empty(): action = receive_queue.get() #Your code here to post events def tic(): if connection_ready: empty_send_queue() process_receive_queue() for event in pygame.event.get(): pygame.draw.rect(screen, (255, 0, 0), pygame.Rect(0,0,200,200)) if event.type == pygame.QUIT: pygame.quit() reactor.stop() elif event.type == pygame.MOUSEBUTTONDOWN: mouse_pos = pygame.mouse.get_pos() if pygame.Rect(0,0,200,200).collidepoint(mouse_pos): send_queue.put('USERNAME: Nico') pygame.display.update() class GameProtocolFactory(ClientFactory): protocol = GameProtocol def clientConnectionFailed(self, connector, reason): print("Connection failed - goodbye!") reactor.stop() pygame.quit() def clientConnectionLost(self, connector, reason): print("Connection lost - goodbye!") reactor.stop() pygame.quit() tick = LoopingCall(tic) tick.start(1/DESIRED_FPS) reactor.connectTCP("localhost", 8122, GameProtocolFactory()) reactor.run()